diff --git a/libafl/Cargo.toml b/libafl/Cargo.toml index 5f2266e7fb..2187b2766c 100644 --- a/libafl/Cargo.toml +++ b/libafl/Cargo.toml @@ -12,7 +12,7 @@ edition = "2021" categories = ["development-tools::testing", "emulators", "embedded", "os", "no-std"] [features] -default = ["std", "derive", "llmp_compression", "rand_trait", "fork", "prelude", "gzip"] +default = ["std", "derive", "llmp_compression", "llmp_small_maps", "llmp_broker_timeouts", "rand_trait", "fork", "prelude", "gzip"] std = ["serde_json", "serde_json/std", "hostname", "nix", "serde/std", "bincode", "wait-timeout", "regex", "byteorder", "once_cell", "uuid", "tui_monitor", "ctor", "backtrace", "uds"] # print, env, launcher ... support derive = ["libafl_derive"] # provide derive(SerdeAny) macro. fork = [] # uses the fork() syscall to spawn children, instead of launching a new command, if supported by the OS (has no effect on Windows, no_std). @@ -43,6 +43,7 @@ llmp_bind_public = [] # If set, llmp will bind to 0.0.0.0, allowing cross-device llmp_compression = ["gzip"] # llmp compression using GZip llmp_debug = [] # Enables debug output for LLMP llmp_small_maps = [] # reduces initial map size for llmp +llmp_broker_timeouts = ["std"] # The broker loop will yield occasionally, even without status messages from client nodes [build-dependencies] rustversion = "1.0" diff --git a/libafl/src/bolts/llmp.rs b/libafl/src/bolts/llmp.rs index d423e4e675..f7bc3a8e3b 100644 --- a/libafl/src/bolts/llmp.rs +++ b/libafl/src/bolts/llmp.rs @@ -1930,16 +1930,17 @@ where /// The broker walks all pages and looks for changes, then broadcasts them on /// its own shared page, once. #[inline] - pub fn once(&mut self, on_new_msg: &mut F) -> Result<(), Error> + pub fn once(&mut self, on_new_msg: &mut F) -> Result where F: FnMut(ClientId, Tag, Flags, &[u8]) -> Result, { + let mut new_messages = false; for i in 0..self.llmp_clients.len() { unsafe { - self.handle_new_msgs(i as u32, on_new_msg)?; + new_messages |= self.handle_new_msgs(i as u32, on_new_msg)?; } } - Ok(()) + Ok(new_messages) } /// Internal function, returns true when shuttdown is requested by a `SIGINT` signal @@ -1958,9 +1959,66 @@ where false } + /// Loops infinitely, forwarding and handling all incoming messages from clients. + /// Never returns. + /// Will call `on_timeout` roughly after `timeout` + /// Panics on error. + /// 5 millis of sleep can't hurt to keep busywait not at 100% + #[cfg(feature = "std")] + pub fn loop_with_timeouts( + &mut self, + on_new_msg_or_timeout: &mut F, + timeout: Duration, + sleep_time: Option, + ) where + F: FnMut(Option<(ClientId, Tag, Flags, &[u8])>) -> Result, + { + use super::current_milliseconds; + + #[cfg(unix)] + if let Err(_e) = unsafe { setup_signal_handler(&mut LLMP_SIGHANDLER_STATE) } { + // We can live without a proper ctrl+c signal handler. Print and ignore. + #[cfg(feature = "std")] + println!("Failed to setup signal handlers: {_e}"); + } + + let timeout = timeout.as_millis() as u64; + let mut end_time = current_milliseconds() + timeout; + + while !self.is_shutting_down() { + if current_milliseconds() > end_time { + on_new_msg_or_timeout(None).expect("An error occured in broker timeout. Exiting."); + end_time = current_milliseconds() + timeout; + } + + if self + .once(&mut |client_id, tag, flags, buf| { + on_new_msg_or_timeout(Some((client_id, tag, flags, buf))) + }) + .expect("An error occurred when brokering. Exiting.") + { + end_time = current_milliseconds() + timeout; + } + + #[cfg(feature = "std")] + if let Some(time) = sleep_time { + thread::sleep(time); + } + + #[cfg(not(feature = "std"))] + if let Some(time) = sleep_time { + panic!("Cannot sleep on no_std platform (requested {:?})", time); + } + } + self.llmp_out + .send_buf(LLMP_TAG_EXITING, &[]) + .expect("Error when shutting down broker: Could not send LLMP_TAG_EXITING msg."); + } + /// Loops infinitely, forwarding and handling all incoming messages from clients. /// Never returns. Panics on error. /// 5 millis of sleep can't hurt to keep busywait not at 100% + /// On std, if you need to run code even if no update got sent, use `Self::loop_with_timeout` (needs the `std` feature). pub fn loop_forever(&mut self, on_new_msg: &mut F, sleep_time: Option) where F: FnMut(ClientId, Tag, Flags, &[u8]) -> Result, @@ -2338,13 +2396,19 @@ where Ok(ret) } - /// broker broadcast to its own page for all others to read */ + /// Broker broadcast to its own page for all others to read + /// Returns `true` if new messages were broker-ed #[inline] #[allow(clippy::cast_ptr_alignment)] - unsafe fn handle_new_msgs(&mut self, client_id: u32, on_new_msg: &mut F) -> Result<(), Error> + unsafe fn handle_new_msgs( + &mut self, + client_id: u32, + on_new_msg: &mut F, + ) -> Result where F: FnMut(ClientId, Tag, Flags, &[u8]) -> Result, { + let mut new_mesages = false; let mut next_id = self.llmp_clients.len() as u32; // TODO: We could memcpy a range of pending messages, instead of one by one. @@ -2354,11 +2418,13 @@ where match client.recv()? { None => { // We're done handling this client - return Ok(()); + return Ok(new_mesages); } Some(msg) => msg, } }; + // We got a new message + new_mesages = true; match (*msg).tag { // first, handle the special, llmp-internal messages diff --git a/libafl/src/events/llmp.rs b/libafl/src/events/llmp.rs index 189834af17..f2ba67c1ff 100644 --- a/libafl/src/events/llmp.rs +++ b/libafl/src/events/llmp.rs @@ -38,7 +38,7 @@ use crate::{ }; use crate::{ bolts::{ - llmp::{self, Flags, LlmpClient, LlmpClientDescription, Tag}, + llmp::{self, LlmpClient, LlmpClientDescription, Tag}, shmem::ShMemProvider, }, events::{ @@ -123,12 +123,13 @@ where } /// Run forever in the broker + #[cfg(not(feature = "llmp_broker_timeouts"))] pub fn broker_loop(&mut self) -> Result<(), Error> { let monitor = &mut self.monitor; #[cfg(feature = "llmp_compression")] let compressor = &self.compressor; self.llmp.loop_forever( - &mut |client_id: u32, tag: Tag, _flags: Flags, msg: &[u8]| { + &mut |client_id, tag, _flags, msg| { if tag == LLMP_TAG_EVENT_TO_BOTH { #[cfg(not(feature = "llmp_compression"))] let event_bytes = msg; @@ -156,6 +157,49 @@ where Ok(()) } + /// Run forever in the broker + #[cfg(feature = "llmp_broker_timeouts")] + pub fn broker_loop(&mut self) -> Result<(), Error> { + let monitor = &mut self.monitor; + #[cfg(feature = "llmp_compression")] + let compressor = &self.compressor; + self.llmp.loop_with_timeouts( + &mut |msg_or_timeout| { + if let Some((client_id, tag, _flags, msg)) = msg_or_timeout { + if tag == LLMP_TAG_EVENT_TO_BOTH { + #[cfg(not(feature = "llmp_compression"))] + let event_bytes = msg; + #[cfg(feature = "llmp_compression")] + let compressed; + #[cfg(feature = "llmp_compression")] + let event_bytes = if _flags & LLMP_FLAG_COMPRESSED == LLMP_FLAG_COMPRESSED { + compressed = compressor.decompress(msg)?; + &compressed + } else { + msg + }; + let event: Event = postcard::from_bytes(event_bytes)?; + match Self::handle_in_broker(monitor, client_id, &event)? { + BrokerEventResult::Forward => { + Ok(llmp::LlmpMsgHookResult::ForwardToClients) + } + BrokerEventResult::Handled => Ok(llmp::LlmpMsgHookResult::Handled), + } + } else { + Ok(llmp::LlmpMsgHookResult::ForwardToClients) + } + } else { + monitor.display("Timeout".into(), 0); + Ok(llmp::LlmpMsgHookResult::Handled) + } + }, + Duration::from_millis(1000), + Some(Duration::from_millis(5)), + ); + + Ok(()) + } + /// Handle arriving events in the broker #[allow(clippy::unnecessary_wraps)] fn handle_in_broker( @@ -453,7 +497,7 @@ where event: Event<::Input>, ) -> Result<(), Error> { let serialized = postcard::to_allocvec(&event)?; - let flags: Flags = LLMP_FLAG_INITIALIZED; + let flags = LLMP_FLAG_INITIALIZED; match self.compressor.compress(&serialized)? { Some(comp_buf) => { @@ -1303,7 +1347,7 @@ where } }; let serialized = postcard::to_allocvec(&converted_event)?; - let flags: Flags = LLMP_FLAG_INITIALIZED; + let flags = LLMP_FLAG_INITIALIZED; match self.compressor.compress(&serialized)? { Some(comp_buf) => {