diff --git a/docs/src/message_passing/message_passing.md b/docs/src/message_passing/message_passing.md index 1bbf0ca9c1..c298c887f8 100644 --- a/docs/src/message_passing/message_passing.md +++ b/docs/src/message_passing/message_passing.md @@ -72,7 +72,7 @@ So the outgoing messages flow is like this over the outgoing broadcast `Shmem`: [client0] [client1] ... [clientN] ``` -To use `LLMP` in LibAFL, you usually want to use an `LlmpEventManager` or its restarting variant. +To use `LLMP` in LibAFL, you usually want to use an `LlmpRestartingEventManager` or its restarting variant. They are the default if using LibAFL's `Launcher`. If you should want to use `LLMP` in its raw form, without any `LibAFL` abstractions, take a look at the `llmp_test` example in [./libafl/examples](https://github.com/AFLplusplus/LibAFL/blob/main/libafl_bolts/examples/llmp_test/main.rs). diff --git a/docs/src/message_passing/spawn_instances.md b/docs/src/message_passing/spawn_instances.md index 0c532848dd..7cdbdb4e1c 100644 --- a/docs/src/message_passing/spawn_instances.md +++ b/docs/src/message_passing/spawn_instances.md @@ -58,6 +58,6 @@ For more examples, you can check out `qemu_launcher` and `libfuzzer_libpng_launc ## Other ways -The `LlmpEventManager` family is the easiest way to spawn instances, but for obscure targets, you may need to come up with other solutions. +The `LlmpRestartEventManager` is the easiest way to spawn instances, but for obscure targets, you may need to come up with other solutions. LLMP is even, in theory, `no_std` compatible, and even completely different EventManagers can be used for message passing. If you are in this situation, please either read through the current implementations and/or reach out to us. diff --git a/fuzzers/binary_only/fuzzbench_qemu/README.md b/fuzzers/binary_only/fuzzbench_qemu/README.md index df34f5e090..9ba943a3f5 100644 --- a/fuzzers/binary_only/fuzzbench_qemu/README.md +++ b/fuzzers/binary_only/fuzzbench_qemu/README.md @@ -1,8 +1,8 @@ # Fuzzbench Harness This folder contains an example fuzzer tailored for fuzzbench. -It uses the best possible setting, with the exception of a SimpleRestartingEventManager instead of an LlmpEventManager - since fuzzbench is single threaded. -Real fuzz campaigns should consider using multithreaded LlmpEventManager, see the other examples. +It uses the best possible setting, with the exception of a SimpleRestartingEventManager instead of an LlmpRestartingEventManager - since fuzzbench is single threaded. +Real fuzz campaigns should consider using multithreaded LlmpRestaringtEventManager, see the other examples. ## Build diff --git a/fuzzers/binary_only/qemu_coverage/src/fuzzer.rs b/fuzzers/binary_only/qemu_coverage/src/fuzzer.rs index bcfffb4f12..3307ef7222 100644 --- a/fuzzers/binary_only/qemu_coverage/src/fuzzer.rs +++ b/fuzzers/binary_only/qemu_coverage/src/fuzzer.rs @@ -288,11 +288,6 @@ pub fn fuzz() { .monitor(MultiMonitor::new(|s| println!("{s}"))) .run_client(&mut run_client) .cores(&options.cores) - .stdout_file(if options.verbose { - None - } else { - Some("/dev/null") - }) .build() .launch() { diff --git a/fuzzers/binary_only/qemu_launcher/src/fuzzer.rs b/fuzzers/binary_only/qemu_launcher/src/fuzzer.rs index df7e7cd31f..64a661ed37 100644 --- a/fuzzers/binary_only/qemu_launcher/src/fuzzer.rs +++ b/fuzzers/binary_only/qemu_launcher/src/fuzzer.rs @@ -10,7 +10,7 @@ use libafl::events::SimpleEventManager; #[cfg(not(feature = "simplemgr"))] use libafl::events::{EventConfig, Launcher, MonitorTypedEventManager}; use libafl::{ - events::{ClientDescription, LlmpEventManager, LlmpRestartingEventManager}, + events::{ClientDescription, LlmpEventManagerBuilder}, monitors::{tui::TuiMonitor, Monitor, MultiMonitor}, Error, }; @@ -114,17 +114,17 @@ impl Fuzzer { // To rerun an input, instead of using a launcher, we create dummy parameters and run the client directly. return client.run( None, - MonitorTypedEventManager::<_, M>::new(LlmpRestartingEventManager::new( - LlmpEventManager::builder() - .build_on_port( - shmem_provider.clone(), - broker_port, - EventConfig::AlwaysUnique, - None, - ) - .unwrap(), - StateRestorer::new(shmem_provider.new_shmem(0x1000).unwrap()), - )), + MonitorTypedEventManager::<_, M>::new( + LlmpEventManagerBuilder::builder().build_on_port( + shmem_provider.clone(), + broker_port, + EventConfig::AlwaysUnique, + None, + Some(StateRestorer::new( + shmem_provider.new_shmem(0x1000).unwrap(), + )), + )?, + ), ClientDescription::new(0, 0, CoreId(0)), ); } diff --git a/fuzzers/full_system/nyx_launcher/src/fuzzer.rs b/fuzzers/full_system/nyx_launcher/src/fuzzer.rs index 1b1e8cfbd3..8325e46feb 100644 --- a/fuzzers/full_system/nyx_launcher/src/fuzzer.rs +++ b/fuzzers/full_system/nyx_launcher/src/fuzzer.rs @@ -7,8 +7,7 @@ use std::{ use clap::Parser; use libafl::{ events::{ - ClientDescription, EventConfig, Launcher, LlmpEventManager, LlmpRestartingEventManager, - MonitorTypedEventManager, + ClientDescription, EventConfig, Launcher, LlmpEventManagerBuilder, MonitorTypedEventManager, }, monitors::{tui::TuiMonitor, Monitor, MultiMonitor}, Error, @@ -111,17 +110,17 @@ impl Fuzzer { // To rerun an input, instead of using a launcher, we create dummy parameters and run the client directly. return client.run( None, - MonitorTypedEventManager::<_, M>::new(LlmpRestartingEventManager::new( - LlmpEventManager::builder() - .build_on_port( - shmem_provider.clone(), - broker_port, - EventConfig::AlwaysUnique, - None, - ) - .unwrap(), - StateRestorer::new(shmem_provider.new_shmem(0x1000).unwrap()), - )), + MonitorTypedEventManager::<_, M>::new( + LlmpEventManagerBuilder::builder().build_on_port( + shmem_provider.clone(), + broker_port, + EventConfig::AlwaysUnique, + None, + Some(StateRestorer::new( + shmem_provider.new_shmem(0x1000).unwrap(), + )), + )?, + ), ClientDescription::new(0, 0, CoreId(0)), ); } diff --git a/fuzzers/inprocess/fuzzbench/README.md b/fuzzers/inprocess/fuzzbench/README.md index df34f5e090..bb3c8af640 100644 --- a/fuzzers/inprocess/fuzzbench/README.md +++ b/fuzzers/inprocess/fuzzbench/README.md @@ -1,8 +1,8 @@ # Fuzzbench Harness This folder contains an example fuzzer tailored for fuzzbench. -It uses the best possible setting, with the exception of a SimpleRestartingEventManager instead of an LlmpEventManager - since fuzzbench is single threaded. -Real fuzz campaigns should consider using multithreaded LlmpEventManager, see the other examples. +It uses the best possible setting, with the exception of a SimpleRestartingEventManager instead of an LlmpRestartEventManager - since fuzzbench is single threaded. +Real fuzz campaigns should consider using multithreaded LlmpRestartEventManager, see the other examples. ## Build diff --git a/fuzzers/inprocess/fuzzbench_text/README.md b/fuzzers/inprocess/fuzzbench_text/README.md index 7a180e1932..51ec868ae0 100644 --- a/fuzzers/inprocess/fuzzbench_text/README.md +++ b/fuzzers/inprocess/fuzzbench_text/README.md @@ -1,8 +1,8 @@ # Fuzzbench Harness (text) This folder contains an example fuzzer tailored for fuzzbench. -It uses the best possible setting, with the exception of a SimpleRestartingEventManager instead of an LlmpEventManager - since fuzzbench is single threaded. -Real fuzz campaigns should consider using multithreaded LlmpEventManager, see the other examples. +It uses the best possible setting, with the exception of a SimpleRestartingEventManager instead of an LlmpRestartEventManager - since fuzzbench is single threaded. +Real fuzz campaigns should consider using multithreaded LlmpRestartEventManager, see the other examples. This fuzzer autodetect if the passed-in tokens and the initial inputs are text or binary data, and enables Grimoire in case of text. diff --git a/libafl/src/events/llmp/mgr.rs b/libafl/src/events/llmp/mgr.rs deleted file mode 100644 index be794116be..0000000000 --- a/libafl/src/events/llmp/mgr.rs +++ /dev/null @@ -1,629 +0,0 @@ -//! An event manager that forwards all events to other attached fuzzers on shared maps or via tcp, -//! using low-level message passing, [`libafl_bolts::llmp`]. - -#[cfg(feature = "std")] -use alloc::string::ToString; -use alloc::vec::Vec; -use core::{fmt::Debug, marker::PhantomData, time::Duration}; -#[cfg(feature = "std")] -use std::net::TcpStream; - -#[cfg(feature = "std")] -use libafl_bolts::tuples::MatchNameRef; -#[cfg(feature = "llmp_compression")] -use libafl_bolts::{ - compress::GzipCompressor, - llmp::{LLMP_FLAG_COMPRESSED, LLMP_FLAG_INITIALIZED}, -}; -use libafl_bolts::{ - current_time, - llmp::{LlmpClient, LlmpClientDescription, LLMP_FLAG_FROM_MM}, - shmem::{NopShMem, ShMem, ShMemProvider}, - tuples::Handle, - ClientId, -}; -#[cfg(feature = "std")] -use libafl_bolts::{ - llmp::{recv_tcp_msg, send_tcp_msg, TcpRequest, TcpResponse}, - IP_LOCALHOST, -}; -use serde::{de::DeserializeOwned, Serialize}; - -#[cfg(feature = "share_objectives")] -use crate::corpus::{Corpus, Testcase}; -#[cfg(feature = "llmp_compression")] -use crate::events::llmp::COMPRESS_THRESHOLD; -#[cfg(feature = "std")] -use crate::events::{serialize_observers_adaptive, CanSerializeObserver}; -use crate::{ - events::{ - llmp::{LLMP_TAG_EVENT_TO_BOTH, _LLMP_TAG_EVENT_TO_BROKER}, - std_maybe_report_progress, std_on_restart, std_report_progress, AdaptiveSerializer, - AwaitRestartSafe, Event, EventConfig, EventFirer, EventManagerHooksTuple, EventManagerId, - EventProcessor, EventRestarter, HasEventManagerId, ProgressReporter, SendExiting, - }, - executors::HasObservers, - fuzzer::{EvaluatorObservers, ExecutionProcessor}, - inputs::Input, - observers::TimeObserver, - stages::HasCurrentStageId, - state::{ - HasCurrentTestcase, HasExecutions, HasImported, HasLastReportTime, HasSolutions, - MaybeHasClientPerfMonitor, Stoppable, - }, - Error, HasMetadata, -}; - -/// Default initial capacity of the event buffer - 4KB -const INITIAL_EVENT_BUFFER_SIZE: usize = 1024 * 4; - -/// An `EventManager` that forwards all events to other attached fuzzers on shared maps or via tcp, -/// using low-level message passing, `llmp`. -pub struct LlmpEventManager -where - SHM: ShMem, -{ - /// We only send 1 testcase for every `throttle` second - pub(crate) throttle: Option, - /// We sent last message at `last_sent` - last_sent: Duration, - hooks: EMH, - /// The LLMP client for inter process communication - llmp: LlmpClient, - #[cfg(feature = "llmp_compression")] - compressor: GzipCompressor, - /// The configuration defines this specific fuzzer. - /// A node will not re-use the observer values sent over LLMP - /// from nodes with other configurations. - configuration: EventConfig, - serialization_time: Duration, - deserialization_time: Duration, - serializations_cnt: usize, - should_serialize_cnt: usize, - pub(crate) time_ref: Option>, - event_buffer: Vec, - phantom: PhantomData<(I, S)>, -} - -impl LlmpEventManager<(), (), (), NopShMem, ()> { - /// Creates a builder for [`LlmpEventManager`] - #[must_use] - pub fn builder() -> LlmpEventManagerBuilder<()> { - LlmpEventManagerBuilder::new() - } -} - -/// Builder for `LlmpEventManager` -#[derive(Debug, Copy, Clone)] -pub struct LlmpEventManagerBuilder { - throttle: Option, - hooks: EMH, -} - -impl Default for LlmpEventManagerBuilder<()> { - fn default() -> Self { - Self::new() - } -} - -impl LlmpEventManagerBuilder<()> { - /// Create a new `LlmpEventManagerBuilder` - #[must_use] - pub fn new() -> Self { - Self { - throttle: None, - hooks: (), - } - } - - /// Add hooks to it - pub fn hooks(self, hooks: EMH) -> LlmpEventManagerBuilder { - LlmpEventManagerBuilder { - throttle: self.throttle, - hooks, - } - } -} - -impl LlmpEventManagerBuilder { - /// Change the sampling rate - #[must_use] - pub fn throttle(mut self, throttle: Duration) -> Self { - self.throttle = Some(throttle); - self - } - - /// Create a manager from a raw LLMP client - pub fn build_from_client( - self, - llmp: LlmpClient, - configuration: EventConfig, - time_ref: Option>, - ) -> Result, Error> - where - SHM: ShMem, - { - Ok(LlmpEventManager { - throttle: self.throttle, - last_sent: Duration::from_secs(0), - hooks: self.hooks, - llmp, - #[cfg(feature = "llmp_compression")] - compressor: GzipCompressor::with_threshold(COMPRESS_THRESHOLD), - configuration, - serialization_time: Duration::ZERO, - deserialization_time: Duration::ZERO, - serializations_cnt: 0, - should_serialize_cnt: 0, - time_ref, - event_buffer: Vec::with_capacity(INITIAL_EVENT_BUFFER_SIZE), - phantom: PhantomData, - }) - } - - /// Create an LLMP event manager on a port. - /// It expects a broker to exist on this port. - #[cfg(feature = "std")] - pub fn build_on_port( - self, - shmem_provider: SP, - port: u16, - configuration: EventConfig, - time_ref: Option>, - ) -> Result, Error> - where - SHM: ShMem, - SP: ShMemProvider, - { - let llmp = LlmpClient::create_attach_to_tcp(shmem_provider, port)?; - Self::build_from_client(self, llmp, configuration, time_ref) - } - - /// If a client respawns, it may reuse the existing connection, previously - /// stored by [`LlmpClient::to_env()`]. - #[cfg(feature = "std")] - pub fn build_existing_client_from_env( - self, - shmem_provider: SP, - env_name: &str, - configuration: EventConfig, - time_ref: Option>, - ) -> Result, Error> - where - SHM: ShMem, - SP: ShMemProvider, - { - let llmp = LlmpClient::on_existing_from_env(shmem_provider, env_name)?; - Self::build_from_client(self, llmp, configuration, time_ref) - } - - /// Create an existing client from description - pub fn build_existing_client_from_description( - self, - shmem_provider: SP, - description: &LlmpClientDescription, - configuration: EventConfig, - time_ref: Option>, - ) -> Result, Error> - where - SHM: ShMem, - SP: ShMemProvider, - { - let llmp = LlmpClient::existing_client_from_description(shmem_provider, description)?; - Self::build_from_client(self, llmp, configuration, time_ref) - } -} - -#[cfg(feature = "std")] -impl CanSerializeObserver for LlmpEventManager -where - OT: MatchNameRef + Serialize, - SHM: ShMem, -{ - fn serialize_observers(&mut self, observers: &OT) -> Result>, Error> { - serialize_observers_adaptive::(self, observers, 2, 80) - } -} - -impl AdaptiveSerializer for LlmpEventManager -where - SHM: ShMem, -{ - fn serialization_time(&self) -> Duration { - self.serialization_time - } - fn deserialization_time(&self) -> Duration { - self.deserialization_time - } - fn serializations_cnt(&self) -> usize { - self.serializations_cnt - } - fn should_serialize_cnt(&self) -> usize { - self.should_serialize_cnt - } - - fn serialization_time_mut(&mut self) -> &mut Duration { - &mut self.serialization_time - } - fn deserialization_time_mut(&mut self) -> &mut Duration { - &mut self.deserialization_time - } - fn serializations_cnt_mut(&mut self) -> &mut usize { - &mut self.serializations_cnt - } - fn should_serialize_cnt_mut(&mut self) -> &mut usize { - &mut self.should_serialize_cnt - } - - fn time_ref(&self) -> &Option> { - &self.time_ref - } -} - -impl Debug for LlmpEventManager -where - SHM: ShMem, - SP: Debug, -{ - fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { - let mut debug_struct = f.debug_struct("LlmpEventManager"); - let debug = debug_struct.field("llmp", &self.llmp); - //.field("custom_buf_handlers", &self.custom_buf_handlers) - #[cfg(feature = "llmp_compression")] - let debug = debug.field("compressor", &self.compressor); - debug - .field("configuration", &self.configuration) - .finish_non_exhaustive() - } -} - -impl Drop for LlmpEventManager -where - SHM: ShMem, -{ - /// LLMP clients will have to wait until their pages are mapped by somebody. - fn drop(&mut self) { - self.await_restart_safe(); - } -} - -impl LlmpEventManager -where - SHM: ShMem, -{ - /// Calling this function will tell the llmp broker that this client is exiting - /// This should be called from the restarter not from the actual fuzzer client - /// This function serves the same roll as the `LlmpClient.send_exiting()` - /// However, from the the event restarter process it is forbidden to call `send_exiting()` - /// (You can call it and it compiles but you should never do so) - /// `send_exiting()` is exclusive to the fuzzer client. - #[cfg(feature = "std")] - pub fn detach_from_broker(&self, broker_port: u16) -> Result<(), Error> { - let client_id = self.llmp.sender().id(); - let Ok(mut stream) = TcpStream::connect((IP_LOCALHOST, broker_port)) else { - log::error!("Connection refused."); - return Ok(()); - }; - // The broker tells us hello we don't care we just tell it our client died - let TcpResponse::BrokerConnectHello { - broker_shmem_description: _, - hostname: _, - } = recv_tcp_msg(&mut stream)?.try_into()? - else { - return Err(Error::illegal_state( - "Received unexpected Broker Hello".to_string(), - )); - }; - let msg = TcpRequest::ClientQuit { client_id }; - // Send this mesasge off and we are leaving. - match send_tcp_msg(&mut stream, &msg) { - Ok(()) => (), - Err(e) => log::error!("Failed to send tcp message {:#?}", e), - } - log::debug!("Asking he broker to be disconnected"); - Ok(()) - } -} - -impl LlmpEventManager -where - SHM: ShMem, -{ - /// Describe the client event manager's LLMP parts in a restorable fashion - pub fn describe(&self) -> Result { - self.llmp.describe() - } - - /// Write the config for a client `EventManager` to env vars, a new - /// client can reattach using [`LlmpEventManagerBuilder::build_existing_client_from_env()`]. - #[cfg(feature = "std")] - pub fn to_env(&self, env_name: &str) { - self.llmp.to_env(env_name).unwrap(); - } -} - -impl LlmpEventManager -where - SHM: ShMem, -{ - // Handle arriving events in the client - fn handle_in_client( - &mut self, - fuzzer: &mut Z, - executor: &mut E, - state: &mut S, - client_id: ClientId, - event: Event, - ) -> Result<(), Error> - where - S: HasImported + HasSolutions + HasCurrentTestcase + Stoppable, - EMH: EventManagerHooksTuple, - I: Input, - E: HasObservers, - E::Observers: DeserializeOwned, - Z: ExecutionProcessor + EvaluatorObservers, - { - log::trace!("Got event in client: {} from {client_id:?}", event.name()); - if !self.hooks.pre_exec_all(state, client_id, &event)? { - return Ok(()); - } - let evt_name = event.name_detailed(); - match event { - Event::NewTestcase { - input, - client_config, - exit_kind, - observers_buf, - #[cfg(feature = "std")] - forward_id, - .. - } => { - #[cfg(feature = "std")] - log::debug!("[{}] Received new Testcase {evt_name} from {client_id:?} ({client_config:?}, forward {forward_id:?})", std::process::id()); - - let res = if client_config.match_with(&self.configuration) - && observers_buf.is_some() - { - let start = current_time(); - let observers: E::Observers = - postcard::from_bytes(observers_buf.as_ref().unwrap())?; - { - self.deserialization_time = current_time() - start; - } - fuzzer.evaluate_execution(state, self, input, &observers, &exit_kind, false)? - } else { - fuzzer.evaluate_input_with_observers(state, executor, self, input, false)? - }; - if let Some(item) = res.1 { - *state.imported_mut() += 1; - log::debug!("Added received Testcase {evt_name} as item #{item}"); - } else { - log::debug!("Testcase {evt_name} was discarded"); - } - } - - #[cfg(feature = "share_objectives")] - Event::Objective { input, .. } => { - log::debug!("Received new Objective"); - let mut testcase = Testcase::from(input); - testcase.set_parent_id_optional(*state.corpus().current()); - - if let Ok(mut tc) = state.current_testcase_mut() { - tc.found_objective(); - } - - state.solutions_mut().add(testcase)?; - log::info!("Added received Objective to Corpus"); - } - Event::Stop => { - state.request_stop(); - } - _ => { - return Err(Error::unknown(format!( - "Received illegal message that message should not have arrived: {:?}.", - event.name() - ))); - } - } - - self.hooks.post_exec_all(state, client_id)?; - Ok(()) - } -} - -impl LlmpEventManager -where - SHM: ShMem, - SP: ShMemProvider, -{ - /// Send information that this client is exiting. - /// The other side may free up all allocated memory. - /// We are no longer allowed to send anything afterwards. - pub fn send_exiting(&mut self) -> Result<(), Error> { - self.llmp.sender_mut().send_exiting() - } -} - -impl EventFirer for LlmpEventManager -where - I: Serialize, - SHM: ShMem, - SP: ShMemProvider, -{ - fn fire(&mut self, _state: &mut S, event: Event) -> Result<(), Error> { - #[cfg(feature = "llmp_compression")] - let flags = LLMP_FLAG_INITIALIZED; - - self.event_buffer.resize(self.event_buffer.capacity(), 0); - - // Serialize the event, reallocating event_buffer if needed - let written_len = match postcard::to_slice(&event, &mut self.event_buffer) { - Ok(written) => written.len(), - Err(postcard::Error::SerializeBufferFull) => { - let serialized = postcard::to_allocvec(&event)?; - self.event_buffer = serialized; - self.event_buffer.len() - } - Err(e) => return Err(Error::from(e)), - }; - - #[cfg(feature = "llmp_compression")] - { - match self - .compressor - .maybe_compress(&self.event_buffer[..written_len]) - { - Some(comp_buf) => { - self.llmp.send_buf_with_flags( - LLMP_TAG_EVENT_TO_BOTH, - flags | LLMP_FLAG_COMPRESSED, - &comp_buf, - )?; - } - None => { - self.llmp - .send_buf(LLMP_TAG_EVENT_TO_BOTH, &self.event_buffer[..written_len])?; - } - } - } - - #[cfg(not(feature = "llmp_compression"))] - { - self.llmp - .send_buf(LLMP_TAG_EVENT_TO_BOTH, &self.event_buffer[..written_len])?; - } - - self.last_sent = current_time(); - Ok(()) - } - fn configuration(&self) -> EventConfig { - self.configuration - } - - fn should_send(&self) -> bool { - if let Some(throttle) = self.throttle { - current_time() - self.last_sent > throttle - } else { - true - } - } -} - -impl EventRestarter for LlmpEventManager -where - S: HasCurrentStageId, - SHM: ShMem, -{ - fn on_restart(&mut self, state: &mut S) -> Result<(), Error> { - std_on_restart(self, state) - } -} - -impl SendExiting for LlmpEventManager -where - SHM: ShMem, - SP: ShMemProvider, -{ - fn send_exiting(&mut self) -> Result<(), Error> { - self.llmp.sender_mut().send_exiting() - } -} - -impl AwaitRestartSafe for LlmpEventManager -where - SHM: ShMem, -{ - /// The LLMP client needs to wait until a broker has mapped all pages before shutting down. - /// Otherwise, the OS may already have removed the shared maps. - fn await_restart_safe(&mut self) { - // wait until we can drop the message safely. - self.llmp.await_safe_to_unmap_blocking(); - } -} - -impl EventProcessor for LlmpEventManager -where - E: HasObservers, - E::Observers: DeserializeOwned, - EMH: EventManagerHooksTuple, - I: DeserializeOwned + Input, - S: HasImported + HasSolutions + HasCurrentTestcase + Stoppable, - SHM: ShMem, - SP: ShMemProvider, - Z: ExecutionProcessor + EvaluatorObservers, -{ - fn process(&mut self, fuzzer: &mut Z, state: &mut S, executor: &mut E) -> Result { - // TODO: Get around local event copy by moving handle_in_client - let self_id = self.llmp.sender().id(); - let mut count = 0; - while let Some((client_id, tag, flags, msg)) = self.llmp.recv_buf_with_flags()? { - assert_ne!( - tag, _LLMP_TAG_EVENT_TO_BROKER, - "EVENT_TO_BROKER parcel should not have arrived in the client!" - ); - - if client_id == self_id { - continue; - } - - #[cfg(not(feature = "llmp_compression"))] - let event_bytes = msg; - #[cfg(feature = "llmp_compression")] - let compressed; - #[cfg(feature = "llmp_compression")] - let event_bytes = if flags & LLMP_FLAG_COMPRESSED == LLMP_FLAG_COMPRESSED { - compressed = self.compressor.decompress(msg)?; - &compressed - } else { - msg - }; - - let event: Event = postcard::from_bytes(event_bytes)?; - log::debug!("Received event in normal llmp {}", event.name_detailed()); - - // If the message comes from another machine, do not - // consider other events than new testcase. - if !event.is_new_testcase() && (flags & LLMP_FLAG_FROM_MM == LLMP_FLAG_FROM_MM) { - continue; - } - - self.handle_in_client(fuzzer, executor, state, client_id, event)?; - count += 1; - } - Ok(count) - } - - fn on_shutdown(&mut self) -> Result<(), Error> { - self.send_exiting() - } -} - -impl ProgressReporter for LlmpEventManager -where - I: Serialize, - S: HasExecutions + HasLastReportTime + HasMetadata + MaybeHasClientPerfMonitor, - SHM: ShMem, - SP: ShMemProvider, -{ - fn maybe_report_progress( - &mut self, - state: &mut S, - monitor_timeout: Duration, - ) -> Result<(), Error> { - std_maybe_report_progress(self, state, monitor_timeout) - } - - fn report_progress(&mut self, state: &mut S) -> Result<(), Error> { - std_report_progress(self, state) - } -} - -impl HasEventManagerId for LlmpEventManager -where - SHM: ShMem, -{ - /// Gets the id assigned to this staterestorer. - fn mgr_id(&self) -> EventManagerId { - EventManagerId(self.llmp.sender().id().0 as usize) - } -} diff --git a/libafl/src/events/llmp/mod.rs b/libafl/src/events/llmp/mod.rs index e9f651548c..20655536d6 100644 --- a/libafl/src/events/llmp/mod.rs +++ b/libafl/src/events/llmp/mod.rs @@ -24,10 +24,6 @@ use crate::{ Error, }; -/// The llmp event manager -pub mod mgr; -pub use mgr::*; - /// The llmp restarting manager #[cfg(feature = "std")] pub mod restarting; diff --git a/libafl/src/events/llmp/restarting.rs b/libafl/src/events/llmp/restarting.rs index a63d7e86dd..0ed177b44f 100644 --- a/libafl/src/events/llmp/restarting.rs +++ b/libafl/src/events/llmp/restarting.rs @@ -3,6 +3,8 @@ //! When the target crashes, a watch process (the parent) will //! restart/refork it. +#[cfg(feature = "std")] +use alloc::string::ToString; use alloc::vec::Vec; use core::{ marker::PhantomData, @@ -11,6 +13,8 @@ use core::{ time::Duration, }; use std::net::SocketAddr; +#[cfg(feature = "std")] +use std::net::TcpStream; #[cfg(any(windows, not(feature = "fork")))] use libafl_bolts::os::startable_self; @@ -18,17 +22,35 @@ use libafl_bolts::os::startable_self; use libafl_bolts::os::unix_signals::setup_signal_handler; #[cfg(all(feature = "fork", unix))] use libafl_bolts::os::{fork, ForkResult}; +#[cfg(feature = "llmp_compression")] +use libafl_bolts::{ + compress::GzipCompressor, + llmp::{LLMP_FLAG_COMPRESSED, LLMP_FLAG_INITIALIZED}, +}; use libafl_bolts::{ core_affinity::CoreId, - llmp::{Broker, LlmpBroker, LlmpConnection}, + current_time, + llmp::{ + Broker, LlmpBroker, LlmpClient, LlmpClientDescription, LlmpConnection, LLMP_FLAG_FROM_MM, + }, os::CTRL_C_EXIT, shmem::{ShMem, ShMemProvider, StdShMem, StdShMemProvider}, staterestore::StateRestorer, tuples::{tuple_list, Handle, MatchNameRef}, + ClientId, +}; +#[cfg(feature = "std")] +use libafl_bolts::{ + llmp::{recv_tcp_msg, send_tcp_msg, TcpRequest, TcpResponse}, + IP_LOCALHOST, }; use serde::{de::DeserializeOwned, Serialize}; use typed_builder::TypedBuilder; +#[cfg(feature = "share_objectives")] +use crate::corpus::{Corpus, Testcase}; +#[cfg(feature = "llmp_compression")] +use crate::events::COMPRESS_THRESHOLD; #[cfg(all(unix, not(miri)))] use crate::events::EVENTMGR_SIGHANDLER_STATE; use crate::{ @@ -37,8 +59,8 @@ use crate::{ launcher::ClientDescription, serialize_observers_adaptive, std_maybe_report_progress, std_report_progress, AdaptiveSerializer, AwaitRestartSafe, CanSerializeObserver, Event, EventConfig, EventFirer, EventManagerHooksTuple, EventManagerId, EventProcessor, - EventRestarter, HasEventManagerId, LlmpEventManager, LlmpShouldSaveState, ProgressReporter, - SendExiting, StdLlmpEventHook, + EventRestarter, HasEventManagerId, LlmpShouldSaveState, ProgressReporter, SendExiting, + StdLlmpEventHook, LLMP_TAG_EVENT_TO_BOTH, _LLMP_TAG_EVENT_TO_BROKER, }, executors::HasObservers, fuzzer::{EvaluatorObservers, ExecutionProcessor}, @@ -53,18 +75,35 @@ use crate::{ Error, }; +const INITIAL_EVENT_BUFFER_SIZE: usize = 1024 * 4; /// A manager that can restart on the fly, storing states in-between (in `on_restart`) #[derive(Debug)] -pub struct LlmpRestartingEventManager -where - SHM: ShMem, -{ - /// The embedded LLMP event manager - llmp_mgr: LlmpEventManager, +pub struct LlmpRestartingEventManager { + /// We only send 1 testcase for every `throttle` second + pub(crate) throttle: Option, + /// We sent last message at `last_sent` + last_sent: Duration, + hooks: EMH, + /// The LLMP client for inter process communication + llmp: LlmpClient, + #[cfg(feature = "llmp_compression")] + compressor: GzipCompressor, + /// The configuration defines this specific fuzzer. + /// A node will not re-use the observer values sent over LLMP + /// from nodes with other configurations. + configuration: EventConfig, + serialization_time: Duration, + deserialization_time: Duration, + serializations_cnt: usize, + should_serialize_cnt: usize, + pub(crate) time_ref: Option>, + event_buffer: Vec, /// The staterestorer to serialize the state for the next runner - staterestorer: StateRestorer, + /// If this is Some, this event manager can restart. Else it does not. + staterestorer: Option>, /// Decide if the state restorer must save the serialized state save_state: LlmpShouldSaveState, + phantom: PhantomData<(I, S)>, } impl AdaptiveSerializer for LlmpRestartingEventManager @@ -72,33 +111,33 @@ where SHM: ShMem, { fn serialization_time(&self) -> Duration { - self.llmp_mgr.serialization_time() + self.serialization_time } fn deserialization_time(&self) -> Duration { - self.llmp_mgr.deserialization_time() + self.deserialization_time } fn serializations_cnt(&self) -> usize { - self.llmp_mgr.serializations_cnt() + self.serializations_cnt } fn should_serialize_cnt(&self) -> usize { - self.llmp_mgr.should_serialize_cnt() + self.should_serialize_cnt } fn serialization_time_mut(&mut self) -> &mut Duration { - self.llmp_mgr.serialization_time_mut() + &mut self.serialization_time } fn deserialization_time_mut(&mut self) -> &mut Duration { - self.llmp_mgr.deserialization_time_mut() + &mut self.deserialization_time } fn serializations_cnt_mut(&mut self) -> &mut usize { - self.llmp_mgr.serializations_cnt_mut() + &mut self.serializations_cnt } fn should_serialize_cnt_mut(&mut self) -> &mut usize { - self.llmp_mgr.should_serialize_cnt_mut() + &mut self.should_serialize_cnt } fn time_ref(&self) -> &Option> { - &self.llmp_mgr.time_ref + &self.time_ref } } @@ -129,19 +168,68 @@ where SHM: ShMem, SP: ShMemProvider, { - fn fire(&mut self, state: &mut S, event: Event) -> Result<(), Error> { + fn fire(&mut self, _state: &mut S, event: Event) -> Result<(), Error> { // Check if we are going to crash in the event, in which case we store our current state for the next runner - self.llmp_mgr.fire(state, event)?; - self.intermediate_save()?; + #[cfg(feature = "llmp_compression")] + let flags = LLMP_FLAG_INITIALIZED; + + self.event_buffer.resize(self.event_buffer.capacity(), 0); + + // Serialize the event, reallocating event_buffer if needed + let written_len = match postcard::to_slice(&event, &mut self.event_buffer) { + Ok(written) => written.len(), + Err(postcard::Error::SerializeBufferFull) => { + let serialized = postcard::to_allocvec(&event)?; + self.event_buffer = serialized; + self.event_buffer.len() + } + Err(e) => return Err(Error::from(e)), + }; + + #[cfg(feature = "llmp_compression")] + { + match self + .compressor + .maybe_compress(&self.event_buffer[..written_len]) + { + Some(comp_buf) => { + self.llmp.send_buf_with_flags( + LLMP_TAG_EVENT_TO_BOTH, + flags | LLMP_FLAG_COMPRESSED, + &comp_buf, + )?; + } + None => { + self.llmp + .send_buf(LLMP_TAG_EVENT_TO_BOTH, &self.event_buffer[..written_len])?; + } + } + } + + #[cfg(not(feature = "llmp_compression"))] + { + self.llmp + .send_buf(LLMP_TAG_EVENT_TO_BOTH, &self.event_buffer[..written_len])?; + } + + self.last_sent = current_time(); + + if self.staterestorer.is_some() { + self.intermediate_save()?; + } Ok(()) } fn configuration(&self) -> EventConfig { - as EventFirer>::configuration(&self.llmp_mgr) + self.configuration } fn should_send(&self) -> bool { - as EventFirer>::should_send(&self.llmp_mgr) + if let Some(throttle) = self.throttle { + current_time() - self.last_sent > throttle + } else { + true + } } } @@ -167,18 +255,21 @@ where fn on_restart(&mut self, state: &mut S) -> Result<(), Error> { state.on_restart()?; - // First, reset the page to 0 so the next iteration can read from the beginning of this page - self.staterestorer.reset(); - self.staterestorer.save(&( - if self.save_state.on_restart() { - Some(state) - } else { - None - }, - &self.llmp_mgr.describe()?, - ))?; + if let Some(sr) = &mut self.staterestorer { + // First, reset the page to 0 so the next iteration can read from the beginning of this page + sr.reset(); + sr.save(&( + if self.save_state.on_restart() { + Some(state) + } else { + None + }, + &self.llmp.describe()?, + ))?; + + log::info!("Waiting for broker..."); + } - log::info!("Waiting for broker..."); self.await_restart_safe(); Ok(()) } @@ -190,10 +281,12 @@ where SP: ShMemProvider, { fn send_exiting(&mut self) -> Result<(), Error> { - self.staterestorer.send_exiting(); + if let Some(ref mut sr) = &mut self.staterestorer { + sr.send_exiting(); + } // Also inform the broker that we are about to exit. // This way, the broker can clean up the pages, and eventually exit. - self.llmp_mgr.send_exiting() + self.llmp.sender_mut().send_exiting() } } @@ -205,7 +298,7 @@ where /// Otherwise, the OS may already have removed the shared maps, #[inline] fn await_restart_safe(&mut self) { - self.llmp_mgr.await_restart_safe(); + self.llmp.await_safe_to_unmap_blocking(); } } @@ -219,12 +312,52 @@ where S: HasImported + HasCurrentTestcase + HasSolutions + Stoppable + Serialize, SHM: ShMem, SP: ShMemProvider, - Z: ExecutionProcessor, I, E::Observers, S> - + EvaluatorObservers, I, S>, + Z: ExecutionProcessor + EvaluatorObservers, { fn process(&mut self, fuzzer: &mut Z, state: &mut S, executor: &mut E) -> Result { - let res = self.llmp_mgr.process(fuzzer, state, executor)?; - self.intermediate_save()?; + let res = { + // TODO: Get around local event copy by moving handle_in_client + let self_id = self.llmp.sender().id(); + let mut count = 0; + while let Some((client_id, tag, flags, msg)) = self.llmp.recv_buf_with_flags()? { + assert_ne!( + tag, _LLMP_TAG_EVENT_TO_BROKER, + "EVENT_TO_BROKER parcel should not have arrived in the client!" + ); + + if client_id == self_id { + continue; + } + + #[cfg(not(feature = "llmp_compression"))] + let event_bytes = msg; + #[cfg(feature = "llmp_compression")] + let compressed; + #[cfg(feature = "llmp_compression")] + let event_bytes = if flags & LLMP_FLAG_COMPRESSED == LLMP_FLAG_COMPRESSED { + compressed = self.compressor.decompress(msg)?; + &compressed + } else { + msg + }; + + let event: Event = postcard::from_bytes(event_bytes)?; + log::debug!("Received event in normal llmp {}", event.name_detailed()); + + // If the message comes from another machine, do not + // consider other events than new testcase. + if !event.is_new_testcase() && (flags & LLMP_FLAG_FROM_MM == LLMP_FLAG_FROM_MM) { + continue; + } + + self.handle_in_client(fuzzer, executor, state, client_id, event)?; + count += 1; + } + count + }; + if self.staterestorer.is_some() { + self.intermediate_save()?; + } Ok(res) } @@ -239,7 +372,7 @@ where SP: ShMemProvider, { fn mgr_id(&self) -> EventManagerId { - self.llmp_mgr.mgr_id() + EventManagerId(self.llmp.sender().id().0 as usize) } } @@ -249,55 +382,303 @@ const _ENV_FUZZER_RECEIVER: &str = "_AFL_ENV_FUZZER_RECEIVER"; /// The llmp (2 way) connection from a fuzzer to the broker (broadcasting all other fuzzer messages) const _ENV_FUZZER_BROKER_CLIENT_INITIAL: &str = "_AFL_ENV_FUZZER_BROKER_CLIENT"; +/// Builder for `LlmpRestartingEventManager` +#[derive(Debug)] +pub struct LlmpEventManagerBuilder { + throttle: Option, + save_state: LlmpShouldSaveState, + hooks: EMH, +} + +impl Default for LlmpEventManagerBuilder<()> { + fn default() -> Self { + Self::builder() + } +} + +impl LlmpEventManagerBuilder<()> { + /// Create a new `LlmpEventManagerBuilder` + #[must_use] + pub fn builder() -> Self { + Self { + throttle: None, + save_state: LlmpShouldSaveState::OnRestart, + hooks: (), + } + } +} + +impl LlmpEventManagerBuilder<()> { + /// Add hooks to it + pub fn hooks(self, hooks: EMH) -> LlmpEventManagerBuilder { + LlmpEventManagerBuilder { + throttle: self.throttle, + save_state: self.save_state, + hooks, + } + } +} + +impl LlmpEventManagerBuilder { + /// Change the sampling rate + #[must_use] + pub fn throttle(mut self, throttle: Duration) -> Self { + self.throttle = Some(throttle); + self + } + + /// Change save state policy + #[must_use] + pub fn save_state(mut self, save_state: LlmpShouldSaveState) -> Self { + self.save_state = save_state; + self + } + + /// Create a manager from a raw LLMP client + /// If staterestorer is some then this restarting manager restarts + /// Otherwise this restarting manager does not restart + pub fn build_from_client( + self, + llmp: LlmpClient, + configuration: EventConfig, + time_ref: Option>, + staterestorer: Option>, + ) -> Result, Error> { + Ok(LlmpRestartingEventManager { + throttle: self.throttle, + last_sent: Duration::from_secs(0), + hooks: self.hooks, + llmp, + #[cfg(feature = "llmp_compression")] + compressor: GzipCompressor::with_threshold(COMPRESS_THRESHOLD), + configuration, + serialization_time: Duration::ZERO, + deserialization_time: Duration::ZERO, + serializations_cnt: 0, + should_serialize_cnt: 0, + time_ref, + event_buffer: Vec::with_capacity(INITIAL_EVENT_BUFFER_SIZE), + staterestorer, + save_state: LlmpShouldSaveState::OnRestart, + phantom: PhantomData, + }) + } + + /// Create an LLMP event manager on a port. + /// It expects a broker to exist on this port. + #[cfg(feature = "std")] + pub fn build_on_port( + self, + shmem_provider: SP, + port: u16, + configuration: EventConfig, + time_ref: Option>, + staterestorer: Option>, + ) -> Result, Error> + where + SHM: ShMem, + SP: ShMemProvider, + { + let llmp = LlmpClient::create_attach_to_tcp(shmem_provider, port)?; + Self::build_from_client(self, llmp, configuration, time_ref, staterestorer) + } + + /// If a client respawns, it may reuse the existing connection, previously + /// stored by [`LlmpClient::to_env()`]. + #[cfg(feature = "std")] + pub fn build_existing_client_from_env( + self, + shmem_provider: SP, + env_name: &str, + configuration: EventConfig, + time_ref: Option>, + staterestorer: Option>, + ) -> Result, Error> + where + SHM: ShMem, + SP: ShMemProvider, + { + let llmp = LlmpClient::on_existing_from_env(shmem_provider, env_name)?; + Self::build_from_client(self, llmp, configuration, time_ref, staterestorer) + } + + /// Create an existing client from description + pub fn build_existing_client_from_description( + self, + shmem_provider: SP, + description: &LlmpClientDescription, + configuration: EventConfig, + time_ref: Option>, + staterestorer: Option>, + ) -> Result, Error> + where + SHM: ShMem, + SP: ShMemProvider, + { + let llmp = LlmpClient::existing_client_from_description(shmem_provider, description)?; + Self::build_from_client(self, llmp, configuration, time_ref, staterestorer) + } +} + impl LlmpRestartingEventManager where S: Serialize, SHM: ShMem, SP: ShMemProvider, { - /// Create a new runner, the executed child doing the actual fuzzing. - pub fn new( - llmp_mgr: LlmpEventManager, - staterestorer: StateRestorer, - ) -> Self { - Self { - llmp_mgr, - staterestorer, - save_state: LlmpShouldSaveState::OnRestart, - } - } - - /// Create a new runner specifying if it must save the serialized state on restart. - pub fn with_save_state( - llmp_mgr: LlmpEventManager, - staterestorer: StateRestorer, - save_state: LlmpShouldSaveState, - ) -> Self { - Self { - llmp_mgr, - staterestorer, - save_state, - } + /// Write the config for a client `EventManager` to env vars, a new + /// client can reattach using [`LlmpEventManagerBuilder::build_existing_client_from_env()`]. + #[cfg(feature = "std")] + pub fn to_env(&self, env_name: &str) { + self.llmp.to_env(env_name).unwrap(); } /// Get the staterestorer - pub fn staterestorer(&self) -> &StateRestorer { + pub fn staterestorer(&self) -> &Option> { &self.staterestorer } /// Get the staterestorer (mutable) - pub fn staterestorer_mut(&mut self) -> &mut StateRestorer { + pub fn staterestorer_mut(&mut self) -> &mut Option> { &mut self.staterestorer } /// Save LLMP state and empty state in staterestorer pub fn intermediate_save(&mut self) -> Result<(), Error> { // First, reset the page to 0 so the next iteration can read read from the beginning of this page - if self.save_state.oom_safe() { - self.staterestorer.reset(); - self.staterestorer - .save(&(None::, &self.llmp_mgr.describe()?))?; + if let Some(sr) = &mut self.staterestorer { + if self.save_state.oom_safe() { + sr.reset(); + sr.save(&(None::, &self.llmp.describe()?))?; + } } + + Ok(()) + } + + /// Reset the state in state restorer + pub fn staterestorer_reset(&mut self) -> Result<(), Error> { + if let Some(sr) = &mut self.staterestorer { + sr.reset(); + } + + Ok(()) + } + + // Handle arriving events in the client + fn handle_in_client( + &mut self, + fuzzer: &mut Z, + executor: &mut E, + state: &mut S, + client_id: ClientId, + event: Event, + ) -> Result<(), Error> + where + S: HasImported + HasSolutions + HasCurrentTestcase + Stoppable, + EMH: EventManagerHooksTuple, + I: Input, + E: HasObservers, + E::Observers: DeserializeOwned, + Z: ExecutionProcessor + EvaluatorObservers, + { + log::trace!("Got event in client: {} from {client_id:?}", event.name()); + if !self.hooks.pre_exec_all(state, client_id, &event)? { + return Ok(()); + } + let evt_name = event.name_detailed(); + match event { + Event::NewTestcase { + input, + client_config, + exit_kind, + observers_buf, + #[cfg(feature = "std")] + forward_id, + .. + } => { + #[cfg(feature = "std")] + log::debug!("[{}] Received new Testcase {evt_name} from {client_id:?} ({client_config:?}, forward {forward_id:?})", std::process::id()); + + let res = if client_config.match_with(&self.configuration) + && observers_buf.is_some() + { + let start = current_time(); + let observers: E::Observers = + postcard::from_bytes(observers_buf.as_ref().unwrap())?; + { + self.deserialization_time = current_time() - start; + } + fuzzer.evaluate_execution(state, self, input, &observers, &exit_kind, false)? + } else { + fuzzer.evaluate_input_with_observers(state, executor, self, input, false)? + }; + if let Some(item) = res.1 { + *state.imported_mut() += 1; + log::debug!("Added received Testcase {evt_name} as item #{item}"); + } else { + log::debug!("Testcase {evt_name} was discarded"); + } + } + + #[cfg(feature = "share_objectives")] + Event::Objective { input, .. } => { + log::debug!("Received new Objective"); + let mut testcase = Testcase::from(input); + testcase.set_parent_id_optional(*state.corpus().current()); + + if let Ok(mut tc) = state.current_testcase_mut() { + tc.found_objective(); + } + + state.solutions_mut().add(testcase)?; + log::info!("Added received Objective to Corpus"); + } + Event::Stop => { + state.request_stop(); + } + _ => { + return Err(Error::unknown(format!( + "Received illegal message that message should not have arrived: {:?}.", + event.name() + ))); + } + } + + self.hooks.post_exec_all(state, client_id)?; + Ok(()) + } + + /// Calling this function will tell the llmp broker that this client is exiting + /// This should be called from the restarter not from the actual fuzzer client + /// This function serves the same roll as the `LlmpClient.send_exiting()` + /// However, from the the event restarter process it is forbidden to call `send_exiting()` + /// (You can call it and it compiles but you should never do so) + /// `send_exiting()` is exclusive to the fuzzer client. + #[cfg(feature = "std")] + pub fn detach_from_broker(&self, broker_port: u16) -> Result<(), Error> { + let client_id = self.llmp.sender().id(); + let Ok(mut stream) = TcpStream::connect((IP_LOCALHOST, broker_port)) else { + log::error!("Connection refused."); + return Ok(()); + }; + // The broker tells us hello we don't care we just tell it our client died + let TcpResponse::BrokerConnectHello { + broker_shmem_description: _, + hostname: _, + } = recv_tcp_msg(&mut stream)?.try_into()? + else { + return Err(Error::illegal_state( + "Received unexpected Broker Hello".to_string(), + )); + }; + let msg = TcpRequest::ClientQuit { client_id }; + // Send this mesasge off and we are leaving. + match send_tcp_msg(&mut stream, &msg) { + Ok(()) => (), + Err(e) => log::error!("Failed to send tcp message {:#?}", e), + } + log::debug!("Asking he broker to be disconnected"); Ok(()) } } @@ -487,13 +868,14 @@ where return Err(Error::shutting_down()); } LlmpConnection::IsClient { client } => { - let mgr: LlmpEventManager = - LlmpEventManager::builder() + let mgr: LlmpRestartingEventManager = + LlmpEventManagerBuilder::builder() .hooks(self.hooks) .build_from_client( client, self.configuration, self.time_ref.clone(), + None, )?; (mgr, None) } @@ -513,13 +895,14 @@ where } ManagerKind::Client { client_description } => { // We are a client - let mgr = LlmpEventManager::builder() + let mgr = LlmpEventManagerBuilder::builder() .hooks(self.hooks) .build_on_port( self.shmem_provider.clone(), self.broker_port, self.configuration, self.time_ref.clone(), + None, )?; (mgr, Some(client_description.core_id())) @@ -643,48 +1026,41 @@ where // If we're restarting, deserialize the old state. let (state, mut mgr) = if let Some((state_opt, mgr_description)) = staterestorer.restore()? { - let llmp_mgr = LlmpEventManager::builder() - .hooks(self.hooks) - .build_existing_client_from_description( - new_shmem_provider, - &mgr_description, - self.configuration, - self.time_ref.clone(), - )?; ( state_opt, - LlmpRestartingEventManager::with_save_state( - llmp_mgr, - staterestorer, - self.serialize_state, - ), + LlmpEventManagerBuilder::builder() + .hooks(self.hooks) + .save_state(self.serialize_state) + .build_existing_client_from_description( + new_shmem_provider, + &mgr_description, + self.configuration, + self.time_ref.clone(), + Some(staterestorer), + )?, ) } else { log::info!("First run. Let's set it all up"); // Mgr to send and receive msgs from/to all other fuzzer instances - let mgr = LlmpEventManager::builder() - .hooks(self.hooks) - .build_existing_client_from_env( - new_shmem_provider, - _ENV_FUZZER_BROKER_CLIENT_INITIAL, - self.configuration, - self.time_ref.clone(), - )?; - ( None, - LlmpRestartingEventManager::with_save_state( - mgr, - staterestorer, - self.serialize_state, - ), + LlmpEventManagerBuilder::builder() + .hooks(self.hooks) + .save_state(self.serialize_state) + .build_existing_client_from_env( + new_shmem_provider, + _ENV_FUZZER_BROKER_CLIENT_INITIAL, + self.configuration, + self.time_ref.clone(), + Some(staterestorer), + )?, ) }; // We reset the staterestorer, the next staterestorer and receiver (after crash) will reuse the page from the initial message. if self.serialize_state.oom_safe() { mgr.intermediate_save()?; } else { - mgr.staterestorer.reset(); + mgr.staterestorer_reset()?; } /* TODO: Not sure if this is needed @@ -713,7 +1089,7 @@ mod tests { use crate::{ corpus::{Corpus, InMemoryCorpus, Testcase}, - events::llmp::{restarting::_ENV_FUZZER_SENDER, LlmpEventManager}, + events::llmp::restarting::{LlmpEventManagerBuilder, _ENV_FUZZER_SENDER}, executors::{ExitKind, InProcessExecutor}, feedbacks::ConstFeedback, fuzzer::Fuzzer, @@ -768,8 +1144,8 @@ mod tests { llmp_client.mark_safe_to_unmap(); } - let mut llmp_mgr = LlmpEventManager::builder() - .build_from_client(llmp_client, "fuzzer".into(), Some(time_ref.clone())) + let mut llmp_mgr = LlmpEventManagerBuilder::builder() + .build_from_client(llmp_client, "fuzzer".into(), Some(time_ref.clone()), None) .unwrap(); let scheduler = RandScheduler::new(); @@ -799,7 +1175,7 @@ mod tests { staterestorer.reset(); staterestorer - .save(&(&mut state, &llmp_mgr.describe().unwrap())) + .save(&(&mut state, &llmp_mgr.llmp.describe().unwrap())) .unwrap(); assert!(staterestorer.has_content()); @@ -812,12 +1188,13 @@ mod tests { assert!(sc_cpy.has_content()); let (mut state_clone, mgr_description) = staterestorer.restore().unwrap().unwrap(); - let mut llmp_clone = LlmpEventManager::builder() + let mut llmp_clone = LlmpEventManagerBuilder::builder() .build_existing_client_from_description( shmem_provider, &mgr_description, "fuzzer".into(), Some(time_ref), + None, ) .unwrap(); diff --git a/libafl/src/events/mod.rs b/libafl/src/events/mod.rs index e2db05dc6d..df9270affe 100644 --- a/libafl/src/events/mod.rs +++ b/libafl/src/events/mod.rs @@ -96,7 +96,7 @@ impl SignalHandler for ShutdownSignalData { } /// A per-fuzzer unique `ID`, usually starting with `0` and increasing -/// by `1` in multiprocessed `EventManagers`, such as [`LlmpEventManager`]. +/// by `1` in multiprocessed `EventManagers`, such as [`LlmpRestartingEventManager`]. #[derive(Clone, Copy, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Hash)] #[repr(transparent)] pub struct EventManagerId( @@ -377,7 +377,7 @@ impl Event { pub trait EventFirer { /// Send off an [`Event`] to the broker /// - /// For multi-processed managers, such as [`LlmpEventManager`], + /// For multi-processed managers, such as [`LlmpRestartingEventManager`], /// this serializes the [`Event`] and commits it to the [`llmp`] page. /// In this case, if you `fire` faster than the broker can consume /// (for example for each [`Input`], on multiple cores) diff --git a/libafl/src/feedbacks/transferred.rs b/libafl/src/feedbacks/transferred.rs index 84f848922f..83ae83da24 100644 --- a/libafl/src/feedbacks/transferred.rs +++ b/libafl/src/feedbacks/transferred.rs @@ -20,7 +20,7 @@ pub const TRANSFERRED_FEEDBACK_NAME: Cow<'static, str> = /// Metadata which denotes whether we are currently transferring an input. /// -/// Implementors of multi-node communication systems (like [`crate::events::LlmpEventManager`]) should wrap any +/// Implementors of multi-node communication systems (like [`crate::events::LlmpRestartingEventManager`]) should wrap any /// [`crate::EvaluatorObservers::evaluate_input_with_observers`] or /// [`crate::ExecutionProcessor::process_execution`] calls with setting this metadata to true/false /// before and after.