Share objectives between nodes (#2754)

* add support to share new objectives in CentralizedEventManager

* handle received Objectives

* remove duplicate event fires in centralized event manager

* share input on share_objectives feature (broken)

* split impl LlmpEventManager based on share_objectives

* reduce code duplication in impl LlmpEventManager (broken)

* fix traits error (temp)

* fix mismatched types

* fix cargo format issue

* merge duplicated functions into single impl in llmp/mod.rs

* merge duplicate impl blocks in stages/sync.rs

* fix clippy warnings

* deduplicate handle_in_client

* cleanup unnecessary code

* handle objectives in tcp eventmanager

* handle objectives in llmp eventmanager (broken)

* handle objectives in llmp eventmanager

* fix doc test

* format

* clippy

---------

Co-authored-by: Dongjia "toka" Zhang <tokazerkje@outlook.com>
This commit is contained in:
Dhanvith Nayak 2025-01-19 21:30:24 +05:30 committed by GitHub
parent f2eefeb52a
commit 2a36b78fd6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 133 additions and 17 deletions

View File

@ -144,6 +144,9 @@ unicode = ["libafl_bolts/alloc", "ahash/std", "serde/rc", "bitvec"]
## Enable multi-part input formats and mutators
multipart_inputs = ["arrayvec", "rand_trait"]
## Share objectives across nodes
share_objectives = []
#! ## LibAFL-Bolts Features
## Provide the `#[derive(SerdeAny)]` macro.

View File

@ -223,14 +223,15 @@ where
if !self.is_main {
// secondary node
let mut is_tc = false;
// Forward to main only if new tc or heartbeat
// Forward to main only if new tc, heartbeat, or optionally, a new objective
let should_be_forwarded = match &mut event {
Event::NewTestcase { forward_id, .. } => {
*forward_id = Some(ClientId(self.inner.mgr_id().0 as u32));
is_tc = true;
true
}
Event::UpdateExecStats { .. } => true, // send it but this guy won't be handled. the only purpose is to keep this client alive else the broker thinks it is dead and will dc it
Event::UpdateExecStats { .. } => true, // send UpdateExecStats but this guy won't be handled. the only purpose is to keep this client alive else the broker thinks it is dead and will dc it
Event::Objective { .. } => true,
Event::Stop => true,
_ => false,
};

View File

@ -29,6 +29,8 @@ use libafl_bolts::{
};
use serde::{de::DeserializeOwned, Serialize};
#[cfg(feature = "share_objectives")]
use crate::corpus::{Corpus, Testcase};
#[cfg(feature = "llmp_compression")]
use crate::events::llmp::COMPRESS_THRESHOLD;
#[cfg(feature = "std")]
@ -46,8 +48,8 @@ use crate::{
observers::TimeObserver,
stages::HasCurrentStageId,
state::{
HasExecutions, HasImported, HasLastReportTime, MaybeHasClientPerfMonitor, NopState,
Stoppable,
HasCurrentTestcase, HasExecutions, HasImported, HasLastReportTime, HasSolutions,
MaybeHasClientPerfMonitor, NopState, Stoppable,
},
Error, HasMetadata,
};
@ -346,7 +348,7 @@ where
event: Event<I>,
) -> Result<(), Error>
where
S: HasImported + Stoppable,
S: HasImported + HasSolutions<I> + HasCurrentTestcase<I> + Stoppable,
EMH: EventManagerHooksTuple<I, S>,
I: Input,
E: HasObservers,
@ -391,6 +393,20 @@ where
log::debug!("Testcase {evt_name} was discarded");
}
}
#[cfg(feature = "share_objectives")]
Event::Objective { input, .. } => {
log::debug!("Received new Objective");
let mut testcase = Testcase::from(input);
testcase.set_parent_id_optional(*state.corpus().current());
if let Ok(mut tc) = state.current_testcase_mut() {
tc.found_objective();
}
state.solutions_mut().add(testcase)?;
log::info!("Added received Objective to Corpus");
}
Event::Stop => {
state.request_stop();
}
@ -510,7 +526,7 @@ impl<E, EMH, I, S, SP, Z> EventProcessor<E, S, Z> for LlmpEventManager<EMH, I, S
where
E: HasObservers,
E::Observers: DeserializeOwned,
S: HasImported + Stoppable,
S: HasImported + HasSolutions<I> + HasCurrentTestcase<I> + Stoppable,
EMH: EventManagerHooksTuple<I, S>,
I: DeserializeOwned + Input,
SP: ShMemProvider,

View File

@ -14,11 +14,13 @@ use libafl_bolts::{
};
use serde::{de::DeserializeOwned, Serialize};
#[cfg(feature = "share_objectives")]
use crate::corpus::{Corpus, Testcase};
use crate::{
events::{Event, EventFirer},
fuzzer::EvaluatorObservers,
inputs::{Input, InputConverter, NopInput, NopInputConverter},
state::NopState,
state::{HasCurrentTestcase, HasSolutions, NopState},
Error,
};
@ -265,6 +267,7 @@ where
) -> Result<(), Error>
where
ICB: InputConverter<To = I, From = DI>,
S: HasCurrentTestcase<I> + HasSolutions<I>,
Z: EvaluatorObservers<E, EM, I, S>,
{
match event {
@ -290,6 +293,28 @@ where
}
Ok(())
}
#[cfg(feature = "share_objectives")]
Event::Objective { input, .. } => {
log::debug!("Received new Objective");
let Some(converter) = self.converter_back.as_mut() else {
return Ok(());
};
let converted_input = converter.convert(input)?;
let mut testcase = Testcase::from(converted_input);
testcase.set_parent_id_optional(*state.corpus().current());
if let Ok(mut tc) = state.current_testcase_mut() {
tc.found_objective();
}
state.solutions_mut().add(testcase)?;
log::info!("Added received Objective to Corpus");
Ok(())
}
Event::Stop => Ok(()),
_ => Err(Error::unknown(format!(
"Received illegal message that message should not have arrived: {:?}.",
@ -309,6 +334,7 @@ where
where
ICB: InputConverter<To = I, From = DI>,
DI: DeserializeOwned + Input,
S: HasCurrentTestcase<I> + HasSolutions<I>,
Z: EvaluatorObservers<E, EM, I, S>,
{
// TODO: Get around local event copy by moving handle_in_client

View File

@ -46,7 +46,10 @@ use crate::{
monitors::Monitor,
observers::TimeObserver,
stages::HasCurrentStageId,
state::{HasExecutions, HasImported, HasLastReportTime, MaybeHasClientPerfMonitor, Stoppable},
state::{
HasCurrentTestcase, HasExecutions, HasImported, HasLastReportTime, HasSolutions,
MaybeHasClientPerfMonitor, Stoppable,
},
Error,
};
@ -201,7 +204,7 @@ where
EMH: EventManagerHooksTuple<I, S>,
E: HasObservers,
E::Observers: DeserializeOwned,
S: HasImported + Stoppable + Serialize,
S: HasImported + HasCurrentTestcase<I> + HasSolutions<I> + Stoppable + Serialize,
I: DeserializeOwned + Input,
SP: ShMemProvider,
Z: ExecutionProcessor<LlmpEventManager<EMH, I, S, SP>, I, E::Observers, S>

View File

@ -301,6 +301,9 @@ pub enum Event<I> {
},
/// A new objective was found
Objective {
/// Input of newly found Objective
#[cfg(feature = "share_objectives")]
input: I,
/// Objective corpus size
objective_size: usize,
/// The time when this event was created

View File

@ -39,6 +39,8 @@ use tokio::{
use typed_builder::TypedBuilder;
use super::{std_maybe_report_progress, std_report_progress, ManagerExit};
#[cfg(feature = "share_objectives")]
use crate::corpus::{Corpus, Testcase};
#[cfg(all(unix, not(miri)))]
use crate::events::EVENTMGR_SIGHANDLER_STATE;
use crate::{
@ -52,7 +54,10 @@ use crate::{
monitors::Monitor,
observers::ObserversTuple,
stages::HasCurrentStageId,
state::{HasExecutions, HasImported, HasLastReportTime, MaybeHasClientPerfMonitor, Stoppable},
state::{
HasCurrentTestcase, HasExecutions, HasImported, HasLastReportTime, HasSolutions,
MaybeHasClientPerfMonitor, Stoppable,
},
Error, HasMetadata,
};
@ -560,7 +565,12 @@ impl<EMH, I, S> Drop for TcpEventManager<EMH, I, S> {
impl<EMH, I, S> TcpEventManager<EMH, I, S>
where
EMH: EventManagerHooksTuple<I, S>,
S: HasExecutions + HasMetadata + HasImported + Stoppable,
S: HasExecutions
+ HasMetadata
+ HasImported
+ HasSolutions<I>
+ HasCurrentTestcase<I>
+ Stoppable,
{
/// Write the client id for a client `EventManager` to env vars
pub fn to_env(&self, env_name: &str) {
@ -610,6 +620,20 @@ where
log::info!("Added received Testcase as item #{item}");
}
}
#[cfg(feature = "share_objectives")]
Event::Objective { input, .. } => {
log::debug!("Received new Objective");
let mut testcase = Testcase::from(input);
testcase.set_parent_id_optional(*state.corpus().current());
if let Ok(mut tc) = state.current_testcase_mut() {
tc.found_objective();
}
state.solutions_mut().add(testcase)?;
log::info!("Added received Objective to Corpus");
}
Event::Stop => {
state.request_stop();
}
@ -684,7 +708,12 @@ where
E::Observers: Serialize + ObserversTuple<I, S>,
for<'a> E::Observers: Deserialize<'a>,
EMH: EventManagerHooksTuple<I, S>,
S: HasExecutions + HasMetadata + HasImported + Stoppable,
S: HasExecutions
+ HasMetadata
+ HasImported
+ HasSolutions<I>
+ HasCurrentTestcase<I>
+ Stoppable,
I: DeserializeOwned,
Z: ExecutionProcessor<Self, I, E::Observers, S> + EvaluatorObservers<E, Self, I, S>,
{
@ -891,7 +920,12 @@ where
E::Observers: ObserversTuple<I, S> + Serialize,
EMH: EventManagerHooksTuple<I, S>,
I: DeserializeOwned,
S: HasExecutions + HasMetadata + HasImported + Stoppable,
S: HasExecutions
+ HasMetadata
+ HasImported
+ HasSolutions<I>
+ HasCurrentTestcase<I>
+ Stoppable,
SP: ShMemProvider,
Z: ExecutionProcessor<TcpEventManager<EMH, I, S>, I, E::Observers, S>
+ EvaluatorObservers<E, TcpEventManager<EMH, I, S>, I, S>,
@ -990,7 +1024,13 @@ pub fn setup_restarting_mgr_tcp<I, MT, S>(
>
where
MT: Monitor + Clone,
S: HasExecutions + HasMetadata + HasImported + DeserializeOwned + Stoppable,
S: HasExecutions
+ HasMetadata
+ HasImported
+ HasSolutions<I>
+ HasCurrentTestcase<I>
+ DeserializeOwned
+ Stoppable,
I: Input,
{
TcpRestartingMgr::builder()
@ -1056,7 +1096,13 @@ where
I: Input,
MT: Monitor + Clone,
SP: ShMemProvider,
S: HasExecutions + HasMetadata + HasImported + DeserializeOwned + Stoppable,
S: HasExecutions
+ HasMetadata
+ HasImported
+ HasSolutions<I>
+ HasCurrentTestcase<I>
+ DeserializeOwned
+ Stoppable,
{
/// Launch the restarting manager
pub fn launch(

View File

@ -415,6 +415,9 @@ pub fn run_observers_and_save_state<E, EM, I, OF, S, Z>(
.fire(
state,
Event::Objective {
#[cfg(feature = "share_objectives")]
input: input.clone(),
objective_size: state.solutions().count(),
time: libafl_bolts::current_time(),
},

View File

@ -475,6 +475,9 @@ where
manager.fire(
state,
Event::Objective {
#[cfg(feature = "share_objectives")]
input,
objective_size: state.solutions().count(),
time: current_time(),
},
@ -667,6 +670,9 @@ where
manager.fire(
state,
Event::Objective {
#[cfg(feature = "share_objectives")]
input,
objective_size: state.solutions().count(),
time: current_time(),
},

View File

@ -17,7 +17,10 @@ use crate::{
fuzzer::{Evaluator, EvaluatorObservers, ExecutionProcessor},
inputs::{Input, InputConverter},
stages::{RetryCountRestartHelper, Stage},
state::{HasCorpus, HasExecutions, HasRand, MaybeHasClientPerfMonitor, Stoppable},
state::{
HasCorpus, HasCurrentTestcase, HasExecutions, HasRand, HasSolutions,
MaybeHasClientPerfMonitor, Stoppable,
},
Error, HasMetadata, HasNamedMetadata,
};
@ -232,7 +235,13 @@ where
I: Input + Clone,
IC: InputConverter<From = I, To = DI>,
ICB: InputConverter<From = DI, To = I>,
S: HasExecutions + HasCorpus<I> + HasRand + HasMetadata + Stoppable + MaybeHasClientPerfMonitor,
S: HasExecutions
+ HasRand
+ HasMetadata
+ HasSolutions<I>
+ HasCurrentTestcase<I>
+ Stoppable
+ MaybeHasClientPerfMonitor,
SP: ShMemProvider,
Z: EvaluatorObservers<E, EM, I, S> + ExecutionProcessor<EM, I, E::Observers, S>,
{