diff --git a/fuzzers/frida_libpng/src/fuzzer.rs b/fuzzers/frida_libpng/src/fuzzer.rs index 05e9afe9c1..d7fea2b388 100644 --- a/fuzzers/frida_libpng/src/fuzzer.rs +++ b/fuzzers/frida_libpng/src/fuzzer.rs @@ -289,16 +289,13 @@ unsafe fn fuzz( stdout_file: Option<&str>, broker_addr: Option, ) -> Result<(), Error> { - let stats_closure = |s| println!("{}", s); // 'While the stats are state, they are usually used in the broker - which is likely never restarted - let stats = MultiStats::new(stats_closure); + let stats = MultiStats::new(|s| println!("{}", s)); #[cfg(target_os = "android")] AshmemService::start().expect("Failed to start Ashmem service"); let shmem_provider = StdShMemProvider::new()?; - let mut client_init_stats = || Ok(MultiStats::new(stats_closure)); - let mut run_client = |state: Option>, mut mgr| { // The restarting state will spawn the same process again as child, then restarted it each time it crashes. @@ -427,7 +424,6 @@ unsafe fn fuzz( Launcher::builder() .shmem_provider(shmem_provider) .stats(stats) - .client_init_stats(&mut client_init_stats) .run_client(&mut run_client) .cores(cores) .broker_port(broker_port) diff --git a/fuzzers/libfuzzer_libpng_launcher/src/lib.rs b/fuzzers/libfuzzer_libpng_launcher/src/lib.rs index b59305a6fc..996ec170bb 100644 --- a/fuzzers/libfuzzer_libpng_launcher/src/lib.rs +++ b/fuzzers/libfuzzer_libpng_launcher/src/lib.rs @@ -58,9 +58,7 @@ pub fn main() { AshmemService::start().expect("Failed to start Ashmem service"); let shmem_provider = StdShMemProvider::new().expect("Failed to init shared memory"); - let stats_closure = |s| println!("{}", s); - let stats = MultiStats::new(stats_closure); - let mut client_init_stats = || Ok(MultiStats::new(stats_closure)); + let stats = MultiStats::new(|s| println!("{}", s)); let mut run_client = |state: Option>, mut restarting_mgr| { let corpus_dirs = &[PathBuf::from("./corpus")]; @@ -170,7 +168,6 @@ pub fn main() { Launcher::builder() .shmem_provider(shmem_provider) .stats(stats) - .client_init_stats(&mut client_init_stats) .run_client(&mut run_client) .cores(&cores) .broker_port(broker_port) diff --git a/libafl/src/bolts/launcher.rs b/libafl/src/bolts/launcher.rs index 5479f5b8ed..4ffe92933b 100644 --- a/libafl/src/bolts/launcher.rs +++ b/libafl/src/bolts/launcher.rs @@ -33,8 +33,8 @@ use typed_builder::TypedBuilder; /// The Launcher client callback type reference #[cfg(feature = "std")] -pub type LauncherClientFnRef<'a, I, OT, S, SP, ST> = - &'a mut dyn FnMut(Option, LlmpRestartingEventManager) -> Result<(), Error>; +pub type LauncherClientFnRef<'a, I, OT, S, SP> = + &'a mut dyn FnMut(Option, LlmpRestartingEventManager) -> Result<(), Error>; /// Provides a Launcher, which can be used to launch a fuzzing run on a specified list of cores #[cfg(feature = "std")] @@ -52,10 +52,8 @@ where shmem_provider: SP, /// The stats instance to use stats: ST, - /// A closure or function which generates stats instances for newly spawned clients - client_init_stats: &'a mut dyn FnMut() -> Result, /// The 'main' function to run for each client forked. This probably shouldn't return - run_client: LauncherClientFnRef<'a, I, OT, S, SP, ST>, + run_client: LauncherClientFnRef<'a, I, OT, S, SP>, /// The broker port to use #[builder(default = 1337_u16)] broker_port: u16, @@ -92,7 +90,7 @@ where .stdout_file .map(|filename| File::create(filename).unwrap()); - //spawn clients + // Spawn clients for (id, bind_to) in core_ids.iter().enumerate().take(num_cores) { if self.cores.iter().any(|&x| x == id) { self.shmem_provider.pre_fork()?; @@ -114,11 +112,9 @@ where dup2(file.as_ref().unwrap().as_raw_fd(), libc::STDOUT_FILENO)?; dup2(file.as_ref().unwrap().as_raw_fd(), libc::STDERR_FILENO)?; } - //fuzzer client. keeps retrying the connection to broker till the broker starts - let stats = (self.client_init_stats)()?; - let (state, mgr) = RestartingMgr::builder() + // Fuzzer client. keeps retrying the connection to broker till the broker starts + let (state, mgr) = RestartingMgr::::builder() .shmem_provider(self.shmem_provider.clone()) - .stats(stats) .broker_port(self.broker_port) .kind(ManagerKind::Client { cpu_core: Some(*bind_to), @@ -135,16 +131,17 @@ where #[cfg(feature = "std")] println!("I am broker!!."); + // TODO we don't want always a broker here, thing about using different laucher process to spawn different configurations RestartingMgr::::builder() .shmem_provider(self.shmem_provider.clone()) - .stats(self.stats.clone()) + .stats(Some(self.stats.clone())) .broker_port(self.broker_port) .kind(ManagerKind::Broker) .remote_broker_addr(self.remote_broker_addr) .build() .launch()?; - //broker exited. kill all clients. + // Broker exited. kill all clients. for handle in &handles { unsafe { libc::kill(*handle, libc::SIGINT); @@ -165,10 +162,8 @@ where //todo: silence stdout and stderr for clients // the actual client. do the fuzzing - let stats = (self.client_init_stats)()?; let (state, mgr) = RestartingMgr::::builder() .shmem_provider(self.shmem_provider.clone()) - .stats(stats) .broker_port(self.broker_port) .kind(ManagerKind::Client { cpu_core: Some(CoreId { @@ -225,7 +220,7 @@ where RestartingMgr::::builder() .shmem_provider(self.shmem_provider.clone()) - .stats(self.stats.clone()) + .stats(Some(self.stats.clone())) .broker_port(self.broker_port) .kind(ManagerKind::Broker) .remote_broker_addr(self.remote_broker_addr) diff --git a/libafl/src/bolts/llmp.rs b/libafl/src/bolts/llmp.rs index 404dff511c..5a5f1ce0ed 100644 --- a/libafl/src/bolts/llmp.rs +++ b/libafl/src/bolts/llmp.rs @@ -604,14 +604,9 @@ where /// Creates a new broker on the given port #[cfg(feature = "std")] pub fn broker_on_port(shmem_provider: SP, port: u16) -> Result { - match tcp_bind(port) { - Ok(listener) => { - let mut broker = LlmpBroker::new(shmem_provider)?; - let _listener_thread = broker.launch_listener(Listener::Tcp(listener))?; - Ok(LlmpConnection::IsBroker { broker }) - } - Err(e) => Err(e), - } + Ok(LlmpConnection::IsBroker { + broker: LlmpBroker::create_attach_to_tcp(shmem_provider, port)?, + }) } /// Creates a new client on the given port @@ -1641,6 +1636,19 @@ where }) } + /// Create a new [`LlmpBroker`] sttaching to a TCP port + #[cfg(feature = "std")] + pub fn create_attach_to_tcp(shmem_provider: SP, port: u16) -> Result { + match tcp_bind(port) { + Ok(listener) => { + let mut broker = LlmpBroker::new(shmem_provider)?; + let _listener_thread = broker.launch_listener(Listener::Tcp(listener))?; + Ok(broker) + } + Err(e) => Err(e), + } + } + /// Allocate the next message on the outgoing map unsafe fn alloc_next(&mut self, buf_len: usize) -> Result<*mut LlmpMsg, Error> { self.llmp_out.alloc_next(buf_len) @@ -2302,7 +2310,7 @@ where } /// Describe this client in a way that it can be recreated, for example after crash - fn describe(&self) -> Result { + pub fn describe(&self) -> Result { Ok(LlmpClientDescription { sender: self.sender.describe()?, receiver: self.receiver.describe()?, @@ -2310,7 +2318,7 @@ where } /// Create an existing client from description - fn existing_client_from_description( + pub fn existing_client_from_description( shmem_provider: SP, description: &LlmpClientDescription, ) -> Result { diff --git a/libafl/src/events/llmp.rs b/libafl/src/events/llmp.rs index 711f21e9f9..35dc577669 100644 --- a/libafl/src/events/llmp.rs +++ b/libafl/src/events/llmp.rs @@ -60,174 +60,95 @@ const LLMP_TAG_EVENT_TO_BOTH: llmp::Tag = 0x2B0741; const _LLMP_TAG_RESTART: llmp::Tag = 0x8357A87; const _LLMP_TAG_NO_RESTART: llmp::Tag = 0x57A7EE71; -/// An [`EventManager`] that forwards all events to other attached fuzzers on shared maps or via tcp, -/// using low-level message passing, [`crate::bolts::llmp`]. -#[derive(Debug)] -pub struct LlmpEventManager -where - I: Input, - OT: ObserversTuple, - SP: ShMemProvider + 'static, - ST: Stats, - //CE: CustomEvent, -{ - stats: Option, - llmp: llmp::LlmpConnection, - #[cfg(feature = "llmp_compression")] - compressor: GzipCompressor, - - phantom: PhantomData<(I, OT, S)>, -} - /// The minimum buffer size at which to compress LLMP IPC messages. #[cfg(feature = "llmp_compression")] const COMPRESS_THRESHOLD: usize = 1024; -impl Drop for LlmpEventManager +#[derive(Debug)] +pub struct LlmpEventBroker where I: Input, - OT: ObserversTuple, SP: ShMemProvider + 'static, ST: Stats, + //CE: CustomEvent, { - /// LLMP clients will have to wait until their pages are mapped by somebody. - fn drop(&mut self) { - self.await_restart_safe() - } + stats: ST, + llmp: llmp::LlmpBroker, + #[cfg(feature = "llmp_compression")] + compressor: GzipCompressor, + phantom: PhantomData, } -impl LlmpEventManager +impl LlmpEventBroker where I: Input, - OT: ObserversTuple, SP: ShMemProvider + 'static, ST: Stats, { + /// Create an even broker from a raw broker. + pub fn new(llmp: llmp::LlmpBroker, stats: ST) -> Result { + Ok(Self { + stats, + llmp, + #[cfg(feature = "llmp_compression")] + compressor: GzipCompressor::new(COMPRESS_THRESHOLD), + phantom: PhantomData, + }) + } + /// Create llmp on a port - /// If the port is not yet bound, it will act as broker - /// Else, it will act as client. + /// The port must not be bound yet to have a broker. #[cfg(feature = "std")] pub fn new_on_port(shmem_provider: SP, stats: ST, port: u16) -> Result { Ok(Self { - stats: Some(stats), - llmp: llmp::LlmpConnection::on_port(shmem_provider, port)?, + stats, + llmp: llmp::LlmpBroker::create_attach_to_tcp(shmem_provider, port)?, #[cfg(feature = "llmp_compression")] compressor: GzipCompressor::new(COMPRESS_THRESHOLD), phantom: PhantomData, }) } - /// If a client respawns, it may reuse the existing connection, previously stored by [`LlmpClient::to_env()`]. - #[cfg(feature = "std")] - pub fn existing_client_from_env(shmem_provider: SP, env_name: &str) -> Result { - Ok(Self { - stats: None, - llmp: llmp::LlmpConnection::IsClient { - client: LlmpClient::on_existing_from_env(shmem_provider, env_name)?, - }, - #[cfg(feature = "llmp_compression")] - compressor: GzipCompressor::new(COMPRESS_THRESHOLD), - // Inserting a nop-stats element here so rust won't complain. - // In any case, the client won't currently use it. - phantom: PhantomData, - }) - } - - /// Describe the client event mgr's llmp parts in a restorable fashion - pub fn describe(&self) -> Result { - self.llmp.describe() - } - - /// Create an existing client from description - pub fn existing_client_from_description( - shmem_provider: SP, - description: &LlmpClientDescription, - ) -> Result { - Ok(Self { - stats: None, - llmp: llmp::LlmpConnection::existing_client_from_description( - shmem_provider, - description, - )?, - #[cfg(feature = "llmp_compression")] - compressor: GzipCompressor::new(COMPRESS_THRESHOLD), - // Inserting a nop-stats element here so rust won't complain. - // In any case, the client won't currently use it. - phantom: PhantomData, - }) - } - - /// Write the config for a client [`EventManager`] to env vars, a new client can reattach using [`LlmpEventManager::existing_client_from_env()`]. - #[cfg(feature = "std")] - pub fn to_env(&self, env_name: &str) { - match &self.llmp { - llmp::LlmpConnection::IsBroker { broker: _ } => { - todo!("There is probably no use storing the broker to env. Client only for now") - } - llmp::LlmpConnection::IsClient { client } => client.to_env(env_name).unwrap(), - } - } - - /// Returns if we are the broker - pub fn is_broker(&self) -> bool { - matches!(self.llmp, llmp::LlmpConnection::IsBroker { broker: _ }) - } - #[cfg(feature = "std")] pub fn connect_b2b(&mut self, addr: A) -> Result<(), Error> where A: ToSocketAddrs, { - match &mut self.llmp { - llmp::LlmpConnection::IsBroker { broker } => broker.connect_b2b(addr), - llmp::LlmpConnection::IsClient { client: _ } => Err(Error::IllegalState( - "Called broker loop in the client".into(), - )), - } + self.llmp.connect_b2b(addr) } /// Run forever in the broker pub fn broker_loop(&mut self) -> Result<(), Error> { - match &mut self.llmp { - llmp::LlmpConnection::IsBroker { broker } => { - let stats = self.stats.as_mut().unwrap(); - #[cfg(feature = "llmp_compression")] - let compressor = &self.compressor; - broker.loop_forever( - &mut |sender_id: u32, tag: Tag, _flags: Flags, msg: &[u8]| { - if tag == LLMP_TAG_EVENT_TO_BOTH { - #[cfg(not(feature = "llmp_compression"))] - let event_bytes = msg; - #[cfg(feature = "llmp_compression")] - let compressed; - #[cfg(feature = "llmp_compression")] - let event_bytes = - if _flags & LLMP_FLAG_COMPRESSED == LLMP_FLAG_COMPRESSED { - compressed = compressor.decompress(msg)?; - &compressed - } else { - msg - }; - let event: Event = postcard::from_bytes(event_bytes)?; - match Self::handle_in_broker(stats, sender_id, &event)? { - BrokerEventResult::Forward => { - Ok(llmp::LlmpMsgHookResult::ForwardToClients) - } - BrokerEventResult::Handled => Ok(llmp::LlmpMsgHookResult::Handled), - } - } else { - Ok(llmp::LlmpMsgHookResult::ForwardToClients) - } - }, - Some(Duration::from_millis(5)), - ); + let stats = &mut self.stats; + #[cfg(feature = "llmp_compression")] + let compressor = &self.compressor; + self.llmp.loop_forever( + &mut |sender_id: u32, tag: Tag, _flags: Flags, msg: &[u8]| { + if tag == LLMP_TAG_EVENT_TO_BOTH { + #[cfg(not(feature = "llmp_compression"))] + let event_bytes = msg; + #[cfg(feature = "llmp_compression")] + let compressed; + #[cfg(feature = "llmp_compression")] + let event_bytes = if _flags & LLMP_FLAG_COMPRESSED == LLMP_FLAG_COMPRESSED { + compressed = compressor.decompress(msg)?; + &compressed + } else { + msg + }; + let event: Event = postcard::from_bytes(event_bytes)?; + match Self::handle_in_broker(stats, sender_id, &event)? { + BrokerEventResult::Forward => Ok(llmp::LlmpMsgHookResult::ForwardToClients), + BrokerEventResult::Handled => Ok(llmp::LlmpMsgHookResult::Handled), + } + } else { + Ok(llmp::LlmpMsgHookResult::ForwardToClients) + } + }, + Some(Duration::from_millis(5)), + ); - Ok(()) - } - _ => Err(Error::IllegalState( - "Called broker loop in the client".into(), - )), - } + Ok(()) } /// Handle arriving events in the broker @@ -316,6 +237,93 @@ where } //_ => Ok(BrokerEventResult::Forward), } } +} + +/// An [`EventManager`] that forwards all events to other attached fuzzers on shared maps or via tcp, +/// using low-level message passing, [`crate::bolts::llmp`]. +#[derive(Debug)] +pub struct LlmpEventManager +where + I: Input, + OT: ObserversTuple, + SP: ShMemProvider + 'static, + //CE: CustomEvent, +{ + llmp: llmp::LlmpClient, + #[cfg(feature = "llmp_compression")] + compressor: GzipCompressor, + phantom: PhantomData<(I, OT, S)>, +} + +impl Drop for LlmpEventManager +where + I: Input, + OT: ObserversTuple, + SP: ShMemProvider + 'static, +{ + /// LLMP clients will have to wait until their pages are mapped by somebody. + fn drop(&mut self) { + self.await_restart_safe() + } +} + +impl LlmpEventManager +where + I: Input, + OT: ObserversTuple, + SP: ShMemProvider + 'static, +{ + /// Create llmp on a port + /// If the port is not yet bound, it will act as broker + /// Else, it will act as client. + #[cfg(feature = "std")] + pub fn new_on_port(shmem_provider: SP, port: u16) -> Result { + Ok(Self { + llmp: llmp::LlmpClient::create_attach_to_tcp(shmem_provider, port)?, + #[cfg(feature = "llmp_compression")] + compressor: GzipCompressor::new(COMPRESS_THRESHOLD), + phantom: PhantomData, + }) + } + + /// If a client respawns, it may reuse the existing connection, previously stored by [`LlmpClient::to_env()`]. + #[cfg(feature = "std")] + pub fn existing_client_from_env(shmem_provider: SP, env_name: &str) -> Result { + Ok(Self { + llmp: LlmpClient::on_existing_from_env(shmem_provider, env_name)?, + #[cfg(feature = "llmp_compression")] + compressor: GzipCompressor::new(COMPRESS_THRESHOLD), + // Inserting a nop-stats element here so rust won't complain. + // In any case, the client won't currently use it. + phantom: PhantomData, + }) + } + + /// Describe the client event mgr's llmp parts in a restorable fashion + pub fn describe(&self) -> Result { + self.llmp.describe() + } + + /// Create an existing client from description + pub fn existing_client_from_description( + shmem_provider: SP, + description: &LlmpClientDescription, + ) -> Result { + Ok(Self { + llmp: llmp::LlmpClient::existing_client_from_description(shmem_provider, description)?, + #[cfg(feature = "llmp_compression")] + compressor: GzipCompressor::new(COMPRESS_THRESHOLD), + // Inserting a nop-stats element here so rust won't complain. + // In any case, the client won't currently use it. + phantom: PhantomData, + }) + } + + /// Write the config for a client [`EventManager`] to env vars, a new client can reattach using [`LlmpEventManager::existing_client_from_env()`]. + #[cfg(feature = "std")] + pub fn to_env(&self, env_name: &str) { + self.llmp.to_env(env_name).unwrap() + } // Handle arriving events in the client #[allow(clippy::unused_self)] @@ -367,12 +375,11 @@ where } } -impl EventFirer for LlmpEventManager +impl EventFirer for LlmpEventManager where I: Input, OT: ObserversTuple, SP: ShMemProvider, - ST: Stats, //CE: CustomEvent, { #[cfg(feature = "llmp_compression")] @@ -384,8 +391,8 @@ where Some(comp_buf) => { self.llmp.send_buf_with_flags( LLMP_TAG_EVENT_TO_BOTH, - &comp_buf, flags | LLMP_FLAG_COMPRESSED, + &comp_buf, )?; } None => { @@ -403,28 +410,24 @@ where } } -impl EventRestarter for LlmpEventManager +impl EventRestarter for LlmpEventManager where I: Input, OT: ObserversTuple, SP: ShMemProvider, - ST: Stats, //CE: CustomEvent, { /// The llmp client needs to wait until a broker mapped all pages, before shutting down. /// Otherwise, the OS may already have removed the shared maps, fn await_restart_safe(&mut self) { - if let llmp::LlmpConnection::IsClient { client } = &self.llmp { - // wait until we can drop the message safely. - client.await_save_to_unmap_blocking(); - } + // wait until we can drop the message safely. + self.llmp.await_save_to_unmap_blocking(); } } -impl EventProcessor for LlmpEventManager +impl EventProcessor for LlmpEventManager where SP: ShMemProvider, - ST: Stats, E: Executor, I: Input, OT: ObserversTuple, @@ -433,32 +436,24 @@ where fn process(&mut self, fuzzer: &mut Z, state: &mut S, executor: &mut E) -> Result { // TODO: Get around local event copy by moving handle_in_client let mut events = vec![]; - match &mut self.llmp { - llmp::LlmpConnection::IsClient { client } => { - while let Some((sender_id, tag, _flags, msg)) = client.recv_buf_with_flags()? { - if tag == _LLMP_TAG_EVENT_TO_BROKER { - panic!("EVENT_TO_BROKER parcel should not have arrived in the client!"); - } - #[cfg(not(feature = "llmp_compression"))] - let event_bytes = msg; - #[cfg(feature = "llmp_compression")] - let compressed; - #[cfg(feature = "llmp_compression")] - let event_bytes = if _flags & LLMP_FLAG_COMPRESSED == LLMP_FLAG_COMPRESSED { - compressed = self.compressor.decompress(msg)?; - &compressed - } else { - msg - }; - let event: Event = postcard::from_bytes(event_bytes)?; - events.push((sender_id, event)); - } + while let Some((sender_id, tag, _flags, msg)) = self.llmp.recv_buf_with_flags()? { + if tag == _LLMP_TAG_EVENT_TO_BROKER { + panic!("EVENT_TO_BROKER parcel should not have arrived in the client!"); } - _ => { - #[cfg(feature = "std")] - dbg!("Skipping process in broker"); - } - }; + #[cfg(not(feature = "llmp_compression"))] + let event_bytes = msg; + #[cfg(feature = "llmp_compression")] + let compressed; + #[cfg(feature = "llmp_compression")] + let event_bytes = if _flags & LLMP_FLAG_COMPRESSED == LLMP_FLAG_COMPRESSED { + compressed = self.compressor.decompress(msg)?; + &compressed + } else { + msg + }; + let event: Event = postcard::from_bytes(event_bytes)?; + events.push((sender_id, event)); + } let count = events.len(); events.drain(..).try_for_each(|(sender_id, event)| { self.handle_in_client(fuzzer, executor, state, sender_id, event) @@ -467,10 +462,9 @@ where } } -impl EventManager for LlmpEventManager +impl EventManager for LlmpEventManager where SP: ShMemProvider, - ST: Stats, E: Executor, I: Input, OT: ObserversTuple, @@ -481,32 +475,30 @@ where /// Serialize the current state and corpus during an executiont to bytes. /// On top, add the current llmp event manager instance to be restored /// This method is needed when the fuzzer run crashes and has to restart. -pub fn serialize_state_mgr( +pub fn serialize_state_mgr( state: &S, - mgr: &LlmpEventManager, + mgr: &LlmpEventManager, ) -> Result, Error> where I: Input, OT: ObserversTuple, S: Serialize, SP: ShMemProvider, - ST: Stats, { Ok(postcard::to_allocvec(&(&state, &mgr.describe()?))?) } /// Deserialize the state and corpus tuple, previously serialized with `serialize_state_corpus(...)` #[allow(clippy::type_complexity)] -pub fn deserialize_state_mgr( +pub fn deserialize_state_mgr( shmem_provider: SP, state_corpus_serialized: &[u8], -) -> Result<(S, LlmpEventManager), Error> +) -> Result<(S, LlmpEventManager), Error> where I: Input, OT: ObserversTuple, S: DeserializeOwned, SP: ShMemProvider, - ST: Stats, { let tuple: (S, _) = postcard::from_bytes(&state_corpus_serialized)?; Ok(( @@ -517,27 +509,25 @@ where /// A manager that can restart on the fly, storing states in-between (in `on_resatrt`) #[derive(Debug)] -pub struct LlmpRestartingEventManager +pub struct LlmpRestartingEventManager where I: Input, OT: ObserversTuple, SP: ShMemProvider + 'static, - ST: Stats, //CE: CustomEvent, { /// The embedded llmp event manager - llmp_mgr: LlmpEventManager, + llmp_mgr: LlmpEventManager, /// The sender to serialize the state for the next runner sender: LlmpSender, } -impl EventFirer for LlmpRestartingEventManager +impl EventFirer for LlmpRestartingEventManager where I: Input, OT: ObserversTuple, S: Serialize, SP: ShMemProvider, - ST: Stats, //CE: CustomEvent, { fn fire(&mut self, state: &mut S, event: Event) -> Result<(), Error> { @@ -546,13 +536,12 @@ where } } -impl EventRestarter for LlmpRestartingEventManager +impl EventRestarter for LlmpRestartingEventManager where I: Input, OT: ObserversTuple, S: Serialize, SP: ShMemProvider, - ST: Stats, //CE: CustomEvent, { /// The llmp client needs to wait until a broker mapped all pages, before shutting down. @@ -572,15 +561,13 @@ where } } -impl EventProcessor - for LlmpRestartingEventManager +impl EventProcessor for LlmpRestartingEventManager where - E: Executor, I, S, Z>, + E: Executor, I, S, Z>, I: Input, Z: IfInteresting + IsInteresting, OT: ObserversTuple, SP: ShMemProvider + 'static, - ST: Stats, //CE: CustomEvent, { fn process(&mut self, fuzzer: &mut Z, state: &mut S, executor: &mut E) -> Result { @@ -588,16 +575,14 @@ where } } -impl EventManager - for LlmpRestartingEventManager +impl EventManager for LlmpRestartingEventManager where - E: Executor, I, S, Z>, + E: Executor, I, S, Z>, I: Input, S: Serialize, Z: IfInteresting + IsInteresting, OT: ObserversTuple, SP: ShMemProvider + 'static, - ST: Stats, //CE: CustomEvent, { } @@ -608,16 +593,15 @@ const _ENV_FUZZER_RECEIVER: &str = &"_AFL_ENV_FUZZER_RECEIVER"; /// The llmp (2 way) connection from a fuzzer to the broker (broadcasting all other fuzzer messages) const _ENV_FUZZER_BROKER_CLIENT_INITIAL: &str = &"_AFL_ENV_FUZZER_BROKER_CLIENT"; -impl LlmpRestartingEventManager +impl LlmpRestartingEventManager where I: Input, OT: ObserversTuple, SP: ShMemProvider + 'static, - ST: Stats, //CE: CustomEvent, { /// Create a new runner, the executed child doing the actual fuzzing. - pub fn new(llmp_mgr: LlmpEventManager, sender: LlmpSender) -> Self { + pub fn new(llmp_mgr: LlmpEventManager, sender: LlmpSender) -> Self { Self { llmp_mgr, sender } } @@ -654,7 +638,7 @@ pub fn setup_restarting_mgr_std( ) -> Result< ( Option, - LlmpRestartingEventManager, + LlmpRestartingEventManager, ), Error, > @@ -670,7 +654,7 @@ where RestartingMgr::builder() .shmem_provider(StdShMemProvider::new()?) - .stats(stats) + .stats(Some(stats)) .broker_port(broker_port) .build() .launch() @@ -695,7 +679,8 @@ where /// manager. shmem_provider: SP, /// The stats to use - stats: ST, + #[builder(default = None)] + stats: Option, /// The broker port to use #[builder(default = 1337_u16)] broker_port: u16, @@ -723,13 +708,7 @@ where /// Launch the restarting manager pub fn launch( &mut self, - ) -> Result<(Option, LlmpRestartingEventManager), Error> { - let mut mgr = LlmpEventManager::::new_on_port( - self.shmem_provider.clone(), - self.stats.clone(), - self.broker_port, - )?; - + ) -> Result<(Option, LlmpRestartingEventManager), Error> { // We start ourself as child process to actually fuzz let (sender, mut receiver, new_shmem_provider, core_id) = if std::env::var( _ENV_FUZZER_SENDER, @@ -737,40 +716,36 @@ where .is_err() { // We get here if we are on Unix, or we are a broker on Windows. - let core_id = if mgr.is_broker() { - match self.kind { - ManagerKind::Broker | ManagerKind::Any => { - // Yep, broker. Just loop here. - println!( - "Doing broker things. Run this tool again to start fuzzing in a client." - ); + let core_id = match self.kind { + ManagerKind::Broker | ManagerKind::Any => { + let mut broker = LlmpEventBroker::::new_on_port( + self.shmem_provider.clone(), + self.stats.take().unwrap(), + self.broker_port, + )?; - if let Some(remote_broker_addr) = self.remote_broker_addr { - println!("B2b: Connecting to {:?}", &remote_broker_addr); - mgr.connect_b2b(remote_broker_addr)?; - }; + // Yep, broker. Just loop here. + println!( + "Doing broker things. Run this tool again to start fuzzing in a client." + ); - mgr.broker_loop()?; - return Err(Error::ShuttingDown); - } - ManagerKind::Client { cpu_core: _ } => { - return Err(Error::IllegalState( - "Tried to start a client, but got a broker".to_string(), - )); - } - } - } else { - match self.kind { - ManagerKind::Broker => { - return Err(Error::IllegalState( - "Tried to start a broker, but got a client".to_string(), - )); - } - ManagerKind::Client { cpu_core } => cpu_core, - ManagerKind::Any => None, + if let Some(remote_broker_addr) = self.remote_broker_addr { + println!("B2b: Connecting to {:?}", &remote_broker_addr); + broker.connect_b2b(remote_broker_addr)?; + }; + + broker.broker_loop()?; + return Err(Error::ShuttingDown); } + ManagerKind::Client { cpu_core } => cpu_core, }; + // We are a client + let mgr = LlmpEventManager::::new_on_port( + self.shmem_provider.clone(), + self.broker_port, + )?; + if let Some(core_id) = core_id { println!("Setting core affinity to {:?}", core_id); core_affinity::set_for_current(core_id); @@ -858,17 +833,17 @@ where None => { println!("First run. Let's set it all up"); // Mgr to send and receive msgs from/to all other fuzzer instances - let client_mgr = LlmpEventManager::::existing_client_from_env( + let mgr = LlmpEventManager::::existing_client_from_env( new_shmem_provider, _ENV_FUZZER_BROKER_CLIENT_INITIAL, )?; - (None, LlmpRestartingEventManager::new(client_mgr, sender)) + (None, LlmpRestartingEventManager::new(mgr, sender)) } // Restoring from a previous run, deserialize state and corpus. Some((_sender, _tag, msg)) => { println!("Subsequent run. Let's load all data from shmem (received {} bytes from previous instance)", msg.len()); - let (state, mgr): (S, LlmpEventManager) = + let (state, mgr): (S, LlmpEventManager) = deserialize_state_mgr(new_shmem_provider, &msg)?; (Some(state), LlmpRestartingEventManager::new(mgr, sender))