Avoid lagged receiver in TCP manager (#1672)
This commit is contained in:
parent
d7825851e9
commit
d606d9f4f6
@ -31,7 +31,7 @@ use libafl_bolts::{shmem::StdShMemProvider, staterestore::StateRestorer};
|
|||||||
use serde::{de::DeserializeOwned, Deserialize, Serialize};
|
use serde::{de::DeserializeOwned, Deserialize, Serialize};
|
||||||
use tokio::{
|
use tokio::{
|
||||||
io::{AsyncReadExt, AsyncWriteExt},
|
io::{AsyncReadExt, AsyncWriteExt},
|
||||||
sync::{broadcast, mpsc},
|
sync::{broadcast, broadcast::error::RecvError, mpsc},
|
||||||
task::{spawn, JoinHandle},
|
task::{spawn, JoinHandle},
|
||||||
};
|
};
|
||||||
#[cfg(feature = "std")]
|
#[cfg(feature = "std")]
|
||||||
@ -111,8 +111,8 @@ where
|
|||||||
#[tokio::main(flavor = "current_thread")]
|
#[tokio::main(flavor = "current_thread")]
|
||||||
#[allow(clippy::too_many_lines)]
|
#[allow(clippy::too_many_lines)]
|
||||||
pub async fn broker_loop(&mut self) -> Result<(), Error> {
|
pub async fn broker_loop(&mut self) -> Result<(), Error> {
|
||||||
let (tx_bc, rx) = broadcast::channel(1024);
|
let (tx_bc, rx) = broadcast::channel(65536);
|
||||||
let (tx, mut rx_mpsc) = mpsc::channel(1024);
|
let (tx, mut rx_mpsc) = mpsc::channel(65536);
|
||||||
|
|
||||||
let exit_cleanly_after = self.exit_cleanly_after;
|
let exit_cleanly_after = self.exit_cleanly_after;
|
||||||
|
|
||||||
@ -224,13 +224,14 @@ where
|
|||||||
spawn(async move {
|
spawn(async move {
|
||||||
// In a loop, read data from the socket and write the data back.
|
// In a loop, read data from the socket and write the data back.
|
||||||
loop {
|
loop {
|
||||||
let buf: Vec<u8> = rx_inner
|
let buf: Vec<u8> = match rx_inner.lock().await.recv().await {
|
||||||
.lock()
|
Ok(buf) => buf,
|
||||||
.await
|
Err(RecvError::Lagged(num)) => {
|
||||||
.recv()
|
log::error!("Receiver lagged, skipping {num} messages");
|
||||||
.await
|
continue;
|
||||||
.expect("Could not receive");
|
}
|
||||||
// TODO handle full capacity, Lagged https://docs.rs/tokio/latest/tokio/sync/broadcast/error/enum.RecvError.html
|
_ => panic!("Could not receive"),
|
||||||
|
};
|
||||||
|
|
||||||
#[cfg(feature = "tcp_debug")]
|
#[cfg(feature = "tcp_debug")]
|
||||||
println!("{buf:?}");
|
println!("{buf:?}");
|
||||||
@ -704,7 +705,7 @@ where
|
|||||||
if self_id == other_client_id {
|
if self_id == other_client_id {
|
||||||
panic!("Own ID should never have been sent by the broker");
|
panic!("Own ID should never have been sent by the broker");
|
||||||
} else {
|
} else {
|
||||||
println!("{self_id:?} (from {other_client_id:?}) Received: {buf:?}");
|
log::info!("{self_id:?} (from {other_client_id:?}) Received: {buf:?}");
|
||||||
|
|
||||||
let event = postcard::from_bytes(&buf[4..])?;
|
let event = postcard::from_bytes(&buf[4..])?;
|
||||||
self.handle_in_client(fuzzer, executor, state, other_client_id, event)?;
|
self.handle_in_client(fuzzer, executor, state, other_client_id, event)?;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user