diff --git a/libafl/src/bolts/llmp.rs b/libafl/src/bolts/llmp.rs index b1c25b2d75..5d5124f388 100644 --- a/libafl/src/bolts/llmp.rs +++ b/libafl/src/bolts/llmp.rs @@ -2096,36 +2096,47 @@ where #[cfg(all(feature = "llmp_debug", feature = "std"))] println!("B2B: Starting proxy loop :)"); + let peer_address = stream.peer_addr().unwrap(); + loop { // first, forward all data we have. - while let Some((client_id, tag, flags, payload)) = local_receiver - .recv_buf_with_flags() - .expect("Error reading from local page!") - { - if client_id == b2b_client_id { - println!( - "Ignored message we probably sent earlier (same id), TAG: {:x}", - tag - ); - continue; - } + loop { + match local_receiver.recv_buf_with_flags() { + Ok(None) => break, // no more data to forward + Ok(Some((client_id, tag, flags, payload))) => { + if client_id == b2b_client_id { + println!( + "Ignored message we probably sent earlier (same id), TAG: {:x}", + tag + ); + continue; + } - #[cfg(all(feature = "llmp_debug", feature = "std"))] - println!( - "Fowarding message ({} bytes) via broker2broker connection", - payload.len() - ); - // We got a new message! Forward... - send_tcp_msg( - &mut stream, - &TcpRemoteNewMessage { - client_id, - tag, - flags, - payload: payload.to_vec(), - }, - ) - .expect("Error sending message via broker 2 broker"); + #[cfg(all(feature = "llmp_debug", feature = "std"))] + println!( + "Fowarding message ({} bytes) via broker2broker connection", + payload.len() + ); + // We got a new message! Forward... + if let Err(e) = send_tcp_msg( + &mut stream, + &TcpRemoteNewMessage { + client_id, + tag, + flags, + payload: payload.to_vec(), + }, + ) { + println!("Got error {} while trying to forward a message to broker {}, exiting thread", e, peer_address); + return; + } + } + Err(Error::ShuttingDown) => { + println!("Local broker is shutting down, exiting thread"); + return; + } + Err(e) => panic!("Error reading from local page! {}", e), + } } // Then, see if we can receive something. @@ -2134,25 +2145,42 @@ where // Forwarding happens between each recv, too, as simplification. // We ignore errors completely as they may be timeout, or stream closings. // Instead, we catch stream close when/if we next try to send. - if let Ok(val) = recv_tcp_msg(&mut stream) { - let msg: TcpRemoteNewMessage = val.try_into().expect( - "Illegal message received from broker 2 broker connection - shutting down.", - ); + match recv_tcp_msg(&mut stream) { + Ok(val) => { + let msg: TcpRemoteNewMessage = val.try_into().expect( + "Illegal message received from broker 2 broker connection - shutting down.", + ); - #[cfg(all(feature = "llmp_debug", feature = "std"))] - println!( - "Fowarding incoming message ({} bytes) from broker2broker connection", - msg.payload.len() - ); + #[cfg(all(feature = "llmp_debug", feature = "std"))] + println!( + "Fowarding incoming message ({} bytes) from broker2broker connection", + msg.payload.len() + ); - // TODO: Could probably optimize this somehow to forward all queued messages between locks... oh well. - // Todo: somehow mangle in the other broker id? ClientId? - new_sender - .send_buf_with_flags(msg.tag, msg.flags | LLMP_FLAG_FROM_B2B, &msg.payload) - .expect("B2B: Error forwarding message. Exiting."); - } else { - #[cfg(all(feature = "llmp_debug", feature = "std"))] - println!("Received no input, timeout or closed. Looping back up :)"); + // TODO: Could probably optimize this somehow to forward all queued messages between locks... oh well. + // Todo: somehow mangle in the other broker id? ClientId? + new_sender + .send_buf_with_flags( + msg.tag, + msg.flags | LLMP_FLAG_FROM_B2B, + &msg.payload, + ) + .expect("B2B: Error forwarding message. Exiting."); + } + Err(e) => { + if let Error::File(e, _) = e { + if e.kind() == ErrorKind::UnexpectedEof { + println!( + "Broker {} seems to have disconnected, exiting", + peer_address + ); + return; + } + } + + #[cfg(all(feature = "llmp_debug", feature = "std"))] + println!("Received no input, timeout or closed. Looping back up :)"); + } } } });