diff --git a/fuzzers/libfuzzer_libpng_centralized/src/lib.rs b/fuzzers/libfuzzer_libpng_centralized/src/lib.rs index 0338b7ad30..a1759c7cd7 100644 --- a/fuzzers/libfuzzer_libpng_centralized/src/lib.rs +++ b/fuzzers/libfuzzer_libpng_centralized/src/lib.rs @@ -12,7 +12,7 @@ use std::{env, net::SocketAddr, path::PathBuf}; use clap::{self, Parser}; use libafl::{ corpus::{Corpus, InMemoryCorpus, OnDiskCorpus}, - events::{launcher::Launcher, CentralizedEventManager, EventConfig}, + events::{launcher::CentralizedLauncher, EventConfig}, executors::{inprocess::InProcessExecutor, ExitKind, TimeoutExecutor}, feedback_or, feedback_or_fast, feedbacks::{CrashFeedback, MaxMapFeedback, TimeFeedback, TimeoutFeedback}, @@ -32,11 +32,10 @@ use libafl::{ use libafl_bolts::{ core_affinity::{CoreId, Cores}, current_nanos, - llmp::{LlmpReceiver, LlmpSender}, rands::StdRand, shmem::{ShMemProvider, StdShMemProvider}, tuples::{tuple_list, Merge}, - AsSlice, ClientId, + AsSlice, }; use libafl_targets::{libfuzzer_initialize, libfuzzer_test_one_input, std_edges_map_observer}; @@ -129,36 +128,9 @@ pub extern "C" fn libafl_main() { let shmem_provider = StdShMemProvider::new().expect("Failed to init shared memory"); - let mut senders = vec![]; - let mut receivers = vec![]; - let mut main_core_id = None; - let mut core_id_map = std::collections::HashMap::::default(); - for core_id in &cores.ids { - if main_core_id.is_none() { - main_core_id = Some(core_id.clone()); - continue; - } - let sender = - LlmpSender::new(shmem_provider.clone(), ClientId(core_id.0 as u32), false).unwrap(); - let receiver = LlmpReceiver::on_existing_shmem( - shmem_provider.clone(), - sender.out_shmems[0].shmem.clone(), - None, - ) - .unwrap(); - - core_id_map.insert(core_id.clone(), senders.len()); - senders.push(Some(sender)); - receivers.push(receiver); - } - - eprintln!("Main is {main_core_id:?}"); - - let mut receivers = Some(receivers); - let monitor = MultiMonitor::new(|s| println!("{s}")); - let mut run_client = |state: Option<_>, restarting_mgr, core_id: CoreId| { + let mut run_client = |state: Option<_>, mut mgr, _core_id: CoreId| { // Create an observation channel using the coverage map let edges_observer = HitcountsMapObserver::new(unsafe { std_edges_map_observer("edges") }); @@ -196,15 +168,6 @@ pub extern "C" fn libafl_main() { .unwrap() }); - let mut mgr = if main_core_id.unwrap() == core_id { - CentralizedEventManager::new_main(restarting_mgr, receivers.take().unwrap()) - } else { - let idx = *core_id_map.get(&core_id).unwrap(); - CentralizedEventManager::new_secondary(restarting_mgr, senders[idx].take().unwrap()) - }; - - // let mut mgr = restarting_mgr; - println!("We're a client, let's fuzz :)"); // Create a PNG dictionary if not existing @@ -272,7 +235,7 @@ pub extern "C" fn libafl_main() { Ok(()) }; - match Launcher::builder() + match CentralizedLauncher::builder() .shmem_provider(shmem_provider) .configuration(EventConfig::from_name("default")) .monitor(monitor) diff --git a/libafl/src/events/centralized.rs b/libafl/src/events/centralized.rs index 0e9249f704..40fb0b84d4 100644 --- a/libafl/src/events/centralized.rs +++ b/libafl/src/events/centralized.rs @@ -1,27 +1,33 @@ //! A wrapper manager to implement a main-secondary architecture with point-to-point channels use alloc::{boxed::Box, string::String, vec::Vec}; -#[cfg(feature = "adaptive_serialization")] -use core::time::Duration; +use core::{marker::PhantomData, num::NonZeroUsize, time::Duration}; #[cfg(feature = "adaptive_serialization")] use libafl_bolts::current_time; +#[cfg(feature = "llmp_compression")] use libafl_bolts::{ - llmp::{LlmpReceiver, LlmpSender, Tag}, + compress::GzipCompressor, + llmp::{LLMP_FLAG_COMPRESSED, LLMP_FLAG_INITIALIZED}, +}; +use libafl_bolts::{ + llmp::{self, LlmpBroker, LlmpClient, LlmpClientDescription, Tag}, shmem::ShMemProvider, ClientId, }; use serde::{Deserialize, Serialize}; use super::{CustomBufEventResult, HasCustomBufHandlers, ProgressReporter}; +#[cfg(feature = "llmp_compression")] +use crate::events::llmp::COMPRESS_THRESHOLD; use crate::{ events::{ - llmp::EventStatsCollector, Event, EventConfig, EventFirer, EventManager, EventManagerId, - EventProcessor, EventRestarter, HasEventManagerId, LogSeverity, + llmp::EventStatsCollector, BrokerEventResult, Event, EventConfig, EventFirer, EventManager, + EventManagerId, EventProcessor, EventRestarter, HasEventManagerId, LogSeverity, }, executors::{Executor, HasObservers}, fuzzer::{EvaluatorObservers, ExecutionProcessor}, - inputs::UsesInput, + inputs::{Input, UsesInput}, observers::ObserversTuple, state::{HasClientPerfMonitor, HasExecutions, HasLastReportTime, HasMetadata, UsesState}, Error, @@ -29,22 +35,192 @@ use crate::{ const _LLMP_TAG_TO_MAIN: Tag = Tag(0x3453453); -/// A wrapper manager to implement a main-secondary architecture with point-to-point channels +/// An LLMP-backed event manager for scalable multi-processed fuzzing +pub struct CentralizedLlmpEventBroker +where + I: Input, + SP: ShMemProvider + 'static, + //CE: CustomEvent, +{ + llmp: LlmpBroker, + #[cfg(feature = "llmp_compression")] + compressor: GzipCompressor, + phantom: PhantomData, +} + +impl core::fmt::Debug for CentralizedLlmpEventBroker +where + SP: ShMemProvider + 'static, + I: Input, +{ + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + let mut debug_struct = f.debug_struct("CentralizedLlmpEventBroker"); + let debug = debug_struct.field("llmp", &self.llmp); + //.field("custom_buf_handlers", &self.custom_buf_handlers) + #[cfg(feature = "llmp_compression")] + let debug = debug.field("compressor", &self.compressor); + debug + .field("phantom", &self.phantom) + .finish_non_exhaustive() + } +} + +impl CentralizedLlmpEventBroker +where + I: Input, + SP: ShMemProvider + 'static, +{ + /// Create an event broker from a raw broker. + pub fn new(llmp: LlmpBroker) -> Result { + Ok(Self { + llmp, + #[cfg(feature = "llmp_compression")] + compressor: GzipCompressor::new(COMPRESS_THRESHOLD), + phantom: PhantomData, + }) + } + + /// Create an LLMP broker on a port. + /// + /// The port must not be bound yet to have a broker. + #[cfg(feature = "std")] + pub fn on_port(shmem_provider: SP, port: u16) -> Result { + Ok(Self { + // TODO switch to false after solving the bug + llmp: LlmpBroker::with_keep_pages_attach_to_tcp(shmem_provider, port, true)?, + #[cfg(feature = "llmp_compression")] + compressor: GzipCompressor::new(COMPRESS_THRESHOLD), + phantom: PhantomData, + }) + } + + /// Exit the broker process cleanly after at least `n` clients attached and all of them disconnected again + pub fn set_exit_cleanly_after(&mut self, n_clients: NonZeroUsize) { + self.llmp.set_exit_cleanly_after(n_clients); + } + + /// Run forever in the broker + #[cfg(not(feature = "llmp_broker_timeouts"))] + pub fn broker_loop(&mut self) -> Result<(), Error> { + #[cfg(feature = "llmp_compression")] + let compressor = &self.compressor; + self.llmp.loop_forever( + &mut |client_id, tag, _flags, msg| { + if tag == _LLMP_TAG_TO_MAIN { + #[cfg(not(feature = "llmp_compression"))] + let event_bytes = msg; + #[cfg(feature = "llmp_compression")] + let compressed; + #[cfg(feature = "llmp_compression")] + let event_bytes = if _flags & LLMP_FLAG_COMPRESSED == LLMP_FLAG_COMPRESSED { + compressed = compressor.decompress(msg)?; + &compressed + } else { + msg + }; + let event: Event = postcard::from_bytes(event_bytes)?; + match Self::handle_in_broker(client_id, &event)? { + BrokerEventResult::Forward => Ok(llmp::LlmpMsgHookResult::ForwardToClients), + BrokerEventResult::Handled => Ok(llmp::LlmpMsgHookResult::Handled), + } + } else { + Ok(llmp::LlmpMsgHookResult::ForwardToClients) + } + }, + Some(Duration::from_millis(5)), + ); + + #[cfg(all(feature = "std", feature = "llmp_debug"))] + println!("The last client quit. Exiting."); + + Err(Error::shutting_down()) + } + + /// Run in the broker until all clients exit + #[cfg(feature = "llmp_broker_timeouts")] + pub fn broker_loop(&mut self) -> Result<(), Error> { + #[cfg(feature = "llmp_compression")] + let compressor = &self.compressor; + self.llmp.loop_with_timeouts( + &mut |msg_or_timeout| { + if let Some((client_id, tag, _flags, msg)) = msg_or_timeout { + if tag == _LLMP_TAG_TO_MAIN { + #[cfg(not(feature = "llmp_compression"))] + let event_bytes = msg; + #[cfg(feature = "llmp_compression")] + let compressed; + #[cfg(feature = "llmp_compression")] + let event_bytes = if _flags & LLMP_FLAG_COMPRESSED == LLMP_FLAG_COMPRESSED { + compressed = compressor.decompress(msg)?; + &compressed + } else { + msg + }; + let event: Event = postcard::from_bytes(event_bytes)?; + match Self::handle_in_broker(client_id, &event)? { + BrokerEventResult::Forward => { + Ok(llmp::LlmpMsgHookResult::ForwardToClients) + } + BrokerEventResult::Handled => Ok(llmp::LlmpMsgHookResult::Handled), + } + } else { + Ok(llmp::LlmpMsgHookResult::ForwardToClients) + } + } else { + Ok(llmp::LlmpMsgHookResult::Handled) + } + }, + Duration::from_secs(30), + Some(Duration::from_millis(5)), + ); + + #[cfg(feature = "llmp_debug")] + println!("The last client quit. Exiting."); + + Err(Error::shutting_down()) + } + + /// Handle arriving events in the broker + #[allow(clippy::unnecessary_wraps)] + fn handle_in_broker( + _client_id: ClientId, + event: &Event, + ) -> Result { + match &event { + Event::NewTestcase { + input: _, + client_config: _, + exit_kind: _, + corpus_size: _, + observers_buf: _, + time: _, + executions: _, + forward_id: _, + } => Ok(BrokerEventResult::Forward), + _ => Ok(BrokerEventResult::Handled), + } + } +} + +/// A wrapper manager to implement a main-secondary architecture witgh another broker #[derive(Debug)] pub struct CentralizedEventManager where EM: UsesState, - SP: ShMemProvider, + SP: ShMemProvider + 'static, { inner: EM, - sender_to_main: Option>, - receivers_from_secondary: Option>>, + /// The LLMP client for inter process communication + client: LlmpClient, + #[cfg(feature = "llmp_compression")] + compressor: GzipCompressor, + is_main: bool, } impl UsesState for CentralizedEventManager where EM: UsesState, - SP: ShMemProvider, + SP: ShMemProvider + 'static, { type State = EM::State; } @@ -53,7 +229,7 @@ where impl EventStatsCollector for CentralizedEventManager where EM: EventStatsCollector + UsesState, - SP: ShMemProvider, + SP: ShMemProvider + 'static, { fn serialization_time(&self) -> Duration { self.inner.serialization_time() @@ -86,21 +262,21 @@ where impl EventStatsCollector for CentralizedEventManager where EM: EventStatsCollector + UsesState, - SP: ShMemProvider, + SP: ShMemProvider + 'static, { } impl EventFirer for CentralizedEventManager where EM: EventStatsCollector + EventFirer + HasEventManagerId, - SP: ShMemProvider, + SP: ShMemProvider + 'static, { fn fire( &mut self, state: &mut Self::State, mut event: Event<::Input>, ) -> Result<(), Error> { - if let Some(sender) = self.sender_to_main.as_mut() { + if !self.is_main { // secondary node let is_nt = match &mut event { Event::NewTestcase { @@ -119,9 +295,7 @@ where _ => false, }; if is_nt { - // TODO use copression when llmp_compression is enabled - let serialized = postcard::to_allocvec(&event)?; - return sender.send_buf(_LLMP_TAG_TO_MAIN, &serialized); + return self.forward_to_main(&event); } } self.inner.fire(state, event) @@ -193,19 +367,23 @@ where impl EventRestarter for CentralizedEventManager where EM: EventRestarter, - SP: ShMemProvider, + SP: ShMemProvider + 'static, { #[inline] fn on_restart(&mut self, state: &mut Self::State) -> Result<(), Error> { - self.inner.on_restart(state) + self.client.await_safe_to_unmap_blocking(); + self.inner.on_restart(state)?; + Ok(()) } fn send_exiting(&mut self) -> Result<(), Error> { + self.client.sender.send_exiting()?; self.inner.send_exiting() } #[inline] fn await_restart_safe(&mut self) { + self.client.await_safe_to_unmap_blocking(); self.inner.await_restart_safe(); } } @@ -213,12 +391,12 @@ where impl EventProcessor for CentralizedEventManager where EM: EventStatsCollector + EventProcessor + EventFirer + HasEventManagerId, - SP: ShMemProvider, E: HasObservers + Executor, for<'a> E::Observers: Deserialize<'a>, Z: EvaluatorObservers + ExecutionProcessor, Self::State: HasExecutions + HasMetadata, + SP: ShMemProvider + 'static, { fn process( &mut self, @@ -226,94 +404,9 @@ where state: &mut Self::State, executor: &mut E, ) -> Result { - if self.receivers_from_secondary.is_some() { + if self.is_main { // main node - let mut receivers = self.receivers_from_secondary.take().unwrap(); - // TODO in case of error, this is discarded, that is a bug ATM - - for (idx, receiver) in receivers.iter_mut().enumerate() { - while let Some((_client_id, tag, _flags, msg)) = receiver.recv_buf_with_flags()? { - assert!( - tag == _LLMP_TAG_TO_MAIN, - "Only the TO_MAIN parcel should have arrived in the main node!" - ); - - // TODO handle compression - let event: Event<::Input> = - postcard::from_bytes(msg)?; - match event { - Event::NewTestcase { - input, - client_config, - exit_kind, - corpus_size, - observers_buf, - time, - executions, - forward_id, - } => { - log::info!( - "Received new Testcase to evaluate from secondary node {idx:?}" - ); - - let res = if client_config.match_with(&self.configuration()) - && observers_buf.is_some() - { - #[cfg(feature = "adaptive_serialization")] - let start = current_time(); - let observers: E::Observers = - postcard::from_bytes(observers_buf.as_ref().unwrap())?; - - #[cfg(feature = "adaptive_serialization")] - { - *self.inner.deserialization_time_mut() = current_time() - start; - } - - fuzzer.process_execution( - state, - self, - input.clone(), - &observers, - &exit_kind, - false, - )? - } else { - fuzzer.evaluate_input_with_observers::( - state, - executor, - self, - input.clone(), - false, - )? - }; - if let Some(item) = res.1 { - log::info!("Added received Testcase as item #{item}"); - - self.inner.fire( - state, - Event::NewTestcase { - input, - observers_buf, - exit_kind, - corpus_size, - client_config, - time, - executions, - forward_id, - }, - )?; - } - } - _ => panic!( - "Only the NewTestcase event should have arrived to the main node!" - ), - }; - } - } - - self.receivers_from_secondary = Some(receivers); - - Ok(0) // TODO is 0 ok? + self.receive_from_secondary(fuzzer, state, executor) } else { // The main node does not process incoming events from the broker ATM self.inner.process(fuzzer, state, executor) @@ -325,18 +418,18 @@ impl EventManager for CentralizedEventManager where EM: EventStatsCollector + EventManager, EM::State: HasClientPerfMonitor + HasExecutions + HasMetadata + HasLastReportTime, - SP: ShMemProvider, E: HasObservers + Executor, for<'a> E::Observers: Deserialize<'a>, Z: EvaluatorObservers + ExecutionProcessor, + SP: ShMemProvider + 'static, { } impl HasCustomBufHandlers for CentralizedEventManager where EM: HasCustomBufHandlers, - SP: ShMemProvider, + SP: ShMemProvider + 'static, { /// Adds a custom buffer handler that will run for each incoming `CustomBuf` event. fn add_custom_buf_handler( @@ -353,14 +446,14 @@ impl ProgressReporter for CentralizedEventManager where EM: EventStatsCollector + ProgressReporter + HasEventManagerId, EM::State: HasClientPerfMonitor + HasMetadata + HasExecutions + HasLastReportTime, - SP: ShMemProvider, + SP: ShMemProvider + 'static, { } impl HasEventManagerId for CentralizedEventManager where EM: HasEventManagerId + UsesState, - SP: ShMemProvider, + SP: ShMemProvider + 'static, { fn mgr_id(&self) -> EventManagerId { self.inner.mgr_id() @@ -370,23 +463,226 @@ where impl CentralizedEventManager where EM: UsesState, - SP: ShMemProvider, + SP: ShMemProvider + 'static, { /// Creates a new [`CentralizedEventManager`]. - pub fn new_main(inner: EM, receivers_from_secondary: Vec>) -> Self { - Self { + pub fn new(inner: EM, client: LlmpClient, is_main: bool) -> Result { + Ok(Self { inner, - sender_to_main: None, - receivers_from_secondary: Some(receivers_from_secondary), - } + client, + #[cfg(feature = "llmp_compression")] + compressor: GzipCompressor::new(COMPRESS_THRESHOLD), + is_main, + }) } - /// Creates a new [`CentralizedEventManager`]. - pub fn new_secondary(inner: EM, sender_to_main: LlmpSender) -> Self { - Self { + /// Create a centralized event manager on a port + /// + /// If the port is not yet bound, it will act as a broker; otherwise, it + /// will act as a client. + #[cfg(feature = "std")] + pub fn on_port(inner: EM, shmem_provider: SP, port: u16, is_main: bool) -> Result { + Ok(Self { inner, - sender_to_main: Some(sender_to_main), - receivers_from_secondary: None, + client: LlmpClient::create_attach_to_tcp(shmem_provider, port)?, + #[cfg(feature = "llmp_compression")] + compressor: GzipCompressor::new(COMPRESS_THRESHOLD), + is_main, + }) + } + + /// If a client respawns, it may reuse the existing connection, previously + /// stored by [`LlmpClient::to_env()`]. + #[cfg(feature = "std")] + pub fn existing_client_from_env( + inner: EM, + shmem_provider: SP, + env_name: &str, + is_main: bool, + ) -> Result { + Ok(Self { + inner, + client: LlmpClient::on_existing_from_env(shmem_provider, env_name)?, + #[cfg(feature = "llmp_compression")] + compressor: GzipCompressor::new(COMPRESS_THRESHOLD), + is_main, + }) + } + + /// Describe the client event manager's LLMP parts in a restorable fashion + pub fn describe(&self) -> Result { + self.client.describe() + } + + /// Create an existing client from description + pub fn existing_client_from_description( + inner: EM, + shmem_provider: SP, + description: &LlmpClientDescription, + is_main: bool, + ) -> Result { + Ok(Self { + inner, + client: LlmpClient::existing_client_from_description(shmem_provider, description)?, + #[cfg(feature = "llmp_compression")] + compressor: GzipCompressor::new(COMPRESS_THRESHOLD), + is_main, + }) + } + + /// Write the config for a client [`EventManager`] to env vars, a new + /// client can reattach using [`CentralizedEventManager::existing_client_from_env()`]. + #[cfg(feature = "std")] + pub fn to_env(&self, env_name: &str) { + self.client.to_env(env_name).unwrap(); + } + + /// Know if this instance is main or secondary + pub fn is_main(&self) -> bool { + self.is_main + } +} + +impl CentralizedEventManager +where + EM: UsesState + EventFirer + EventStatsCollector + HasEventManagerId, + SP: ShMemProvider + 'static, +{ + #[cfg(feature = "llmp_compression")] + fn forward_to_main(&mut self, event: &Event) -> Result<(), Error> + where + I: Input, + { + let serialized = postcard::to_allocvec(event)?; + let flags = LLMP_FLAG_INITIALIZED; + + match self.compressor.compress(&serialized)? { + Some(comp_buf) => { + self.client.send_buf_with_flags( + _LLMP_TAG_TO_MAIN, + flags | LLMP_FLAG_COMPRESSED, + &comp_buf, + )?; + } + None => { + self.client.send_buf(_LLMP_TAG_TO_MAIN, &serialized)?; + } + } + Ok(()) + } + + #[cfg(not(feature = "llmp_compression"))] + fn forward_to_main(&mut self, event: &Event) -> Result<(), Error> + where + I: Input, + { + let serialized = postcard::to_allocvec(event)?; + self.client.send_buf(_LLMP_TAG_TO_MAIN, &serialized)?; + Ok(()) + } + + fn receive_from_secondary( + &mut self, + fuzzer: &mut Z, + state: &mut EM::State, + executor: &mut E, + ) -> Result + where + E: Executor + HasObservers, + EM::State: UsesInput + HasExecutions + HasMetadata, + for<'a> E::Observers: Deserialize<'a>, + Z: ExecutionProcessor + EvaluatorObservers, + { + // TODO: Get around local event copy by moving handle_in_client + let self_id = self.client.sender.id; + let mut count = 0; + while let Some((client_id, tag, _flags, msg)) = self.client.recv_buf_with_flags()? { + assert!( + tag == _LLMP_TAG_TO_MAIN, + "Only _LLMP_TAG_TO_MAIN parcel should have arrived in the main node!" + ); + + if client_id == self_id { + continue; + } + #[cfg(not(feature = "llmp_compression"))] + let event_bytes = msg; + #[cfg(feature = "llmp_compression")] + let compressed; + #[cfg(feature = "llmp_compression")] + let event_bytes = if _flags & LLMP_FLAG_COMPRESSED == LLMP_FLAG_COMPRESSED { + compressed = self.compressor.decompress(msg)?; + &compressed + } else { + msg + }; + let event: Event<<::State as UsesInput>::Input> = + postcard::from_bytes(event_bytes)?; + self.handle_in_main(fuzzer, executor, state, client_id, event)?; + count += 1; + } + Ok(count) + } + + // Handle arriving events in the main node + fn handle_in_main( + &mut self, + fuzzer: &mut Z, + executor: &mut E, + state: &mut EM::State, + client_id: ClientId, + event: Event<::Input>, + ) -> Result<(), Error> + where + E: Executor + HasObservers, + EM::State: UsesInput + HasExecutions + HasMetadata, + for<'a> E::Observers: Deserialize<'a>, + Z: ExecutionProcessor + EvaluatorObservers, + { + match event { + Event::NewTestcase { + input, + client_config, + exit_kind, + corpus_size: _, + observers_buf, + time: _, + executions: _, + forward_id, + } => { + log::info!("Received new Testcase from {client_id:?} ({client_config:?}, forward {forward_id:?})"); + + let res = if client_config.match_with(&self.configuration()) + && observers_buf.is_some() + { + let observers: E::Observers = + postcard::from_bytes(observers_buf.as_ref().unwrap())?; + fuzzer.process_execution(state, self, input, &observers, &exit_kind, true)? + } else { + fuzzer.evaluate_input_with_observers::( + state, executor, self, input, true, + )? + }; + if let Some(item) = res.1 { + log::info!("Added received Testcase as item #{item}"); + } + Ok(()) + } + _ => Err(Error::unknown(format!( + "Received illegal message that message should not have arrived: {:?}.", + event.name() + ))), } } } + +/* +impl Drop for CentralizedEventManager +where + EM: UsesState, SP: ShMemProvider + 'static, +{ + /// LLMP clients will have to wait until their pages are mapped by somebody. + fn drop(&mut self) { + self.await_restart_safe(); + } +}*/ diff --git a/libafl/src/events/launcher.rs b/libafl/src/events/launcher.rs index 7d3c9d0abf..4a993ee6b6 100644 --- a/libafl/src/events/launcher.rs +++ b/libafl/src/events/launcher.rs @@ -40,6 +40,8 @@ use serde::de::DeserializeOwned; #[cfg(feature = "std")] use typed_builder::TypedBuilder; +#[cfg(all(unix, feature = "std", feature = "fork"))] +use crate::events::{CentralizedEventManager, CentralizedLlmpEventBroker}; use crate::inputs::UsesInput; #[cfg(feature = "std")] use crate::{ @@ -51,6 +53,7 @@ use crate::{ /// The (internal) `env` that indicates we're running as client. const _AFL_LAUNCHER_CLIENT: &str = "AFL_LAUNCHER_CLIENT"; + /// Provides a Launcher, which can be used to launch a fuzzing run on a specified list of cores #[cfg(feature = "std")] #[allow( @@ -366,3 +369,246 @@ where Ok(()) } } + +/// Provides a Launcher, which can be used to launch a fuzzing run on a specified list of cores with a single main and multiple secondary nodes +#[cfg(all(unix, feature = "std", feature = "fork"))] +#[derive(TypedBuilder)] +#[allow(clippy::type_complexity, missing_debug_implementations)] +pub struct CentralizedLauncher<'a, CF, MT, S, SP> +where + CF: FnOnce( + Option, + CentralizedEventManager, SP>, + CoreId, + ) -> Result<(), Error>, + S::Input: 'a, + MT: Monitor, + SP: ShMemProvider + 'static, + S: DeserializeOwned + UsesInput + 'a, +{ + /// The ShmemProvider to use + shmem_provider: SP, + /// The monitor instance to use + monitor: MT, + /// The configuration + configuration: EventConfig, + /// The 'main' function to run for each client forked. This probably shouldn't return + #[builder(default, setter(strip_option))] + run_client: Option, + /// The broker port to use (or to attach to, in case [`Self::spawn_broker`] is `false`) + #[builder(default = 1337_u16)] + broker_port: u16, + /// The centralized broker port to use (or to attach to, in case [`Self::spawn_broker`] is `false`) + #[builder(default = 1338_u16)] + centralized_broker_port: u16, + /// The list of cores to run on + cores: &'a Cores, + /// A file name to write all client output to + #[builder(default = None)] + stdout_file: Option<&'a str>, + /// A file name to write all client stderr output to. If not specified, output is sent to + /// `stdout_file`. + #[builder(default = None)] + stderr_file: Option<&'a str>, + /// The `ip:port` address of another broker to connect our new broker to for multi-machine + /// clusters. + #[builder(default = None)] + remote_broker_addr: Option, + /// If this launcher should spawn a new `broker` on `[Self::broker_port]` (default). + /// The reason you may not want this is, if you already have a [`Launcher`] + /// with a different configuration (for the same target) running on this machine. + /// Then, clients launched by this [`Launcher`] can connect to the original `broker`. + #[builder(default = true)] + spawn_broker: bool, + /// Tell the manager to serialize or not the state on restart + #[builder(default = true)] + serialize_state: bool, + #[builder(setter(skip), default = PhantomData)] + phantom_data: PhantomData<(&'a S, &'a SP)>, +} + +#[cfg(all(unix, feature = "std", feature = "fork"))] +impl Debug for CentralizedLauncher<'_, CF, MT, S, SP> +where + CF: FnOnce( + Option, + CentralizedEventManager, SP>, + CoreId, + ) -> Result<(), Error>, + MT: Monitor + Clone, + SP: ShMemProvider + 'static, + S: DeserializeOwned + UsesInput, +{ + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + f.debug_struct("Launcher") + .field("configuration", &self.configuration) + .field("broker_port", &self.broker_port) + .field("core", &self.cores) + .field("spawn_broker", &self.spawn_broker) + .field("remote_broker_addr", &self.remote_broker_addr) + .field("stdout_file", &self.stdout_file) + .field("stderr_file", &self.stderr_file) + .finish_non_exhaustive() + } +} + +#[cfg(all(unix, feature = "std", feature = "fork"))] +impl<'a, CF, MT, S, SP> CentralizedLauncher<'a, CF, MT, S, SP> +where + CF: FnOnce( + Option, + CentralizedEventManager, SP>, + CoreId, + ) -> Result<(), Error>, + MT: Monitor + Clone, + S: DeserializeOwned + UsesInput + HasExecutions + HasClientPerfMonitor, + SP: ShMemProvider + 'static, +{ + /// Launch the broker and the clients and fuzz + #[allow(clippy::similar_names)] + #[allow(clippy::too_many_lines)] + pub fn launch(&mut self) -> Result<(), Error> { + if self.cores.ids.is_empty() { + return Err(Error::illegal_argument( + "No cores to spawn on given, cannot launch anything.", + )); + } + + if self.run_client.is_none() { + return Err(Error::illegal_argument( + "No client callback provided".to_string(), + )); + } + + let core_ids = get_core_ids().unwrap(); + let num_cores = core_ids.len(); + let mut handles = vec![]; + + log::info!("spawning on cores: {:?}", self.cores); + + let stdout_file = self + .stdout_file + .map(|filename| File::create(filename).unwrap()); + let stderr_file = self + .stderr_file + .map(|filename| File::create(filename).unwrap()); + + let debug_output = std::env::var("LIBAFL_DEBUG_OUTPUT").is_ok(); + + // Spawn centralized broker + self.shmem_provider.pre_fork()?; + match unsafe { fork() }? { + ForkResult::Parent(child) => { + self.shmem_provider.post_fork(false)?; + handles.push(child.pid); + #[cfg(feature = "std")] + log::info!("centralized broker spawned"); + } + ForkResult::Child => { + log::info!("{:?} PostFork", unsafe { libc::getpid() }); + self.shmem_provider.post_fork(true)?; + + let mut broker: CentralizedLlmpEventBroker = + CentralizedLlmpEventBroker::on_port( + self.shmem_provider.clone(), + self.centralized_broker_port, + )?; + broker.broker_loop()?; + } + } + + std::thread::sleep(std::time::Duration::from_millis(10)); + + // Spawn clients + let mut index = 0_u64; + for (id, bind_to) in core_ids.iter().enumerate().take(num_cores) { + if self.cores.ids.iter().any(|&x| x == id.into()) { + index += 1; + self.shmem_provider.pre_fork()?; + match unsafe { fork() }? { + ForkResult::Parent(child) => { + self.shmem_provider.post_fork(false)?; + handles.push(child.pid); + #[cfg(feature = "std")] + log::info!("child spawned and bound to core {id}"); + } + ForkResult::Child => { + log::info!("{:?} PostFork", unsafe { libc::getpid() }); + self.shmem_provider.post_fork(true)?; + + std::thread::sleep(std::time::Duration::from_millis(index * 10)); + + if !debug_output { + if let Some(file) = stdout_file { + dup2(file.as_raw_fd(), libc::STDOUT_FILENO)?; + if let Some(stderr) = stderr_file { + dup2(stderr.as_raw_fd(), libc::STDERR_FILENO)?; + } else { + dup2(file.as_raw_fd(), libc::STDERR_FILENO)?; + } + } + } + + // Fuzzer client. keeps retrying the connection to broker till the broker starts + let (state, mgr) = RestartingMgr::::builder() + .shmem_provider(self.shmem_provider.clone()) + .broker_port(self.broker_port) + .kind(ManagerKind::Client { + cpu_core: Some(*bind_to), + }) + .configuration(self.configuration) + .serialize_state(self.serialize_state) + .build() + .launch()?; + + let c_mgr = CentralizedEventManager::on_port( + mgr, + self.shmem_provider.clone(), + self.centralized_broker_port, + id == 0, + )?; + + return (self.run_client.take().unwrap())(state, c_mgr, *bind_to); + } + }; + } + } + + if self.spawn_broker { + log::info!("I am broker!!."); + + // TODO we don't want always a broker here, think about using different laucher process to spawn different configurations + RestartingMgr::::builder() + .shmem_provider(self.shmem_provider.clone()) + .monitor(Some(self.monitor.clone())) + .broker_port(self.broker_port) + .kind(ManagerKind::Broker) + .remote_broker_addr(self.remote_broker_addr) + .exit_cleanly_after(Some(NonZeroUsize::try_from(self.cores.ids.len()).unwrap())) + .configuration(self.configuration) + .serialize_state(self.serialize_state) + .build() + .launch()?; + + // Broker exited. kill all clients. + for handle in &handles { + unsafe { + libc::kill(*handle, libc::SIGINT); + } + } + } else { + for handle in &handles { + let mut status = 0; + log::info!("Not spawning broker (spawn_broker is false). Waiting for fuzzer children to exit..."); + unsafe { + libc::waitpid(*handle, &mut status, 0); + if status != 0 { + log::info!("Client with pid {handle} exited with status {status}"); + } + } + } + } + + Ok(()) + } +} diff --git a/libafl/src/events/llmp.rs b/libafl/src/events/llmp.rs index 6ff29b5e28..3319bf8571 100644 --- a/libafl/src/events/llmp.rs +++ b/libafl/src/events/llmp.rs @@ -68,7 +68,7 @@ const _LLMP_TAG_NO_RESTART: Tag = Tag(0x57A7EE71); /// The minimum buffer size at which to compress LLMP IPC messages. #[cfg(feature = "llmp_compression")] -const COMPRESS_THRESHOLD: usize = 1024; +pub const COMPRESS_THRESHOLD: usize = 1024; /// An LLMP-backed event manager for scalable multi-processed fuzzing #[derive(Debug)] @@ -954,7 +954,11 @@ where self.staterestorer.save(&( if self.save_state { Some(state) } else { None }, &self.llmp_mgr.describe()?, - )) + ))?; + + log::info!("Waiting for broker..."); + self.await_restart_safe(); + Ok(()) } fn send_exiting(&mut self) -> Result<(), Error> { diff --git a/libafl/src/events/mod.rs b/libafl/src/events/mod.rs index 506f9adf28..8d3d28a36b 100644 --- a/libafl/src/events/mod.rs +++ b/libafl/src/events/mod.rs @@ -3,7 +3,9 @@ pub mod simple; pub use simple::*; +#[cfg(all(unix, feature = "std"))] pub mod centralized; +#[cfg(all(unix, feature = "std"))] pub use centralized::*; #[cfg(feature = "std")] #[allow(clippy::ignored_unit_patterns)] @@ -273,6 +275,7 @@ where } */ +// TODO remove forward_id as not anymore needed for centralized /// Events sent around in the library #[derive(Serialize, Deserialize, Clone, Debug)] #[serde(bound = "I: serde::de::DeserializeOwned")] @@ -530,6 +533,7 @@ pub trait EventRestarter: UsesState { /// For restarting event managers, implement a way to forward state to their next peers. #[inline] fn on_restart(&mut self, _state: &mut Self::State) -> Result<(), Error> { + self.await_restart_safe(); Ok(()) } @@ -539,7 +543,7 @@ pub trait EventRestarter: UsesState { Ok(()) } - /// Block until we are safe to exit. + /// Block until we are safe to exit, usually called inside `on_restart`. #[inline] fn await_restart_safe(&mut self) {} } diff --git a/libafl/src/events/tcp.rs b/libafl/src/events/tcp.rs index 0edf9ef5b7..fb70404ec1 100644 --- a/libafl/src/events/tcp.rs +++ b/libafl/src/events/tcp.rs @@ -733,7 +733,9 @@ where // First, reset the page to 0 so the next iteration can read read from the beginning of this page self.staterestorer.reset(); self.staterestorer - .save(&if self.save_state { Some(state) } else { None }) + .save(&if self.save_state { Some(state) } else { None })?; + self.await_restart_safe(); + Ok(()) } fn send_exiting(&mut self) -> Result<(), Error> { diff --git a/libafl/src/executors/inprocess.rs b/libafl/src/executors/inprocess.rs index 0f4d8b82c9..20a8597eae 100644 --- a/libafl/src/executors/inprocess.rs +++ b/libafl/src/executors/inprocess.rs @@ -675,11 +675,9 @@ pub fn run_observers_and_save_state( .expect("Could not save state in run_observers_and_save_state"); } - // We will start mutators from scratch after restart. + // Serialize the state and wait safely for the broker to read pending messages event_mgr.on_restart(state).unwrap(); - log::info!("Waiting for broker..."); - event_mgr.await_restart_safe(); log::info!("Bye!"); } diff --git a/libafl_bolts/Cargo.toml b/libafl_bolts/Cargo.toml index 420d36d7f0..3748684111 100644 --- a/libafl_bolts/Cargo.toml +++ b/libafl_bolts/Cargo.toml @@ -23,7 +23,7 @@ cli = ["clap"] # expose libafl_bolts::cli for easy commandline parsing qemu_cli = ["cli"] # Commandline flagr for qemu-based fuzzers frida_cli = ["cli"] # Commandline flags for frida-based fuzzers errors_backtrace = ["backtrace"] -gzip = ["miniz_oxide"] # Enables gzip compression in certain parts of the lib +gzip = ["miniz_oxide", "alloc"] # Enables gzip compression in certain parts of the lib # SerdeAny features serdeany_autoreg = ["ctor"] # Automatically register all `#[derive(SerdeAny)]` types at startup. diff --git a/libafl_bolts/src/lib.rs b/libafl_bolts/src/lib.rs index aa6d88d3e9..837738382b 100644 --- a/libafl_bolts/src/lib.rs +++ b/libafl_bolts/src/lib.rs @@ -166,7 +166,7 @@ pub enum Error { /// Serialization error Serialize(String, ErrorBacktrace), /// Compression error - #[cfg(feature = "llmp_compression")] + #[cfg(feature = "gzip")] Compression(ErrorBacktrace), /// File related error #[cfg(feature = "std")] @@ -202,7 +202,7 @@ impl Error { { Error::Serialize(arg.into(), ErrorBacktrace::new()) } - #[cfg(feature = "llmp_compression")] + #[cfg(feature = "gzip")] /// Compression error #[must_use] pub fn compression() -> Self { @@ -300,7 +300,7 @@ impl Display for Error { write!(f, "Error in Serialization: `{0}`", &s)?; display_error_backtrace(f, b) } - #[cfg(feature = "llmp_compression")] + #[cfg(feature = "gzip")] Self::Compression(b) => { write!(f, "Error in decompression")?; display_error_backtrace(f, b) @@ -476,7 +476,7 @@ pub mod build_id; feature = "std" ))] pub mod cli; -#[cfg(feature = "llmp_compression")] +#[cfg(feature = "gzip")] pub mod compress; #[cfg(feature = "std")] pub mod core_affinity; @@ -828,7 +828,7 @@ pub mod bolts_prelude { feature = "std" ))] pub use super::cli::*; - #[cfg(feature = "llmp_compression")] + #[cfg(feature = "gzip")] pub use super::compress::*; #[cfg(feature = "std")] pub use super::core_affinity::*; diff --git a/libafl_bolts/src/llmp.rs b/libafl_bolts/src/llmp.rs index 9d72b02478..b8b352730e 100644 --- a/libafl_bolts/src/llmp.rs +++ b/libafl_bolts/src/llmp.rs @@ -1940,7 +1940,17 @@ where SP: ShMemProvider + 'static, { /// Create and initialize a new [`LlmpBroker`] - pub fn new(mut shmem_provider: SP) -> Result { + pub fn new(shmem_provider: SP) -> Result { + // Broker never cleans up the pages so that new + // clients may join at any time + Self::with_keep_pages(shmem_provider, true) + } + + /// Create and initialize a new [`LlmpBroker`] telling if it has to keep pages forever + pub fn with_keep_pages( + mut shmem_provider: SP, + keep_pages_forever: bool, + ) -> Result { Ok(LlmpBroker { llmp_out: LlmpSender { id: ClientId(0), @@ -1949,9 +1959,7 @@ where ClientId(0), shmem_provider.new_shmem(next_shmem_size(0))?, )], - // Broker never cleans up the pages so that new - // clients may join at any time - keep_pages_forever: true, + keep_pages_forever, has_unsent_message: false, shmem_provider: shmem_provider.clone(), unused_shmem_cache: vec![], @@ -1965,12 +1973,22 @@ where }) } - /// Create a new [`LlmpBroker`] sttaching to a TCP port + /// Create a new [`LlmpBroker`] attaching to a TCP port #[cfg(feature = "std")] pub fn create_attach_to_tcp(shmem_provider: SP, port: u16) -> Result { + Self::with_keep_pages_attach_to_tcp(shmem_provider, port, true) + } + + /// Create a new [`LlmpBroker`] attaching to a TCP port and telling if it has to keep pages forever + #[cfg(feature = "std")] + pub fn with_keep_pages_attach_to_tcp( + shmem_provider: SP, + port: u16, + keep_pages_forever: bool, + ) -> Result { match tcp_bind(port) { Ok(listener) => { - let mut broker = LlmpBroker::new(shmem_provider)?; + let mut broker = LlmpBroker::with_keep_pages(shmem_provider, keep_pages_forever)?; let _listener_thread = broker.launch_listener(Listener::Tcp(listener))?; Ok(broker) } @@ -2773,9 +2791,9 @@ where #[derive(Clone, Copy, Debug, Serialize, Deserialize)] pub struct LlmpClientDescription { /// Description of the sender - sender: LlmpDescription, + pub sender: LlmpDescription, /// Description of the receiver - receiver: LlmpDescription, + pub receiver: LlmpDescription, } /// Client side of LLMP @@ -2923,6 +2941,17 @@ where }) } + /// Create a point-to-point channel instead of using a broker-client channel + pub fn new_p2p(shmem_provider: SP, sender_id: ClientId) -> Result { + let sender = LlmpSender::new(shmem_provider.clone(), sender_id, false)?; + let receiver = LlmpReceiver::on_existing_shmem( + shmem_provider, + sender.out_shmems[0].shmem.clone(), + None, + )?; + Ok(Self { sender, receiver }) + } + /// Commits a msg to the client's out map /// # Safety /// Needs to be called with a proper msg pointer diff --git a/libafl_bolts/src/os/unix_shmem_server.rs b/libafl_bolts/src/os/unix_shmem_server.rs index 6842503608..58d137bf54 100644 --- a/libafl_bolts/src/os/unix_shmem_server.rs +++ b/libafl_bolts/src/os/unix_shmem_server.rs @@ -65,6 +65,7 @@ where /// A referencde to the [`ShMemService`] backing this provider. /// It will be started only once for all processes and providers. service: ShMemService, + about_to_restart: bool, } /// [`ShMem`] that got served from a [`ShMemService`] via domain sockets and can now be used in this program. @@ -142,6 +143,11 @@ where let server_fd: i32 = server_id.into(); Ok((server_fd, fd_buf[0])) } + + /// Tell the provider that we are about to restart and the worker should not kill the shared memory + pub fn on_restart(&mut self) { + self.about_to_restart = true; + } } impl Default for ServedShMemProvider @@ -185,6 +191,7 @@ where inner: SP::new()?, id: -1, service, + about_to_restart: false }; let (id, _) = res.send_receive(ServedShMemRequest::Hello())?; res.id = id; @@ -244,6 +251,10 @@ where } fn release_shmem(&mut self, map: &mut Self::ShMem) { + if self.about_to_restart { + return; + } + let (refcount, _) = self .send_receive(ServedShMemRequest::Deregister(map.server_fd)) .expect("Could not communicate with ServedShMem server!"); diff --git a/libafl_bolts/src/shmem.rs b/libafl_bolts/src/shmem.rs index a1c7724ccc..b8346c3afd 100644 --- a/libafl_bolts/src/shmem.rs +++ b/libafl_bolts/src/shmem.rs @@ -59,6 +59,22 @@ pub type StdShMemProvider = UnixShMemProvider; ))] pub type StdShMemService = DummyShMemService; +// for unix only +/// The standard served shmem provider +#[cfg(all(target_os = "android", feature = "std"))] +pub type StdServedShMemProvider = + RcShMemProvider>; +/// The standard served shmem provider +#[cfg(all(feature = "std", target_vendor = "apple"))] +pub type StdServedShMemProvider = RcShMemProvider>; +/// The standard served shmem provider +#[cfg(all( + feature = "std", + unix, + not(any(target_os = "android", target_vendor = "apple")) +))] +pub type StdServedShMemProvider = RcShMemProvider>; + /// Description of a shared map. /// May be used to restore the map by id. #[derive(Copy, Clone, Debug, Serialize, Deserialize)] @@ -210,6 +226,28 @@ pub trait ShMem: Sized + Debug + Clone + AsSlice + AsMutSlice(&self, len: usize) -> &[T] { + assert!(self.len() >= core::mem::size_of::() * len); + let ptr = self.as_slice().as_ptr() as *const () as *const T; + core::slice::from_raw_parts(ptr, len) + } + + /// Convert to a slice of type &mut \[T\] + /// + /// # Safety + /// This function is not safe as the object may be not initialized. + /// The user is responsible to initialize the objects in the slice + unsafe fn as_objects_slice_mut(&mut self, len: usize) -> &mut [T] { + assert!(self.len() >= core::mem::size_of::() * len); + let ptr = self.as_mut_slice().as_mut_ptr() as *mut () as *mut T; + core::slice::from_raw_parts_mut(ptr, len) + } + /// Get the description of the shared memory mapping fn description(&self) -> ShMemDescription { ShMemDescription { @@ -250,6 +288,14 @@ pub trait ShMemProvider: Clone + Default + Debug { self.new_shmem(core::mem::size_of::()) } + /// Create a new shared memory mapping to hold an array of objects of the given type + fn new_shmem_objects_array( + &mut self, + len: usize, + ) -> Result { + self.new_shmem(core::mem::size_of::() * len) + } + /// Get a mapping given its id to hold an object of the given type fn shmem_object_from_id( &mut self, @@ -527,6 +573,17 @@ where } } +#[cfg(all(unix, feature = "std"))] +impl RcShMemProvider> +where + SP: ShMemProvider + Debug, +{ + /// Forward to `ServedShMemProvider::on_restart` + pub fn on_restart(&mut self) { + self.internal.borrow_mut().on_restart(); + } +} + /// A Unix sharedmem implementation. /// /// On Android, this is partially reused to wrap [`unix_shmem::ashmem::AshmemShMem`], @@ -856,6 +913,7 @@ pub mod unix_shmem { let map = shmat(os_id, ptr::null(), 0) as *mut c_uchar; if map as c_int == -1 || map.is_null() { + perror(b"shmat\0".as_ptr() as *const _); shmctl(os_id, libc::IPC_RMID, ptr::null_mut()); return Err(Error::unknown( "Failed to map the shared mapping".to_string(), @@ -877,9 +935,10 @@ pub mod unix_shmem { let map = shmat(id_int, ptr::null(), 0) as *mut c_uchar; if map.is_null() || map == ptr::null_mut::().wrapping_sub(1) { - return Err(Error::unknown( - "Failed to map the shared mapping".to_string(), - )); + perror(b"shmat\0".as_ptr() as *const _); + return Err(Error::unknown(format!( + "Failed to map the shared mapping with id {id_int}" + ))); } Ok(Self { id, map, map_size }) diff --git a/scripts/kill_all_ipc.sh b/scripts/kill_all_ipc.sh new file mode 100755 index 0000000000..6580584144 --- /dev/null +++ b/scripts/kill_all_ipc.sh @@ -0,0 +1,19 @@ +#!/bin/bash +PATH="/bin:/usr/bin" + +IPCS_S=$(ipcs -s | grep -E "0x[0-9a-f]+ [0-9]+" | grep "${USER}" | cut -f2 -d" ") +IPCS_M=$(ipcs -m | grep -E "0x[0-9a-f]+ [0-9]+" | grep "${USER}" | cut -f2 -d" ") +IPCS_Q=$(ipcs -q | grep -E "0x[0-9a-f]+ [0-9]+" | grep "${USER}" | cut -f2 -d" ") + +for id in $IPCS_M; do + ipcrm -m "$id"; +done + +for id in $IPCS_S; do + ipcrm -s "$id"; +done + +for id in $IPCS_Q; do + ipcrm -q "$id"; +done +