From 2a36b78fd65ee6fdb22d961c15da1b79467394b7 Mon Sep 17 00:00:00 2001 From: Dhanvith Nayak <85876638+BAGUVIX456@users.noreply.github.com> Date: Sun, 19 Jan 2025 21:30:24 +0530 Subject: [PATCH] Share objectives between nodes (#2754) * add support to share new objectives in CentralizedEventManager * handle received Objectives * remove duplicate event fires in centralized event manager * share input on share_objectives feature (broken) * split impl LlmpEventManager based on share_objectives * reduce code duplication in impl LlmpEventManager (broken) * fix traits error (temp) * fix mismatched types * fix cargo format issue * merge duplicated functions into single impl in llmp/mod.rs * merge duplicate impl blocks in stages/sync.rs * fix clippy warnings * deduplicate handle_in_client * cleanup unnecessary code * handle objectives in tcp eventmanager * handle objectives in llmp eventmanager (broken) * handle objectives in llmp eventmanager * fix doc test * format * clippy --------- Co-authored-by: Dongjia "toka" Zhang --- libafl/Cargo.toml | 3 ++ libafl/src/events/centralized.rs | 5 ++- libafl/src/events/llmp/mgr.rs | 24 +++++++++-- libafl/src/events/llmp/mod.rs | 28 ++++++++++++- libafl/src/events/llmp/restarting.rs | 7 +++- libafl/src/events/mod.rs | 3 ++ libafl/src/events/tcp.rs | 58 ++++++++++++++++++++++++--- libafl/src/executors/inprocess/mod.rs | 3 ++ libafl/src/fuzzer/mod.rs | 6 +++ libafl/src/stages/sync.rs | 13 +++++- 10 files changed, 133 insertions(+), 17 deletions(-) diff --git a/libafl/Cargo.toml b/libafl/Cargo.toml index d5ed573eb1..40d22236a2 100644 --- a/libafl/Cargo.toml +++ b/libafl/Cargo.toml @@ -144,6 +144,9 @@ unicode = ["libafl_bolts/alloc", "ahash/std", "serde/rc", "bitvec"] ## Enable multi-part input formats and mutators multipart_inputs = ["arrayvec", "rand_trait"] +## Share objectives across nodes +share_objectives = [] + #! ## LibAFL-Bolts Features ## Provide the `#[derive(SerdeAny)]` macro. diff --git a/libafl/src/events/centralized.rs b/libafl/src/events/centralized.rs index bc636b6bf9..0dbede7466 100644 --- a/libafl/src/events/centralized.rs +++ b/libafl/src/events/centralized.rs @@ -223,14 +223,15 @@ where if !self.is_main { // secondary node let mut is_tc = false; - // Forward to main only if new tc or heartbeat + // Forward to main only if new tc, heartbeat, or optionally, a new objective let should_be_forwarded = match &mut event { Event::NewTestcase { forward_id, .. } => { *forward_id = Some(ClientId(self.inner.mgr_id().0 as u32)); is_tc = true; true } - Event::UpdateExecStats { .. } => true, // send it but this guy won't be handled. the only purpose is to keep this client alive else the broker thinks it is dead and will dc it + Event::UpdateExecStats { .. } => true, // send UpdateExecStats but this guy won't be handled. the only purpose is to keep this client alive else the broker thinks it is dead and will dc it + Event::Objective { .. } => true, Event::Stop => true, _ => false, }; diff --git a/libafl/src/events/llmp/mgr.rs b/libafl/src/events/llmp/mgr.rs index 8cf5379f47..34b0619432 100644 --- a/libafl/src/events/llmp/mgr.rs +++ b/libafl/src/events/llmp/mgr.rs @@ -29,6 +29,8 @@ use libafl_bolts::{ }; use serde::{de::DeserializeOwned, Serialize}; +#[cfg(feature = "share_objectives")] +use crate::corpus::{Corpus, Testcase}; #[cfg(feature = "llmp_compression")] use crate::events::llmp::COMPRESS_THRESHOLD; #[cfg(feature = "std")] @@ -46,8 +48,8 @@ use crate::{ observers::TimeObserver, stages::HasCurrentStageId, state::{ - HasExecutions, HasImported, HasLastReportTime, MaybeHasClientPerfMonitor, NopState, - Stoppable, + HasCurrentTestcase, HasExecutions, HasImported, HasLastReportTime, HasSolutions, + MaybeHasClientPerfMonitor, NopState, Stoppable, }, Error, HasMetadata, }; @@ -346,7 +348,7 @@ where event: Event, ) -> Result<(), Error> where - S: HasImported + Stoppable, + S: HasImported + HasSolutions + HasCurrentTestcase + Stoppable, EMH: EventManagerHooksTuple, I: Input, E: HasObservers, @@ -391,6 +393,20 @@ where log::debug!("Testcase {evt_name} was discarded"); } } + + #[cfg(feature = "share_objectives")] + Event::Objective { input, .. } => { + log::debug!("Received new Objective"); + let mut testcase = Testcase::from(input); + testcase.set_parent_id_optional(*state.corpus().current()); + + if let Ok(mut tc) = state.current_testcase_mut() { + tc.found_objective(); + } + + state.solutions_mut().add(testcase)?; + log::info!("Added received Objective to Corpus"); + } Event::Stop => { state.request_stop(); } @@ -510,7 +526,7 @@ impl EventProcessor for LlmpEventManager + HasCurrentTestcase + Stoppable, EMH: EventManagerHooksTuple, I: DeserializeOwned + Input, SP: ShMemProvider, diff --git a/libafl/src/events/llmp/mod.rs b/libafl/src/events/llmp/mod.rs index 2d76ba52dd..ffcbb2d06d 100644 --- a/libafl/src/events/llmp/mod.rs +++ b/libafl/src/events/llmp/mod.rs @@ -14,11 +14,13 @@ use libafl_bolts::{ }; use serde::{de::DeserializeOwned, Serialize}; +#[cfg(feature = "share_objectives")] +use crate::corpus::{Corpus, Testcase}; use crate::{ events::{Event, EventFirer}, fuzzer::EvaluatorObservers, inputs::{Input, InputConverter, NopInput, NopInputConverter}, - state::NopState, + state::{HasCurrentTestcase, HasSolutions, NopState}, Error, }; @@ -265,6 +267,7 @@ where ) -> Result<(), Error> where ICB: InputConverter, + S: HasCurrentTestcase + HasSolutions, Z: EvaluatorObservers, { match event { @@ -290,6 +293,28 @@ where } Ok(()) } + + #[cfg(feature = "share_objectives")] + Event::Objective { input, .. } => { + log::debug!("Received new Objective"); + + let Some(converter) = self.converter_back.as_mut() else { + return Ok(()); + }; + + let converted_input = converter.convert(input)?; + let mut testcase = Testcase::from(converted_input); + testcase.set_parent_id_optional(*state.corpus().current()); + + if let Ok(mut tc) = state.current_testcase_mut() { + tc.found_objective(); + } + + state.solutions_mut().add(testcase)?; + log::info!("Added received Objective to Corpus"); + + Ok(()) + } Event::Stop => Ok(()), _ => Err(Error::unknown(format!( "Received illegal message that message should not have arrived: {:?}.", @@ -309,6 +334,7 @@ where where ICB: InputConverter, DI: DeserializeOwned + Input, + S: HasCurrentTestcase + HasSolutions, Z: EvaluatorObservers, { // TODO: Get around local event copy by moving handle_in_client diff --git a/libafl/src/events/llmp/restarting.rs b/libafl/src/events/llmp/restarting.rs index 8c8d5f31b3..bea9ca2789 100644 --- a/libafl/src/events/llmp/restarting.rs +++ b/libafl/src/events/llmp/restarting.rs @@ -46,7 +46,10 @@ use crate::{ monitors::Monitor, observers::TimeObserver, stages::HasCurrentStageId, - state::{HasExecutions, HasImported, HasLastReportTime, MaybeHasClientPerfMonitor, Stoppable}, + state::{ + HasCurrentTestcase, HasExecutions, HasImported, HasLastReportTime, HasSolutions, + MaybeHasClientPerfMonitor, Stoppable, + }, Error, }; @@ -201,7 +204,7 @@ where EMH: EventManagerHooksTuple, E: HasObservers, E::Observers: DeserializeOwned, - S: HasImported + Stoppable + Serialize, + S: HasImported + HasCurrentTestcase + HasSolutions + Stoppable + Serialize, I: DeserializeOwned + Input, SP: ShMemProvider, Z: ExecutionProcessor, I, E::Observers, S> diff --git a/libafl/src/events/mod.rs b/libafl/src/events/mod.rs index c89909d555..f5672fd9f6 100644 --- a/libafl/src/events/mod.rs +++ b/libafl/src/events/mod.rs @@ -301,6 +301,9 @@ pub enum Event { }, /// A new objective was found Objective { + /// Input of newly found Objective + #[cfg(feature = "share_objectives")] + input: I, /// Objective corpus size objective_size: usize, /// The time when this event was created diff --git a/libafl/src/events/tcp.rs b/libafl/src/events/tcp.rs index cbb7b4e932..dcb90fd5cb 100644 --- a/libafl/src/events/tcp.rs +++ b/libafl/src/events/tcp.rs @@ -39,6 +39,8 @@ use tokio::{ use typed_builder::TypedBuilder; use super::{std_maybe_report_progress, std_report_progress, ManagerExit}; +#[cfg(feature = "share_objectives")] +use crate::corpus::{Corpus, Testcase}; #[cfg(all(unix, not(miri)))] use crate::events::EVENTMGR_SIGHANDLER_STATE; use crate::{ @@ -52,7 +54,10 @@ use crate::{ monitors::Monitor, observers::ObserversTuple, stages::HasCurrentStageId, - state::{HasExecutions, HasImported, HasLastReportTime, MaybeHasClientPerfMonitor, Stoppable}, + state::{ + HasCurrentTestcase, HasExecutions, HasImported, HasLastReportTime, HasSolutions, + MaybeHasClientPerfMonitor, Stoppable, + }, Error, HasMetadata, }; @@ -560,7 +565,12 @@ impl Drop for TcpEventManager { impl TcpEventManager where EMH: EventManagerHooksTuple, - S: HasExecutions + HasMetadata + HasImported + Stoppable, + S: HasExecutions + + HasMetadata + + HasImported + + HasSolutions + + HasCurrentTestcase + + Stoppable, { /// Write the client id for a client `EventManager` to env vars pub fn to_env(&self, env_name: &str) { @@ -610,6 +620,20 @@ where log::info!("Added received Testcase as item #{item}"); } } + + #[cfg(feature = "share_objectives")] + Event::Objective { input, .. } => { + log::debug!("Received new Objective"); + let mut testcase = Testcase::from(input); + testcase.set_parent_id_optional(*state.corpus().current()); + + if let Ok(mut tc) = state.current_testcase_mut() { + tc.found_objective(); + } + + state.solutions_mut().add(testcase)?; + log::info!("Added received Objective to Corpus"); + } Event::Stop => { state.request_stop(); } @@ -684,7 +708,12 @@ where E::Observers: Serialize + ObserversTuple, for<'a> E::Observers: Deserialize<'a>, EMH: EventManagerHooksTuple, - S: HasExecutions + HasMetadata + HasImported + Stoppable, + S: HasExecutions + + HasMetadata + + HasImported + + HasSolutions + + HasCurrentTestcase + + Stoppable, I: DeserializeOwned, Z: ExecutionProcessor + EvaluatorObservers, { @@ -891,7 +920,12 @@ where E::Observers: ObserversTuple + Serialize, EMH: EventManagerHooksTuple, I: DeserializeOwned, - S: HasExecutions + HasMetadata + HasImported + Stoppable, + S: HasExecutions + + HasMetadata + + HasImported + + HasSolutions + + HasCurrentTestcase + + Stoppable, SP: ShMemProvider, Z: ExecutionProcessor, I, E::Observers, S> + EvaluatorObservers, I, S>, @@ -990,7 +1024,13 @@ pub fn setup_restarting_mgr_tcp( > where MT: Monitor + Clone, - S: HasExecutions + HasMetadata + HasImported + DeserializeOwned + Stoppable, + S: HasExecutions + + HasMetadata + + HasImported + + HasSolutions + + HasCurrentTestcase + + DeserializeOwned + + Stoppable, I: Input, { TcpRestartingMgr::builder() @@ -1056,7 +1096,13 @@ where I: Input, MT: Monitor + Clone, SP: ShMemProvider, - S: HasExecutions + HasMetadata + HasImported + DeserializeOwned + Stoppable, + S: HasExecutions + + HasMetadata + + HasImported + + HasSolutions + + HasCurrentTestcase + + DeserializeOwned + + Stoppable, { /// Launch the restarting manager pub fn launch( diff --git a/libafl/src/executors/inprocess/mod.rs b/libafl/src/executors/inprocess/mod.rs index f019fb680d..07e3fa1233 100644 --- a/libafl/src/executors/inprocess/mod.rs +++ b/libafl/src/executors/inprocess/mod.rs @@ -415,6 +415,9 @@ pub fn run_observers_and_save_state( .fire( state, Event::Objective { + #[cfg(feature = "share_objectives")] + input: input.clone(), + objective_size: state.solutions().count(), time: libafl_bolts::current_time(), }, diff --git a/libafl/src/fuzzer/mod.rs b/libafl/src/fuzzer/mod.rs index e8a61afe34..12eba8385b 100644 --- a/libafl/src/fuzzer/mod.rs +++ b/libafl/src/fuzzer/mod.rs @@ -475,6 +475,9 @@ where manager.fire( state, Event::Objective { + #[cfg(feature = "share_objectives")] + input, + objective_size: state.solutions().count(), time: current_time(), }, @@ -667,6 +670,9 @@ where manager.fire( state, Event::Objective { + #[cfg(feature = "share_objectives")] + input, + objective_size: state.solutions().count(), time: current_time(), }, diff --git a/libafl/src/stages/sync.rs b/libafl/src/stages/sync.rs index 9d5c7033d0..9c3c759927 100644 --- a/libafl/src/stages/sync.rs +++ b/libafl/src/stages/sync.rs @@ -17,7 +17,10 @@ use crate::{ fuzzer::{Evaluator, EvaluatorObservers, ExecutionProcessor}, inputs::{Input, InputConverter}, stages::{RetryCountRestartHelper, Stage}, - state::{HasCorpus, HasExecutions, HasRand, MaybeHasClientPerfMonitor, Stoppable}, + state::{ + HasCorpus, HasCurrentTestcase, HasExecutions, HasRand, HasSolutions, + MaybeHasClientPerfMonitor, Stoppable, + }, Error, HasMetadata, HasNamedMetadata, }; @@ -232,7 +235,13 @@ where I: Input + Clone, IC: InputConverter, ICB: InputConverter, - S: HasExecutions + HasCorpus + HasRand + HasMetadata + Stoppable + MaybeHasClientPerfMonitor, + S: HasExecutions + + HasRand + + HasMetadata + + HasSolutions + + HasCurrentTestcase + + Stoppable + + MaybeHasClientPerfMonitor, SP: ShMemProvider, Z: EvaluatorObservers + ExecutionProcessor, {