diff --git a/fuzzers/frida_executable_libpng/src/fuzzer.rs b/fuzzers/frida_executable_libpng/src/fuzzer.rs index 21a40c08da..f1d6ded886 100644 --- a/fuzzers/frida_executable_libpng/src/fuzzer.rs +++ b/fuzzers/frida_executable_libpng/src/fuzzer.rs @@ -93,13 +93,13 @@ unsafe fn fuzz( let shmem_provider = StdShMemProvider::new()?; - let mut run_client = |state: Option<_>, mgr: LlmpRestartingEventManager<_, _>, core_id| { + let mut run_client = |state: Option<_>, mgr: LlmpRestartingEventManager<_, _, _>, core_id| { // The restarting state will spawn the same process again as child, then restarted it each time it crashes. // println!("{:?}", mgr.mgr_id()); if options.asan && options.asan_cores.contains(core_id) { - (|state: Option<_>, mut mgr: LlmpRestartingEventManager<_, _>, _core_id| { + (|state: Option<_>, mut mgr: LlmpRestartingEventManager<_, _, _>, _core_id| { let gum = Gum::obtain(); let coverage = CoverageRuntime::new(); @@ -221,7 +221,7 @@ unsafe fn fuzz( Ok(()) })(state, mgr, core_id) } else if options.cmplog && options.cmplog_cores.contains(core_id) { - (|state: Option<_>, mut mgr: LlmpRestartingEventManager<_, _>, _core_id| { + (|state: Option<_>, mut mgr: LlmpRestartingEventManager<_, _, _>, _core_id| { let gum = Gum::obtain(); let coverage = CoverageRuntime::new(); @@ -352,7 +352,7 @@ unsafe fn fuzz( Ok(()) })(state, mgr, core_id) } else { - (|state: Option<_>, mut mgr: LlmpRestartingEventManager<_, _>, _core_id| { + (|state: Option<_>, mut mgr: LlmpRestartingEventManager<_, _, _>, _core_id| { let gum = Gum::obtain(); let coverage = CoverageRuntime::new(); diff --git a/fuzzers/frida_gdiplus/src/fuzzer.rs b/fuzzers/frida_gdiplus/src/fuzzer.rs index 64b4514e9f..6cc4c53828 100644 --- a/fuzzers/frida_gdiplus/src/fuzzer.rs +++ b/fuzzers/frida_gdiplus/src/fuzzer.rs @@ -76,7 +76,7 @@ unsafe fn fuzz(options: &FuzzerOptions) -> Result<(), Error> { let shmem_provider = StdShMemProvider::new()?; - let mut run_client = |state: Option<_>, mgr: LlmpRestartingEventManager<_, _>, core_id| { + let mut run_client = |state: Option<_>, mgr: LlmpRestartingEventManager<_, _, _>, core_id| { // The restarting state will spawn the same process again as child, then restarted it each time it crashes. // println!("{:?}", mgr.mgr_id()); @@ -94,7 +94,7 @@ unsafe fn fuzz(options: &FuzzerOptions) -> Result<(), Error> { }; if options.asan && options.asan_cores.contains(core_id) { - (|state: Option<_>, mut mgr: LlmpRestartingEventManager<_, _>, _core_id| { + (|state: Option<_>, mut mgr: LlmpRestartingEventManager<_, _, _>, _core_id| { let gum = Gum::obtain(); let coverage = CoverageRuntime::new(); @@ -215,7 +215,7 @@ unsafe fn fuzz(options: &FuzzerOptions) -> Result<(), Error> { Ok(()) })(state, mgr, core_id) } else if options.cmplog && options.cmplog_cores.contains(core_id) { - (|state: Option<_>, mut mgr: LlmpRestartingEventManager<_, _>, _core_id| { + (|state: Option<_>, mut mgr: LlmpRestartingEventManager<_, _, _>, _core_id| { let gum = Gum::obtain(); let coverage = CoverageRuntime::new(); @@ -347,7 +347,7 @@ unsafe fn fuzz(options: &FuzzerOptions) -> Result<(), Error> { Ok(()) })(state, mgr, core_id) } else { - (|state: Option<_>, mut mgr: LlmpRestartingEventManager<_, _>, _core_id| { + (|state: Option<_>, mut mgr: LlmpRestartingEventManager<_, _, _>, _core_id| { let gum = Gum::obtain(); let coverage = CoverageRuntime::new(); diff --git a/fuzzers/frida_libpng/src/fuzzer.rs b/fuzzers/frida_libpng/src/fuzzer.rs index ac910c0b68..80adabe2b8 100644 --- a/fuzzers/frida_libpng/src/fuzzer.rs +++ b/fuzzers/frida_libpng/src/fuzzer.rs @@ -71,7 +71,7 @@ unsafe fn fuzz(options: &FuzzerOptions) -> Result<(), Error> { let shmem_provider = StdShMemProvider::new()?; - let mut run_client = |state: Option<_>, mgr: LlmpRestartingEventManager<_, _>, core_id| { + let mut run_client = |state: Option<_>, mgr: LlmpRestartingEventManager<_, _, _>, core_id| { // The restarting state will spawn the same process again as child, then restarted it each time it crashes. // println!("{:?}", mgr.mgr_id()); @@ -89,7 +89,7 @@ unsafe fn fuzz(options: &FuzzerOptions) -> Result<(), Error> { }; if options.asan && options.asan_cores.contains(core_id) { - (|state: Option<_>, mut mgr: LlmpRestartingEventManager<_, _>, _core_id| { + (|state: Option<_>, mut mgr: LlmpRestartingEventManager<_, _, _>, _core_id| { let gum = Gum::obtain(); let coverage = CoverageRuntime::new(); @@ -211,7 +211,7 @@ unsafe fn fuzz(options: &FuzzerOptions) -> Result<(), Error> { Ok(()) })(state, mgr, core_id) } else if options.cmplog && options.cmplog_cores.contains(core_id) { - (|state: Option<_>, mut mgr: LlmpRestartingEventManager<_, _>, _core_id| { + (|state: Option<_>, mut mgr: LlmpRestartingEventManager<_, _, _>, _core_id| { let gum = Gum::obtain(); let coverage = CoverageRuntime::new(); @@ -343,7 +343,7 @@ unsafe fn fuzz(options: &FuzzerOptions) -> Result<(), Error> { Ok(()) })(state, mgr, core_id) } else { - (|state: Option<_>, mut mgr: LlmpRestartingEventManager<_, _>, _core_id| { + (|state: Option<_>, mut mgr: LlmpRestartingEventManager<_, _, _>, _core_id| { let gum = Gum::obtain(); let coverage = CoverageRuntime::new(); diff --git a/fuzzers/libfuzzer_libpng_norestart/src/lib.rs b/fuzzers/libfuzzer_libpng_norestart/src/lib.rs index bf868ec2fc..254b3595b6 100644 --- a/fuzzers/libfuzzer_libpng_norestart/src/lib.rs +++ b/fuzzers/libfuzzer_libpng_norestart/src/lib.rs @@ -151,7 +151,7 @@ pub extern "C" fn libafl_main() { ); let mut run_client = |state: Option<_>, - mut restarting_mgr: LlmpRestartingEventManager<_, _>, + mut restarting_mgr: LlmpRestartingEventManager<_, _, _>, core_id| { // Create an observation channel using the coverage map let edges_observer = HitcountsMapObserver::new(unsafe { std_edges_map_observer("edges") }); diff --git a/fuzzers/qemu_coverage/src/fuzzer.rs b/fuzzers/qemu_coverage/src/fuzzer.rs index 78a123fe65..12e473f951 100644 --- a/fuzzers/qemu_coverage/src/fuzzer.rs +++ b/fuzzers/qemu_coverage/src/fuzzer.rs @@ -180,101 +180,102 @@ pub fn fuzz() { ExitKind::Ok }; - let mut run_client = |state: Option<_>, mut mgr: LlmpRestartingEventManager<_, _>, core_id| { - let core_idx = options - .cores - .position(core_id) - .expect("Failed to get core index"); - let files = corpus_files - .iter() - .skip(files_per_core * core_idx) - .take(files_per_core) - .map(|x| x.path()) - .collect::>(); + let mut run_client = + |state: Option<_>, mut mgr: LlmpRestartingEventManager<_, _, _>, core_id| { + let core_idx = options + .cores + .position(core_id) + .expect("Failed to get core index"); + let files = corpus_files + .iter() + .skip(files_per_core * core_idx) + .take(files_per_core) + .map(|x| x.path()) + .collect::>(); - if files.is_empty() { - mgr.send_exiting()?; - Err(Error::ShuttingDown)? - } + if files.is_empty() { + mgr.send_exiting()?; + Err(Error::ShuttingDown)? + } - #[allow(clippy::let_unit_value)] - let mut feedback = (); + #[allow(clippy::let_unit_value)] + let mut feedback = (); - #[allow(clippy::let_unit_value)] - let mut objective = (); + #[allow(clippy::let_unit_value)] + let mut objective = (); - let mut state = state.unwrap_or_else(|| { - StdState::new( - StdRand::with_seed(current_nanos()), - NopCorpus::new(), - NopCorpus::new(), - &mut feedback, - &mut objective, - ) - .unwrap() - }); + let mut state = state.unwrap_or_else(|| { + StdState::new( + StdRand::with_seed(current_nanos()), + NopCorpus::new(), + NopCorpus::new(), + &mut feedback, + &mut objective, + ) + .unwrap() + }); - let scheduler = QueueScheduler::new(); - let mut fuzzer = StdFuzzer::new(scheduler, feedback, objective); + let scheduler = QueueScheduler::new(); + let mut fuzzer = StdFuzzer::new(scheduler, feedback, objective); - let rangemap = emu - .mappings() - .filter_map(|m| { - m.path() - .map(|p| ((m.start() as usize)..(m.end() as usize), p.to_string())) - .filter(|(_, p)| !p.is_empty()) - }) - .enumerate() - .fold( - RangeMap::::new(), - |mut rm, (i, (r, p))| { - rm.insert(r, (i as u16, p)); - rm - }, + let rangemap = emu + .mappings() + .filter_map(|m| { + m.path() + .map(|p| ((m.start() as usize)..(m.end() as usize), p.to_string())) + .filter(|(_, p)| !p.is_empty()) + }) + .enumerate() + .fold( + RangeMap::::new(), + |mut rm, (i, (r, p))| { + rm.insert(r, (i as u16, p)); + rm + }, + ); + + let mut coverage = PathBuf::from(&options.coverage); + let coverage_name = coverage.file_stem().unwrap().to_str().unwrap(); + let coverage_extension = coverage.extension().unwrap_or_default().to_str().unwrap(); + let core = core_id.0; + coverage.set_file_name(format!("{coverage_name}-{core:03}.{coverage_extension}")); + + let mut hooks = QemuHooks::new( + emu.clone(), + tuple_list!(QemuDrCovHelper::new( + QemuInstrumentationAddressRangeFilter::None, + rangemap, + PathBuf::from(coverage), + false, + )), ); - let mut coverage = PathBuf::from(&options.coverage); - let coverage_name = coverage.file_stem().unwrap().to_str().unwrap(); - let coverage_extension = coverage.extension().unwrap_or_default().to_str().unwrap(); - let core = core_id.0; - coverage.set_file_name(format!("{coverage_name}-{core:03}.{coverage_extension}")); + let mut executor = QemuExecutor::new( + &mut hooks, + &mut harness, + (), + &mut fuzzer, + &mut state, + &mut mgr, + options.timeout, + ) + .expect("Failed to create QemuExecutor"); - let mut hooks = QemuHooks::new( - emu.clone(), - tuple_list!(QemuDrCovHelper::new( - QemuInstrumentationAddressRangeFilter::None, - rangemap, - PathBuf::from(coverage), - false, - )), - ); + if state.must_load_initial_inputs() { + state + .load_initial_inputs_by_filenames(&mut fuzzer, &mut executor, &mut mgr, &files) + .unwrap_or_else(|_| { + println!("Failed to load initial corpus at {:?}", &corpus_dir); + process::exit(0); + }); + log::debug!("We imported {} inputs from disk.", state.corpus().count()); + } - let mut executor = QemuExecutor::new( - &mut hooks, - &mut harness, - (), - &mut fuzzer, - &mut state, - &mut mgr, - options.timeout, - ) - .expect("Failed to create QemuExecutor"); + log::debug!("Processed {} inputs from disk.", files.len()); - if state.must_load_initial_inputs() { - state - .load_initial_inputs_by_filenames(&mut fuzzer, &mut executor, &mut mgr, &files) - .unwrap_or_else(|_| { - println!("Failed to load initial corpus at {:?}", &corpus_dir); - process::exit(0); - }); - log::debug!("We imported {} inputs from disk.", state.corpus().count()); - } - - log::debug!("Processed {} inputs from disk.", files.len()); - - mgr.send_exiting()?; - Err(Error::ShuttingDown)? - }; + mgr.send_exiting()?; + Err(Error::ShuttingDown)? + }; match Launcher::builder() .shmem_provider(StdShMemProvider::new().expect("Failed to init shared memory")) diff --git a/fuzzers/qemu_launcher/src/instance.rs b/fuzzers/qemu_launcher/src/instance.rs index a879308ecc..f122452c50 100644 --- a/fuzzers/qemu_launcher/src/instance.rs +++ b/fuzzers/qemu_launcher/src/instance.rs @@ -54,7 +54,7 @@ pub type ClientState = pub type ClientMgr = SimpleEventManager; #[cfg(not(feature = "simplemgr"))] pub type ClientMgr = - MonitorTypedEventManager, M>; + MonitorTypedEventManager, M>; #[derive(TypedBuilder)] pub struct Instance<'a, M: Monitor> { diff --git a/libafl/src/events/centralized.rs b/libafl/src/events/centralized.rs index 627c2d849f..061775b65c 100644 --- a/libafl/src/events/centralized.rs +++ b/libafl/src/events/centralized.rs @@ -1,4 +1,11 @@ -//! A wrapper manager to implement a main-secondary architecture with point-to-point channels +//! Centralized event manager is a special event manager that will be used to achieve a more efficient message passing architecture. + +// Some technical details.. +// A very standard multi-process fuzzing using centralized event manager will consist of 4 components +// 1. The "fuzzer clients", the fuzzer that will do the "normal" fuzzing +// 2. The "centralized broker, the broker that gathers all the testcases from all the fuzzer clients +// 3. The "main evaluator", the evaluator node that will evaluate all the testcases pass by the centralized event manager to see if the testcases are worth propagating +// 4. The "main broker", the gathers the stats from the fuzzer clients and broadcast the newly found testcases from the main evaluator. use alloc::{boxed::Box, string::String, vec::Vec}; use core::{marker::PhantomData, num::NonZeroUsize, time::Duration}; @@ -28,7 +35,6 @@ use crate::{ EventManagerId, EventProcessor, EventRestarter, HasEventManagerId, LogSeverity, }, executors::{Executor, HasObservers}, - feedbacks::transferred::TransferringMetadata, fuzzer::{EvaluatorObservers, ExecutionProcessor}, inputs::{Input, UsesInput}, observers::ObserversTuple, @@ -665,9 +671,6 @@ where } => { log::info!("Received new Testcase from {client_id:?} ({client_config:?}, forward {forward_id:?})"); - if let Ok(meta) = state.metadata_mut::() { - meta.set_transferring(true); - } let res = if client_config.match_with(&self.configuration()) && observers_buf.is_some() { let observers: E::Observers = @@ -697,9 +700,6 @@ where false, )? }; - if let Ok(meta) = state.metadata_mut::() { - meta.set_transferring(false); - } if let Some(item) = res.1 { if res.1.is_some() { diff --git a/libafl/src/events/hooks/mod.rs b/libafl/src/events/hooks/mod.rs new file mode 100644 index 0000000000..48360a3ae7 --- /dev/null +++ b/libafl/src/events/hooks/mod.rs @@ -0,0 +1,118 @@ +//! Hooks for event managers, especifically these are used to hook before and and `handle_in_client`. +//! This will allow user to define pre/post-processing code when the event manager receives any message from +//! other clients +use libafl_bolts::ClientId; + +use crate::{events::Event, state::State, Error}; + +/// The hooks that are run before and after the event manager calls `handle_in_client` +pub trait EventManagerHook +where + S: State, +{ + /// The hook that runs before `handle_in_client` + /// Return false if you want to cancel the subsequent event handling + fn pre_exec( + &mut self, + fuzzer: &mut Z, + executor: &mut E, + state: &mut S, + client_id: ClientId, + event: &Event, + ) -> Result; + /// The hook that runs after `handle_in_client` + /// Return false if you want to cancel the subsequent event handling + fn post_exec( + &mut self, + fuzzer: &mut Z, + executor: &mut E, + state: &mut S, + client_id: ClientId, + ) -> Result; +} + +/// The tuples contains hooks to be executed for `handle_in_client` +pub trait EventManagerHooksTuple +where + S: State, +{ + /// The hook that runs before `handle_in_client` + fn pre_exec_all( + &mut self, + fuzzer: &mut Z, + executor: &mut E, + state: &mut S, + client_id: ClientId, + event: &Event, + ) -> Result; + /// The hook that runs after `handle_in_client` + fn post_exec_all( + &mut self, + fuzzer: &mut Z, + executor: &mut E, + state: &mut S, + client_id: ClientId, + ) -> Result; +} + +impl EventManagerHooksTuple for () +where + S: State, +{ + /// The hook that runs before `handle_in_client` + fn pre_exec_all( + &mut self, + _fuzzer: &mut Z, + _executor: &mut E, + _state: &mut S, + _client_id: ClientId, + _event: &Event, + ) -> Result { + Ok(true) + } + /// The hook that runs after `handle_in_client` + fn post_exec_all( + &mut self, + _fuzzer: &mut Z, + _executor: &mut E, + _state: &mut S, + _client_id: ClientId, + ) -> Result { + Ok(true) + } +} + +impl EventManagerHooksTuple for (Head, Tail) +where + Head: EventManagerHook, + Tail: EventManagerHooksTuple, + S: State, +{ + /// The hook that runs before `handle_in_client` + fn pre_exec_all( + &mut self, + fuzzer: &mut Z, + executor: &mut E, + state: &mut S, + client_id: ClientId, + event: &Event, + ) -> Result { + let first = self.0.pre_exec(fuzzer, executor, state, client_id, event)?; + let second = self + .1 + .pre_exec_all(fuzzer, executor, state, client_id, event)?; + Ok(first & second) + } + /// The hook that runs after `handle_in_client` + fn post_exec_all( + &mut self, + fuzzer: &mut Z, + executor: &mut E, + state: &mut S, + client_id: ClientId, + ) -> Result { + let first = self.0.post_exec(fuzzer, executor, state, client_id)?; + let second = self.1.post_exec_all(fuzzer, executor, state, client_id)?; + Ok(first & second) + } +} diff --git a/libafl/src/events/launcher.rs b/libafl/src/events/launcher.rs index 03a07e3a55..daae796683 100644 --- a/libafl/src/events/launcher.rs +++ b/libafl/src/events/launcher.rs @@ -38,10 +38,12 @@ use libafl_bolts::{ core_affinity::{CoreId, Cores}, llmp::DEFAULT_CLIENT_TIMEOUT_SECS, shmem::ShMemProvider, + tuples::tuple_list, }; #[cfg(feature = "std")] use typed_builder::TypedBuilder; +use super::hooks::EventManagerHooksTuple; #[cfg(all(unix, feature = "std", feature = "fork"))] use crate::events::{CentralizedEventManager, CentralizedLlmpEventBroker}; #[cfg(feature = "std")] @@ -72,9 +74,10 @@ const LIBAFL_DEBUG_OUTPUT: &str = "LIBAFL_DEBUG_OUTPUT"; clippy::ignored_unit_patterns )] #[derive(TypedBuilder)] -pub struct Launcher<'a, CF, MT, S, SP> +pub struct Launcher<'a, CF, EMH, MT, S, SP> where - CF: FnOnce(Option, LlmpRestartingEventManager, CoreId) -> Result<(), Error>, + CF: FnOnce(Option, LlmpRestartingEventManager, CoreId) -> Result<(), Error>, + EMH: EventManagerHooksTuple, S::Input: 'a, MT: Monitor, SP: ShMemProvider + 'static, @@ -126,12 +129,13 @@ where #[builder(default = true)] serialize_state: bool, #[builder(setter(skip), default = PhantomData)] - phantom_data: PhantomData<(&'a S, &'a SP)>, + phantom_data: PhantomData<(&'a S, &'a SP, EMH)>, } -impl Debug for Launcher<'_, CF, MT, S, SP> +impl Debug for Launcher<'_, CF, EMH, MT, S, SP> where - CF: FnOnce(Option, LlmpRestartingEventManager, CoreId) -> Result<(), Error>, + CF: FnOnce(Option, LlmpRestartingEventManager, CoreId) -> Result<(), Error>, + EMH: EventManagerHooksTuple, MT: Monitor + Clone, SP: ShMemProvider + 'static, S: State, @@ -149,18 +153,41 @@ where } } -#[cfg(feature = "std")] -impl<'a, CF, MT, S, SP> Launcher<'a, CF, MT, S, SP> +impl<'a, CF, MT, S, SP> Launcher<'a, CF, (), MT, S, SP> where - CF: FnOnce(Option, LlmpRestartingEventManager, CoreId) -> Result<(), Error>, + CF: FnOnce(Option, LlmpRestartingEventManager<(), S, SP>, CoreId) -> Result<(), Error>, MT: Monitor + Clone, S: State + HasExecutions, SP: ShMemProvider + 'static, { /// Launch the broker and the clients and fuzz #[cfg(all(unix, feature = "std", feature = "fork"))] - #[allow(clippy::similar_names)] pub fn launch(&mut self) -> Result<(), Error> { + Self::launch_with_hooks(self, tuple_list!()) + } + + /// Launch the broker and the clients and fuzz + #[cfg(all(feature = "std", any(windows, not(feature = "fork"))))] + #[allow(unused_mut, clippy::match_wild_err_arm)] + pub fn launch(&mut self) -> Result<(), Error> { + Self::launch_with_hooks(self, tuple_list!()) + } +} + +#[cfg(feature = "std")] +impl<'a, CF, EMH, MT, S, SP> Launcher<'a, CF, EMH, MT, S, SP> +where + CF: FnOnce(Option, LlmpRestartingEventManager, CoreId) -> Result<(), Error>, + EMH: EventManagerHooksTuple + Clone + Copy, + MT: Monitor + Clone, + S: State + HasExecutions, + SP: ShMemProvider + 'static, +{ + /// Launch the broker and the clients and fuzz with a user-supplied hook + #[cfg(all(unix, feature = "std", feature = "fork"))] + #[allow(clippy::similar_names)] + #[allow(clippy::too_many_lines)] + pub fn launch_with_hooks(&mut self, hooks: EMH) -> Result<(), Error> { if self.cores.ids.is_empty() { return Err(Error::illegal_argument( "No cores to spawn on given, cannot launch anything.", @@ -226,7 +253,7 @@ where } // Fuzzer client. keeps retrying the connection to broker till the broker starts - let (state, mgr) = RestartingMgr::::builder() + let (state, mgr) = RestartingMgr::::builder() .shmem_provider(self.shmem_provider.clone()) .broker_port(self.broker_port) .kind(ManagerKind::Client { @@ -235,6 +262,7 @@ where .configuration(self.configuration) .serialize_state(self.serialize_state) .client_timeout(self.client_timeout) + .hooks(hooks) .build() .launch()?; @@ -249,7 +277,7 @@ where log::info!("I am broker!!."); // TODO we don't want always a broker here, think about using different laucher process to spawn different configurations - RestartingMgr::::builder() + RestartingMgr::::builder() .shmem_provider(self.shmem_provider.clone()) .monitor(Some(self.monitor.clone())) .broker_port(self.broker_port) @@ -259,6 +287,7 @@ where .configuration(self.configuration) .serialize_state(self.serialize_state) .client_timeout(self.client_timeout) + .hooks(hooks) .build() .launch()?; @@ -289,7 +318,7 @@ where /// Launch the broker and the clients and fuzz #[cfg(all(feature = "std", any(windows, not(feature = "fork"))))] #[allow(unused_mut, clippy::match_wild_err_arm)] - pub fn launch(&mut self) -> Result<(), Error> { + pub fn launch_with_hooks(&mut self, hooks: EMH) -> Result<(), Error> { use libafl_bolts::core_affinity; let is_client = std::env::var(_AFL_LAUNCHER_CLIENT); @@ -302,7 +331,7 @@ where // let debug_output = std::env::var(LIBAFL_DEBUG_OUTPUT).is_ok(); // the actual client. do the fuzzing - let (state, mgr) = RestartingMgr::::builder() + let (state, mgr) = RestartingMgr::::builder() .shmem_provider(self.shmem_provider.clone()) .broker_port(self.broker_port) .kind(ManagerKind::Client { @@ -311,6 +340,7 @@ where .configuration(self.configuration) .serialize_state(self.serialize_state) .client_timeout(self.client_timeout) + .hooks(hooks) .build() .launch()?; @@ -371,7 +401,7 @@ where #[cfg(feature = "std")] log::info!("I am broker!!."); - RestartingMgr::::builder() + RestartingMgr::::builder() .shmem_provider(self.shmem_provider.clone()) .monitor(Some(self.monitor.clone())) .broker_port(self.broker_port) @@ -381,6 +411,7 @@ where .configuration(self.configuration) .serialize_state(self.serialize_state) .client_timeout(self.client_timeout) + .hooks(hooks) .build() .launch()?; @@ -410,7 +441,7 @@ pub struct CentralizedLauncher<'a, CF, MT, S, SP> where CF: FnOnce( Option, - CentralizedEventManager, SP>, + CentralizedEventManager, SP>, // No hooks for centralized EM CoreId, ) -> Result<(), Error>, S::Input: 'a, @@ -476,7 +507,7 @@ impl Debug for CentralizedLauncher<'_, CF, MT, S, SP> where CF: FnOnce( Option, - CentralizedEventManager, SP>, + CentralizedEventManager, SP>, CoreId, ) -> Result<(), Error>, MT: Monitor + Clone, @@ -501,7 +532,7 @@ impl<'a, CF, MT, S, SP> CentralizedLauncher<'a, CF, MT, S, SP> where CF: FnOnce( Option, - CentralizedEventManager, SP>, + CentralizedEventManager, SP>, CoreId, ) -> Result<(), Error>, MT: Monitor + Clone, @@ -597,7 +628,7 @@ where } // Fuzzer client. keeps retrying the connection to broker till the broker starts - let (state, mgr) = RestartingMgr::::builder() + let (state, mgr) = RestartingMgr::<(), MT, S, SP>::builder() .shmem_provider(self.shmem_provider.clone()) .broker_port(self.broker_port) .kind(ManagerKind::Client { @@ -606,6 +637,7 @@ where .configuration(self.configuration) .serialize_state(self.serialize_state) .client_timeout(self.client_timeout) + .hooks(tuple_list!()) .build() .launch()?; @@ -626,7 +658,7 @@ where log::info!("I am broker!!."); // TODO we don't want always a broker here, think about using different laucher process to spawn different configurations - RestartingMgr::::builder() + RestartingMgr::<(), MT, S, SP>::builder() .shmem_provider(self.shmem_provider.clone()) .monitor(Some(self.monitor.clone())) .broker_port(self.broker_port) @@ -636,6 +668,7 @@ where .configuration(self.configuration) .serialize_state(self.serialize_state) .client_timeout(self.client_timeout) + .hooks(tuple_list!()) .build() .launch()?; diff --git a/libafl/src/events/llmp.rs b/libafl/src/events/llmp.rs index a675a64450..b3f67757e5 100644 --- a/libafl/src/events/llmp.rs +++ b/libafl/src/events/llmp.rs @@ -31,15 +31,14 @@ use libafl_bolts::{llmp::LlmpConnection, shmem::StdShMemProvider, staterestore:: use libafl_bolts::{ llmp::{self, LlmpClient, LlmpClientDescription, Tag}, shmem::ShMemProvider, + tuples::tuple_list, ClientId, }; -#[cfg(feature = "std")] -use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; #[cfg(feature = "std")] use typed_builder::TypedBuilder; -use super::{CustomBufEventResult, CustomBufHandlerFn}; +use super::{hooks::EventManagerHooksTuple, CustomBufEventResult, CustomBufHandlerFn}; #[cfg(all(unix, feature = "std"))] use crate::events::EVENTMGR_SIGHANDLER_STATE; use crate::{ @@ -48,7 +47,6 @@ use crate::{ EventProcessor, EventRestarter, HasCustomBufHandlers, HasEventManagerId, ProgressReporter, }, executors::{Executor, HasObservers}, - feedbacks::transferred::TransferringMetadata, fuzzer::{EvaluatorObservers, ExecutionProcessor}, inputs::{Input, InputConverter, UsesInput}, monitors::Monitor, @@ -356,11 +354,12 @@ pub trait EventStatsCollector {} /// An [`EventManager`] that forwards all events to other attached fuzzers on shared maps or via tcp, /// using low-level message passing, [`libafl_bolts::llmp`]. -pub struct LlmpEventManager +pub struct LlmpEventManager where S: State, SP: ShMemProvider + 'static, { + hooks: EMH, /// The LLMP client for inter process communication llmp: LlmpClient, /// The custom buf handler @@ -383,7 +382,7 @@ where } #[cfg(feature = "adaptive_serialization")] -impl EventStatsCollector for LlmpEventManager +impl EventStatsCollector for LlmpEventManager where SP: ShMemProvider + 'static, S: State, @@ -415,7 +414,7 @@ where } } -impl core::fmt::Debug for LlmpEventManager +impl core::fmt::Debug for LlmpEventManager where SP: ShMemProvider + 'static, S: State, @@ -433,7 +432,7 @@ where } } -impl Drop for LlmpEventManager +impl Drop for LlmpEventManager where SP: ShMemProvider + 'static, S: State, @@ -444,14 +443,15 @@ where } } -impl LlmpEventManager +impl LlmpEventManager<(), S, SP> where S: State, SP: ShMemProvider + 'static, { /// Create a manager from a raw LLMP client pub fn new(llmp: LlmpClient, configuration: EventConfig) -> Result { - Ok(Self { + Ok(LlmpEventManager { + hooks: tuple_list!(), llmp, #[cfg(feature = "llmp_compression")] compressor: GzipCompressor::new(COMPRESS_THRESHOLD), @@ -478,23 +478,9 @@ where shmem_provider: SP, port: u16, configuration: EventConfig, - ) -> Result { - Ok(Self { - llmp: LlmpClient::create_attach_to_tcp(shmem_provider, port)?, - #[cfg(feature = "llmp_compression")] - compressor: GzipCompressor::new(COMPRESS_THRESHOLD), - configuration, - #[cfg(feature = "adaptive_serialization")] - serialization_time: Duration::ZERO, - #[cfg(feature = "adaptive_serialization")] - deserialization_time: Duration::ZERO, - #[cfg(feature = "adaptive_serialization")] - serializations_cnt: 0, - #[cfg(feature = "adaptive_serialization")] - should_serialize_cnt: 0, - phantom: PhantomData, - custom_buf_handlers: vec![], - }) + ) -> Result, Error> { + let llmp = LlmpClient::create_attach_to_tcp(shmem_provider, port)?; + Self::new(llmp, configuration) } /// If a client respawns, it may reuse the existing connection, previously @@ -504,9 +490,36 @@ where shmem_provider: SP, env_name: &str, configuration: EventConfig, + ) -> Result, Error> { + let llmp = LlmpClient::on_existing_from_env(shmem_provider, env_name)?; + Self::new(llmp, configuration) + } + + /// Create an existing client from description + pub fn existing_client_from_description( + shmem_provider: SP, + description: &LlmpClientDescription, + configuration: EventConfig, + ) -> Result, Error> { + let llmp = LlmpClient::existing_client_from_description(shmem_provider, description)?; + Self::new(llmp, configuration) + } +} + +impl LlmpEventManager +where + S: State, + SP: ShMemProvider + 'static, +{ + /// Create a manager from a raw LLMP client with hooks + pub fn with_hooks( + llmp: LlmpClient, + configuration: EventConfig, + hooks: EMH, ) -> Result { Ok(Self { - llmp: LlmpClient::on_existing_from_env(shmem_provider, env_name)?, + hooks, + llmp, #[cfg(feature = "llmp_compression")] compressor: GzipCompressor::new(COMPRESS_THRESHOLD), configuration, @@ -523,33 +536,49 @@ where }) } + /// Create an LLMP event manager on a port with hook + /// + /// If the port is not yet bound, it will act as a broker; otherwise, it + /// will act as a client. + #[cfg(feature = "std")] + pub fn on_port_with_hooks( + shmem_provider: SP, + port: u16, + configuration: EventConfig, + hooks: EMH, + ) -> Result { + let llmp = LlmpClient::create_attach_to_tcp(shmem_provider, port)?; + Self::with_hooks(llmp, configuration, hooks) + } + + /// If a client respawns, it may reuse the existing connection, previously + /// stored by [`LlmpClient::to_env()`]. + /// create a event manager from env with hooks + #[cfg(feature = "std")] + pub fn existing_client_from_env_with_hooks( + shmem_provider: SP, + env_name: &str, + configuration: EventConfig, + hooks: EMH, + ) -> Result { + let llmp = LlmpClient::on_existing_from_env(shmem_provider, env_name)?; + Self::with_hooks(llmp, configuration, hooks) + } + /// Describe the client event manager's LLMP parts in a restorable fashion pub fn describe(&self) -> Result { self.llmp.describe() } /// Create an existing client from description - pub fn existing_client_from_description( + pub fn existing_client_from_description_with_hooks( shmem_provider: SP, description: &LlmpClientDescription, configuration: EventConfig, + hooks: EMH, ) -> Result { - Ok(Self { - llmp: LlmpClient::existing_client_from_description(shmem_provider, description)?, - #[cfg(feature = "llmp_compression")] - compressor: GzipCompressor::new(COMPRESS_THRESHOLD), - configuration, - #[cfg(feature = "adaptive_serialization")] - serialization_time: Duration::ZERO, - #[cfg(feature = "adaptive_serialization")] - deserialization_time: Duration::ZERO, - #[cfg(feature = "adaptive_serialization")] - serializations_cnt: 0, - #[cfg(feature = "adaptive_serialization")] - should_serialize_cnt: 0, - phantom: PhantomData, - custom_buf_handlers: vec![], - }) + let llmp = LlmpClient::existing_client_from_description(shmem_provider, description)?; + Self::with_hooks(llmp, configuration, hooks) } /// Write the config for a client [`EventManager`] to env vars, a new @@ -560,8 +589,9 @@ where } } -impl LlmpEventManager +impl LlmpEventManager where + EMH: EventManagerHooksTuple, S: State + HasExecutions + HasMetadata, SP: ShMemProvider + 'static, { @@ -580,6 +610,12 @@ where for<'a> E::Observers: Deserialize<'a>, Z: ExecutionProcessor + EvaluatorObservers, { + if self + .hooks + .pre_exec_all(fuzzer, executor, state, client_id, &event)? + { + return Ok(()); + } match event { Event::NewTestcase { input, @@ -593,9 +629,6 @@ where } => { log::info!("Received new Testcase from {client_id:?} ({client_config:?}, forward {forward_id:?})"); - if let Ok(meta) = state.metadata_mut::() { - meta.set_transferring(true); - } let res = if client_config.match_with(&self.configuration) && observers_buf.is_some() { @@ -621,13 +654,9 @@ where state, executor, self, input, false, )? }; - if let Ok(meta) = state.metadata_mut::() { - meta.set_transferring(false); - } if let Some(item) = res.1 { log::info!("Added received Testcase as item #{item}"); } - Ok(()) } Event::CustomBuf { tag, buf } => { for handler in &mut self.custom_buf_handlers { @@ -635,17 +664,21 @@ where break; } } - Ok(()) } - _ => Err(Error::unknown(format!( - "Received illegal message that message should not have arrived: {:?}.", - event.name() - ))), + _ => { + return Err(Error::unknown(format!( + "Received illegal message that message should not have arrived: {:?}.", + event.name() + ))); + } } + self.hooks + .post_exec_all(fuzzer, executor, state, client_id)?; + Ok(()) } } -impl LlmpEventManager { +impl LlmpEventManager { /// Send information that this client is exiting. /// The other side may free up all allocated memory. /// We are no longer allowed to send anything afterwards. @@ -654,7 +687,7 @@ impl LlmpEventManager { } } -impl UsesState for LlmpEventManager +impl UsesState for LlmpEventManager where S: State, SP: ShMemProvider, @@ -662,7 +695,7 @@ where type State = S; } -impl EventFirer for LlmpEventManager +impl EventFirer for LlmpEventManager where S: State, SP: ShMemProvider, @@ -756,7 +789,7 @@ where } } -impl EventRestarter for LlmpEventManager +impl EventRestarter for LlmpEventManager where S: State, SP: ShMemProvider, @@ -769,8 +802,9 @@ where } } -impl EventProcessor for LlmpEventManager +impl EventProcessor for LlmpEventManager where + EMH: EventManagerHooksTuple, S: State + HasExecutions + HasMetadata, SP: ShMemProvider, E: HasObservers + Executor, @@ -814,17 +848,18 @@ where } } -impl EventManager for LlmpEventManager +impl EventManager for LlmpEventManager where E: HasObservers + Executor, for<'a> E::Observers: Deserialize<'a>, + EMH: EventManagerHooksTuple, S: State + HasExecutions + HasMetadata + HasLastReportTime, SP: ShMemProvider, Z: EvaluatorObservers + ExecutionProcessor, { } -impl HasCustomBufHandlers for LlmpEventManager +impl HasCustomBufHandlers for LlmpEventManager where S: State, SP: ShMemProvider, @@ -837,14 +872,14 @@ where } } -impl ProgressReporter for LlmpEventManager +impl ProgressReporter for LlmpEventManager where S: State + HasExecutions + HasMetadata + HasLastReportTime, SP: ShMemProvider, { } -impl HasEventManagerId for LlmpEventManager +impl HasEventManagerId for LlmpEventManager where S: State, SP: ShMemProvider, @@ -858,14 +893,14 @@ where /// A manager that can restart on the fly, storing states in-between (in `on_restart`) #[cfg(feature = "std")] #[derive(Debug)] -pub struct LlmpRestartingEventManager +pub struct LlmpRestartingEventManager where S: State, SP: ShMemProvider + 'static, //CE: CustomEvent, { /// The embedded LLMP event manager - llmp_mgr: LlmpEventManager, + llmp_mgr: LlmpEventManager, /// The staterestorer to serialize the state for the next runner staterestorer: StateRestorer, /// Decide if the state restorer must save the serialized state @@ -873,7 +908,7 @@ where } #[cfg(all(feature = "std", feature = "adaptive_serialization"))] -impl EventStatsCollector for LlmpRestartingEventManager +impl EventStatsCollector for LlmpRestartingEventManager where SP: ShMemProvider + 'static, S: State, @@ -906,7 +941,7 @@ where } #[cfg(all(feature = "std", not(feature = "adaptive_serialization")))] -impl EventStatsCollector for LlmpRestartingEventManager +impl EventStatsCollector for LlmpRestartingEventManager where SP: ShMemProvider + 'static, S: State, @@ -914,7 +949,7 @@ where } #[cfg(feature = "std")] -impl UsesState for LlmpRestartingEventManager +impl UsesState for LlmpRestartingEventManager where S: State, SP: ShMemProvider + 'static, @@ -923,7 +958,7 @@ where } #[cfg(feature = "std")] -impl ProgressReporter for LlmpRestartingEventManager +impl ProgressReporter for LlmpRestartingEventManager where S: State + HasExecutions + HasMetadata + HasLastReportTime, SP: ShMemProvider, @@ -931,7 +966,7 @@ where } #[cfg(feature = "std")] -impl EventFirer for LlmpRestartingEventManager +impl EventFirer for LlmpRestartingEventManager where SP: ShMemProvider, S: State, @@ -959,7 +994,7 @@ where } #[cfg(feature = "std")] -impl EventRestarter for LlmpRestartingEventManager +impl EventRestarter for LlmpRestartingEventManager where S: State + HasExecutions, SP: ShMemProvider, @@ -997,10 +1032,11 @@ where } #[cfg(feature = "std")] -impl EventProcessor for LlmpRestartingEventManager +impl EventProcessor for LlmpRestartingEventManager where - E: HasObservers + Executor, Z>, + E: HasObservers + Executor, Z>, for<'a> E::Observers: Deserialize<'a>, + EMH: EventManagerHooksTuple, S: State + HasExecutions + HasMetadata, SP: ShMemProvider + 'static, Z: EvaluatorObservers + ExecutionProcessor, //CE: CustomEvent, @@ -1011,10 +1047,11 @@ where } #[cfg(feature = "std")] -impl EventManager for LlmpRestartingEventManager +impl EventManager for LlmpRestartingEventManager where - E: HasObservers + Executor, Z>, + E: HasObservers + Executor, Z>, for<'a> E::Observers: Deserialize<'a>, + EMH: EventManagerHooksTuple, S: State + HasExecutions + HasMetadata + HasLastReportTime, SP: ShMemProvider + 'static, Z: EvaluatorObservers + ExecutionProcessor, //CE: CustomEvent, @@ -1022,7 +1059,7 @@ where } #[cfg(feature = "std")] -impl HasEventManagerId for LlmpRestartingEventManager +impl HasEventManagerId for LlmpRestartingEventManager where S: State, SP: ShMemProvider + 'static, @@ -1039,14 +1076,14 @@ const _ENV_FUZZER_RECEIVER: &str = "_AFL_ENV_FUZZER_RECEIVER"; const _ENV_FUZZER_BROKER_CLIENT_INITIAL: &str = "_AFL_ENV_FUZZER_BROKER_CLIENT"; #[cfg(feature = "std")] -impl LlmpRestartingEventManager +impl LlmpRestartingEventManager where S: State, SP: ShMemProvider + 'static, //CE: CustomEvent, { /// Create a new runner, the executed child doing the actual fuzzing. - pub fn new(llmp_mgr: LlmpEventManager, staterestorer: StateRestorer) -> Self { + pub fn new(llmp_mgr: LlmpEventManager, staterestorer: StateRestorer) -> Self { Self { llmp_mgr, staterestorer, @@ -1056,7 +1093,7 @@ where /// Create a new runner specifying if it must save the serialized state on restart. pub fn with_save_state( - llmp_mgr: LlmpEventManager, + llmp_mgr: LlmpEventManager, staterestorer: StateRestorer, save_state: bool, ) -> Self { @@ -1102,7 +1139,13 @@ pub fn setup_restarting_mgr_std( monitor: MT, broker_port: u16, configuration: EventConfig, -) -> Result<(Option, LlmpRestartingEventManager), Error> +) -> Result< + ( + Option, + LlmpRestartingEventManager<(), S, StdShMemProvider>, + ), + Error, +> where MT: Monitor + Clone, S: State + HasExecutions, @@ -1112,6 +1155,7 @@ where .monitor(Some(monitor)) .broker_port(broker_port) .configuration(configuration) + .hooks(tuple_list!()) .build() .launch() } @@ -1122,9 +1166,10 @@ where #[cfg(feature = "std")] #[allow(clippy::default_trait_access, clippy::ignored_unit_patterns)] #[derive(TypedBuilder, Debug)] -pub struct RestartingMgr +pub struct RestartingMgr where - S: UsesInput + DeserializeOwned, + EMH: EventManagerHooksTuple, + S: State, SP: ShMemProvider + 'static, MT: Monitor, //CE: CustomEvent, @@ -1160,14 +1205,17 @@ where /// The timeout duration used for llmp client timeout #[builder(default = DEFAULT_CLIENT_TIMEOUT_SECS)] client_timeout: Duration, + /// The hooks passed to event manager: + hooks: EMH, #[builder(setter(skip), default = PhantomData)] - phantom_data: PhantomData, + phantom_data: PhantomData<(EMH, S)>, } #[cfg(feature = "std")] #[allow(clippy::type_complexity, clippy::too_many_lines)] -impl RestartingMgr +impl RestartingMgr where + EMH: EventManagerHooksTuple + Copy + Clone, SP: ShMemProvider, S: State + HasExecutions, MT: Monitor + Clone, @@ -1186,7 +1234,7 @@ where } /// Launch the broker and the clients and fuzz - pub fn launch(&mut self) -> Result<(Option, LlmpRestartingEventManager), Error> { + pub fn launch(&mut self) -> Result<(Option, LlmpRestartingEventManager), Error> { // We start ourself as child process to actually fuzz let (staterestorer, new_shmem_provider, core_id) = if std::env::var(_ENV_FUZZER_SENDER) .is_err() @@ -1230,7 +1278,11 @@ where return Err(Error::shutting_down()); } LlmpConnection::IsClient { client } => { - let mgr = LlmpEventManager::::new(client, self.configuration)?; + let mgr = LlmpEventManager::::with_hooks( + client, + self.configuration, + self.hooks, + )?; (mgr, None) } } @@ -1248,10 +1300,11 @@ where } ManagerKind::Client { cpu_core } => { // We are a client - let mgr = LlmpEventManager::::on_port( + let mgr = LlmpEventManager::::on_port_with_hooks( self.shmem_provider.clone(), self.broker_port, self.configuration, + self.hooks, )?; (mgr, cpu_core) @@ -1365,10 +1418,11 @@ where ( state_opt, LlmpRestartingEventManager::with_save_state( - LlmpEventManager::existing_client_from_description( + LlmpEventManager::existing_client_from_description_with_hooks( new_shmem_provider, &mgr_description, self.configuration, + self.hooks, )?, staterestorer, self.serialize_state, @@ -1377,10 +1431,11 @@ where } else { log::info!("First run. Let's set it all up"); // Mgr to send and receive msgs from/to all other fuzzer instances - let mgr = LlmpEventManager::::existing_client_from_env( + let mgr = LlmpEventManager::::existing_client_from_env_with_hooks( new_shmem_provider, _ENV_FUZZER_BROKER_CLIENT_INITIAL, self.configuration, + self.hooks, )?; ( @@ -1539,7 +1594,7 @@ where executor: &mut E, state: &mut S, manager: &mut EM, - _client_id: ClientId, + client_id: ClientId, event: Event, ) -> Result<(), Error> where @@ -1559,15 +1614,12 @@ where executions: _, forward_id, } => { - log::info!("Received new Testcase to convert from {_client_id:?} (forward {forward_id:?}, forward {forward_id:?})"); + log::info!("Received new Testcase to convert from {client_id:?} (forward {forward_id:?}, forward {forward_id:?})"); let Some(converter) = self.converter_back.as_mut() else { return Ok(()); }; - if let Ok(meta) = state.metadata_mut::() { - meta.set_transferring(true); - } let res = fuzzer.evaluate_input_with_observers::( state, executor, @@ -1575,9 +1627,6 @@ where converter.convert(input)?, false, )?; - if let Ok(meta) = state.metadata_mut::() { - meta.set_transferring(false); - } if let Some(item) = res.1 { log::info!("Added received Testcase as item #{item}"); diff --git a/libafl/src/events/mod.rs b/libafl/src/events/mod.rs index fdade760a5..178de9f8fb 100644 --- a/libafl/src/events/mod.rs +++ b/libafl/src/events/mod.rs @@ -1,6 +1,8 @@ //! An [`EventManager`] manages all events that go to other instances of the fuzzer. //! The messages are commonly information about new Testcases as well as stats and other [`Event`]s. +pub mod hooks; + pub mod simple; pub use simple::*; #[cfg(all(unix, feature = "std"))] @@ -12,6 +14,8 @@ pub use centralized::*; pub mod launcher; #[allow(clippy::ignored_unit_patterns)] pub mod llmp; +pub use llmp::*; + #[cfg(feature = "tcp_manager")] #[allow(clippy::ignored_unit_patterns)] pub mod tcp; @@ -31,7 +35,6 @@ pub use launcher::*; #[cfg(all(unix, feature = "std"))] use libafl_bolts::os::unix_signals::{siginfo_t, ucontext_t, Handler, Signal}; use libafl_bolts::{current_time, ClientId}; -pub use llmp::*; use serde::{Deserialize, Serialize}; #[cfg(feature = "std")] use uuid::Uuid; diff --git a/libafl/src/events/tcp.rs b/libafl/src/events/tcp.rs index 222c771529..95efd7c0ec 100644 --- a/libafl/src/events/tcp.rs +++ b/libafl/src/events/tcp.rs @@ -23,7 +23,7 @@ use libafl_bolts::os::startable_self; use libafl_bolts::os::unix_signals::setup_signal_handler; #[cfg(all(feature = "std", feature = "fork", unix))] use libafl_bolts::os::{fork, ForkResult}; -use libafl_bolts::{shmem::ShMemProvider, ClientId}; +use libafl_bolts::{shmem::ShMemProvider, tuples::tuple_list, ClientId}; #[cfg(feature = "std")] use libafl_bolts::{shmem::StdShMemProvider, staterestore::StateRestorer}; use serde::{de::DeserializeOwned, Deserialize}; @@ -40,11 +40,11 @@ use super::{CustomBufEventResult, CustomBufHandlerFn}; use crate::events::EVENTMGR_SIGHANDLER_STATE; use crate::{ events::{ - BrokerEventResult, Event, EventConfig, EventFirer, EventManager, EventManagerId, - EventProcessor, EventRestarter, HasCustomBufHandlers, HasEventManagerId, ProgressReporter, + hooks::EventManagerHooksTuple, BrokerEventResult, Event, EventConfig, EventFirer, + EventManager, EventManagerId, EventProcessor, EventRestarter, HasCustomBufHandlers, + HasEventManagerId, ProgressReporter, }, executors::{Executor, HasObservers}, - feedbacks::transferred::TransferringMetadata, fuzzer::{EvaluatorObservers, ExecutionProcessor}, inputs::{Input, UsesInput}, monitors::Monitor, @@ -410,10 +410,12 @@ where } /// An [`EventManager`] that forwards all events to other attached via tcp. -pub struct TcpEventManager +pub struct TcpEventManager where + EMH: EventManagerHooksTuple, S: State, { + hooks: EMH, /// The TCP stream for inter process communication tcp: TcpStream, /// Our `CientId` @@ -429,8 +431,9 @@ where phantom: PhantomData, } -impl core::fmt::Debug for TcpEventManager +impl core::fmt::Debug for TcpEventManager where + EMH: EventManagerHooksTuple, S: State, { fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { @@ -446,8 +449,9 @@ where } } -impl Drop for TcpEventManager +impl Drop for TcpEventManager where + EMH: EventManagerHooksTuple, S: State, { /// TCP clients will have to wait until their pages are mapped by somebody. @@ -456,7 +460,7 @@ where } } -impl TcpEventManager +impl TcpEventManager<(), S> where S: State + HasExecutions + HasMetadata, { @@ -465,6 +469,65 @@ where addr: &A, client_id: ClientId, configuration: EventConfig, + ) -> Result { + Self::existing_with_hooks(addr, client_id, configuration, tuple_list!()) + } + + /// Create a manager from a raw TCP client + pub fn new(addr: &A, configuration: EventConfig) -> Result { + Self::existing_with_hooks(addr, UNDEFINED_CLIENT_ID, configuration, tuple_list!()) + } + + /// Create an TCP event manager on a port specifying the client id + /// + /// If the port is not yet bound, it will act as a broker; otherwise, it + /// will act as a client. + pub fn existing_on_port( + port: u16, + client_id: ClientId, + configuration: EventConfig, + ) -> Result { + Self::existing_with_hooks( + &("127.0.0.1", port), + client_id, + configuration, + tuple_list!(), + ) + } + + /// Create an TCP event manager on a port with hooks + /// + /// If the port is not yet bound, it will act as a broker; otherwise, it + /// will act as a client. + pub fn on_port(port: u16, configuration: EventConfig) -> Result { + Self::with_hooks(&("127.0.0.1", port), configuration, tuple_list!()) + } + + /// Create an TCP event manager on a port specifying the client id from env + /// + /// If the port is not yet bound, it will act as a broker; otherwise, it + /// will act as a client. + pub fn existing_from_env( + addr: &A, + env_name: &str, + configuration: EventConfig, + ) -> Result { + let this_id = ClientId(str::parse::(&env::var(env_name)?)?); + Self::existing_with_hooks(addr, this_id, configuration, tuple_list!()) + } +} + +impl TcpEventManager +where + EMH: EventManagerHooksTuple, + S: State + HasExecutions + HasMetadata, +{ + /// Create a manager from a raw TCP client specifying the client id with hooks + pub fn existing_with_hooks( + addr: &A, + client_id: ClientId, + configuration: EventConfig, + hooks: EMH, ) -> Result { let mut tcp = TcpStream::connect(addr)?; @@ -479,6 +542,7 @@ where println!("Our client id: {client_id:?}"); Ok(Self { + hooks, tcp, client_id, #[cfg(feature = "tcp_compression")] @@ -489,42 +553,52 @@ where }) } - /// Create a manager from a raw TCP client - pub fn new(addr: &A, configuration: EventConfig) -> Result { - Self::existing(addr, UNDEFINED_CLIENT_ID, configuration) + /// Create a manager from a raw TCP client with hooks + pub fn with_hooks( + addr: &A, + configuration: EventConfig, + hooks: EMH, + ) -> Result { + Self::existing_with_hooks(addr, UNDEFINED_CLIENT_ID, configuration, hooks) } - /// Create an TCP event manager on a port specifying the client id + /// Create an TCP event manager on a port specifying the client id with hooks /// /// If the port is not yet bound, it will act as a broker; otherwise, it /// will act as a client. - pub fn existing_on_port( + pub fn existing_on_port_with_hooks( port: u16, client_id: ClientId, configuration: EventConfig, + hooks: EMH, ) -> Result { - Self::existing(&("127.0.0.1", port), client_id, configuration) + Self::existing_with_hooks(&("127.0.0.1", port), client_id, configuration, hooks) } - /// Create an TCP event manager on a port + /// Create an TCP event manager on a port with hooks /// /// If the port is not yet bound, it will act as a broker; otherwise, it /// will act as a client. - pub fn on_port(port: u16, configuration: EventConfig) -> Result { - Self::new(&("127.0.0.1", port), configuration) + pub fn on_port_with_hooks( + port: u16, + configuration: EventConfig, + hooks: EMH, + ) -> Result { + Self::with_hooks(&("127.0.0.1", port), configuration, hooks) } - /// Create an TCP event manager on a port specifying the client id from env + /// Create an TCP event manager on a port specifying the client id from env with hooks /// /// If the port is not yet bound, it will act as a broker; otherwise, it /// will act as a client. - pub fn existing_from_env( + pub fn existing_from_env_with_hooks( addr: &A, env_name: &str, configuration: EventConfig, + hooks: EMH, ) -> Result { let this_id = ClientId(str::parse::(&env::var(env_name)?)?); - Self::existing(addr, this_id, configuration) + Self::existing_with_hooks(addr, this_id, configuration, hooks) } /// Write the client id for a client [`EventManager`] to env vars @@ -547,6 +621,12 @@ where for<'a> E::Observers: Deserialize<'a>, Z: ExecutionProcessor + EvaluatorObservers, { + if !self + .hooks + .pre_exec_all(fuzzer, executor, state, client_id, &event)? + { + return Ok(()); + } match event { Event::NewTestcase { input, @@ -560,9 +640,6 @@ where } => { log::info!("Received new Testcase from {client_id:?} ({client_config:?}, forward {forward_id:?})"); - if let Ok(meta) = state.metadata_mut::() { - meta.set_transferring(true); - } let _res = if client_config.match_with(&self.configuration) && observers_buf.is_some() { @@ -582,13 +659,9 @@ where state, executor, self, input, false, )? }; - if let Ok(meta) = state.metadata_mut::() { - meta.set_transferring(false); - } if let Some(item) = _res.1 { log::info!("Added received Testcase as item #{item}"); } - Ok(()) } Event::CustomBuf { tag, buf } => { for handler in &mut self.custom_buf_handlers { @@ -596,18 +669,23 @@ where break; } } - Ok(()) } - _ => Err(Error::unknown(format!( - "Received illegal message that message should not have arrived: {:?}.", - event.name() - ))), + _ => { + return Err(Error::unknown(format!( + "Received illegal message that message should not have arrived: {:?}.", + event.name() + ))) + } } + self.hooks + .post_exec_all(fuzzer, executor, state, client_id)?; + Ok(()) } } -impl TcpEventManager +impl TcpEventManager where + EMH: EventManagerHooksTuple, S: State, { /// Send information that this client is exiting. @@ -620,15 +698,17 @@ where } } -impl UsesState for TcpEventManager +impl UsesState for TcpEventManager where + EMH: EventManagerHooksTuple, S: State, { type State = S; } -impl EventFirer for TcpEventManager +impl EventFirer for TcpEventManager where + EMH: EventManagerHooksTuple, S: State, { #[cfg(feature = "tcp_compression")] @@ -674,8 +754,9 @@ where } } -impl EventRestarter for TcpEventManager +impl EventRestarter for TcpEventManager where + EMH: EventManagerHooksTuple, S: State, { /// The TCP client needs to wait until a broker has mapped all pages before shutting down. @@ -686,11 +767,12 @@ where } } -impl EventProcessor for TcpEventManager +impl EventProcessor for TcpEventManager where - S: State + HasExecutions + HasMetadata, E: HasObservers + Executor, for<'a> E::Observers: Deserialize<'a>, + EMH: EventManagerHooksTuple, + S: State + HasExecutions + HasMetadata, Z: EvaluatorObservers + ExecutionProcessor, { fn process( @@ -747,17 +829,19 @@ where } } -impl EventManager for TcpEventManager +impl EventManager for TcpEventManager where E: HasObservers + Executor, for<'a> E::Observers: Deserialize<'a>, + EMH: EventManagerHooksTuple, S: State + HasExecutions + HasMetadata + HasLastReportTime, Z: EvaluatorObservers + ExecutionProcessor, { } -impl HasCustomBufHandlers for TcpEventManager +impl HasCustomBufHandlers for TcpEventManager where + EMH: EventManagerHooksTuple, S: State, { fn add_custom_buf_handler( @@ -768,13 +852,16 @@ where } } -impl ProgressReporter for TcpEventManager where - S: State + HasExecutions + HasMetadata + HasLastReportTime +impl ProgressReporter for TcpEventManager +where + EMH: EventManagerHooksTuple, + S: State + HasExecutions + HasMetadata + HasLastReportTime, { } -impl HasEventManagerId for TcpEventManager +impl HasEventManagerId for TcpEventManager where + EMH: EventManagerHooksTuple, S: State, { /// Gets the id assigned to this staterestorer. @@ -786,14 +873,15 @@ where /// A manager that can restart on the fly, storing states in-between (in `on_restart`) #[cfg(feature = "std")] #[derive(Debug)] -pub struct TcpRestartingEventManager +pub struct TcpRestartingEventManager where + EMH: EventManagerHooksTuple, S: State, SP: ShMemProvider + 'static, //CE: CustomEvent, { /// The embedded TCP event manager - tcp_mgr: TcpEventManager, + tcp_mgr: TcpEventManager, /// The staterestorer to serialize the state for the next runner staterestorer: StateRestorer, /// Decide if the state restorer must save the serialized state @@ -801,8 +889,9 @@ where } #[cfg(feature = "std")] -impl UsesState for TcpRestartingEventManager +impl UsesState for TcpRestartingEventManager where + EMH: EventManagerHooksTuple, S: State, SP: ShMemProvider + 'static, { @@ -810,16 +899,18 @@ where } #[cfg(feature = "std")] -impl ProgressReporter for TcpRestartingEventManager +impl ProgressReporter for TcpRestartingEventManager where + EMH: EventManagerHooksTuple, S: State + HasExecutions + HasMetadata + HasLastReportTime, SP: ShMemProvider, { } #[cfg(feature = "std")] -impl EventFirer for TcpRestartingEventManager +impl EventFirer for TcpRestartingEventManager where + EMH: EventManagerHooksTuple, SP: ShMemProvider, S: State, //CE: CustomEvent, @@ -839,8 +930,9 @@ where } #[cfg(feature = "std")] -impl EventRestarter for TcpRestartingEventManager +impl EventRestarter for TcpRestartingEventManager where + EMH: EventManagerHooksTuple, S: State + HasExecutions, SP: ShMemProvider, //CE: CustomEvent, @@ -877,10 +969,11 @@ where } #[cfg(feature = "std")] -impl EventProcessor for TcpRestartingEventManager +impl EventProcessor for TcpRestartingEventManager where - E: HasObservers + Executor, Z>, + E: HasObservers + Executor, Z>, for<'a> E::Observers: Deserialize<'a>, + EMH: EventManagerHooksTuple, S: State + HasExecutions + HasMetadata, SP: ShMemProvider + 'static, Z: EvaluatorObservers + ExecutionProcessor, //CE: CustomEvent, @@ -891,10 +984,11 @@ where } #[cfg(feature = "std")] -impl EventManager for TcpRestartingEventManager +impl EventManager for TcpRestartingEventManager where - E: HasObservers + Executor, Z>, + E: HasObservers + Executor, Z>, for<'a> E::Observers: Deserialize<'a>, + EMH: EventManagerHooksTuple, S: State + HasExecutions + HasMetadata + HasLastReportTime, SP: ShMemProvider + 'static, Z: EvaluatorObservers + ExecutionProcessor, //CE: CustomEvent, @@ -902,8 +996,9 @@ where } #[cfg(feature = "std")] -impl HasEventManagerId for TcpRestartingEventManager +impl HasEventManagerId for TcpRestartingEventManager where + EMH: EventManagerHooksTuple, S: State, SP: ShMemProvider + 'static, { @@ -919,14 +1014,15 @@ const _ENV_FUZZER_RECEIVER: &str = "_AFL_ENV_FUZZER_RECEIVER"; const _ENV_FUZZER_BROKER_CLIENT_INITIAL: &str = "_AFL_ENV_FUZZER_BROKER_CLIENT"; #[cfg(feature = "std")] -impl TcpRestartingEventManager +impl TcpRestartingEventManager where + EMH: EventManagerHooksTuple, S: State, SP: ShMemProvider + 'static, //CE: CustomEvent, { /// Create a new runner, the executed child doing the actual fuzzing. - pub fn new(tcp_mgr: TcpEventManager, staterestorer: StateRestorer) -> Self { + pub fn new(tcp_mgr: TcpEventManager, staterestorer: StateRestorer) -> Self { Self { tcp_mgr, staterestorer, @@ -936,7 +1032,7 @@ where /// Create a new runner specifying if it must save the serialized state on restart. pub fn with_save_state( - tcp_mgr: TcpEventManager, + tcp_mgr: TcpEventManager, staterestorer: StateRestorer, save_state: bool, ) -> Self { @@ -982,7 +1078,13 @@ pub fn setup_restarting_mgr_tcp( monitor: MT, broker_port: u16, configuration: EventConfig, -) -> Result<(Option, TcpRestartingEventManager), Error> +) -> Result< + ( + Option, + TcpRestartingEventManager<(), S, StdShMemProvider>, + ), + Error, +> where MT: Monitor + Clone, S: State + HasExecutions + HasMetadata, @@ -992,6 +1094,7 @@ where .monitor(Some(monitor)) .broker_port(broker_port) .configuration(configuration) + .hooks(tuple_list!()) .build() .launch() } @@ -1002,7 +1105,7 @@ where #[cfg(feature = "std")] #[allow(clippy::default_trait_access, clippy::ignored_unit_patterns)] #[derive(TypedBuilder, Debug)] -pub struct TcpRestartingMgr +pub struct TcpRestartingMgr where S: UsesInput + DeserializeOwned, SP: ShMemProvider + 'static, @@ -1037,14 +1140,17 @@ where /// Tell the manager to serialize or not the state on restart #[builder(default = true)] serialize_state: bool, + /// The hooks for `handle_in_client` + hooks: EMH, #[builder(setter(skip), default = PhantomData)] phantom_data: PhantomData, } #[cfg(feature = "std")] #[allow(clippy::type_complexity, clippy::too_many_lines)] -impl TcpRestartingMgr +impl TcpRestartingMgr where + EMH: EventManagerHooksTuple + Copy + Clone, SP: ShMemProvider, S: State + HasExecutions + HasMetadata, MT: Monitor + Clone, @@ -1063,7 +1169,7 @@ where } /// Launch the restarting manager - pub fn launch(&mut self) -> Result<(Option, TcpRestartingEventManager), Error> { + pub fn launch(&mut self) -> Result<(Option, TcpRestartingEventManager), Error> { // We start ourself as child process to actually fuzz let (staterestorer, _new_shmem_provider, core_id) = if env::var(_ENV_FUZZER_SENDER).is_err() { @@ -1097,9 +1203,10 @@ where } Err(Error::File(_, _)) => { // port was likely already bound - let mgr = TcpEventManager::::new( + let mgr = TcpEventManager::::with_hooks( &("127.0.0.1", self.broker_port), self.configuration, + self.hooks, )?; (mgr, None) } @@ -1119,7 +1226,11 @@ where } TcpManagerKind::Client { cpu_core } => { // We are a client - let mgr = TcpEventManager::::on_port(self.broker_port, self.configuration)?; + let mgr = TcpEventManager::::on_port_with_hooks( + self.broker_port, + self.configuration, + self.hooks, + )?; (mgr, cpu_core) } @@ -1232,10 +1343,11 @@ where ( state_opt, TcpRestartingEventManager::with_save_state( - TcpEventManager::existing_on_port( + TcpEventManager::existing_on_port_with_hooks( self.broker_port, this_id, self.configuration, + self.hooks, )?, staterestorer, self.serialize_state, @@ -1244,10 +1356,11 @@ where } else { log::info!("First run. Let's set it all up"); // Mgr to send and receive msgs from/to all other fuzzer instances - let mgr = TcpEventManager::::existing_from_env( + let mgr = TcpEventManager::::existing_from_env_with_hooks( &("127.0.0.1", self.broker_port), _ENV_FUZZER_BROKER_CLIENT_INITIAL, self.configuration, + self.hooks, )?; ( diff --git a/libafl/src/lib.rs b/libafl/src/lib.rs index 28b5c2dc5e..c70469c5b4 100644 --- a/libafl/src/lib.rs +++ b/libafl/src/lib.rs @@ -28,7 +28,8 @@ Welcome to `LibAFL` clippy::module_name_repetitions, clippy::ptr_cast_constness, clippy::unsafe_derive_deserialize, - clippy::similar_names + clippy::similar_names, + clippy::too_many_lines )] #![cfg_attr(not(test), warn( missing_debug_implementations, diff --git a/libafl_sugar/src/forkserver.rs b/libafl_sugar/src/forkserver.rs index 5fbe1fcbf6..c7fa34c451 100644 --- a/libafl_sugar/src/forkserver.rs +++ b/libafl_sugar/src/forkserver.rs @@ -115,7 +115,7 @@ impl<'a> ForkserverBytesCoverageSugar<'a> { let monitor = MultiMonitor::new(|s| log::info!("{s}")); let mut run_client = |state: Option<_>, - mut mgr: LlmpRestartingEventManager<_, _>, + mut mgr: LlmpRestartingEventManager<_, _, _>, _core_id| { // Coverage map shared between target and fuzzer let mut shmem = shmem_provider_client.new_shmem(MAP_SIZE).unwrap(); diff --git a/libafl_sugar/src/inmemory.rs b/libafl_sugar/src/inmemory.rs index b70d241121..fc00a0e84a 100644 --- a/libafl_sugar/src/inmemory.rs +++ b/libafl_sugar/src/inmemory.rs @@ -139,7 +139,7 @@ where let monitor = MultiMonitor::new(|s| println!("{s}")); let mut run_client = |state: Option<_>, - mut mgr: LlmpRestartingEventManager<_, _>, + mut mgr: LlmpRestartingEventManager<_, _, _>, _core_id| { // Create an observation channel using the coverage map let edges_observer = diff --git a/libafl_sugar/src/qemu.rs b/libafl_sugar/src/qemu.rs index be47e2d154..f3330d7522 100644 --- a/libafl_sugar/src/qemu.rs +++ b/libafl_sugar/src/qemu.rs @@ -146,7 +146,7 @@ where let monitor = MultiMonitor::new(|s| log::info!("{s}")); let mut run_client = |state: Option<_>, - mut mgr: LlmpRestartingEventManager<_, _>, + mut mgr: LlmpRestartingEventManager<_, _, _>, _core_id| { // Create an observation channel using the coverage map let edges_observer = unsafe {