Event Manager handle_in_client hooks (#1916)

* event manager hooks

* tcp

* fix

* FMT

* fix

* fix
This commit is contained in:
Dongjia "toka" Zhang 2024-03-12 14:39:16 +01:00 committed by GitHub
parent fe1c7a34b7
commit 5f67b9fbc4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 614 additions and 296 deletions

View File

@ -93,13 +93,13 @@ unsafe fn fuzz(
let shmem_provider = StdShMemProvider::new()?; let shmem_provider = StdShMemProvider::new()?;
let mut run_client = |state: Option<_>, mgr: LlmpRestartingEventManager<_, _>, core_id| { let mut run_client = |state: Option<_>, mgr: LlmpRestartingEventManager<_, _, _>, core_id| {
// The restarting state will spawn the same process again as child, then restarted it each time it crashes. // The restarting state will spawn the same process again as child, then restarted it each time it crashes.
// println!("{:?}", mgr.mgr_id()); // println!("{:?}", mgr.mgr_id());
if options.asan && options.asan_cores.contains(core_id) { if options.asan && options.asan_cores.contains(core_id) {
(|state: Option<_>, mut mgr: LlmpRestartingEventManager<_, _>, _core_id| { (|state: Option<_>, mut mgr: LlmpRestartingEventManager<_, _, _>, _core_id| {
let gum = Gum::obtain(); let gum = Gum::obtain();
let coverage = CoverageRuntime::new(); let coverage = CoverageRuntime::new();
@ -221,7 +221,7 @@ unsafe fn fuzz(
Ok(()) Ok(())
})(state, mgr, core_id) })(state, mgr, core_id)
} else if options.cmplog && options.cmplog_cores.contains(core_id) { } else if options.cmplog && options.cmplog_cores.contains(core_id) {
(|state: Option<_>, mut mgr: LlmpRestartingEventManager<_, _>, _core_id| { (|state: Option<_>, mut mgr: LlmpRestartingEventManager<_, _, _>, _core_id| {
let gum = Gum::obtain(); let gum = Gum::obtain();
let coverage = CoverageRuntime::new(); let coverage = CoverageRuntime::new();
@ -352,7 +352,7 @@ unsafe fn fuzz(
Ok(()) Ok(())
})(state, mgr, core_id) })(state, mgr, core_id)
} else { } else {
(|state: Option<_>, mut mgr: LlmpRestartingEventManager<_, _>, _core_id| { (|state: Option<_>, mut mgr: LlmpRestartingEventManager<_, _, _>, _core_id| {
let gum = Gum::obtain(); let gum = Gum::obtain();
let coverage = CoverageRuntime::new(); let coverage = CoverageRuntime::new();

View File

@ -76,7 +76,7 @@ unsafe fn fuzz(options: &FuzzerOptions) -> Result<(), Error> {
let shmem_provider = StdShMemProvider::new()?; let shmem_provider = StdShMemProvider::new()?;
let mut run_client = |state: Option<_>, mgr: LlmpRestartingEventManager<_, _>, core_id| { let mut run_client = |state: Option<_>, mgr: LlmpRestartingEventManager<_, _, _>, core_id| {
// The restarting state will spawn the same process again as child, then restarted it each time it crashes. // The restarting state will spawn the same process again as child, then restarted it each time it crashes.
// println!("{:?}", mgr.mgr_id()); // println!("{:?}", mgr.mgr_id());
@ -94,7 +94,7 @@ unsafe fn fuzz(options: &FuzzerOptions) -> Result<(), Error> {
}; };
if options.asan && options.asan_cores.contains(core_id) { if options.asan && options.asan_cores.contains(core_id) {
(|state: Option<_>, mut mgr: LlmpRestartingEventManager<_, _>, _core_id| { (|state: Option<_>, mut mgr: LlmpRestartingEventManager<_, _, _>, _core_id| {
let gum = Gum::obtain(); let gum = Gum::obtain();
let coverage = CoverageRuntime::new(); let coverage = CoverageRuntime::new();
@ -215,7 +215,7 @@ unsafe fn fuzz(options: &FuzzerOptions) -> Result<(), Error> {
Ok(()) Ok(())
})(state, mgr, core_id) })(state, mgr, core_id)
} else if options.cmplog && options.cmplog_cores.contains(core_id) { } else if options.cmplog && options.cmplog_cores.contains(core_id) {
(|state: Option<_>, mut mgr: LlmpRestartingEventManager<_, _>, _core_id| { (|state: Option<_>, mut mgr: LlmpRestartingEventManager<_, _, _>, _core_id| {
let gum = Gum::obtain(); let gum = Gum::obtain();
let coverage = CoverageRuntime::new(); let coverage = CoverageRuntime::new();
@ -347,7 +347,7 @@ unsafe fn fuzz(options: &FuzzerOptions) -> Result<(), Error> {
Ok(()) Ok(())
})(state, mgr, core_id) })(state, mgr, core_id)
} else { } else {
(|state: Option<_>, mut mgr: LlmpRestartingEventManager<_, _>, _core_id| { (|state: Option<_>, mut mgr: LlmpRestartingEventManager<_, _, _>, _core_id| {
let gum = Gum::obtain(); let gum = Gum::obtain();
let coverage = CoverageRuntime::new(); let coverage = CoverageRuntime::new();

View File

@ -71,7 +71,7 @@ unsafe fn fuzz(options: &FuzzerOptions) -> Result<(), Error> {
let shmem_provider = StdShMemProvider::new()?; let shmem_provider = StdShMemProvider::new()?;
let mut run_client = |state: Option<_>, mgr: LlmpRestartingEventManager<_, _>, core_id| { let mut run_client = |state: Option<_>, mgr: LlmpRestartingEventManager<_, _, _>, core_id| {
// The restarting state will spawn the same process again as child, then restarted it each time it crashes. // The restarting state will spawn the same process again as child, then restarted it each time it crashes.
// println!("{:?}", mgr.mgr_id()); // println!("{:?}", mgr.mgr_id());
@ -89,7 +89,7 @@ unsafe fn fuzz(options: &FuzzerOptions) -> Result<(), Error> {
}; };
if options.asan && options.asan_cores.contains(core_id) { if options.asan && options.asan_cores.contains(core_id) {
(|state: Option<_>, mut mgr: LlmpRestartingEventManager<_, _>, _core_id| { (|state: Option<_>, mut mgr: LlmpRestartingEventManager<_, _, _>, _core_id| {
let gum = Gum::obtain(); let gum = Gum::obtain();
let coverage = CoverageRuntime::new(); let coverage = CoverageRuntime::new();
@ -211,7 +211,7 @@ unsafe fn fuzz(options: &FuzzerOptions) -> Result<(), Error> {
Ok(()) Ok(())
})(state, mgr, core_id) })(state, mgr, core_id)
} else if options.cmplog && options.cmplog_cores.contains(core_id) { } else if options.cmplog && options.cmplog_cores.contains(core_id) {
(|state: Option<_>, mut mgr: LlmpRestartingEventManager<_, _>, _core_id| { (|state: Option<_>, mut mgr: LlmpRestartingEventManager<_, _, _>, _core_id| {
let gum = Gum::obtain(); let gum = Gum::obtain();
let coverage = CoverageRuntime::new(); let coverage = CoverageRuntime::new();
@ -343,7 +343,7 @@ unsafe fn fuzz(options: &FuzzerOptions) -> Result<(), Error> {
Ok(()) Ok(())
})(state, mgr, core_id) })(state, mgr, core_id)
} else { } else {
(|state: Option<_>, mut mgr: LlmpRestartingEventManager<_, _>, _core_id| { (|state: Option<_>, mut mgr: LlmpRestartingEventManager<_, _, _>, _core_id| {
let gum = Gum::obtain(); let gum = Gum::obtain();
let coverage = CoverageRuntime::new(); let coverage = CoverageRuntime::new();

View File

@ -151,7 +151,7 @@ pub extern "C" fn libafl_main() {
); );
let mut run_client = |state: Option<_>, let mut run_client = |state: Option<_>,
mut restarting_mgr: LlmpRestartingEventManager<_, _>, mut restarting_mgr: LlmpRestartingEventManager<_, _, _>,
core_id| { core_id| {
// Create an observation channel using the coverage map // Create an observation channel using the coverage map
let edges_observer = HitcountsMapObserver::new(unsafe { std_edges_map_observer("edges") }); let edges_observer = HitcountsMapObserver::new(unsafe { std_edges_map_observer("edges") });

View File

@ -180,7 +180,8 @@ pub fn fuzz() {
ExitKind::Ok ExitKind::Ok
}; };
let mut run_client = |state: Option<_>, mut mgr: LlmpRestartingEventManager<_, _>, core_id| { let mut run_client =
|state: Option<_>, mut mgr: LlmpRestartingEventManager<_, _, _>, core_id| {
let core_idx = options let core_idx = options
.cores .cores
.position(core_id) .position(core_id)

View File

@ -54,7 +54,7 @@ pub type ClientState =
pub type ClientMgr<M> = SimpleEventManager<M, ClientState>; pub type ClientMgr<M> = SimpleEventManager<M, ClientState>;
#[cfg(not(feature = "simplemgr"))] #[cfg(not(feature = "simplemgr"))]
pub type ClientMgr<M> = pub type ClientMgr<M> =
MonitorTypedEventManager<LlmpRestartingEventManager<ClientState, StdShMemProvider>, M>; MonitorTypedEventManager<LlmpRestartingEventManager<(), ClientState, StdShMemProvider>, M>;
#[derive(TypedBuilder)] #[derive(TypedBuilder)]
pub struct Instance<'a, M: Monitor> { pub struct Instance<'a, M: Monitor> {

View File

@ -1,4 +1,11 @@
//! A wrapper manager to implement a main-secondary architecture with point-to-point channels //! Centralized event manager is a special event manager that will be used to achieve a more efficient message passing architecture.
// Some technical details..
// A very standard multi-process fuzzing using centralized event manager will consist of 4 components
// 1. The "fuzzer clients", the fuzzer that will do the "normal" fuzzing
// 2. The "centralized broker, the broker that gathers all the testcases from all the fuzzer clients
// 3. The "main evaluator", the evaluator node that will evaluate all the testcases pass by the centralized event manager to see if the testcases are worth propagating
// 4. The "main broker", the gathers the stats from the fuzzer clients and broadcast the newly found testcases from the main evaluator.
use alloc::{boxed::Box, string::String, vec::Vec}; use alloc::{boxed::Box, string::String, vec::Vec};
use core::{marker::PhantomData, num::NonZeroUsize, time::Duration}; use core::{marker::PhantomData, num::NonZeroUsize, time::Duration};
@ -28,7 +35,6 @@ use crate::{
EventManagerId, EventProcessor, EventRestarter, HasEventManagerId, LogSeverity, EventManagerId, EventProcessor, EventRestarter, HasEventManagerId, LogSeverity,
}, },
executors::{Executor, HasObservers}, executors::{Executor, HasObservers},
feedbacks::transferred::TransferringMetadata,
fuzzer::{EvaluatorObservers, ExecutionProcessor}, fuzzer::{EvaluatorObservers, ExecutionProcessor},
inputs::{Input, UsesInput}, inputs::{Input, UsesInput},
observers::ObserversTuple, observers::ObserversTuple,
@ -665,9 +671,6 @@ where
} => { } => {
log::info!("Received new Testcase from {client_id:?} ({client_config:?}, forward {forward_id:?})"); log::info!("Received new Testcase from {client_id:?} ({client_config:?}, forward {forward_id:?})");
if let Ok(meta) = state.metadata_mut::<TransferringMetadata>() {
meta.set_transferring(true);
}
let res = let res =
if client_config.match_with(&self.configuration()) && observers_buf.is_some() { if client_config.match_with(&self.configuration()) && observers_buf.is_some() {
let observers: E::Observers = let observers: E::Observers =
@ -697,9 +700,6 @@ where
false, false,
)? )?
}; };
if let Ok(meta) = state.metadata_mut::<TransferringMetadata>() {
meta.set_transferring(false);
}
if let Some(item) = res.1 { if let Some(item) = res.1 {
if res.1.is_some() { if res.1.is_some() {

View File

@ -0,0 +1,118 @@
//! Hooks for event managers, especifically these are used to hook before and and `handle_in_client`.
//! This will allow user to define pre/post-processing code when the event manager receives any message from
//! other clients
use libafl_bolts::ClientId;
use crate::{events::Event, state::State, Error};
/// The hooks that are run before and after the event manager calls `handle_in_client`
pub trait EventManagerHook<S>
where
S: State,
{
/// The hook that runs before `handle_in_client`
/// Return false if you want to cancel the subsequent event handling
fn pre_exec<E, Z>(
&mut self,
fuzzer: &mut Z,
executor: &mut E,
state: &mut S,
client_id: ClientId,
event: &Event<S::Input>,
) -> Result<bool, Error>;
/// The hook that runs after `handle_in_client`
/// Return false if you want to cancel the subsequent event handling
fn post_exec<E, Z>(
&mut self,
fuzzer: &mut Z,
executor: &mut E,
state: &mut S,
client_id: ClientId,
) -> Result<bool, Error>;
}
/// The tuples contains hooks to be executed for `handle_in_client`
pub trait EventManagerHooksTuple<S>
where
S: State,
{
/// The hook that runs before `handle_in_client`
fn pre_exec_all<E, Z>(
&mut self,
fuzzer: &mut Z,
executor: &mut E,
state: &mut S,
client_id: ClientId,
event: &Event<S::Input>,
) -> Result<bool, Error>;
/// The hook that runs after `handle_in_client`
fn post_exec_all<E, Z>(
&mut self,
fuzzer: &mut Z,
executor: &mut E,
state: &mut S,
client_id: ClientId,
) -> Result<bool, Error>;
}
impl<S> EventManagerHooksTuple<S> for ()
where
S: State,
{
/// The hook that runs before `handle_in_client`
fn pre_exec_all<E, Z>(
&mut self,
_fuzzer: &mut Z,
_executor: &mut E,
_state: &mut S,
_client_id: ClientId,
_event: &Event<S::Input>,
) -> Result<bool, Error> {
Ok(true)
}
/// The hook that runs after `handle_in_client`
fn post_exec_all<E, Z>(
&mut self,
_fuzzer: &mut Z,
_executor: &mut E,
_state: &mut S,
_client_id: ClientId,
) -> Result<bool, Error> {
Ok(true)
}
}
impl<Head, Tail, S> EventManagerHooksTuple<S> for (Head, Tail)
where
Head: EventManagerHook<S>,
Tail: EventManagerHooksTuple<S>,
S: State,
{
/// The hook that runs before `handle_in_client`
fn pre_exec_all<E, Z>(
&mut self,
fuzzer: &mut Z,
executor: &mut E,
state: &mut S,
client_id: ClientId,
event: &Event<S::Input>,
) -> Result<bool, Error> {
let first = self.0.pre_exec(fuzzer, executor, state, client_id, event)?;
let second = self
.1
.pre_exec_all(fuzzer, executor, state, client_id, event)?;
Ok(first & second)
}
/// The hook that runs after `handle_in_client`
fn post_exec_all<E, Z>(
&mut self,
fuzzer: &mut Z,
executor: &mut E,
state: &mut S,
client_id: ClientId,
) -> Result<bool, Error> {
let first = self.0.post_exec(fuzzer, executor, state, client_id)?;
let second = self.1.post_exec_all(fuzzer, executor, state, client_id)?;
Ok(first & second)
}
}

View File

@ -38,10 +38,12 @@ use libafl_bolts::{
core_affinity::{CoreId, Cores}, core_affinity::{CoreId, Cores},
llmp::DEFAULT_CLIENT_TIMEOUT_SECS, llmp::DEFAULT_CLIENT_TIMEOUT_SECS,
shmem::ShMemProvider, shmem::ShMemProvider,
tuples::tuple_list,
}; };
#[cfg(feature = "std")] #[cfg(feature = "std")]
use typed_builder::TypedBuilder; use typed_builder::TypedBuilder;
use super::hooks::EventManagerHooksTuple;
#[cfg(all(unix, feature = "std", feature = "fork"))] #[cfg(all(unix, feature = "std", feature = "fork"))]
use crate::events::{CentralizedEventManager, CentralizedLlmpEventBroker}; use crate::events::{CentralizedEventManager, CentralizedLlmpEventBroker};
#[cfg(feature = "std")] #[cfg(feature = "std")]
@ -72,9 +74,10 @@ const LIBAFL_DEBUG_OUTPUT: &str = "LIBAFL_DEBUG_OUTPUT";
clippy::ignored_unit_patterns clippy::ignored_unit_patterns
)] )]
#[derive(TypedBuilder)] #[derive(TypedBuilder)]
pub struct Launcher<'a, CF, MT, S, SP> pub struct Launcher<'a, CF, EMH, MT, S, SP>
where where
CF: FnOnce(Option<S>, LlmpRestartingEventManager<S, SP>, CoreId) -> Result<(), Error>, CF: FnOnce(Option<S>, LlmpRestartingEventManager<EMH, S, SP>, CoreId) -> Result<(), Error>,
EMH: EventManagerHooksTuple<S>,
S::Input: 'a, S::Input: 'a,
MT: Monitor, MT: Monitor,
SP: ShMemProvider + 'static, SP: ShMemProvider + 'static,
@ -126,12 +129,13 @@ where
#[builder(default = true)] #[builder(default = true)]
serialize_state: bool, serialize_state: bool,
#[builder(setter(skip), default = PhantomData)] #[builder(setter(skip), default = PhantomData)]
phantom_data: PhantomData<(&'a S, &'a SP)>, phantom_data: PhantomData<(&'a S, &'a SP, EMH)>,
} }
impl<CF, MT, S, SP> Debug for Launcher<'_, CF, MT, S, SP> impl<CF, EMH, MT, S, SP> Debug for Launcher<'_, CF, EMH, MT, S, SP>
where where
CF: FnOnce(Option<S>, LlmpRestartingEventManager<S, SP>, CoreId) -> Result<(), Error>, CF: FnOnce(Option<S>, LlmpRestartingEventManager<EMH, S, SP>, CoreId) -> Result<(), Error>,
EMH: EventManagerHooksTuple<S>,
MT: Monitor + Clone, MT: Monitor + Clone,
SP: ShMemProvider + 'static, SP: ShMemProvider + 'static,
S: State, S: State,
@ -149,18 +153,41 @@ where
} }
} }
#[cfg(feature = "std")] impl<'a, CF, MT, S, SP> Launcher<'a, CF, (), MT, S, SP>
impl<'a, CF, MT, S, SP> Launcher<'a, CF, MT, S, SP>
where where
CF: FnOnce(Option<S>, LlmpRestartingEventManager<S, SP>, CoreId) -> Result<(), Error>, CF: FnOnce(Option<S>, LlmpRestartingEventManager<(), S, SP>, CoreId) -> Result<(), Error>,
MT: Monitor + Clone, MT: Monitor + Clone,
S: State + HasExecutions, S: State + HasExecutions,
SP: ShMemProvider + 'static, SP: ShMemProvider + 'static,
{ {
/// Launch the broker and the clients and fuzz /// Launch the broker and the clients and fuzz
#[cfg(all(unix, feature = "std", feature = "fork"))] #[cfg(all(unix, feature = "std", feature = "fork"))]
#[allow(clippy::similar_names)]
pub fn launch(&mut self) -> Result<(), Error> { pub fn launch(&mut self) -> Result<(), Error> {
Self::launch_with_hooks(self, tuple_list!())
}
/// Launch the broker and the clients and fuzz
#[cfg(all(feature = "std", any(windows, not(feature = "fork"))))]
#[allow(unused_mut, clippy::match_wild_err_arm)]
pub fn launch(&mut self) -> Result<(), Error> {
Self::launch_with_hooks(self, tuple_list!())
}
}
#[cfg(feature = "std")]
impl<'a, CF, EMH, MT, S, SP> Launcher<'a, CF, EMH, MT, S, SP>
where
CF: FnOnce(Option<S>, LlmpRestartingEventManager<EMH, S, SP>, CoreId) -> Result<(), Error>,
EMH: EventManagerHooksTuple<S> + Clone + Copy,
MT: Monitor + Clone,
S: State + HasExecutions,
SP: ShMemProvider + 'static,
{
/// Launch the broker and the clients and fuzz with a user-supplied hook
#[cfg(all(unix, feature = "std", feature = "fork"))]
#[allow(clippy::similar_names)]
#[allow(clippy::too_many_lines)]
pub fn launch_with_hooks(&mut self, hooks: EMH) -> Result<(), Error> {
if self.cores.ids.is_empty() { if self.cores.ids.is_empty() {
return Err(Error::illegal_argument( return Err(Error::illegal_argument(
"No cores to spawn on given, cannot launch anything.", "No cores to spawn on given, cannot launch anything.",
@ -226,7 +253,7 @@ where
} }
// Fuzzer client. keeps retrying the connection to broker till the broker starts // Fuzzer client. keeps retrying the connection to broker till the broker starts
let (state, mgr) = RestartingMgr::<MT, S, SP>::builder() let (state, mgr) = RestartingMgr::<EMH, MT, S, SP>::builder()
.shmem_provider(self.shmem_provider.clone()) .shmem_provider(self.shmem_provider.clone())
.broker_port(self.broker_port) .broker_port(self.broker_port)
.kind(ManagerKind::Client { .kind(ManagerKind::Client {
@ -235,6 +262,7 @@ where
.configuration(self.configuration) .configuration(self.configuration)
.serialize_state(self.serialize_state) .serialize_state(self.serialize_state)
.client_timeout(self.client_timeout) .client_timeout(self.client_timeout)
.hooks(hooks)
.build() .build()
.launch()?; .launch()?;
@ -249,7 +277,7 @@ where
log::info!("I am broker!!."); log::info!("I am broker!!.");
// TODO we don't want always a broker here, think about using different laucher process to spawn different configurations // TODO we don't want always a broker here, think about using different laucher process to spawn different configurations
RestartingMgr::<MT, S, SP>::builder() RestartingMgr::<EMH, MT, S, SP>::builder()
.shmem_provider(self.shmem_provider.clone()) .shmem_provider(self.shmem_provider.clone())
.monitor(Some(self.monitor.clone())) .monitor(Some(self.monitor.clone()))
.broker_port(self.broker_port) .broker_port(self.broker_port)
@ -259,6 +287,7 @@ where
.configuration(self.configuration) .configuration(self.configuration)
.serialize_state(self.serialize_state) .serialize_state(self.serialize_state)
.client_timeout(self.client_timeout) .client_timeout(self.client_timeout)
.hooks(hooks)
.build() .build()
.launch()?; .launch()?;
@ -289,7 +318,7 @@ where
/// Launch the broker and the clients and fuzz /// Launch the broker and the clients and fuzz
#[cfg(all(feature = "std", any(windows, not(feature = "fork"))))] #[cfg(all(feature = "std", any(windows, not(feature = "fork"))))]
#[allow(unused_mut, clippy::match_wild_err_arm)] #[allow(unused_mut, clippy::match_wild_err_arm)]
pub fn launch(&mut self) -> Result<(), Error> { pub fn launch_with_hooks(&mut self, hooks: EMH) -> Result<(), Error> {
use libafl_bolts::core_affinity; use libafl_bolts::core_affinity;
let is_client = std::env::var(_AFL_LAUNCHER_CLIENT); let is_client = std::env::var(_AFL_LAUNCHER_CLIENT);
@ -302,7 +331,7 @@ where
// let debug_output = std::env::var(LIBAFL_DEBUG_OUTPUT).is_ok(); // let debug_output = std::env::var(LIBAFL_DEBUG_OUTPUT).is_ok();
// the actual client. do the fuzzing // the actual client. do the fuzzing
let (state, mgr) = RestartingMgr::<MT, S, SP>::builder() let (state, mgr) = RestartingMgr::<EMH, MT, S, SP>::builder()
.shmem_provider(self.shmem_provider.clone()) .shmem_provider(self.shmem_provider.clone())
.broker_port(self.broker_port) .broker_port(self.broker_port)
.kind(ManagerKind::Client { .kind(ManagerKind::Client {
@ -311,6 +340,7 @@ where
.configuration(self.configuration) .configuration(self.configuration)
.serialize_state(self.serialize_state) .serialize_state(self.serialize_state)
.client_timeout(self.client_timeout) .client_timeout(self.client_timeout)
.hooks(hooks)
.build() .build()
.launch()?; .launch()?;
@ -371,7 +401,7 @@ where
#[cfg(feature = "std")] #[cfg(feature = "std")]
log::info!("I am broker!!."); log::info!("I am broker!!.");
RestartingMgr::<MT, S, SP>::builder() RestartingMgr::<EMH, MT, S, SP>::builder()
.shmem_provider(self.shmem_provider.clone()) .shmem_provider(self.shmem_provider.clone())
.monitor(Some(self.monitor.clone())) .monitor(Some(self.monitor.clone()))
.broker_port(self.broker_port) .broker_port(self.broker_port)
@ -381,6 +411,7 @@ where
.configuration(self.configuration) .configuration(self.configuration)
.serialize_state(self.serialize_state) .serialize_state(self.serialize_state)
.client_timeout(self.client_timeout) .client_timeout(self.client_timeout)
.hooks(hooks)
.build() .build()
.launch()?; .launch()?;
@ -410,7 +441,7 @@ pub struct CentralizedLauncher<'a, CF, MT, S, SP>
where where
CF: FnOnce( CF: FnOnce(
Option<S>, Option<S>,
CentralizedEventManager<LlmpRestartingEventManager<S, SP>, SP>, CentralizedEventManager<LlmpRestartingEventManager<(), S, SP>, SP>, // No hooks for centralized EM
CoreId, CoreId,
) -> Result<(), Error>, ) -> Result<(), Error>,
S::Input: 'a, S::Input: 'a,
@ -476,7 +507,7 @@ impl<CF, MT, S, SP> Debug for CentralizedLauncher<'_, CF, MT, S, SP>
where where
CF: FnOnce( CF: FnOnce(
Option<S>, Option<S>,
CentralizedEventManager<LlmpRestartingEventManager<S, SP>, SP>, CentralizedEventManager<LlmpRestartingEventManager<(), S, SP>, SP>,
CoreId, CoreId,
) -> Result<(), Error>, ) -> Result<(), Error>,
MT: Monitor + Clone, MT: Monitor + Clone,
@ -501,7 +532,7 @@ impl<'a, CF, MT, S, SP> CentralizedLauncher<'a, CF, MT, S, SP>
where where
CF: FnOnce( CF: FnOnce(
Option<S>, Option<S>,
CentralizedEventManager<LlmpRestartingEventManager<S, SP>, SP>, CentralizedEventManager<LlmpRestartingEventManager<(), S, SP>, SP>,
CoreId, CoreId,
) -> Result<(), Error>, ) -> Result<(), Error>,
MT: Monitor + Clone, MT: Monitor + Clone,
@ -597,7 +628,7 @@ where
} }
// Fuzzer client. keeps retrying the connection to broker till the broker starts // Fuzzer client. keeps retrying the connection to broker till the broker starts
let (state, mgr) = RestartingMgr::<MT, S, SP>::builder() let (state, mgr) = RestartingMgr::<(), MT, S, SP>::builder()
.shmem_provider(self.shmem_provider.clone()) .shmem_provider(self.shmem_provider.clone())
.broker_port(self.broker_port) .broker_port(self.broker_port)
.kind(ManagerKind::Client { .kind(ManagerKind::Client {
@ -606,6 +637,7 @@ where
.configuration(self.configuration) .configuration(self.configuration)
.serialize_state(self.serialize_state) .serialize_state(self.serialize_state)
.client_timeout(self.client_timeout) .client_timeout(self.client_timeout)
.hooks(tuple_list!())
.build() .build()
.launch()?; .launch()?;
@ -626,7 +658,7 @@ where
log::info!("I am broker!!."); log::info!("I am broker!!.");
// TODO we don't want always a broker here, think about using different laucher process to spawn different configurations // TODO we don't want always a broker here, think about using different laucher process to spawn different configurations
RestartingMgr::<MT, S, SP>::builder() RestartingMgr::<(), MT, S, SP>::builder()
.shmem_provider(self.shmem_provider.clone()) .shmem_provider(self.shmem_provider.clone())
.monitor(Some(self.monitor.clone())) .monitor(Some(self.monitor.clone()))
.broker_port(self.broker_port) .broker_port(self.broker_port)
@ -636,6 +668,7 @@ where
.configuration(self.configuration) .configuration(self.configuration)
.serialize_state(self.serialize_state) .serialize_state(self.serialize_state)
.client_timeout(self.client_timeout) .client_timeout(self.client_timeout)
.hooks(tuple_list!())
.build() .build()
.launch()?; .launch()?;

View File

@ -31,15 +31,14 @@ use libafl_bolts::{llmp::LlmpConnection, shmem::StdShMemProvider, staterestore::
use libafl_bolts::{ use libafl_bolts::{
llmp::{self, LlmpClient, LlmpClientDescription, Tag}, llmp::{self, LlmpClient, LlmpClientDescription, Tag},
shmem::ShMemProvider, shmem::ShMemProvider,
tuples::tuple_list,
ClientId, ClientId,
}; };
#[cfg(feature = "std")]
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
#[cfg(feature = "std")] #[cfg(feature = "std")]
use typed_builder::TypedBuilder; use typed_builder::TypedBuilder;
use super::{CustomBufEventResult, CustomBufHandlerFn}; use super::{hooks::EventManagerHooksTuple, CustomBufEventResult, CustomBufHandlerFn};
#[cfg(all(unix, feature = "std"))] #[cfg(all(unix, feature = "std"))]
use crate::events::EVENTMGR_SIGHANDLER_STATE; use crate::events::EVENTMGR_SIGHANDLER_STATE;
use crate::{ use crate::{
@ -48,7 +47,6 @@ use crate::{
EventProcessor, EventRestarter, HasCustomBufHandlers, HasEventManagerId, ProgressReporter, EventProcessor, EventRestarter, HasCustomBufHandlers, HasEventManagerId, ProgressReporter,
}, },
executors::{Executor, HasObservers}, executors::{Executor, HasObservers},
feedbacks::transferred::TransferringMetadata,
fuzzer::{EvaluatorObservers, ExecutionProcessor}, fuzzer::{EvaluatorObservers, ExecutionProcessor},
inputs::{Input, InputConverter, UsesInput}, inputs::{Input, InputConverter, UsesInput},
monitors::Monitor, monitors::Monitor,
@ -356,11 +354,12 @@ pub trait EventStatsCollector {}
/// An [`EventManager`] that forwards all events to other attached fuzzers on shared maps or via tcp, /// An [`EventManager`] that forwards all events to other attached fuzzers on shared maps or via tcp,
/// using low-level message passing, [`libafl_bolts::llmp`]. /// using low-level message passing, [`libafl_bolts::llmp`].
pub struct LlmpEventManager<S, SP> pub struct LlmpEventManager<EMH, S, SP>
where where
S: State, S: State,
SP: ShMemProvider + 'static, SP: ShMemProvider + 'static,
{ {
hooks: EMH,
/// The LLMP client for inter process communication /// The LLMP client for inter process communication
llmp: LlmpClient<SP>, llmp: LlmpClient<SP>,
/// The custom buf handler /// The custom buf handler
@ -383,7 +382,7 @@ where
} }
#[cfg(feature = "adaptive_serialization")] #[cfg(feature = "adaptive_serialization")]
impl<S, SP> EventStatsCollector for LlmpEventManager<S, SP> impl<EMH, S, SP> EventStatsCollector for LlmpEventManager<EMH, S, SP>
where where
SP: ShMemProvider + 'static, SP: ShMemProvider + 'static,
S: State, S: State,
@ -415,7 +414,7 @@ where
} }
} }
impl<S, SP> core::fmt::Debug for LlmpEventManager<S, SP> impl<EMH, S, SP> core::fmt::Debug for LlmpEventManager<EMH, S, SP>
where where
SP: ShMemProvider + 'static, SP: ShMemProvider + 'static,
S: State, S: State,
@ -433,7 +432,7 @@ where
} }
} }
impl<S, SP> Drop for LlmpEventManager<S, SP> impl<EMH, S, SP> Drop for LlmpEventManager<EMH, S, SP>
where where
SP: ShMemProvider + 'static, SP: ShMemProvider + 'static,
S: State, S: State,
@ -444,14 +443,15 @@ where
} }
} }
impl<S, SP> LlmpEventManager<S, SP> impl<S, SP> LlmpEventManager<(), S, SP>
where where
S: State, S: State,
SP: ShMemProvider + 'static, SP: ShMemProvider + 'static,
{ {
/// Create a manager from a raw LLMP client /// Create a manager from a raw LLMP client
pub fn new(llmp: LlmpClient<SP>, configuration: EventConfig) -> Result<Self, Error> { pub fn new(llmp: LlmpClient<SP>, configuration: EventConfig) -> Result<Self, Error> {
Ok(Self { Ok(LlmpEventManager {
hooks: tuple_list!(),
llmp, llmp,
#[cfg(feature = "llmp_compression")] #[cfg(feature = "llmp_compression")]
compressor: GzipCompressor::new(COMPRESS_THRESHOLD), compressor: GzipCompressor::new(COMPRESS_THRESHOLD),
@ -478,23 +478,9 @@ where
shmem_provider: SP, shmem_provider: SP,
port: u16, port: u16,
configuration: EventConfig, configuration: EventConfig,
) -> Result<Self, Error> { ) -> Result<LlmpEventManager<(), S, SP>, Error> {
Ok(Self { let llmp = LlmpClient::create_attach_to_tcp(shmem_provider, port)?;
llmp: LlmpClient::create_attach_to_tcp(shmem_provider, port)?, Self::new(llmp, configuration)
#[cfg(feature = "llmp_compression")]
compressor: GzipCompressor::new(COMPRESS_THRESHOLD),
configuration,
#[cfg(feature = "adaptive_serialization")]
serialization_time: Duration::ZERO,
#[cfg(feature = "adaptive_serialization")]
deserialization_time: Duration::ZERO,
#[cfg(feature = "adaptive_serialization")]
serializations_cnt: 0,
#[cfg(feature = "adaptive_serialization")]
should_serialize_cnt: 0,
phantom: PhantomData,
custom_buf_handlers: vec![],
})
} }
/// If a client respawns, it may reuse the existing connection, previously /// If a client respawns, it may reuse the existing connection, previously
@ -504,9 +490,36 @@ where
shmem_provider: SP, shmem_provider: SP,
env_name: &str, env_name: &str,
configuration: EventConfig, configuration: EventConfig,
) -> Result<LlmpEventManager<(), S, SP>, Error> {
let llmp = LlmpClient::on_existing_from_env(shmem_provider, env_name)?;
Self::new(llmp, configuration)
}
/// Create an existing client from description
pub fn existing_client_from_description(
shmem_provider: SP,
description: &LlmpClientDescription,
configuration: EventConfig,
) -> Result<LlmpEventManager<(), S, SP>, Error> {
let llmp = LlmpClient::existing_client_from_description(shmem_provider, description)?;
Self::new(llmp, configuration)
}
}
impl<EMH, S, SP> LlmpEventManager<EMH, S, SP>
where
S: State,
SP: ShMemProvider + 'static,
{
/// Create a manager from a raw LLMP client with hooks
pub fn with_hooks(
llmp: LlmpClient<SP>,
configuration: EventConfig,
hooks: EMH,
) -> Result<Self, Error> { ) -> Result<Self, Error> {
Ok(Self { Ok(Self {
llmp: LlmpClient::on_existing_from_env(shmem_provider, env_name)?, hooks,
llmp,
#[cfg(feature = "llmp_compression")] #[cfg(feature = "llmp_compression")]
compressor: GzipCompressor::new(COMPRESS_THRESHOLD), compressor: GzipCompressor::new(COMPRESS_THRESHOLD),
configuration, configuration,
@ -523,33 +536,49 @@ where
}) })
} }
/// Create an LLMP event manager on a port with hook
///
/// If the port is not yet bound, it will act as a broker; otherwise, it
/// will act as a client.
#[cfg(feature = "std")]
pub fn on_port_with_hooks(
shmem_provider: SP,
port: u16,
configuration: EventConfig,
hooks: EMH,
) -> Result<Self, Error> {
let llmp = LlmpClient::create_attach_to_tcp(shmem_provider, port)?;
Self::with_hooks(llmp, configuration, hooks)
}
/// If a client respawns, it may reuse the existing connection, previously
/// stored by [`LlmpClient::to_env()`].
/// create a event manager from env with hooks
#[cfg(feature = "std")]
pub fn existing_client_from_env_with_hooks(
shmem_provider: SP,
env_name: &str,
configuration: EventConfig,
hooks: EMH,
) -> Result<Self, Error> {
let llmp = LlmpClient::on_existing_from_env(shmem_provider, env_name)?;
Self::with_hooks(llmp, configuration, hooks)
}
/// Describe the client event manager's LLMP parts in a restorable fashion /// Describe the client event manager's LLMP parts in a restorable fashion
pub fn describe(&self) -> Result<LlmpClientDescription, Error> { pub fn describe(&self) -> Result<LlmpClientDescription, Error> {
self.llmp.describe() self.llmp.describe()
} }
/// Create an existing client from description /// Create an existing client from description
pub fn existing_client_from_description( pub fn existing_client_from_description_with_hooks(
shmem_provider: SP, shmem_provider: SP,
description: &LlmpClientDescription, description: &LlmpClientDescription,
configuration: EventConfig, configuration: EventConfig,
hooks: EMH,
) -> Result<Self, Error> { ) -> Result<Self, Error> {
Ok(Self { let llmp = LlmpClient::existing_client_from_description(shmem_provider, description)?;
llmp: LlmpClient::existing_client_from_description(shmem_provider, description)?, Self::with_hooks(llmp, configuration, hooks)
#[cfg(feature = "llmp_compression")]
compressor: GzipCompressor::new(COMPRESS_THRESHOLD),
configuration,
#[cfg(feature = "adaptive_serialization")]
serialization_time: Duration::ZERO,
#[cfg(feature = "adaptive_serialization")]
deserialization_time: Duration::ZERO,
#[cfg(feature = "adaptive_serialization")]
serializations_cnt: 0,
#[cfg(feature = "adaptive_serialization")]
should_serialize_cnt: 0,
phantom: PhantomData,
custom_buf_handlers: vec![],
})
} }
/// Write the config for a client [`EventManager`] to env vars, a new /// Write the config for a client [`EventManager`] to env vars, a new
@ -560,8 +589,9 @@ where
} }
} }
impl<S, SP> LlmpEventManager<S, SP> impl<EMH, S, SP> LlmpEventManager<EMH, S, SP>
where where
EMH: EventManagerHooksTuple<S>,
S: State + HasExecutions + HasMetadata, S: State + HasExecutions + HasMetadata,
SP: ShMemProvider + 'static, SP: ShMemProvider + 'static,
{ {
@ -580,6 +610,12 @@ where
for<'a> E::Observers: Deserialize<'a>, for<'a> E::Observers: Deserialize<'a>,
Z: ExecutionProcessor<E::Observers, State = S> + EvaluatorObservers<E::Observers>, Z: ExecutionProcessor<E::Observers, State = S> + EvaluatorObservers<E::Observers>,
{ {
if self
.hooks
.pre_exec_all(fuzzer, executor, state, client_id, &event)?
{
return Ok(());
}
match event { match event {
Event::NewTestcase { Event::NewTestcase {
input, input,
@ -593,9 +629,6 @@ where
} => { } => {
log::info!("Received new Testcase from {client_id:?} ({client_config:?}, forward {forward_id:?})"); log::info!("Received new Testcase from {client_id:?} ({client_config:?}, forward {forward_id:?})");
if let Ok(meta) = state.metadata_mut::<TransferringMetadata>() {
meta.set_transferring(true);
}
let res = if client_config.match_with(&self.configuration) let res = if client_config.match_with(&self.configuration)
&& observers_buf.is_some() && observers_buf.is_some()
{ {
@ -621,13 +654,9 @@ where
state, executor, self, input, false, state, executor, self, input, false,
)? )?
}; };
if let Ok(meta) = state.metadata_mut::<TransferringMetadata>() {
meta.set_transferring(false);
}
if let Some(item) = res.1 { if let Some(item) = res.1 {
log::info!("Added received Testcase as item #{item}"); log::info!("Added received Testcase as item #{item}");
} }
Ok(())
} }
Event::CustomBuf { tag, buf } => { Event::CustomBuf { tag, buf } => {
for handler in &mut self.custom_buf_handlers { for handler in &mut self.custom_buf_handlers {
@ -635,17 +664,21 @@ where
break; break;
} }
} }
Ok(())
} }
_ => Err(Error::unknown(format!( _ => {
return Err(Error::unknown(format!(
"Received illegal message that message should not have arrived: {:?}.", "Received illegal message that message should not have arrived: {:?}.",
event.name() event.name()
))), )));
} }
} }
self.hooks
.post_exec_all(fuzzer, executor, state, client_id)?;
Ok(())
}
} }
impl<S: State, SP: ShMemProvider> LlmpEventManager<S, SP> { impl<EMH, S: State, SP: ShMemProvider> LlmpEventManager<EMH, S, SP> {
/// Send information that this client is exiting. /// Send information that this client is exiting.
/// The other side may free up all allocated memory. /// The other side may free up all allocated memory.
/// We are no longer allowed to send anything afterwards. /// We are no longer allowed to send anything afterwards.
@ -654,7 +687,7 @@ impl<S: State, SP: ShMemProvider> LlmpEventManager<S, SP> {
} }
} }
impl<S, SP> UsesState for LlmpEventManager<S, SP> impl<EMH, S, SP> UsesState for LlmpEventManager<EMH, S, SP>
where where
S: State, S: State,
SP: ShMemProvider, SP: ShMemProvider,
@ -662,7 +695,7 @@ where
type State = S; type State = S;
} }
impl<S, SP> EventFirer for LlmpEventManager<S, SP> impl<EMH, S, SP> EventFirer for LlmpEventManager<EMH, S, SP>
where where
S: State, S: State,
SP: ShMemProvider, SP: ShMemProvider,
@ -756,7 +789,7 @@ where
} }
} }
impl<S, SP> EventRestarter for LlmpEventManager<S, SP> impl<EMH, S, SP> EventRestarter for LlmpEventManager<EMH, S, SP>
where where
S: State, S: State,
SP: ShMemProvider, SP: ShMemProvider,
@ -769,8 +802,9 @@ where
} }
} }
impl<E, S, SP, Z> EventProcessor<E, Z> for LlmpEventManager<S, SP> impl<E, EMH, S, SP, Z> EventProcessor<E, Z> for LlmpEventManager<EMH, S, SP>
where where
EMH: EventManagerHooksTuple<S>,
S: State + HasExecutions + HasMetadata, S: State + HasExecutions + HasMetadata,
SP: ShMemProvider, SP: ShMemProvider,
E: HasObservers<State = S> + Executor<Self, Z>, E: HasObservers<State = S> + Executor<Self, Z>,
@ -814,17 +848,18 @@ where
} }
} }
impl<E, S, SP, Z> EventManager<E, Z> for LlmpEventManager<S, SP> impl<E, EMH, S, SP, Z> EventManager<E, Z> for LlmpEventManager<EMH, S, SP>
where where
E: HasObservers<State = S> + Executor<Self, Z>, E: HasObservers<State = S> + Executor<Self, Z>,
for<'a> E::Observers: Deserialize<'a>, for<'a> E::Observers: Deserialize<'a>,
EMH: EventManagerHooksTuple<S>,
S: State + HasExecutions + HasMetadata + HasLastReportTime, S: State + HasExecutions + HasMetadata + HasLastReportTime,
SP: ShMemProvider, SP: ShMemProvider,
Z: EvaluatorObservers<E::Observers, State = S> + ExecutionProcessor<E::Observers, State = S>, Z: EvaluatorObservers<E::Observers, State = S> + ExecutionProcessor<E::Observers, State = S>,
{ {
} }
impl<S, SP> HasCustomBufHandlers for LlmpEventManager<S, SP> impl<EMH, S, SP> HasCustomBufHandlers for LlmpEventManager<EMH, S, SP>
where where
S: State, S: State,
SP: ShMemProvider, SP: ShMemProvider,
@ -837,14 +872,14 @@ where
} }
} }
impl<S, SP> ProgressReporter for LlmpEventManager<S, SP> impl<EMH, S, SP> ProgressReporter for LlmpEventManager<EMH, S, SP>
where where
S: State + HasExecutions + HasMetadata + HasLastReportTime, S: State + HasExecutions + HasMetadata + HasLastReportTime,
SP: ShMemProvider, SP: ShMemProvider,
{ {
} }
impl<S, SP> HasEventManagerId for LlmpEventManager<S, SP> impl<EMH, S, SP> HasEventManagerId for LlmpEventManager<EMH, S, SP>
where where
S: State, S: State,
SP: ShMemProvider, SP: ShMemProvider,
@ -858,14 +893,14 @@ where
/// A manager that can restart on the fly, storing states in-between (in `on_restart`) /// A manager that can restart on the fly, storing states in-between (in `on_restart`)
#[cfg(feature = "std")] #[cfg(feature = "std")]
#[derive(Debug)] #[derive(Debug)]
pub struct LlmpRestartingEventManager<S, SP> pub struct LlmpRestartingEventManager<EMH, S, SP>
where where
S: State, S: State,
SP: ShMemProvider + 'static, SP: ShMemProvider + 'static,
//CE: CustomEvent<I>, //CE: CustomEvent<I>,
{ {
/// The embedded LLMP event manager /// The embedded LLMP event manager
llmp_mgr: LlmpEventManager<S, SP>, llmp_mgr: LlmpEventManager<EMH, S, SP>,
/// The staterestorer to serialize the state for the next runner /// The staterestorer to serialize the state for the next runner
staterestorer: StateRestorer<SP>, staterestorer: StateRestorer<SP>,
/// Decide if the state restorer must save the serialized state /// Decide if the state restorer must save the serialized state
@ -873,7 +908,7 @@ where
} }
#[cfg(all(feature = "std", feature = "adaptive_serialization"))] #[cfg(all(feature = "std", feature = "adaptive_serialization"))]
impl<S, SP> EventStatsCollector for LlmpRestartingEventManager<S, SP> impl<EMH, S, SP> EventStatsCollector for LlmpRestartingEventManager<EMH, S, SP>
where where
SP: ShMemProvider + 'static, SP: ShMemProvider + 'static,
S: State, S: State,
@ -906,7 +941,7 @@ where
} }
#[cfg(all(feature = "std", not(feature = "adaptive_serialization")))] #[cfg(all(feature = "std", not(feature = "adaptive_serialization")))]
impl<S, SP> EventStatsCollector for LlmpRestartingEventManager<S, SP> impl<EMH, S, SP> EventStatsCollector for LlmpRestartingEventManager<EMH, S, SP>
where where
SP: ShMemProvider + 'static, SP: ShMemProvider + 'static,
S: State, S: State,
@ -914,7 +949,7 @@ where
} }
#[cfg(feature = "std")] #[cfg(feature = "std")]
impl<S, SP> UsesState for LlmpRestartingEventManager<S, SP> impl<EMH, S, SP> UsesState for LlmpRestartingEventManager<EMH, S, SP>
where where
S: State, S: State,
SP: ShMemProvider + 'static, SP: ShMemProvider + 'static,
@ -923,7 +958,7 @@ where
} }
#[cfg(feature = "std")] #[cfg(feature = "std")]
impl<S, SP> ProgressReporter for LlmpRestartingEventManager<S, SP> impl<EMH, S, SP> ProgressReporter for LlmpRestartingEventManager<EMH, S, SP>
where where
S: State + HasExecutions + HasMetadata + HasLastReportTime, S: State + HasExecutions + HasMetadata + HasLastReportTime,
SP: ShMemProvider, SP: ShMemProvider,
@ -931,7 +966,7 @@ where
} }
#[cfg(feature = "std")] #[cfg(feature = "std")]
impl<S, SP> EventFirer for LlmpRestartingEventManager<S, SP> impl<EMH, S, SP> EventFirer for LlmpRestartingEventManager<EMH, S, SP>
where where
SP: ShMemProvider, SP: ShMemProvider,
S: State, S: State,
@ -959,7 +994,7 @@ where
} }
#[cfg(feature = "std")] #[cfg(feature = "std")]
impl<S, SP> EventRestarter for LlmpRestartingEventManager<S, SP> impl<EMH, S, SP> EventRestarter for LlmpRestartingEventManager<EMH, S, SP>
where where
S: State + HasExecutions, S: State + HasExecutions,
SP: ShMemProvider, SP: ShMemProvider,
@ -997,10 +1032,11 @@ where
} }
#[cfg(feature = "std")] #[cfg(feature = "std")]
impl<E, S, SP, Z> EventProcessor<E, Z> for LlmpRestartingEventManager<S, SP> impl<E, EMH, S, SP, Z> EventProcessor<E, Z> for LlmpRestartingEventManager<EMH, S, SP>
where where
E: HasObservers<State = S> + Executor<LlmpEventManager<S, SP>, Z>, E: HasObservers<State = S> + Executor<LlmpEventManager<EMH, S, SP>, Z>,
for<'a> E::Observers: Deserialize<'a>, for<'a> E::Observers: Deserialize<'a>,
EMH: EventManagerHooksTuple<S>,
S: State + HasExecutions + HasMetadata, S: State + HasExecutions + HasMetadata,
SP: ShMemProvider + 'static, SP: ShMemProvider + 'static,
Z: EvaluatorObservers<E::Observers, State = S> + ExecutionProcessor<E::Observers>, //CE: CustomEvent<I>, Z: EvaluatorObservers<E::Observers, State = S> + ExecutionProcessor<E::Observers>, //CE: CustomEvent<I>,
@ -1011,10 +1047,11 @@ where
} }
#[cfg(feature = "std")] #[cfg(feature = "std")]
impl<E, S, SP, Z> EventManager<E, Z> for LlmpRestartingEventManager<S, SP> impl<E, EMH, S, SP, Z> EventManager<E, Z> for LlmpRestartingEventManager<EMH, S, SP>
where where
E: HasObservers<State = S> + Executor<LlmpEventManager<S, SP>, Z>, E: HasObservers<State = S> + Executor<LlmpEventManager<EMH, S, SP>, Z>,
for<'a> E::Observers: Deserialize<'a>, for<'a> E::Observers: Deserialize<'a>,
EMH: EventManagerHooksTuple<S>,
S: State + HasExecutions + HasMetadata + HasLastReportTime, S: State + HasExecutions + HasMetadata + HasLastReportTime,
SP: ShMemProvider + 'static, SP: ShMemProvider + 'static,
Z: EvaluatorObservers<E::Observers, State = S> + ExecutionProcessor<E::Observers>, //CE: CustomEvent<I>, Z: EvaluatorObservers<E::Observers, State = S> + ExecutionProcessor<E::Observers>, //CE: CustomEvent<I>,
@ -1022,7 +1059,7 @@ where
} }
#[cfg(feature = "std")] #[cfg(feature = "std")]
impl<S, SP> HasEventManagerId for LlmpRestartingEventManager<S, SP> impl<EMH, S, SP> HasEventManagerId for LlmpRestartingEventManager<EMH, S, SP>
where where
S: State, S: State,
SP: ShMemProvider + 'static, SP: ShMemProvider + 'static,
@ -1039,14 +1076,14 @@ const _ENV_FUZZER_RECEIVER: &str = "_AFL_ENV_FUZZER_RECEIVER";
const _ENV_FUZZER_BROKER_CLIENT_INITIAL: &str = "_AFL_ENV_FUZZER_BROKER_CLIENT"; const _ENV_FUZZER_BROKER_CLIENT_INITIAL: &str = "_AFL_ENV_FUZZER_BROKER_CLIENT";
#[cfg(feature = "std")] #[cfg(feature = "std")]
impl<S, SP> LlmpRestartingEventManager<S, SP> impl<EMH, S, SP> LlmpRestartingEventManager<EMH, S, SP>
where where
S: State, S: State,
SP: ShMemProvider + 'static, SP: ShMemProvider + 'static,
//CE: CustomEvent<I>, //CE: CustomEvent<I>,
{ {
/// Create a new runner, the executed child doing the actual fuzzing. /// Create a new runner, the executed child doing the actual fuzzing.
pub fn new(llmp_mgr: LlmpEventManager<S, SP>, staterestorer: StateRestorer<SP>) -> Self { pub fn new(llmp_mgr: LlmpEventManager<EMH, S, SP>, staterestorer: StateRestorer<SP>) -> Self {
Self { Self {
llmp_mgr, llmp_mgr,
staterestorer, staterestorer,
@ -1056,7 +1093,7 @@ where
/// Create a new runner specifying if it must save the serialized state on restart. /// Create a new runner specifying if it must save the serialized state on restart.
pub fn with_save_state( pub fn with_save_state(
llmp_mgr: LlmpEventManager<S, SP>, llmp_mgr: LlmpEventManager<EMH, S, SP>,
staterestorer: StateRestorer<SP>, staterestorer: StateRestorer<SP>,
save_state: bool, save_state: bool,
) -> Self { ) -> Self {
@ -1102,7 +1139,13 @@ pub fn setup_restarting_mgr_std<MT, S>(
monitor: MT, monitor: MT,
broker_port: u16, broker_port: u16,
configuration: EventConfig, configuration: EventConfig,
) -> Result<(Option<S>, LlmpRestartingEventManager<S, StdShMemProvider>), Error> ) -> Result<
(
Option<S>,
LlmpRestartingEventManager<(), S, StdShMemProvider>,
),
Error,
>
where where
MT: Monitor + Clone, MT: Monitor + Clone,
S: State + HasExecutions, S: State + HasExecutions,
@ -1112,6 +1155,7 @@ where
.monitor(Some(monitor)) .monitor(Some(monitor))
.broker_port(broker_port) .broker_port(broker_port)
.configuration(configuration) .configuration(configuration)
.hooks(tuple_list!())
.build() .build()
.launch() .launch()
} }
@ -1122,9 +1166,10 @@ where
#[cfg(feature = "std")] #[cfg(feature = "std")]
#[allow(clippy::default_trait_access, clippy::ignored_unit_patterns)] #[allow(clippy::default_trait_access, clippy::ignored_unit_patterns)]
#[derive(TypedBuilder, Debug)] #[derive(TypedBuilder, Debug)]
pub struct RestartingMgr<MT, S, SP> pub struct RestartingMgr<EMH, MT, S, SP>
where where
S: UsesInput + DeserializeOwned, EMH: EventManagerHooksTuple<S>,
S: State,
SP: ShMemProvider + 'static, SP: ShMemProvider + 'static,
MT: Monitor, MT: Monitor,
//CE: CustomEvent<I>, //CE: CustomEvent<I>,
@ -1160,14 +1205,17 @@ where
/// The timeout duration used for llmp client timeout /// The timeout duration used for llmp client timeout
#[builder(default = DEFAULT_CLIENT_TIMEOUT_SECS)] #[builder(default = DEFAULT_CLIENT_TIMEOUT_SECS)]
client_timeout: Duration, client_timeout: Duration,
/// The hooks passed to event manager:
hooks: EMH,
#[builder(setter(skip), default = PhantomData)] #[builder(setter(skip), default = PhantomData)]
phantom_data: PhantomData<S>, phantom_data: PhantomData<(EMH, S)>,
} }
#[cfg(feature = "std")] #[cfg(feature = "std")]
#[allow(clippy::type_complexity, clippy::too_many_lines)] #[allow(clippy::type_complexity, clippy::too_many_lines)]
impl<MT, S, SP> RestartingMgr<MT, S, SP> impl<EMH, MT, S, SP> RestartingMgr<EMH, MT, S, SP>
where where
EMH: EventManagerHooksTuple<S> + Copy + Clone,
SP: ShMemProvider, SP: ShMemProvider,
S: State + HasExecutions, S: State + HasExecutions,
MT: Monitor + Clone, MT: Monitor + Clone,
@ -1186,7 +1234,7 @@ where
} }
/// Launch the broker and the clients and fuzz /// Launch the broker and the clients and fuzz
pub fn launch(&mut self) -> Result<(Option<S>, LlmpRestartingEventManager<S, SP>), Error> { pub fn launch(&mut self) -> Result<(Option<S>, LlmpRestartingEventManager<EMH, S, SP>), Error> {
// We start ourself as child process to actually fuzz // We start ourself as child process to actually fuzz
let (staterestorer, new_shmem_provider, core_id) = if std::env::var(_ENV_FUZZER_SENDER) let (staterestorer, new_shmem_provider, core_id) = if std::env::var(_ENV_FUZZER_SENDER)
.is_err() .is_err()
@ -1230,7 +1278,11 @@ where
return Err(Error::shutting_down()); return Err(Error::shutting_down());
} }
LlmpConnection::IsClient { client } => { LlmpConnection::IsClient { client } => {
let mgr = LlmpEventManager::<S, SP>::new(client, self.configuration)?; let mgr = LlmpEventManager::<EMH, S, SP>::with_hooks(
client,
self.configuration,
self.hooks,
)?;
(mgr, None) (mgr, None)
} }
} }
@ -1248,10 +1300,11 @@ where
} }
ManagerKind::Client { cpu_core } => { ManagerKind::Client { cpu_core } => {
// We are a client // We are a client
let mgr = LlmpEventManager::<S, SP>::on_port( let mgr = LlmpEventManager::<EMH, S, SP>::on_port_with_hooks(
self.shmem_provider.clone(), self.shmem_provider.clone(),
self.broker_port, self.broker_port,
self.configuration, self.configuration,
self.hooks,
)?; )?;
(mgr, cpu_core) (mgr, cpu_core)
@ -1365,10 +1418,11 @@ where
( (
state_opt, state_opt,
LlmpRestartingEventManager::with_save_state( LlmpRestartingEventManager::with_save_state(
LlmpEventManager::existing_client_from_description( LlmpEventManager::existing_client_from_description_with_hooks(
new_shmem_provider, new_shmem_provider,
&mgr_description, &mgr_description,
self.configuration, self.configuration,
self.hooks,
)?, )?,
staterestorer, staterestorer,
self.serialize_state, self.serialize_state,
@ -1377,10 +1431,11 @@ where
} else { } else {
log::info!("First run. Let's set it all up"); log::info!("First run. Let's set it all up");
// Mgr to send and receive msgs from/to all other fuzzer instances // Mgr to send and receive msgs from/to all other fuzzer instances
let mgr = LlmpEventManager::<S, SP>::existing_client_from_env( let mgr = LlmpEventManager::<EMH, S, SP>::existing_client_from_env_with_hooks(
new_shmem_provider, new_shmem_provider,
_ENV_FUZZER_BROKER_CLIENT_INITIAL, _ENV_FUZZER_BROKER_CLIENT_INITIAL,
self.configuration, self.configuration,
self.hooks,
)?; )?;
( (
@ -1539,7 +1594,7 @@ where
executor: &mut E, executor: &mut E,
state: &mut S, state: &mut S,
manager: &mut EM, manager: &mut EM,
_client_id: ClientId, client_id: ClientId,
event: Event<DI>, event: Event<DI>,
) -> Result<(), Error> ) -> Result<(), Error>
where where
@ -1559,15 +1614,12 @@ where
executions: _, executions: _,
forward_id, forward_id,
} => { } => {
log::info!("Received new Testcase to convert from {_client_id:?} (forward {forward_id:?}, forward {forward_id:?})"); log::info!("Received new Testcase to convert from {client_id:?} (forward {forward_id:?}, forward {forward_id:?})");
let Some(converter) = self.converter_back.as_mut() else { let Some(converter) = self.converter_back.as_mut() else {
return Ok(()); return Ok(());
}; };
if let Ok(meta) = state.metadata_mut::<TransferringMetadata>() {
meta.set_transferring(true);
}
let res = fuzzer.evaluate_input_with_observers::<E, EM>( let res = fuzzer.evaluate_input_with_observers::<E, EM>(
state, state,
executor, executor,
@ -1575,9 +1627,6 @@ where
converter.convert(input)?, converter.convert(input)?,
false, false,
)?; )?;
if let Ok(meta) = state.metadata_mut::<TransferringMetadata>() {
meta.set_transferring(false);
}
if let Some(item) = res.1 { if let Some(item) = res.1 {
log::info!("Added received Testcase as item #{item}"); log::info!("Added received Testcase as item #{item}");

View File

@ -1,6 +1,8 @@
//! An [`EventManager`] manages all events that go to other instances of the fuzzer. //! An [`EventManager`] manages all events that go to other instances of the fuzzer.
//! The messages are commonly information about new Testcases as well as stats and other [`Event`]s. //! The messages are commonly information about new Testcases as well as stats and other [`Event`]s.
pub mod hooks;
pub mod simple; pub mod simple;
pub use simple::*; pub use simple::*;
#[cfg(all(unix, feature = "std"))] #[cfg(all(unix, feature = "std"))]
@ -12,6 +14,8 @@ pub use centralized::*;
pub mod launcher; pub mod launcher;
#[allow(clippy::ignored_unit_patterns)] #[allow(clippy::ignored_unit_patterns)]
pub mod llmp; pub mod llmp;
pub use llmp::*;
#[cfg(feature = "tcp_manager")] #[cfg(feature = "tcp_manager")]
#[allow(clippy::ignored_unit_patterns)] #[allow(clippy::ignored_unit_patterns)]
pub mod tcp; pub mod tcp;
@ -31,7 +35,6 @@ pub use launcher::*;
#[cfg(all(unix, feature = "std"))] #[cfg(all(unix, feature = "std"))]
use libafl_bolts::os::unix_signals::{siginfo_t, ucontext_t, Handler, Signal}; use libafl_bolts::os::unix_signals::{siginfo_t, ucontext_t, Handler, Signal};
use libafl_bolts::{current_time, ClientId}; use libafl_bolts::{current_time, ClientId};
pub use llmp::*;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
#[cfg(feature = "std")] #[cfg(feature = "std")]
use uuid::Uuid; use uuid::Uuid;

View File

@ -23,7 +23,7 @@ use libafl_bolts::os::startable_self;
use libafl_bolts::os::unix_signals::setup_signal_handler; use libafl_bolts::os::unix_signals::setup_signal_handler;
#[cfg(all(feature = "std", feature = "fork", unix))] #[cfg(all(feature = "std", feature = "fork", unix))]
use libafl_bolts::os::{fork, ForkResult}; use libafl_bolts::os::{fork, ForkResult};
use libafl_bolts::{shmem::ShMemProvider, ClientId}; use libafl_bolts::{shmem::ShMemProvider, tuples::tuple_list, ClientId};
#[cfg(feature = "std")] #[cfg(feature = "std")]
use libafl_bolts::{shmem::StdShMemProvider, staterestore::StateRestorer}; use libafl_bolts::{shmem::StdShMemProvider, staterestore::StateRestorer};
use serde::{de::DeserializeOwned, Deserialize}; use serde::{de::DeserializeOwned, Deserialize};
@ -40,11 +40,11 @@ use super::{CustomBufEventResult, CustomBufHandlerFn};
use crate::events::EVENTMGR_SIGHANDLER_STATE; use crate::events::EVENTMGR_SIGHANDLER_STATE;
use crate::{ use crate::{
events::{ events::{
BrokerEventResult, Event, EventConfig, EventFirer, EventManager, EventManagerId, hooks::EventManagerHooksTuple, BrokerEventResult, Event, EventConfig, EventFirer,
EventProcessor, EventRestarter, HasCustomBufHandlers, HasEventManagerId, ProgressReporter, EventManager, EventManagerId, EventProcessor, EventRestarter, HasCustomBufHandlers,
HasEventManagerId, ProgressReporter,
}, },
executors::{Executor, HasObservers}, executors::{Executor, HasObservers},
feedbacks::transferred::TransferringMetadata,
fuzzer::{EvaluatorObservers, ExecutionProcessor}, fuzzer::{EvaluatorObservers, ExecutionProcessor},
inputs::{Input, UsesInput}, inputs::{Input, UsesInput},
monitors::Monitor, monitors::Monitor,
@ -410,10 +410,12 @@ where
} }
/// An [`EventManager`] that forwards all events to other attached via tcp. /// An [`EventManager`] that forwards all events to other attached via tcp.
pub struct TcpEventManager<S> pub struct TcpEventManager<EMH, S>
where where
EMH: EventManagerHooksTuple<S>,
S: State, S: State,
{ {
hooks: EMH,
/// The TCP stream for inter process communication /// The TCP stream for inter process communication
tcp: TcpStream, tcp: TcpStream,
/// Our `CientId` /// Our `CientId`
@ -429,8 +431,9 @@ where
phantom: PhantomData<S>, phantom: PhantomData<S>,
} }
impl<S> core::fmt::Debug for TcpEventManager<S> impl<EMH, S> core::fmt::Debug for TcpEventManager<EMH, S>
where where
EMH: EventManagerHooksTuple<S>,
S: State, S: State,
{ {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
@ -446,8 +449,9 @@ where
} }
} }
impl<S> Drop for TcpEventManager<S> impl<EMH, S> Drop for TcpEventManager<EMH, S>
where where
EMH: EventManagerHooksTuple<S>,
S: State, S: State,
{ {
/// TCP clients will have to wait until their pages are mapped by somebody. /// TCP clients will have to wait until their pages are mapped by somebody.
@ -456,7 +460,7 @@ where
} }
} }
impl<S> TcpEventManager<S> impl<S> TcpEventManager<(), S>
where where
S: State + HasExecutions + HasMetadata, S: State + HasExecutions + HasMetadata,
{ {
@ -465,6 +469,65 @@ where
addr: &A, addr: &A,
client_id: ClientId, client_id: ClientId,
configuration: EventConfig, configuration: EventConfig,
) -> Result<Self, Error> {
Self::existing_with_hooks(addr, client_id, configuration, tuple_list!())
}
/// Create a manager from a raw TCP client
pub fn new<A: ToSocketAddrs>(addr: &A, configuration: EventConfig) -> Result<Self, Error> {
Self::existing_with_hooks(addr, UNDEFINED_CLIENT_ID, configuration, tuple_list!())
}
/// Create an TCP event manager on a port specifying the client id
///
/// If the port is not yet bound, it will act as a broker; otherwise, it
/// will act as a client.
pub fn existing_on_port(
port: u16,
client_id: ClientId,
configuration: EventConfig,
) -> Result<Self, Error> {
Self::existing_with_hooks(
&("127.0.0.1", port),
client_id,
configuration,
tuple_list!(),
)
}
/// Create an TCP event manager on a port with hooks
///
/// If the port is not yet bound, it will act as a broker; otherwise, it
/// will act as a client.
pub fn on_port(port: u16, configuration: EventConfig) -> Result<Self, Error> {
Self::with_hooks(&("127.0.0.1", port), configuration, tuple_list!())
}
/// Create an TCP event manager on a port specifying the client id from env
///
/// If the port is not yet bound, it will act as a broker; otherwise, it
/// will act as a client.
pub fn existing_from_env<A: ToSocketAddrs>(
addr: &A,
env_name: &str,
configuration: EventConfig,
) -> Result<Self, Error> {
let this_id = ClientId(str::parse::<u32>(&env::var(env_name)?)?);
Self::existing_with_hooks(addr, this_id, configuration, tuple_list!())
}
}
impl<EMH, S> TcpEventManager<EMH, S>
where
EMH: EventManagerHooksTuple<S>,
S: State + HasExecutions + HasMetadata,
{
/// Create a manager from a raw TCP client specifying the client id with hooks
pub fn existing_with_hooks<A: ToSocketAddrs>(
addr: &A,
client_id: ClientId,
configuration: EventConfig,
hooks: EMH,
) -> Result<Self, Error> { ) -> Result<Self, Error> {
let mut tcp = TcpStream::connect(addr)?; let mut tcp = TcpStream::connect(addr)?;
@ -479,6 +542,7 @@ where
println!("Our client id: {client_id:?}"); println!("Our client id: {client_id:?}");
Ok(Self { Ok(Self {
hooks,
tcp, tcp,
client_id, client_id,
#[cfg(feature = "tcp_compression")] #[cfg(feature = "tcp_compression")]
@ -489,42 +553,52 @@ where
}) })
} }
/// Create a manager from a raw TCP client /// Create a manager from a raw TCP client with hooks
pub fn new<A: ToSocketAddrs>(addr: &A, configuration: EventConfig) -> Result<Self, Error> { pub fn with_hooks<A: ToSocketAddrs>(
Self::existing(addr, UNDEFINED_CLIENT_ID, configuration) addr: &A,
configuration: EventConfig,
hooks: EMH,
) -> Result<Self, Error> {
Self::existing_with_hooks(addr, UNDEFINED_CLIENT_ID, configuration, hooks)
} }
/// Create an TCP event manager on a port specifying the client id /// Create an TCP event manager on a port specifying the client id with hooks
/// ///
/// If the port is not yet bound, it will act as a broker; otherwise, it /// If the port is not yet bound, it will act as a broker; otherwise, it
/// will act as a client. /// will act as a client.
pub fn existing_on_port( pub fn existing_on_port_with_hooks(
port: u16, port: u16,
client_id: ClientId, client_id: ClientId,
configuration: EventConfig, configuration: EventConfig,
hooks: EMH,
) -> Result<Self, Error> { ) -> Result<Self, Error> {
Self::existing(&("127.0.0.1", port), client_id, configuration) Self::existing_with_hooks(&("127.0.0.1", port), client_id, configuration, hooks)
} }
/// Create an TCP event manager on a port /// Create an TCP event manager on a port with hooks
/// ///
/// If the port is not yet bound, it will act as a broker; otherwise, it /// If the port is not yet bound, it will act as a broker; otherwise, it
/// will act as a client. /// will act as a client.
pub fn on_port(port: u16, configuration: EventConfig) -> Result<Self, Error> { pub fn on_port_with_hooks(
Self::new(&("127.0.0.1", port), configuration) port: u16,
configuration: EventConfig,
hooks: EMH,
) -> Result<Self, Error> {
Self::with_hooks(&("127.0.0.1", port), configuration, hooks)
} }
/// Create an TCP event manager on a port specifying the client id from env /// Create an TCP event manager on a port specifying the client id from env with hooks
/// ///
/// If the port is not yet bound, it will act as a broker; otherwise, it /// If the port is not yet bound, it will act as a broker; otherwise, it
/// will act as a client. /// will act as a client.
pub fn existing_from_env<A: ToSocketAddrs>( pub fn existing_from_env_with_hooks<A: ToSocketAddrs>(
addr: &A, addr: &A,
env_name: &str, env_name: &str,
configuration: EventConfig, configuration: EventConfig,
hooks: EMH,
) -> Result<Self, Error> { ) -> Result<Self, Error> {
let this_id = ClientId(str::parse::<u32>(&env::var(env_name)?)?); let this_id = ClientId(str::parse::<u32>(&env::var(env_name)?)?);
Self::existing(addr, this_id, configuration) Self::existing_with_hooks(addr, this_id, configuration, hooks)
} }
/// Write the client id for a client [`EventManager`] to env vars /// Write the client id for a client [`EventManager`] to env vars
@ -547,6 +621,12 @@ where
for<'a> E::Observers: Deserialize<'a>, for<'a> E::Observers: Deserialize<'a>,
Z: ExecutionProcessor<E::Observers, State = S> + EvaluatorObservers<E::Observers>, Z: ExecutionProcessor<E::Observers, State = S> + EvaluatorObservers<E::Observers>,
{ {
if !self
.hooks
.pre_exec_all(fuzzer, executor, state, client_id, &event)?
{
return Ok(());
}
match event { match event {
Event::NewTestcase { Event::NewTestcase {
input, input,
@ -560,9 +640,6 @@ where
} => { } => {
log::info!("Received new Testcase from {client_id:?} ({client_config:?}, forward {forward_id:?})"); log::info!("Received new Testcase from {client_id:?} ({client_config:?}, forward {forward_id:?})");
if let Ok(meta) = state.metadata_mut::<TransferringMetadata>() {
meta.set_transferring(true);
}
let _res = if client_config.match_with(&self.configuration) let _res = if client_config.match_with(&self.configuration)
&& observers_buf.is_some() && observers_buf.is_some()
{ {
@ -582,13 +659,9 @@ where
state, executor, self, input, false, state, executor, self, input, false,
)? )?
}; };
if let Ok(meta) = state.metadata_mut::<TransferringMetadata>() {
meta.set_transferring(false);
}
if let Some(item) = _res.1 { if let Some(item) = _res.1 {
log::info!("Added received Testcase as item #{item}"); log::info!("Added received Testcase as item #{item}");
} }
Ok(())
} }
Event::CustomBuf { tag, buf } => { Event::CustomBuf { tag, buf } => {
for handler in &mut self.custom_buf_handlers { for handler in &mut self.custom_buf_handlers {
@ -596,18 +669,23 @@ where
break; break;
} }
} }
Ok(())
} }
_ => Err(Error::unknown(format!( _ => {
return Err(Error::unknown(format!(
"Received illegal message that message should not have arrived: {:?}.", "Received illegal message that message should not have arrived: {:?}.",
event.name() event.name()
))), )))
} }
} }
self.hooks
.post_exec_all(fuzzer, executor, state, client_id)?;
Ok(())
}
} }
impl<S> TcpEventManager<S> impl<EMH, S> TcpEventManager<EMH, S>
where where
EMH: EventManagerHooksTuple<S>,
S: State, S: State,
{ {
/// Send information that this client is exiting. /// Send information that this client is exiting.
@ -620,15 +698,17 @@ where
} }
} }
impl<S> UsesState for TcpEventManager<S> impl<EMH, S> UsesState for TcpEventManager<EMH, S>
where where
EMH: EventManagerHooksTuple<S>,
S: State, S: State,
{ {
type State = S; type State = S;
} }
impl<S> EventFirer for TcpEventManager<S> impl<EMH, S> EventFirer for TcpEventManager<EMH, S>
where where
EMH: EventManagerHooksTuple<S>,
S: State, S: State,
{ {
#[cfg(feature = "tcp_compression")] #[cfg(feature = "tcp_compression")]
@ -674,8 +754,9 @@ where
} }
} }
impl<S> EventRestarter for TcpEventManager<S> impl<EMH, S> EventRestarter for TcpEventManager<EMH, S>
where where
EMH: EventManagerHooksTuple<S>,
S: State, S: State,
{ {
/// The TCP client needs to wait until a broker has mapped all pages before shutting down. /// The TCP client needs to wait until a broker has mapped all pages before shutting down.
@ -686,11 +767,12 @@ where
} }
} }
impl<E, S, Z> EventProcessor<E, Z> for TcpEventManager<S> impl<E, EMH, S, Z> EventProcessor<E, Z> for TcpEventManager<EMH, S>
where where
S: State + HasExecutions + HasMetadata,
E: HasObservers<State = S> + Executor<Self, Z>, E: HasObservers<State = S> + Executor<Self, Z>,
for<'a> E::Observers: Deserialize<'a>, for<'a> E::Observers: Deserialize<'a>,
EMH: EventManagerHooksTuple<S>,
S: State + HasExecutions + HasMetadata,
Z: EvaluatorObservers<E::Observers, State = S> + ExecutionProcessor<E::Observers, State = S>, Z: EvaluatorObservers<E::Observers, State = S> + ExecutionProcessor<E::Observers, State = S>,
{ {
fn process( fn process(
@ -747,17 +829,19 @@ where
} }
} }
impl<E, S, Z> EventManager<E, Z> for TcpEventManager<S> impl<E, EMH, S, Z> EventManager<E, Z> for TcpEventManager<EMH, S>
where where
E: HasObservers<State = S> + Executor<Self, Z>, E: HasObservers<State = S> + Executor<Self, Z>,
for<'a> E::Observers: Deserialize<'a>, for<'a> E::Observers: Deserialize<'a>,
EMH: EventManagerHooksTuple<S>,
S: State + HasExecutions + HasMetadata + HasLastReportTime, S: State + HasExecutions + HasMetadata + HasLastReportTime,
Z: EvaluatorObservers<E::Observers, State = S> + ExecutionProcessor<E::Observers, State = S>, Z: EvaluatorObservers<E::Observers, State = S> + ExecutionProcessor<E::Observers, State = S>,
{ {
} }
impl<S> HasCustomBufHandlers for TcpEventManager<S> impl<EMH, S> HasCustomBufHandlers for TcpEventManager<EMH, S>
where where
EMH: EventManagerHooksTuple<S>,
S: State, S: State,
{ {
fn add_custom_buf_handler( fn add_custom_buf_handler(
@ -768,13 +852,16 @@ where
} }
} }
impl<S> ProgressReporter for TcpEventManager<S> where impl<EMH, S> ProgressReporter for TcpEventManager<EMH, S>
S: State + HasExecutions + HasMetadata + HasLastReportTime where
EMH: EventManagerHooksTuple<S>,
S: State + HasExecutions + HasMetadata + HasLastReportTime,
{ {
} }
impl<S> HasEventManagerId for TcpEventManager<S> impl<EMH, S> HasEventManagerId for TcpEventManager<EMH, S>
where where
EMH: EventManagerHooksTuple<S>,
S: State, S: State,
{ {
/// Gets the id assigned to this staterestorer. /// Gets the id assigned to this staterestorer.
@ -786,14 +873,15 @@ where
/// A manager that can restart on the fly, storing states in-between (in `on_restart`) /// A manager that can restart on the fly, storing states in-between (in `on_restart`)
#[cfg(feature = "std")] #[cfg(feature = "std")]
#[derive(Debug)] #[derive(Debug)]
pub struct TcpRestartingEventManager<S, SP> pub struct TcpRestartingEventManager<EMH, S, SP>
where where
EMH: EventManagerHooksTuple<S>,
S: State, S: State,
SP: ShMemProvider + 'static, SP: ShMemProvider + 'static,
//CE: CustomEvent<I>, //CE: CustomEvent<I>,
{ {
/// The embedded TCP event manager /// The embedded TCP event manager
tcp_mgr: TcpEventManager<S>, tcp_mgr: TcpEventManager<EMH, S>,
/// The staterestorer to serialize the state for the next runner /// The staterestorer to serialize the state for the next runner
staterestorer: StateRestorer<SP>, staterestorer: StateRestorer<SP>,
/// Decide if the state restorer must save the serialized state /// Decide if the state restorer must save the serialized state
@ -801,8 +889,9 @@ where
} }
#[cfg(feature = "std")] #[cfg(feature = "std")]
impl<S, SP> UsesState for TcpRestartingEventManager<S, SP> impl<EMH, S, SP> UsesState for TcpRestartingEventManager<EMH, S, SP>
where where
EMH: EventManagerHooksTuple<S>,
S: State, S: State,
SP: ShMemProvider + 'static, SP: ShMemProvider + 'static,
{ {
@ -810,16 +899,18 @@ where
} }
#[cfg(feature = "std")] #[cfg(feature = "std")]
impl<S, SP> ProgressReporter for TcpRestartingEventManager<S, SP> impl<EMH, S, SP> ProgressReporter for TcpRestartingEventManager<EMH, S, SP>
where where
EMH: EventManagerHooksTuple<S>,
S: State + HasExecutions + HasMetadata + HasLastReportTime, S: State + HasExecutions + HasMetadata + HasLastReportTime,
SP: ShMemProvider, SP: ShMemProvider,
{ {
} }
#[cfg(feature = "std")] #[cfg(feature = "std")]
impl<S, SP> EventFirer for TcpRestartingEventManager<S, SP> impl<EMH, S, SP> EventFirer for TcpRestartingEventManager<EMH, S, SP>
where where
EMH: EventManagerHooksTuple<S>,
SP: ShMemProvider, SP: ShMemProvider,
S: State, S: State,
//CE: CustomEvent<I>, //CE: CustomEvent<I>,
@ -839,8 +930,9 @@ where
} }
#[cfg(feature = "std")] #[cfg(feature = "std")]
impl<S, SP> EventRestarter for TcpRestartingEventManager<S, SP> impl<EMH, S, SP> EventRestarter for TcpRestartingEventManager<EMH, S, SP>
where where
EMH: EventManagerHooksTuple<S>,
S: State + HasExecutions, S: State + HasExecutions,
SP: ShMemProvider, SP: ShMemProvider,
//CE: CustomEvent<I>, //CE: CustomEvent<I>,
@ -877,10 +969,11 @@ where
} }
#[cfg(feature = "std")] #[cfg(feature = "std")]
impl<E, S, SP, Z> EventProcessor<E, Z> for TcpRestartingEventManager<S, SP> impl<E, EMH, S, SP, Z> EventProcessor<E, Z> for TcpRestartingEventManager<EMH, S, SP>
where where
E: HasObservers<State = S> + Executor<TcpEventManager<S>, Z>, E: HasObservers<State = S> + Executor<TcpEventManager<EMH, S>, Z>,
for<'a> E::Observers: Deserialize<'a>, for<'a> E::Observers: Deserialize<'a>,
EMH: EventManagerHooksTuple<S>,
S: State + HasExecutions + HasMetadata, S: State + HasExecutions + HasMetadata,
SP: ShMemProvider + 'static, SP: ShMemProvider + 'static,
Z: EvaluatorObservers<E::Observers, State = S> + ExecutionProcessor<E::Observers>, //CE: CustomEvent<I>, Z: EvaluatorObservers<E::Observers, State = S> + ExecutionProcessor<E::Observers>, //CE: CustomEvent<I>,
@ -891,10 +984,11 @@ where
} }
#[cfg(feature = "std")] #[cfg(feature = "std")]
impl<E, S, SP, Z> EventManager<E, Z> for TcpRestartingEventManager<S, SP> impl<E, EMH, S, SP, Z> EventManager<E, Z> for TcpRestartingEventManager<EMH, S, SP>
where where
E: HasObservers<State = S> + Executor<TcpEventManager<S>, Z>, E: HasObservers<State = S> + Executor<TcpEventManager<EMH, S>, Z>,
for<'a> E::Observers: Deserialize<'a>, for<'a> E::Observers: Deserialize<'a>,
EMH: EventManagerHooksTuple<S>,
S: State + HasExecutions + HasMetadata + HasLastReportTime, S: State + HasExecutions + HasMetadata + HasLastReportTime,
SP: ShMemProvider + 'static, SP: ShMemProvider + 'static,
Z: EvaluatorObservers<E::Observers, State = S> + ExecutionProcessor<E::Observers>, //CE: CustomEvent<I>, Z: EvaluatorObservers<E::Observers, State = S> + ExecutionProcessor<E::Observers>, //CE: CustomEvent<I>,
@ -902,8 +996,9 @@ where
} }
#[cfg(feature = "std")] #[cfg(feature = "std")]
impl<S, SP> HasEventManagerId for TcpRestartingEventManager<S, SP> impl<EMH, S, SP> HasEventManagerId for TcpRestartingEventManager<EMH, S, SP>
where where
EMH: EventManagerHooksTuple<S>,
S: State, S: State,
SP: ShMemProvider + 'static, SP: ShMemProvider + 'static,
{ {
@ -919,14 +1014,15 @@ const _ENV_FUZZER_RECEIVER: &str = "_AFL_ENV_FUZZER_RECEIVER";
const _ENV_FUZZER_BROKER_CLIENT_INITIAL: &str = "_AFL_ENV_FUZZER_BROKER_CLIENT"; const _ENV_FUZZER_BROKER_CLIENT_INITIAL: &str = "_AFL_ENV_FUZZER_BROKER_CLIENT";
#[cfg(feature = "std")] #[cfg(feature = "std")]
impl<S, SP> TcpRestartingEventManager<S, SP> impl<EMH, S, SP> TcpRestartingEventManager<EMH, S, SP>
where where
EMH: EventManagerHooksTuple<S>,
S: State, S: State,
SP: ShMemProvider + 'static, SP: ShMemProvider + 'static,
//CE: CustomEvent<I>, //CE: CustomEvent<I>,
{ {
/// Create a new runner, the executed child doing the actual fuzzing. /// Create a new runner, the executed child doing the actual fuzzing.
pub fn new(tcp_mgr: TcpEventManager<S>, staterestorer: StateRestorer<SP>) -> Self { pub fn new(tcp_mgr: TcpEventManager<EMH, S>, staterestorer: StateRestorer<SP>) -> Self {
Self { Self {
tcp_mgr, tcp_mgr,
staterestorer, staterestorer,
@ -936,7 +1032,7 @@ where
/// Create a new runner specifying if it must save the serialized state on restart. /// Create a new runner specifying if it must save the serialized state on restart.
pub fn with_save_state( pub fn with_save_state(
tcp_mgr: TcpEventManager<S>, tcp_mgr: TcpEventManager<EMH, S>,
staterestorer: StateRestorer<SP>, staterestorer: StateRestorer<SP>,
save_state: bool, save_state: bool,
) -> Self { ) -> Self {
@ -982,7 +1078,13 @@ pub fn setup_restarting_mgr_tcp<MT, S>(
monitor: MT, monitor: MT,
broker_port: u16, broker_port: u16,
configuration: EventConfig, configuration: EventConfig,
) -> Result<(Option<S>, TcpRestartingEventManager<S, StdShMemProvider>), Error> ) -> Result<
(
Option<S>,
TcpRestartingEventManager<(), S, StdShMemProvider>,
),
Error,
>
where where
MT: Monitor + Clone, MT: Monitor + Clone,
S: State + HasExecutions + HasMetadata, S: State + HasExecutions + HasMetadata,
@ -992,6 +1094,7 @@ where
.monitor(Some(monitor)) .monitor(Some(monitor))
.broker_port(broker_port) .broker_port(broker_port)
.configuration(configuration) .configuration(configuration)
.hooks(tuple_list!())
.build() .build()
.launch() .launch()
} }
@ -1002,7 +1105,7 @@ where
#[cfg(feature = "std")] #[cfg(feature = "std")]
#[allow(clippy::default_trait_access, clippy::ignored_unit_patterns)] #[allow(clippy::default_trait_access, clippy::ignored_unit_patterns)]
#[derive(TypedBuilder, Debug)] #[derive(TypedBuilder, Debug)]
pub struct TcpRestartingMgr<MT, S, SP> pub struct TcpRestartingMgr<EMH, MT, S, SP>
where where
S: UsesInput + DeserializeOwned, S: UsesInput + DeserializeOwned,
SP: ShMemProvider + 'static, SP: ShMemProvider + 'static,
@ -1037,14 +1140,17 @@ where
/// Tell the manager to serialize or not the state on restart /// Tell the manager to serialize or not the state on restart
#[builder(default = true)] #[builder(default = true)]
serialize_state: bool, serialize_state: bool,
/// The hooks for `handle_in_client`
hooks: EMH,
#[builder(setter(skip), default = PhantomData)] #[builder(setter(skip), default = PhantomData)]
phantom_data: PhantomData<S>, phantom_data: PhantomData<S>,
} }
#[cfg(feature = "std")] #[cfg(feature = "std")]
#[allow(clippy::type_complexity, clippy::too_many_lines)] #[allow(clippy::type_complexity, clippy::too_many_lines)]
impl<MT, S, SP> TcpRestartingMgr<MT, S, SP> impl<EMH, MT, S, SP> TcpRestartingMgr<EMH, MT, S, SP>
where where
EMH: EventManagerHooksTuple<S> + Copy + Clone,
SP: ShMemProvider, SP: ShMemProvider,
S: State + HasExecutions + HasMetadata, S: State + HasExecutions + HasMetadata,
MT: Monitor + Clone, MT: Monitor + Clone,
@ -1063,7 +1169,7 @@ where
} }
/// Launch the restarting manager /// Launch the restarting manager
pub fn launch(&mut self) -> Result<(Option<S>, TcpRestartingEventManager<S, SP>), Error> { pub fn launch(&mut self) -> Result<(Option<S>, TcpRestartingEventManager<EMH, S, SP>), Error> {
// We start ourself as child process to actually fuzz // We start ourself as child process to actually fuzz
let (staterestorer, _new_shmem_provider, core_id) = if env::var(_ENV_FUZZER_SENDER).is_err() let (staterestorer, _new_shmem_provider, core_id) = if env::var(_ENV_FUZZER_SENDER).is_err()
{ {
@ -1097,9 +1203,10 @@ where
} }
Err(Error::File(_, _)) => { Err(Error::File(_, _)) => {
// port was likely already bound // port was likely already bound
let mgr = TcpEventManager::<S>::new( let mgr = TcpEventManager::<EMH, S>::with_hooks(
&("127.0.0.1", self.broker_port), &("127.0.0.1", self.broker_port),
self.configuration, self.configuration,
self.hooks,
)?; )?;
(mgr, None) (mgr, None)
} }
@ -1119,7 +1226,11 @@ where
} }
TcpManagerKind::Client { cpu_core } => { TcpManagerKind::Client { cpu_core } => {
// We are a client // We are a client
let mgr = TcpEventManager::<S>::on_port(self.broker_port, self.configuration)?; let mgr = TcpEventManager::<EMH, S>::on_port_with_hooks(
self.broker_port,
self.configuration,
self.hooks,
)?;
(mgr, cpu_core) (mgr, cpu_core)
} }
@ -1232,10 +1343,11 @@ where
( (
state_opt, state_opt,
TcpRestartingEventManager::with_save_state( TcpRestartingEventManager::with_save_state(
TcpEventManager::existing_on_port( TcpEventManager::existing_on_port_with_hooks(
self.broker_port, self.broker_port,
this_id, this_id,
self.configuration, self.configuration,
self.hooks,
)?, )?,
staterestorer, staterestorer,
self.serialize_state, self.serialize_state,
@ -1244,10 +1356,11 @@ where
} else { } else {
log::info!("First run. Let's set it all up"); log::info!("First run. Let's set it all up");
// Mgr to send and receive msgs from/to all other fuzzer instances // Mgr to send and receive msgs from/to all other fuzzer instances
let mgr = TcpEventManager::<S>::existing_from_env( let mgr = TcpEventManager::<EMH, S>::existing_from_env_with_hooks(
&("127.0.0.1", self.broker_port), &("127.0.0.1", self.broker_port),
_ENV_FUZZER_BROKER_CLIENT_INITIAL, _ENV_FUZZER_BROKER_CLIENT_INITIAL,
self.configuration, self.configuration,
self.hooks,
)?; )?;
( (

View File

@ -28,7 +28,8 @@ Welcome to `LibAFL`
clippy::module_name_repetitions, clippy::module_name_repetitions,
clippy::ptr_cast_constness, clippy::ptr_cast_constness,
clippy::unsafe_derive_deserialize, clippy::unsafe_derive_deserialize,
clippy::similar_names clippy::similar_names,
clippy::too_many_lines
)] )]
#![cfg_attr(not(test), warn( #![cfg_attr(not(test), warn(
missing_debug_implementations, missing_debug_implementations,

View File

@ -115,7 +115,7 @@ impl<'a> ForkserverBytesCoverageSugar<'a> {
let monitor = MultiMonitor::new(|s| log::info!("{s}")); let monitor = MultiMonitor::new(|s| log::info!("{s}"));
let mut run_client = |state: Option<_>, let mut run_client = |state: Option<_>,
mut mgr: LlmpRestartingEventManager<_, _>, mut mgr: LlmpRestartingEventManager<_, _, _>,
_core_id| { _core_id| {
// Coverage map shared between target and fuzzer // Coverage map shared between target and fuzzer
let mut shmem = shmem_provider_client.new_shmem(MAP_SIZE).unwrap(); let mut shmem = shmem_provider_client.new_shmem(MAP_SIZE).unwrap();

View File

@ -139,7 +139,7 @@ where
let monitor = MultiMonitor::new(|s| println!("{s}")); let monitor = MultiMonitor::new(|s| println!("{s}"));
let mut run_client = |state: Option<_>, let mut run_client = |state: Option<_>,
mut mgr: LlmpRestartingEventManager<_, _>, mut mgr: LlmpRestartingEventManager<_, _, _>,
_core_id| { _core_id| {
// Create an observation channel using the coverage map // Create an observation channel using the coverage map
let edges_observer = let edges_observer =

View File

@ -146,7 +146,7 @@ where
let monitor = MultiMonitor::new(|s| log::info!("{s}")); let monitor = MultiMonitor::new(|s| log::info!("{s}"));
let mut run_client = |state: Option<_>, let mut run_client = |state: Option<_>,
mut mgr: LlmpRestartingEventManager<_, _>, mut mgr: LlmpRestartingEventManager<_, _, _>,
_core_id| { _core_id| {
// Create an observation channel using the coverage map // Create an observation channel using the coverage map
let edges_observer = unsafe { let edges_observer = unsafe {