From d606d9f4f62053b9061150b90f3d0ee3e29a3f86 Mon Sep 17 00:00:00 2001 From: Andrea Fioraldi Date: Fri, 17 Nov 2023 14:23:51 +0100 Subject: [PATCH] Avoid lagged receiver in TCP manager (#1672) --- libafl/src/events/tcp.rs | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/libafl/src/events/tcp.rs b/libafl/src/events/tcp.rs index ff7e9a25d9..31ff6e7209 100644 --- a/libafl/src/events/tcp.rs +++ b/libafl/src/events/tcp.rs @@ -31,7 +31,7 @@ use libafl_bolts::{shmem::StdShMemProvider, staterestore::StateRestorer}; use serde::{de::DeserializeOwned, Deserialize, Serialize}; use tokio::{ io::{AsyncReadExt, AsyncWriteExt}, - sync::{broadcast, mpsc}, + sync::{broadcast, broadcast::error::RecvError, mpsc}, task::{spawn, JoinHandle}, }; #[cfg(feature = "std")] @@ -111,8 +111,8 @@ where #[tokio::main(flavor = "current_thread")] #[allow(clippy::too_many_lines)] pub async fn broker_loop(&mut self) -> Result<(), Error> { - let (tx_bc, rx) = broadcast::channel(1024); - let (tx, mut rx_mpsc) = mpsc::channel(1024); + let (tx_bc, rx) = broadcast::channel(65536); + let (tx, mut rx_mpsc) = mpsc::channel(65536); let exit_cleanly_after = self.exit_cleanly_after; @@ -224,13 +224,14 @@ where spawn(async move { // In a loop, read data from the socket and write the data back. loop { - let buf: Vec = rx_inner - .lock() - .await - .recv() - .await - .expect("Could not receive"); - // TODO handle full capacity, Lagged https://docs.rs/tokio/latest/tokio/sync/broadcast/error/enum.RecvError.html + let buf: Vec = match rx_inner.lock().await.recv().await { + Ok(buf) => buf, + Err(RecvError::Lagged(num)) => { + log::error!("Receiver lagged, skipping {num} messages"); + continue; + } + _ => panic!("Could not receive"), + }; #[cfg(feature = "tcp_debug")] println!("{buf:?}"); @@ -704,7 +705,7 @@ where if self_id == other_client_id { panic!("Own ID should never have been sent by the broker"); } else { - println!("{self_id:?} (from {other_client_id:?}) Received: {buf:?}"); + log::info!("{self_id:?} (from {other_client_id:?}) Received: {buf:?}"); let event = postcard::from_bytes(&buf[4..])?; self.handle_in_client(fuzzer, executor, state, other_client_id, event)?;