diff --git a/fuzzers/libfuzzer_libpng_centralized/src/lib.rs b/fuzzers/libfuzzer_libpng_centralized/src/lib.rs index 169f2e12b0..44393a0fa6 100644 --- a/fuzzers/libfuzzer_libpng_centralized/src/lib.rs +++ b/fuzzers/libfuzzer_libpng_centralized/src/lib.rs @@ -135,9 +135,9 @@ pub extern "C" fn libafl_main() { let monitor = MultiMonitor::new(|s| println!("{s}")); - let mut run_client = |state: Option<_>, - mut mgr: CentralizedEventManager<_, _>, - _core_id: CoreId| { + let mut secondary_run_client = |state: Option<_>, + mut mgr: CentralizedEventManager<_, _>, + _core_id: CoreId| { // Create an observation channel using the coverage map let edges_observer = HitcountsMapObserver::new(unsafe { std_edges_map_observer("edges") }).track_indices(); @@ -252,13 +252,13 @@ pub extern "C" fn libafl_main() { Ok(()) }; - let mut main_run_client = run_client.clone(); // clone it just for borrow checker + let mut main_run_client = secondary_run_client.clone(); // clone it just for borrow checker match CentralizedLauncher::builder() .shmem_provider(shmem_provider) .configuration(EventConfig::from_name("default")) .monitor(monitor) - .run_client(&mut run_client) + .secondary_run_client(&mut secondary_run_client) .main_run_client(&mut main_run_client) .cores(&cores) .broker_port(broker_port) diff --git a/libafl/src/events/centralized.rs b/libafl/src/events/centralized.rs index 5055e00011..255e562278 100644 --- a/libafl/src/events/centralized.rs +++ b/libafl/src/events/centralized.rs @@ -214,7 +214,7 @@ where } } -/// A wrapper manager to implement a main-secondary architecture witgh another broker +/// A wrapper manager to implement a main-secondary architecture with another broker #[derive(Debug)] pub struct CentralizedEventManager where @@ -518,7 +518,8 @@ where if !self.is_main { // secondary node let mut is_tc = false; - let is_nt_or_heartbeat = match &mut event { + // Forward to main only if new tc or heartbeat + let should_be_forwarded = match &mut event { Event::NewTestcase { input: _, client_config: _, @@ -541,7 +542,7 @@ where _ => false, }; - if is_nt_or_heartbeat { + if should_be_forwarded { self.forward_to_main(&event)?; if is_tc { // early return here because we only send it to centralized not main broker. @@ -549,7 +550,8 @@ where } } } - // now inner llmp manager will process it + + // now inner llmp manager will process it if it's not a new testcase from a secondary node. self.inner.fire(state, event) } diff --git a/libafl/src/events/launcher.rs b/libafl/src/events/launcher.rs index 7833e9d199..8ca073316f 100644 --- a/libafl/src/events/launcher.rs +++ b/libafl/src/events/launcher.rs @@ -48,10 +48,13 @@ use libafl_bolts::{ use typed_builder::TypedBuilder; use super::hooks::EventManagerHooksTuple; -#[cfg(all(unix, feature = "std", feature = "fork"))] -use crate::events::centralized::{CentralizedEventManager, CentralizedLlmpEventBroker}; #[cfg(feature = "adaptive_serialization")] use crate::observers::TimeObserver; +#[cfg(all(unix, feature = "std", feature = "fork"))] +use crate::{ + events::centralized::{CentralizedEventManager, CentralizedLlmpEventBroker}, + state::UsesState, +}; #[cfg(feature = "std")] use crate::{ events::{ @@ -464,7 +467,7 @@ where #[cfg(all(unix, feature = "std", feature = "fork"))] #[derive(TypedBuilder)] #[allow(clippy::type_complexity, missing_debug_implementations)] -pub struct CentralizedLauncher<'a, CF, MF, MT, S, SP> { +pub struct CentralizedLauncher<'a, CF, IM, MF, MT, S, SP> { /// The `ShmemProvider` to use shmem_provider: SP, /// The monitor instance to use @@ -474,10 +477,10 @@ pub struct CentralizedLauncher<'a, CF, MF, MT, S, SP> { /// Consider this testcase as interesting always if true #[builder(default = false)] always_interesting: bool, - /// The 'main' function to run for each client forked. This probably shouldn't return + /// The 'main' function to run for each secondary client forked. This probably shouldn't return #[builder(default, setter(strip_option))] - run_client: Option, - /// The 'main' function to run for the main evaluator noed + secondary_run_client: Option, + /// The 'main' function to run for the main evaluator node. #[builder(default, setter(strip_option))] main_run_client: Option, /// The broker port to use (or to attach to, in case [`Self::spawn_broker`] is `false`) @@ -524,11 +527,11 @@ pub struct CentralizedLauncher<'a, CF, MF, MT, S, SP> { #[builder(default = LlmpShouldSaveState::OnRestart)] serialize_state: LlmpShouldSaveState, #[builder(setter(skip), default = PhantomData)] - phantom_data: PhantomData<(&'a S, &'a SP)>, + phantom_data: PhantomData<(IM, &'a S, &'a SP)>, } #[cfg(all(unix, feature = "std", feature = "fork"))] -impl Debug for CentralizedLauncher<'_, CF, MF, MT, S, SP> { +impl Debug for CentralizedLauncher<'_, CF, IM, MF, MT, S, SP> { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { f.debug_struct("Launcher") .field("configuration", &self.configuration) @@ -542,34 +545,89 @@ impl Debug for CentralizedLauncher<'_, CF, MF, MT, S, SP> { } } +/// The standard inner manager of centralized +pub type StdCentralizedInnerMgr = LlmpRestartingEventManager<(), S, SP>; + #[cfg(all(unix, feature = "std", feature = "fork"))] -impl<'a, CF, MF, MT, S, SP> CentralizedLauncher<'a, CF, MF, MT, S, SP> +impl<'a, CF, MF, MT, S, SP> + CentralizedLauncher<'a, CF, StdCentralizedInnerMgr, MF, MT, S, SP> where CF: FnOnce( Option, - CentralizedEventManager, SP>, + CentralizedEventManager, SP>, CoreId, ) -> Result<(), Error>, MF: FnOnce( Option, - CentralizedEventManager, SP>, // No hooks for centralized EM + CentralizedEventManager, SP>, CoreId, ) -> Result<(), Error>, MT: Monitor + Clone, S: State + HasExecutions, SP: ShMemProvider + 'static, { + /// Launch a standard Centralized-based fuzzer + pub fn launch(&mut self) -> Result<(), Error> { + let restarting_mgr_builder = |centralized_launcher: &Self, core_to_bind: CoreId| { + // Fuzzer client. keeps retrying the connection to broker till the broker starts + let builder = RestartingMgr::<(), MT, S, SP>::builder() + .always_interesting(centralized_launcher.always_interesting) + .shmem_provider(centralized_launcher.shmem_provider.clone()) + .broker_port(centralized_launcher.broker_port) + .kind(ManagerKind::Client { + cpu_core: Some(core_to_bind), + }) + .configuration(centralized_launcher.configuration) + .serialize_state(centralized_launcher.serialize_state) + .hooks(tuple_list!()); + + #[cfg(feature = "adaptive_serialization")] + let builder = builder.time_ref(centralized_launcher.time_obs.handle()); + + builder.build().launch() + }; + + self.launch_generic(restarting_mgr_builder, restarting_mgr_builder) + } +} + +#[cfg(all(unix, feature = "std", feature = "fork"))] +impl<'a, CF, IM, MF, MT, S, SP> CentralizedLauncher<'a, CF, IM, MF, MT, S, SP> +where + CF: FnOnce(Option, CentralizedEventManager, CoreId) -> Result<(), Error>, + IM: UsesState, + MF: FnOnce( + Option, + CentralizedEventManager, // No hooks for centralized EM + CoreId, + ) -> Result<(), Error>, + MT: Monitor + Clone, + S: State + HasExecutions, + SP: ShMemProvider + 'static, +{ + /// Launch a Centralized-based fuzzer. + /// - `main_inner_mgr_builder` will be called to build the inner manager of the main node. + /// - `secondary_inner_mgr_builder` will be called to build the inner manager of the secondary nodes. #[allow(clippy::similar_names)] #[allow(clippy::too_many_lines)] - /// launch the broker and the client and fuzz - pub fn launch(&mut self) -> Result<(), Error> { + pub fn launch_generic( + &mut self, + main_inner_mgr_builder: IMF, + secondary_inner_mgr_builder: IMF, + ) -> Result<(), Error> + where + IMF: FnOnce(&Self, CoreId) -> Result<(Option, IM), Error>, + { + let mut main_inner_mgr_builder = Some(main_inner_mgr_builder); + let mut secondary_inner_mgr_builder = Some(secondary_inner_mgr_builder); + if self.cores.ids.is_empty() { return Err(Error::illegal_argument( "No cores to spawn on given, cannot launch anything.", )); } - if self.run_client.is_none() { + if self.secondary_run_client.is_none() { return Err(Error::illegal_argument( "No client callback provided".to_string(), )); @@ -646,46 +704,53 @@ where } } - // Fuzzer client. keeps retrying the connection to broker till the broker starts - let builder = RestartingMgr::<(), MT, S, SP>::builder() - .always_interesting(self.always_interesting) - .shmem_provider(self.shmem_provider.clone()) - .broker_port(self.broker_port) - .kind(ManagerKind::Client { - cpu_core: Some(*bind_to), - }) - .configuration(self.configuration) - .serialize_state(self.serialize_state) - .hooks(tuple_list!()); - #[cfg(feature = "adaptive_serialization")] - let builder = builder.time_ref(self.time_obs.handle()); - let (state, mgr) = builder.build().launch()?; - - let mut centralized_builder = CentralizedEventManager::builder(); - if index == 1 { + // Main client + let (state, mgr) = + main_inner_mgr_builder.take().unwrap()(self, *bind_to)?; + + let mut centralized_builder = CentralizedEventManager::builder(); centralized_builder = centralized_builder.is_main(true); - } - #[cfg(not(feature = "adaptive_serialization"))] - let c_mgr = centralized_builder.build_on_port( - mgr, - self.shmem_provider.clone(), - self.centralized_broker_port, - )?; - #[cfg(feature = "adaptive_serialization")] - let c_mgr = centralized_builder.build_on_port( - mgr, - self.shmem_provider.clone(), - self.centralized_broker_port, - self.time_obs, - )?; + #[cfg(not(feature = "adaptive_serialization"))] + let c_mgr = centralized_builder.build_on_port( + mgr, + self.shmem_provider.clone(), + self.centralized_broker_port, + )?; + #[cfg(feature = "adaptive_serialization")] + let c_mgr = centralized_builder.build_on_port( + mgr, + self.shmem_provider.clone(), + self.centralized_broker_port, + self.time_obs, + )?; - if index == 1 { - return (self.main_run_client.take().unwrap())(state, c_mgr, *bind_to); + self.main_run_client.take().unwrap()(state, c_mgr, *bind_to) + } else { + // Secondary clients + let (state, mgr) = + secondary_inner_mgr_builder.take().unwrap()(self, *bind_to)?; + + let centralized_builder = CentralizedEventManager::builder(); + + #[cfg(not(feature = "adaptive_serialization"))] + let c_mgr = centralized_builder.build_on_port( + mgr, + self.shmem_provider.clone(), + self.centralized_broker_port, + )?; + #[cfg(feature = "adaptive_serialization")] + let c_mgr = centralized_builder.build_on_port( + mgr, + self.shmem_provider.clone(), + self.centralized_broker_port, + self.time_obs, + )?; + + self.secondary_run_client.take().unwrap()(state, c_mgr, *bind_to) } - return (self.run_client.take().unwrap())(state, c_mgr, *bind_to); - } + }?, }; } }