Decouple fuzzer functions from event manager (#2915)

* decouple fuzzer from em

* lol

* 3

* fix tcp

* fix

* fix

* fix

* fixer

* std

* fixer

* plz

* plzplzplz

* plzplzplzplz

* mm

* more

* symbol

* a

* a

* mm

* mmm

* mmmm

* mmmmm

* ff
This commit is contained in:
Dongjia "toka" Zhang 2025-02-01 07:52:42 +01:00 committed by GitHub
parent ace2a76ece
commit 6cd97e7105
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
31 changed files with 488 additions and 604 deletions

View File

@ -1,3 +1,10 @@
# 0.15.0 -> 0.16.0
- `EventManager` is refactored to avoid calling function from `Fuzzer`, thus we do not evaluate testcases in `EventManager` anymore.
- Now we have `EventReceiver` in `events` module, and `EventProessor` in `fuzzer` module.
- `EventReceiver` is responsible for receiving testcases and delegates its evaluation to `EventProcessor`.
- `EventProcessor` is responsible for evaluating the testcases passed by the `EventReceiver`.
- Since we don't evaluate testcases in the `EventManager` anymore. `on_fire` and `post_exec` have been deleted from `EventManagerHook`.
- Similarly `pre_exec` has been renamed to `pre_receive`.
# 0.14.1 -> 0.15.0 # 0.14.1 -> 0.15.0
- `MmapShMem::new` and `MmapShMemProvider::new_shmem_with_id` now take `AsRef<Path>` instead of a byte array for the filename/id. - `MmapShMem::new` and `MmapShMemProvider::new_shmem_with_id` now take `AsRef<Path>` instead of a byte array for the filename/id.

View File

@ -7,7 +7,7 @@ use libafl::monitors::tui::TuiMonitor;
#[cfg(not(feature = "tui"))] #[cfg(not(feature = "tui"))]
use libafl::monitors::SimpleMonitor; use libafl::monitors::SimpleMonitor;
use libafl::{ use libafl::{
corpus::{Corpus, InMemoryCorpus, OnDiskCorpus}, corpus::{InMemoryCorpus, OnDiskCorpus},
events::SimpleEventManager, events::SimpleEventManager,
executors::{Executor, ExitKind, WithObservers}, executors::{Executor, ExitKind, WithObservers},
feedback_and_fast, feedback_and_fast,

View File

@ -123,7 +123,7 @@ pub fn main() {
&mut state, &mut state,
&mut executor, &mut executor,
&mut mgr, &mut mgr,
BytesInput::new(vec![b'a']), &BytesInput::new(vec![b'a']),
) )
.unwrap(); .unwrap();

View File

@ -78,7 +78,6 @@ pub type LibaflFuzzState =
#[cfg(not(feature = "fuzzbench"))] #[cfg(not(feature = "fuzzbench"))]
type LibaflFuzzManager = CentralizedEventManager< type LibaflFuzzManager = CentralizedEventManager<
LlmpRestartingEventManager<(), BytesInput, LibaflFuzzState, StdShMem, StdShMemProvider>, LlmpRestartingEventManager<(), BytesInput, LibaflFuzzState, StdShMem, StdShMemProvider>,
(),
BytesInput, BytesInput,
LibaflFuzzState, LibaflFuzzState,
StdShMem, StdShMem,

View File

@ -14,7 +14,7 @@ impl<I, S> EventManagerHook<I, S> for LibAflFuzzEventHook
where where
S: Stoppable, S: Stoppable,
{ {
fn pre_exec( fn pre_receive(
&mut self, &mut self,
state: &mut S, state: &mut S,
_client_id: ClientId, _client_id: ClientId,
@ -26,7 +26,4 @@ where
} }
Ok(true) Ok(true)
} }
fn post_exec(&mut self, _state: &mut S, _client_id: ClientId) -> Result<bool, Error> {
Ok(true)
}
} }

View File

@ -141,7 +141,7 @@ pub extern "C" fn libafl_main() {
let mut secondary_run_client = let mut secondary_run_client =
|state: Option<_>, |state: Option<_>,
mut mgr: CentralizedEventManager<_, _, _, _, _, _>, mut mgr: CentralizedEventManager<_, _, _, _, _>,
_client_description: ClientDescription| { _client_description: ClientDescription| {
// Create an observation channel using the coverage map // Create an observation channel using the coverage map
let edges_observer = let edges_observer =

View File

@ -157,7 +157,7 @@ pub extern "C" fn libafl_main() {
let mut secondary_run_client = let mut secondary_run_client =
|state: Option<_>, |state: Option<_>,
mut mgr: CentralizedEventManager<_, _, _, _, _, _>, mut mgr: CentralizedEventManager<_, _, _, _, _>,
_client_description: ClientDescription| { _client_description: ClientDescription| {
// Create an observation channel using the coverage map // Create an observation channel using the coverage map
let edges_observer = let edges_observer =

View File

@ -164,7 +164,7 @@ pub fn main() {
for input in initial_inputs { for input in initial_inputs {
fuzzer fuzzer
.evaluate_input(&mut state, &mut executor, &mut mgr, input) .evaluate_input(&mut state, &mut executor, &mut mgr, &input)
.unwrap(); .unwrap();
} }

View File

@ -151,7 +151,7 @@ pub fn main() {
]); ]);
fuzzer fuzzer
.evaluate_input(&mut state, &mut executor, &mut mgr, initial) .evaluate_input(&mut state, &mut executor, &mut mgr, &initial)
.unwrap(); .unwrap();
// Setup a mutational stage with a basic bytes mutator // Setup a mutational stage with a basic bytes mutator

View File

@ -22,21 +22,19 @@ use libafl_bolts::{
tuples::{Handle, MatchNameRef}, tuples::{Handle, MatchNameRef},
ClientId, ClientId,
}; };
use serde::{de::DeserializeOwned, Serialize}; use serde::Serialize;
use super::AwaitRestartSafe; use super::{AwaitRestartSafe, RecordSerializationTime};
#[cfg(feature = "llmp_compression")] #[cfg(feature = "llmp_compression")]
use crate::events::llmp::COMPRESS_THRESHOLD; use crate::events::llmp::COMPRESS_THRESHOLD;
use crate::{ use crate::{
common::HasMetadata, common::HasMetadata,
events::{ events::{
serialize_observers_adaptive, std_maybe_report_progress, std_report_progress, serialize_observers_adaptive, std_maybe_report_progress, std_report_progress,
AdaptiveSerializer, CanSerializeObserver, Event, EventConfig, EventFirer, AdaptiveSerializer, CanSerializeObserver, Event, EventConfig, EventFirer, EventManagerId,
EventManagerHooksTuple, EventManagerId, EventProcessor, EventRestarter, HasEventManagerId, EventReceiver, EventRestarter, HasEventManagerId, LogSeverity, ProgressReporter,
LogSeverity, ProgressReporter, SendExiting, SendExiting,
}, },
executors::HasObservers,
fuzzer::{EvaluatorObservers, ExecutionProcessor},
inputs::Input, inputs::Input,
observers::TimeObserver, observers::TimeObserver,
state::{HasExecutions, HasLastReportTime, MaybeHasClientPerfMonitor, Stoppable}, state::{HasExecutions, HasLastReportTime, MaybeHasClientPerfMonitor, Stoppable},
@ -47,19 +45,18 @@ pub(crate) const _LLMP_TAG_TO_MAIN: Tag = Tag(0x3453453);
/// A wrapper manager to implement a main-secondary architecture with another broker /// A wrapper manager to implement a main-secondary architecture with another broker
#[derive(Debug)] #[derive(Debug)]
pub struct CentralizedEventManager<EM, EMH, I, S, SHM, SP> { pub struct CentralizedEventManager<EM, I, S, SHM, SP> {
inner: EM, inner: EM,
/// The centralized LLMP client for inter process communication /// The centralized LLMP client for inter process communication
client: LlmpClient<SHM, SP>, client: LlmpClient<SHM, SP>,
#[cfg(feature = "llmp_compression")] #[cfg(feature = "llmp_compression")]
compressor: GzipCompressor, compressor: GzipCompressor,
time_ref: Option<Handle<TimeObserver>>, time_ref: Option<Handle<TimeObserver>>,
hooks: EMH,
is_main: bool, is_main: bool,
phantom: PhantomData<(I, S)>, phantom: PhantomData<(I, S)>,
} }
impl CentralizedEventManager<(), (), (), (), (), ()> { impl CentralizedEventManager<(), (), (), (), ()> {
/// Creates a builder for [`CentralizedEventManager`] /// Creates a builder for [`CentralizedEventManager`]
#[must_use] #[must_use]
pub fn builder() -> CentralizedEventManagerBuilder { pub fn builder() -> CentralizedEventManagerBuilder {
@ -93,20 +90,17 @@ impl CentralizedEventManagerBuilder {
} }
/// Creates a new [`CentralizedEventManager`]. /// Creates a new [`CentralizedEventManager`].
#[expect(clippy::type_complexity)] pub fn build_from_client<EM, I, S, SP>(
pub fn build_from_client<EM, EMH, I, S, SP>(
self, self,
inner: EM, inner: EM,
hooks: EMH,
client: LlmpClient<SP::ShMem, SP>, client: LlmpClient<SP::ShMem, SP>,
time_obs: Option<Handle<TimeObserver>>, time_obs: Option<Handle<TimeObserver>>,
) -> Result<CentralizedEventManager<EM, EMH, I, S, SP::ShMem, SP>, Error> ) -> Result<CentralizedEventManager<EM, I, S, SP::ShMem, SP>, Error>
where where
SP: ShMemProvider, SP: ShMemProvider,
{ {
Ok(CentralizedEventManager { Ok(CentralizedEventManager {
inner, inner,
hooks,
client, client,
#[cfg(feature = "llmp_compression")] #[cfg(feature = "llmp_compression")]
compressor: GzipCompressor::with_threshold(COMPRESS_THRESHOLD), compressor: GzipCompressor::with_threshold(COMPRESS_THRESHOLD),
@ -120,59 +114,66 @@ impl CentralizedEventManagerBuilder {
/// ///
/// If the port is not yet bound, it will act as a broker; otherwise, it /// If the port is not yet bound, it will act as a broker; otherwise, it
/// will act as a client. /// will act as a client.
pub fn build_on_port<EM, EMH, I, S, SHM, SP>( pub fn build_on_port<EM, I, S, SHM, SP>(
self, self,
inner: EM, inner: EM,
hooks: EMH,
shmem_provider: SP, shmem_provider: SP,
port: u16, port: u16,
time_obs: Option<Handle<TimeObserver>>, time_obs: Option<Handle<TimeObserver>>,
) -> Result<CentralizedEventManager<EM, EMH, I, S, SHM, SP>, Error> ) -> Result<CentralizedEventManager<EM, I, S, SHM, SP>, Error>
where where
SHM: ShMem, SHM: ShMem,
SP: ShMemProvider<ShMem = SHM>, SP: ShMemProvider<ShMem = SHM>,
{ {
let client = LlmpClient::create_attach_to_tcp(shmem_provider, port)?; let client = LlmpClient::create_attach_to_tcp(shmem_provider, port)?;
Self::build_from_client(self, inner, hooks, client, time_obs) Self::build_from_client(self, inner, client, time_obs)
} }
/// If a client respawns, it may reuse the existing connection, previously /// If a client respawns, it may reuse the existing connection, previously
/// stored by [`LlmpClient::to_env()`]. /// stored by [`LlmpClient::to_env()`].
pub fn build_existing_client_from_env<EM, EMH, I, S, SHM, SP>( pub fn build_existing_client_from_env<EM, I, S, SHM, SP>(
self, self,
inner: EM, inner: EM,
hooks: EMH,
shmem_provider: SP, shmem_provider: SP,
env_name: &str, env_name: &str,
time_obs: Option<Handle<TimeObserver>>, time_obs: Option<Handle<TimeObserver>>,
) -> Result<CentralizedEventManager<EM, EMH, I, S, SHM, SP>, Error> ) -> Result<CentralizedEventManager<EM, I, S, SHM, SP>, Error>
where where
SHM: ShMem, SHM: ShMem,
SP: ShMemProvider<ShMem = SHM>, SP: ShMemProvider<ShMem = SHM>,
{ {
let client = LlmpClient::on_existing_from_env(shmem_provider, env_name)?; let client = LlmpClient::on_existing_from_env(shmem_provider, env_name)?;
Self::build_from_client(self, inner, hooks, client, time_obs) Self::build_from_client(self, inner, client, time_obs)
} }
/// Create an existing client from description /// Create an existing client from description
pub fn existing_client_from_description<EM, EMH, I, S, SHM, SP>( pub fn existing_client_from_description<EM, I, S, SHM, SP>(
self, self,
inner: EM, inner: EM,
hooks: EMH,
shmem_provider: SP, shmem_provider: SP,
description: &LlmpClientDescription, description: &LlmpClientDescription,
time_obs: Option<Handle<TimeObserver>>, time_obs: Option<Handle<TimeObserver>>,
) -> Result<CentralizedEventManager<EM, EMH, I, S, SHM, SP>, Error> ) -> Result<CentralizedEventManager<EM, I, S, SHM, SP>, Error>
where where
SHM: ShMem, SHM: ShMem,
SP: ShMemProvider<ShMem = SHM>, SP: ShMemProvider<ShMem = SHM>,
{ {
let client = LlmpClient::existing_client_from_description(shmem_provider, description)?; let client = LlmpClient::existing_client_from_description(shmem_provider, description)?;
Self::build_from_client(self, inner, hooks, client, time_obs) Self::build_from_client(self, inner, client, time_obs)
} }
} }
impl<EM, EMH, I, S, SHM, SP> AdaptiveSerializer for CentralizedEventManager<EM, EMH, I, S, SHM, SP> impl<EM, I, S, SHM, SP> RecordSerializationTime for CentralizedEventManager<EM, I, S, SHM, SP>
where
EM: RecordSerializationTime,
{
/// Set the deserialization time (mut)
fn set_deserialization_time(&mut self, dur: Duration) {
self.inner.set_deserialization_time(dur);
}
}
impl<EM, I, S, SHM, SP> AdaptiveSerializer for CentralizedEventManager<EM, I, S, SHM, SP>
where where
EM: AdaptiveSerializer, EM: AdaptiveSerializer,
{ {
@ -207,10 +208,9 @@ where
} }
} }
impl<EM, EMH, I, S, SHM, SP> EventFirer<I, S> for CentralizedEventManager<EM, EMH, I, S, SHM, SP> impl<EM, I, S, SHM, SP> EventFirer<I, S> for CentralizedEventManager<EM, I, S, SHM, SP>
where where
EM: HasEventManagerId + EventFirer<I, S>, EM: HasEventManagerId + EventFirer<I, S>,
EMH: EventManagerHooksTuple<I, S>,
S: Stoppable, S: Stoppable,
I: Input, I: Input,
SHM: ShMem, SHM: ShMem,
@ -265,7 +265,7 @@ where
} }
} }
impl<EM, EMH, I, S, SHM, SP> EventRestarter<S> for CentralizedEventManager<EM, EMH, I, S, SHM, SP> impl<EM, I, S, SHM, SP> EventRestarter<S> for CentralizedEventManager<EM, I, S, SHM, SP>
where where
EM: EventRestarter<S>, EM: EventRestarter<S>,
SHM: ShMem, SHM: ShMem,
@ -279,8 +279,7 @@ where
} }
} }
impl<EM, EMH, I, OT, S, SHM, SP> CanSerializeObserver<OT> impl<EM, I, OT, S, SHM, SP> CanSerializeObserver<OT> for CentralizedEventManager<EM, I, S, SHM, SP>
for CentralizedEventManager<EM, EMH, I, S, SHM, SP>
where where
EM: AdaptiveSerializer, EM: AdaptiveSerializer,
OT: MatchNameRef + Serialize, OT: MatchNameRef + Serialize,
@ -295,7 +294,7 @@ where
} }
} }
impl<EM, EMH, I, S, SHM, SP> SendExiting for CentralizedEventManager<EM, EMH, I, S, SHM, SP> impl<EM, I, S, SHM, SP> SendExiting for CentralizedEventManager<EM, I, S, SHM, SP>
where where
EM: SendExiting, EM: SendExiting,
SHM: ShMem, SHM: ShMem,
@ -305,9 +304,14 @@ where
self.client.sender_mut().send_exiting()?; self.client.sender_mut().send_exiting()?;
self.inner.send_exiting() self.inner.send_exiting()
} }
fn on_shutdown(&mut self) -> Result<(), Error> {
self.inner.on_shutdown()?;
self.client.sender_mut().send_exiting()
}
} }
impl<EM, EMH, I, S, SHM, SP> AwaitRestartSafe for CentralizedEventManager<EM, EMH, I, S, SHM, SP> impl<EM, I, S, SHM, SP> AwaitRestartSafe for CentralizedEventManager<EM, I, S, SHM, SP>
where where
SHM: ShMem, SHM: ShMem,
EM: AwaitRestartSafe, EM: AwaitRestartSafe,
@ -319,40 +323,33 @@ where
} }
} }
impl<E, EM, EMH, I, S, SHM, SP, Z> EventProcessor<E, S, Z> impl<EM, I, S, SHM, SP> EventReceiver<I, S> for CentralizedEventManager<EM, I, S, SHM, SP>
for CentralizedEventManager<EM, EMH, I, S, SHM, SP>
where where
E: HasObservers, EM: EventReceiver<I, S> + HasEventManagerId + EventFirer<I, S>,
E::Observers: DeserializeOwned,
EM: EventProcessor<E, S, Z> + HasEventManagerId + EventFirer<I, S>,
EMH: EventManagerHooksTuple<I, S>,
I: Input, I: Input,
S: Stoppable, S: Stoppable,
SHM: ShMem, SHM: ShMem,
SP: ShMemProvider<ShMem = SHM>, SP: ShMemProvider<ShMem = SHM>,
Z: ExecutionProcessor<Self, I, E::Observers, S> + EvaluatorObservers<E, Self, I, S>,
{ {
fn process(&mut self, fuzzer: &mut Z, state: &mut S, executor: &mut E) -> Result<usize, Error> { fn try_receive(&mut self, state: &mut S) -> Result<Option<(Event<I>, bool)>, Error> {
if self.is_main { if self.is_main {
// main node // main node
self.receive_from_secondary(fuzzer, state, executor) self.receive_from_secondary(state)
// self.inner.process(fuzzer, state, executor) // self.inner.process(fuzzer, state, executor)
} else { } else {
// The main node does not process incoming events from the broker ATM // The main node does not process incoming events from the broker ATM
self.inner.process(fuzzer, state, executor) self.inner.try_receive(state)
} }
} }
fn on_shutdown(&mut self) -> Result<(), Error> { fn on_interesting(&mut self, state: &mut S, event: Event<I>) -> Result<(), Error> {
self.inner.on_shutdown()?; self.inner.fire(state, event)
self.client.sender_mut().send_exiting()
} }
} }
impl<EM, EMH, I, S, SHM, SP> ProgressReporter<S> for CentralizedEventManager<EM, EMH, I, S, SHM, SP> impl<EM, I, S, SHM, SP> ProgressReporter<S> for CentralizedEventManager<EM, I, S, SHM, SP>
where where
EM: EventFirer<I, S> + HasEventManagerId, EM: EventFirer<I, S> + HasEventManagerId,
EMH: EventManagerHooksTuple<I, S>,
I: Input, I: Input,
S: HasExecutions + HasMetadata + HasLastReportTime + Stoppable + MaybeHasClientPerfMonitor, S: HasExecutions + HasMetadata + HasLastReportTime + Stoppable + MaybeHasClientPerfMonitor,
SHM: ShMem, SHM: ShMem,
@ -371,7 +368,7 @@ where
} }
} }
impl<EM, EMH, I, S, SHM, SP> HasEventManagerId for CentralizedEventManager<EM, EMH, I, S, SHM, SP> impl<EM, I, S, SHM, SP> HasEventManagerId for CentralizedEventManager<EM, I, S, SHM, SP>
where where
EM: HasEventManagerId, EM: HasEventManagerId,
{ {
@ -380,7 +377,7 @@ where
} }
} }
impl<EM, EMH, I, S, SHM, SP> CentralizedEventManager<EM, EMH, I, S, SHM, SP> impl<EM, I, S, SHM, SP> CentralizedEventManager<EM, I, S, SHM, SP>
where where
SHM: ShMem, SHM: ShMem,
SP: ShMemProvider<ShMem = SHM>, SP: ShMemProvider<ShMem = SHM>,
@ -402,10 +399,9 @@ where
} }
} }
impl<EM, EMH, I, S, SHM, SP> CentralizedEventManager<EM, EMH, I, S, SHM, SP> impl<EM, I, S, SHM, SP> CentralizedEventManager<EM, I, S, SHM, SP>
where where
EM: HasEventManagerId + EventFirer<I, S>, EM: HasEventManagerId + EventFirer<I, S>,
EMH: EventManagerHooksTuple<I, S>,
I: Input, I: Input,
S: Stoppable, S: Stoppable,
SHM: ShMem, SHM: ShMem,
@ -438,20 +434,9 @@ where
Ok(()) Ok(())
} }
fn receive_from_secondary<E, Z>( fn receive_from_secondary(&mut self, state: &mut S) -> Result<Option<(Event<I>, bool)>, Error> {
&mut self,
fuzzer: &mut Z,
state: &mut S,
executor: &mut E,
) -> Result<usize, Error>
where
E: HasObservers,
E::Observers: DeserializeOwned,
Z: ExecutionProcessor<Self, I, E::Observers, S> + EvaluatorObservers<E, Self, I, S>,
{
// TODO: Get around local event copy by moving handle_in_client // TODO: Get around local event copy by moving handle_in_client
let self_id = self.client.sender().id(); let self_id = self.client.sender().id();
let mut count = 0;
while let Some((client_id, tag, _flags, msg)) = self.client.recv_buf_with_flags()? { while let Some((client_id, tag, _flags, msg)) = self.client.recv_buf_with_flags()? {
assert!( assert!(
tag == _LLMP_TAG_TO_MAIN, tag == _LLMP_TAG_TO_MAIN,
@ -474,116 +459,43 @@ where
}; };
let event: Event<I> = postcard::from_bytes(event_bytes)?; let event: Event<I> = postcard::from_bytes(event_bytes)?;
log::debug!("Processor received message {}", event.name_detailed()); log::debug!("Processor received message {}", event.name_detailed());
self.handle_in_main(fuzzer, executor, state, client_id, event)?;
count += 1;
}
Ok(count)
}
// Handle arriving events in the main node let event_name = event.name_detailed();
fn handle_in_main<E, Z>(
&mut self,
fuzzer: &mut Z,
executor: &mut E,
state: &mut S,
client_id: ClientId,
event: Event<I>,
) -> Result<(), Error>
where
E: HasObservers,
E::Observers: DeserializeOwned,
Z: ExecutionProcessor<Self, I, E::Observers, S> + EvaluatorObservers<E, Self, I, S>,
{
log::debug!("handle_in_main!");
let event_name = event.name_detailed(); match event {
Event::NewTestcase {
match event { client_config,
Event::NewTestcase { ref observers_buf,
input, forward_id,
client_config, ..
exit_kind, } => {
corpus_size, log::debug!(
observers_buf, "Received {} from {client_id:?} ({client_config:?}, forward {forward_id:?})",
time, event_name
forward_id, );
#[cfg(feature = "multi_machine")]
node_id,
} => {
log::debug!(
"Received {} from {client_id:?} ({client_config:?}, forward {forward_id:?})",
event_name
);
let res =
if client_config.match_with(&self.configuration()) && observers_buf.is_some() {
let observers: E::Observers =
postcard::from_bytes(observers_buf.as_ref().unwrap())?;
log::debug!(
"[{}] Running fuzzer with event {}",
process::id(),
event_name
);
fuzzer.evaluate_execution(
state,
self,
input.clone(),
&observers,
&exit_kind,
false,
)?
} else {
log::debug!(
"[{}] Running fuzzer with event {}",
process::id(),
event_name
);
fuzzer.evaluate_input_with_observers(
state,
executor,
self,
input.clone(),
false,
)?
};
if let Some(item) = res.1 {
let event = Event::NewTestcase {
input,
client_config,
exit_kind,
corpus_size,
observers_buf,
time,
forward_id,
#[cfg(feature = "multi_machine")]
node_id,
};
self.hooks.on_fire_all(state, client_id, &event)?;
log::debug!( log::debug!(
"[{}] Adding received Testcase {} as item #{item}...", "[{}] Running fuzzer with event {}",
process::id(), process::id(),
event_name event_name
); );
self.inner.fire(state, event)?; if client_config.match_with(&self.configuration()) && observers_buf.is_some() {
} else { return Ok(Some((event, true)));
log::debug!("[{}] {} was discarded...)", process::id(), event_name); }
return Ok(Some((event, false)));
}
Event::Stop => {
state.request_stop();
}
_ => {
return Err(Error::illegal_state(format!(
"Received illegal message that message should not have arrived: {:?}.",
event.name()
)));
} }
} }
Event::Stop => {
state.request_stop();
}
_ => {
return Err(Error::unknown(format!(
"Received illegal message that message should not have arrived: {:?}.",
event.name()
)));
}
} }
Ok(None)
Ok(())
} }
} }

View File

@ -1,4 +1,4 @@
//! Hooks for event managers, especifically these are used to hook before `handle_in_client`. //! Hooks for event managers, especifically these are used to hook before `try_receive`.
//! //!
//! This will allow user to define pre/post-processing code when the event manager receives any message from //! This will allow user to define pre/post-processing code when the event manager receives any message from
//! other clients //! other clients
@ -6,59 +6,32 @@ use libafl_bolts::ClientId;
use crate::{events::Event, Error}; use crate::{events::Event, Error};
/// The `broker_hooks` that are run before and after the event manager calls `handle_in_client` /// The `broker_hooks` that are run before and after the event manager calls `try_receive`
pub trait EventManagerHook<I, S> { pub trait EventManagerHook<I, S> {
/// The hook that runs before `handle_in_client` /// The hook that runs before `try_receive`
/// Return false if you want to cancel the subsequent event handling /// Return false if you want to cancel the subsequent event handling
fn pre_exec( fn pre_receive(
&mut self, &mut self,
state: &mut S, state: &mut S,
client_id: ClientId, client_id: ClientId,
event: &Event<I>, event: &Event<I>,
) -> Result<bool, Error>; ) -> Result<bool, Error>;
/// Triggered when the even manager decides to fire the event after processing
fn on_fire(
&mut self,
_state: &mut S,
_client_id: ClientId,
_event: &Event<I>,
) -> Result<(), Error> {
Ok(())
}
/// The hook that runs after `handle_in_client`
/// Return false if you want to cancel the subsequent event handling
fn post_exec(&mut self, _state: &mut S, _client_id: ClientId) -> Result<bool, Error> {
Ok(true)
}
} }
/// The tuples contains `broker_hooks` to be executed for `handle_in_client` /// The tuples contains `broker_hooks` to be executed for `try_receive`
pub trait EventManagerHooksTuple<I, S> { pub trait EventManagerHooksTuple<I, S> {
/// The hook that runs before `handle_in_client` /// The hook that runs before `try_receive`
fn pre_exec_all( fn pre_receive_all(
&mut self, &mut self,
state: &mut S, state: &mut S,
client_id: ClientId, client_id: ClientId,
event: &Event<I>, event: &Event<I>,
) -> Result<bool, Error>; ) -> Result<bool, Error>;
/// Ran when the Event Manager decides to accept an event and propagates it
fn on_fire_all(
&mut self,
state: &mut S,
client_id: ClientId,
event: &Event<I>,
) -> Result<(), Error>;
/// The hook that runs after `handle_in_client`
fn post_exec_all(&mut self, state: &mut S, client_id: ClientId) -> Result<bool, Error>;
} }
impl<I, S> EventManagerHooksTuple<I, S> for () { impl<I, S> EventManagerHooksTuple<I, S> for () {
/// The hook that runs before `handle_in_client` /// The hook that runs before `try_receive`
fn pre_exec_all( fn pre_receive_all(
&mut self, &mut self,
_state: &mut S, _state: &mut S,
_client_id: ClientId, _client_id: ClientId,
@ -66,20 +39,6 @@ impl<I, S> EventManagerHooksTuple<I, S> for () {
) -> Result<bool, Error> { ) -> Result<bool, Error> {
Ok(true) Ok(true)
} }
fn on_fire_all(
&mut self,
_state: &mut S,
_client_id: ClientId,
_event: &Event<I>,
) -> Result<(), Error> {
Ok(())
}
/// The hook that runs after `handle_in_client`
fn post_exec_all(&mut self, _state: &mut S, _client_id: ClientId) -> Result<bool, Error> {
Ok(true)
}
} }
impl<Head, Tail, I, S> EventManagerHooksTuple<I, S> for (Head, Tail) impl<Head, Tail, I, S> EventManagerHooksTuple<I, S> for (Head, Tail)
@ -87,32 +46,15 @@ where
Head: EventManagerHook<I, S>, Head: EventManagerHook<I, S>,
Tail: EventManagerHooksTuple<I, S>, Tail: EventManagerHooksTuple<I, S>,
{ {
/// The hook that runs before `handle_in_client` /// The hook that runs before `try_receive`
fn pre_exec_all( fn pre_receive_all(
&mut self, &mut self,
state: &mut S, state: &mut S,
client_id: ClientId, client_id: ClientId,
event: &Event<I>, event: &Event<I>,
) -> Result<bool, Error> { ) -> Result<bool, Error> {
let first = self.0.pre_exec(state, client_id, event)?; let first = self.0.pre_receive(state, client_id, event)?;
let second = self.1.pre_exec_all(state, client_id, event)?; let second = self.1.pre_receive_all(state, client_id, event)?;
Ok(first & second)
}
fn on_fire_all(
&mut self,
state: &mut S,
client_id: ClientId,
event: &Event<I>,
) -> Result<(), Error> {
self.0.on_fire(state, client_id, event)?;
self.1.on_fire_all(state, client_id, event)
}
/// The hook that runs after `handle_in_client`
fn post_exec_all(&mut self, state: &mut S, client_id: ClientId) -> Result<bool, Error> {
let first = self.0.post_exec(state, client_id)?;
let second = self.1.post_exec_all(state, client_id)?;
Ok(first & second) Ok(first & second)
} }
} }

View File

@ -637,7 +637,6 @@ where
Option<S>, Option<S>,
CentralizedEventManager< CentralizedEventManager<
StdCentralizedInnerMgr<I, S, SP::ShMem, SP>, StdCentralizedInnerMgr<I, S, SP::ShMem, SP>,
(),
I, I,
S, S,
SP::ShMem, SP::ShMem,
@ -649,7 +648,6 @@ where
Option<S>, Option<S>,
CentralizedEventManager< CentralizedEventManager<
StdCentralizedInnerMgr<I, S, SP::ShMem, SP>, StdCentralizedInnerMgr<I, S, SP::ShMem, SP>,
(),
I, I,
S, S,
SP::ShMem, SP::ShMem,
@ -696,13 +694,13 @@ where
I: Input + Send + Sync + 'static, I: Input + Send + Sync + 'static,
CF: FnOnce( CF: FnOnce(
Option<S>, Option<S>,
CentralizedEventManager<EM, (), I, S, SP::ShMem, SP>, CentralizedEventManager<EM, I, S, SP::ShMem, SP>,
ClientDescription, ClientDescription,
) -> Result<(), Error>, ) -> Result<(), Error>,
EMB: FnOnce(&Self, ClientDescription) -> Result<(Option<S>, EM), Error>, EMB: FnOnce(&Self, ClientDescription) -> Result<(Option<S>, EM), Error>,
MF: FnOnce( MF: FnOnce(
Option<S>, Option<S>,
CentralizedEventManager<EM, (), I, S, SP::ShMem, SP>, // No broker_hooks for centralized EM CentralizedEventManager<EM, I, S, SP::ShMem, SP>, // No broker_hooks for centralized EM
ClientDescription, ClientDescription,
) -> Result<(), Error>, ) -> Result<(), Error>,
{ {
@ -788,7 +786,6 @@ where
let c_mgr = centralized_event_manager_builder.build_on_port( let c_mgr = centralized_event_manager_builder.build_on_port(
mgr, mgr,
// tuple_list!(multi_machine_event_manager_hook.take().unwrap()), // tuple_list!(multi_machine_event_manager_hook.take().unwrap()),
tuple_list!(),
self.shmem_provider.clone(), self.shmem_provider.clone(),
self.centralized_broker_port, self.centralized_broker_port,
self.time_obs.clone(), self.time_obs.clone(),
@ -815,7 +812,6 @@ where
let c_mgr = centralized_builder.build_on_port( let c_mgr = centralized_builder.build_on_port(
mgr, mgr,
tuple_list!(),
self.shmem_provider.clone(), self.shmem_provider.clone(),
self.centralized_broker_port, self.centralized_broker_port,
self.time_obs.clone(), self.time_obs.clone(),

View File

@ -276,7 +276,7 @@ where
state, state,
executor, executor,
manager, manager,
converter.convert(input)?, &converter.convert(input)?,
false, false,
)?; )?;
@ -298,7 +298,7 @@ where
state, state,
executor, executor,
manager, manager,
converter.convert(input)?, &converter.convert(input)?,
false, false,
)?; )?;

View File

@ -37,7 +37,6 @@ use libafl_bolts::{
shmem::{ShMem, ShMemProvider, StdShMem, StdShMemProvider}, shmem::{ShMem, ShMemProvider, StdShMem, StdShMemProvider},
staterestore::StateRestorer, staterestore::StateRestorer,
tuples::{tuple_list, Handle, MatchNameRef}, tuples::{tuple_list, Handle, MatchNameRef},
ClientId,
}; };
#[cfg(feature = "std")] #[cfg(feature = "std")]
use libafl_bolts::{ use libafl_bolts::{
@ -56,12 +55,11 @@ use crate::{
events::{ events::{
launcher::ClientDescription, serialize_observers_adaptive, std_maybe_report_progress, launcher::ClientDescription, serialize_observers_adaptive, std_maybe_report_progress,
std_report_progress, AdaptiveSerializer, AwaitRestartSafe, CanSerializeObserver, Event, std_report_progress, AdaptiveSerializer, AwaitRestartSafe, CanSerializeObserver, Event,
EventConfig, EventFirer, EventManagerHooksTuple, EventManagerId, EventProcessor, EventConfig, EventFirer, EventManagerHooksTuple, EventManagerId, EventReceiver,
EventRestarter, HasEventManagerId, LlmpShouldSaveState, ProgressReporter, SendExiting, EventRestarter, HasEventManagerId, LlmpShouldSaveState, ProgressReporter,
StdLlmpEventHook, LLMP_TAG_EVENT_TO_BOTH, _LLMP_TAG_EVENT_TO_BROKER, RecordSerializationTime, SendExiting, StdLlmpEventHook, LLMP_TAG_EVENT_TO_BOTH,
_LLMP_TAG_EVENT_TO_BROKER,
}, },
executors::HasObservers,
fuzzer::{EvaluatorObservers, ExecutionProcessor},
inputs::Input, inputs::Input,
monitors::Monitor, monitors::Monitor,
observers::TimeObserver, observers::TimeObserver,
@ -104,6 +102,15 @@ pub struct LlmpRestartingEventManager<EMH, I, S, SHM, SP> {
phantom: PhantomData<(I, S)>, phantom: PhantomData<(I, S)>,
} }
impl<EMH, I, S, SHM, SP> RecordSerializationTime for LlmpRestartingEventManager<EMH, I, S, SHM, SP>
where
SHM: ShMem,
{
fn set_deserialization_time(&mut self, dur: Duration) {
self.deserialization_time = dur;
}
}
impl<EMH, I, S, SHM, SP> AdaptiveSerializer for LlmpRestartingEventManager<EMH, I, S, SHM, SP> impl<EMH, I, S, SHM, SP> AdaptiveSerializer for LlmpRestartingEventManager<EMH, I, S, SHM, SP>
where where
SHM: ShMem, SHM: ShMem,
@ -286,6 +293,10 @@ where
// This way, the broker can clean up the pages, and eventually exit. // This way, the broker can clean up the pages, and eventually exit.
self.llmp.sender_mut().send_exiting() self.llmp.sender_mut().send_exiting()
} }
fn on_shutdown(&mut self) -> Result<(), Error> {
self.send_exiting()
}
} }
impl<EMH, I, S, SHM, SP> AwaitRestartSafe for LlmpRestartingEventManager<EMH, I, S, SHM, SP> impl<EMH, I, S, SHM, SP> AwaitRestartSafe for LlmpRestartingEventManager<EMH, I, S, SHM, SP>
@ -300,67 +311,94 @@ where
} }
} }
impl<E, EMH, I, S, SHM, SP, Z> EventProcessor<E, S, Z> impl<EMH, I, S, SHM, SP> EventReceiver<I, S> for LlmpRestartingEventManager<EMH, I, S, SHM, SP>
for LlmpRestartingEventManager<EMH, I, S, SHM, SP>
where where
E: HasObservers,
E::Observers: DeserializeOwned,
EMH: EventManagerHooksTuple<I, S>, EMH: EventManagerHooksTuple<I, S>,
I: DeserializeOwned + Input, I: DeserializeOwned + Input,
S: HasImported + HasCurrentTestcase<I> + HasSolutions<I> + Stoppable + Serialize, S: HasImported + HasCurrentTestcase<I> + HasSolutions<I> + Stoppable + Serialize,
SHM: ShMem, SHM: ShMem,
SP: ShMemProvider<ShMem = SHM>, SP: ShMemProvider<ShMem = SHM>,
Z: ExecutionProcessor<Self, I, E::Observers, S> + EvaluatorObservers<E, Self, I, S>,
{ {
fn process(&mut self, fuzzer: &mut Z, state: &mut S, executor: &mut E) -> Result<usize, Error> { fn try_receive(&mut self, state: &mut S) -> Result<Option<(Event<I>, bool)>, Error> {
let res = { // TODO: Get around local event copy by moving handle_in_client
// TODO: Get around local event copy by moving handle_in_client let self_id = self.llmp.sender().id();
let self_id = self.llmp.sender().id(); while let Some((client_id, tag, flags, msg)) = self.llmp.recv_buf_with_flags()? {
let mut count = 0; assert_ne!(
while let Some((client_id, tag, flags, msg)) = self.llmp.recv_buf_with_flags()? { tag, _LLMP_TAG_EVENT_TO_BROKER,
assert_ne!( "EVENT_TO_BROKER parcel should not have arrived in the client!"
tag, _LLMP_TAG_EVENT_TO_BROKER, );
"EVENT_TO_BROKER parcel should not have arrived in the client!"
);
if client_id == self_id { if client_id == self_id {
continue; continue;
} }
#[cfg(not(feature = "llmp_compression"))] #[cfg(not(feature = "llmp_compression"))]
let event_bytes = msg; let event_bytes = msg;
#[cfg(feature = "llmp_compression")] #[cfg(feature = "llmp_compression")]
let compressed; let compressed;
#[cfg(feature = "llmp_compression")] #[cfg(feature = "llmp_compression")]
let event_bytes = if flags & LLMP_FLAG_COMPRESSED == LLMP_FLAG_COMPRESSED { let event_bytes = if flags & LLMP_FLAG_COMPRESSED == LLMP_FLAG_COMPRESSED {
compressed = self.compressor.decompress(msg)?; compressed = self.compressor.decompress(msg)?;
&compressed &compressed
} else { } else {
msg msg
}; };
let event: Event<I> = postcard::from_bytes(event_bytes)?; let event: Event<I> = postcard::from_bytes(event_bytes)?;
log::debug!("Received event in normal llmp {}", event.name_detailed()); log::debug!("Received event in normal llmp {}", event.name_detailed());
// If the message comes from another machine, do not // If the message comes from another machine, do not
// consider other events than new testcase. // consider other events than new testcase.
if !event.is_new_testcase() && (flags & LLMP_FLAG_FROM_MM == LLMP_FLAG_FROM_MM) { if !event.is_new_testcase() && (flags & LLMP_FLAG_FROM_MM == LLMP_FLAG_FROM_MM) {
continue; continue;
} }
self.handle_in_client(fuzzer, executor, state, client_id, event)?; log::trace!("Got event in client: {} from {client_id:?}", event.name());
count += 1; if !self.hooks.pre_receive_all(state, client_id, &event)? {
continue;
}
let evt_name = event.name_detailed();
match event {
Event::NewTestcase {
client_config,
ref observers_buf,
#[cfg(feature = "std")]
forward_id,
..
} => {
#[cfg(feature = "std")]
log::debug!("[{}] Received new Testcase {evt_name} from {client_id:?} ({client_config:?}, forward {forward_id:?})", std::process::id());
if client_config.match_with(&self.configuration) && observers_buf.is_some() {
return Ok(Some((event, true)));
}
return Ok(Some((event, false)));
}
#[cfg(feature = "share_objectives")]
Event::Objective { .. } => {
#[cfg(feature = "std")]
log::debug!("[{}] Received new Objective", std::process::id());
return Ok(Some((event, false)));
}
Event::Stop => {
state.request_stop();
}
_ => {
return Err(Error::unknown(format!(
"Received illegal message that message should not have arrived: {:?}.",
event.name()
)));
}
} }
count
};
if self.staterestorer.is_some() {
self.intermediate_save()?;
} }
Ok(res) Ok(None)
} }
fn on_shutdown(&mut self) -> Result<(), Error> { fn on_interesting(&mut self, _state: &mut S, _event_vec: Event<I>) -> Result<(), Error> {
self.send_exiting() Ok(())
} }
} }
@ -563,91 +601,6 @@ where
Ok(()) Ok(())
} }
// Handle arriving events in the client
fn handle_in_client<E, Z>(
&mut self,
fuzzer: &mut Z,
executor: &mut E,
state: &mut S,
client_id: ClientId,
event: Event<I>,
) -> Result<(), Error>
where
S: HasImported + Stoppable,
EMH: EventManagerHooksTuple<I, S>,
I: Input,
E: HasObservers,
E::Observers: DeserializeOwned,
Z: ExecutionProcessor<Self, I, E::Observers, S> + EvaluatorObservers<E, Self, I, S>,
{
log::trace!("Got event in client: {} from {client_id:?}", event.name());
if !self.hooks.pre_exec_all(state, client_id, &event)? {
return Ok(());
}
let evt_name = event.name_detailed();
match event {
Event::NewTestcase {
input,
client_config,
exit_kind,
observers_buf,
#[cfg(feature = "std")]
forward_id,
..
} => {
#[cfg(feature = "std")]
log::debug!("[{}] Received new Testcase {evt_name} from {client_id:?} ({client_config:?}, forward {forward_id:?})", std::process::id());
let res = if client_config.match_with(&self.configuration)
&& observers_buf.is_some()
{
let start = current_time();
let observers: E::Observers =
postcard::from_bytes(observers_buf.as_ref().unwrap())?;
{
self.deserialization_time = current_time() - start;
}
fuzzer.evaluate_execution(state, self, input, &observers, &exit_kind, false)?
} else {
fuzzer.evaluate_input_with_observers(state, executor, self, input, false)?
};
if let Some(item) = res.1 {
*state.imported_mut() += 1;
log::debug!("Added received Testcase {evt_name} as item #{item}");
} else {
log::debug!("Testcase {evt_name} was discarded");
}
}
#[cfg(feature = "share_objectives")]
Event::Objective { input, .. } => {
#[cfg(feature = "std")]
log::debug!("[{}] Received new Objective", std::process::id());
let res =
fuzzer.evaluate_input_with_observers(state, executor, self, input, false)?;
if let Some(item) = res.1 {
*state.imported_mut() += 1;
log::debug!("Added received Objective {evt_name} as item #{item}");
} else {
log::debug!("Objective {evt_name} was discarded");
}
}
Event::Stop => {
state.request_stop();
}
_ => {
return Err(Error::unknown(format!(
"Received illegal message that message should not have arrived: {:?}.",
event.name()
)));
}
}
self.hooks.post_exec_all(state, client_id)?;
Ok(())
}
/// Calling this function will tell the llmp broker that this client is exiting /// Calling this function will tell the llmp broker that this client is exiting
/// This should be called from the restarter not from the actual fuzzer client /// This should be called from the restarter not from the actual fuzzer client
/// This function serves the same roll as the `LlmpClient.send_exiting()` /// This function serves the same roll as the `LlmpClient.send_exiting()`

View File

@ -585,6 +585,10 @@ pub trait SendExiting {
/// Send information that this client is exiting. /// Send information that this client is exiting.
/// No need to restart us any longer, and no need to print an error, either. /// No need to restart us any longer, and no need to print an error, either.
fn send_exiting(&mut self) -> Result<(), Error>; fn send_exiting(&mut self) -> Result<(), Error>;
/// Shutdown gracefully; typically without saving state.
/// This is usually called from `fuzz_loop`.
fn on_shutdown(&mut self) -> Result<(), Error>;
} }
/// Wait until it's safe to restart /// Wait until it's safe to restart
@ -593,14 +597,15 @@ pub trait AwaitRestartSafe {
fn await_restart_safe(&mut self); fn await_restart_safe(&mut self);
} }
/// [`EventProcessor`] process all the incoming messages /// [`EventReceiver`] process all the incoming messages
pub trait EventProcessor<E, S, Z> { pub trait EventReceiver<I, S> {
/// Lookup for incoming events and process them. /// Lookup for incoming events and process them.
/// Return the number of processes events or an error /// Return the event, if any, that needs to be evaluated
fn process(&mut self, fuzzer: &mut Z, state: &mut S, executor: &mut E) -> Result<usize, Error>; fn try_receive(&mut self, state: &mut S) -> Result<Option<(Event<I>, bool)>, Error>;
/// Shutdown gracefully; typically without saving state. /// Run the post processing routine after the fuzzer deemed this event as interesting
fn on_shutdown(&mut self) -> Result<(), Error>; /// For example, in centralized manager you wanna send this an event.
fn on_interesting(&mut self, state: &mut S, event: Event<I>) -> Result<(), Error>;
} }
/// The id of this `EventManager`. /// The id of this `EventManager`.
/// For multi processed `EventManagers`, /// For multi processed `EventManagers`,
@ -623,6 +628,8 @@ impl NopEventManager {
} }
} }
impl RecordSerializationTime for NopEventManager {}
impl<I, S> EventFirer<I, S> for NopEventManager { impl<I, S> EventFirer<I, S> for NopEventManager {
fn should_send(&self) -> bool { fn should_send(&self) -> bool {
true true
@ -648,6 +655,10 @@ impl SendExiting for NopEventManager {
fn send_exiting(&mut self) -> Result<(), Error> { fn send_exiting(&mut self) -> Result<(), Error> {
Ok(()) Ok(())
} }
fn on_shutdown(&mut self) -> Result<(), Error> {
Ok(())
}
} }
impl AwaitRestartSafe for NopEventManager { impl AwaitRestartSafe for NopEventManager {
@ -655,17 +666,12 @@ impl AwaitRestartSafe for NopEventManager {
fn await_restart_safe(&mut self) {} fn await_restart_safe(&mut self) {}
} }
impl<E, S, Z> EventProcessor<E, S, Z> for NopEventManager { impl<I, S> EventReceiver<I, S> for NopEventManager {
fn process( fn try_receive(&mut self, _state: &mut S) -> Result<Option<(Event<I>, bool)>, Error> {
&mut self, Ok(None)
_fuzzer: &mut Z,
_state: &mut S,
_executor: &mut E,
) -> Result<usize, Error> {
Ok(0)
} }
fn on_shutdown(&mut self) -> Result<(), Error> { fn on_interesting(&mut self, _state: &mut S, _event_vec: Event<I>) -> Result<(), Error> {
Ok(()) Ok(())
} }
} }
@ -707,6 +713,8 @@ pub struct MonitorTypedEventManager<EM, M> {
phantom: PhantomData<M>, phantom: PhantomData<M>,
} }
impl<EM, M> RecordSerializationTime for MonitorTypedEventManager<EM, M> {}
impl<EM, M> MonitorTypedEventManager<EM, M> { impl<EM, M> MonitorTypedEventManager<EM, M> {
/// Creates a new `EventManager` that wraps another manager, but captures a `monitor` type as well. /// Creates a new `EventManager` that wraps another manager, but captures a `monitor` type as well.
#[must_use] #[must_use]
@ -774,6 +782,10 @@ where
fn send_exiting(&mut self) -> Result<(), Error> { fn send_exiting(&mut self) -> Result<(), Error> {
self.inner.send_exiting() self.inner.send_exiting()
} }
fn on_shutdown(&mut self) -> Result<(), Error> {
self.inner.on_shutdown()
}
} }
impl<EM, M> AwaitRestartSafe for MonitorTypedEventManager<EM, M> impl<EM, M> AwaitRestartSafe for MonitorTypedEventManager<EM, M>
@ -786,17 +798,16 @@ where
} }
} }
impl<E, EM, M, S, Z> EventProcessor<E, S, Z> for MonitorTypedEventManager<EM, M> impl<EM, I, M, S> EventReceiver<I, S> for MonitorTypedEventManager<EM, M>
where where
EM: EventProcessor<E, S, Z>, EM: EventReceiver<I, S>,
{ {
#[inline] #[inline]
fn process(&mut self, fuzzer: &mut Z, state: &mut S, executor: &mut E) -> Result<usize, Error> { fn try_receive(&mut self, state: &mut S) -> Result<Option<(Event<I>, bool)>, Error> {
self.inner.process(fuzzer, state, executor) self.inner.try_receive(state)
} }
fn on_interesting(&mut self, _state: &mut S, _event_vec: Event<I>) -> Result<(), Error> {
fn on_shutdown(&mut self) -> Result<(), Error> { Ok(())
self.inner.on_shutdown()
} }
} }
@ -829,6 +840,12 @@ where
} }
} }
/// Record the deserialization time for this event manager
pub trait RecordSerializationTime {
/// Set the deserialization time (mut)
fn set_deserialization_time(&mut self, _dur: Duration) {}
}
/// Collected stats to decide if observers must be serialized or not /// Collected stats to decide if observers must be serialized or not
pub trait AdaptiveSerializer { pub trait AdaptiveSerializer {
/// Expose the collected observers serialization time /// Expose the collected observers serialization time

View File

@ -22,13 +22,13 @@ use libafl_bolts::{
use serde::de::DeserializeOwned; use serde::de::DeserializeOwned;
use serde::Serialize; use serde::Serialize;
use super::{std_on_restart, AwaitRestartSafe, ProgressReporter}; use super::{std_on_restart, AwaitRestartSafe, ProgressReporter, RecordSerializationTime};
#[cfg(all(unix, feature = "std", not(miri)))] #[cfg(all(unix, feature = "std", not(miri)))]
use crate::events::EVENTMGR_SIGHANDLER_STATE; use crate::events::EVENTMGR_SIGHANDLER_STATE;
use crate::{ use crate::{
events::{ events::{
std_maybe_report_progress, std_report_progress, BrokerEventResult, CanSerializeObserver, std_maybe_report_progress, std_report_progress, BrokerEventResult, CanSerializeObserver,
Event, EventFirer, EventManagerId, EventProcessor, EventRestarter, HasEventManagerId, Event, EventFirer, EventManagerId, EventReceiver, EventRestarter, HasEventManagerId,
SendExiting, SendExiting,
}, },
monitors::Monitor, monitors::Monitor,
@ -71,6 +71,8 @@ where
} }
} }
impl<I, MT, S> RecordSerializationTime for SimpleEventManager<I, MT, S> {}
impl<I, MT, S> EventFirer<I, S> for SimpleEventManager<I, MT, S> impl<I, MT, S> EventFirer<I, S> for SimpleEventManager<I, MT, S>
where where
I: Debug, I: Debug,
@ -94,6 +96,10 @@ impl<I, MT, S> SendExiting for SimpleEventManager<I, MT, S> {
fn send_exiting(&mut self) -> Result<(), Error> { fn send_exiting(&mut self) -> Result<(), Error> {
Ok(()) Ok(())
} }
fn on_shutdown(&mut self) -> Result<(), Error> {
self.send_exiting()
}
} }
impl<I, MT, S> AwaitRestartSafe for SimpleEventManager<I, MT, S> { impl<I, MT, S> AwaitRestartSafe for SimpleEventManager<I, MT, S> {
@ -109,27 +115,29 @@ where
} }
} }
impl<E, I, MT, S, Z> EventProcessor<E, S, Z> for SimpleEventManager<I, MT, S> impl<I, MT, S> EventReceiver<I, S> for SimpleEventManager<I, MT, S>
where where
I: Debug, I: Debug,
MT: Monitor, MT: Monitor,
S: Stoppable, S: Stoppable,
{ {
fn process( fn try_receive(&mut self, state: &mut S) -> Result<Option<(Event<I>, bool)>, Error> {
&mut self,
_fuzzer: &mut Z,
state: &mut S,
_executor: &mut E,
) -> Result<usize, Error> {
let count = self.events.len();
while let Some(event) = self.events.pop() { while let Some(event) = self.events.pop() {
self.handle_in_client(state, &event)?; match event {
Event::Stop => {
state.request_stop();
}
_ => {
return Err(Error::unknown(format!(
"Received illegal message that message should not have arrived: {event:?}."
)))
}
}
} }
Ok(count) Ok(None)
} }
fn on_interesting(&mut self, _state: &mut S, _event_vec: Event<I>) -> Result<(), Error> {
fn on_shutdown(&mut self) -> Result<(), Error> { Ok(())
self.send_exiting()
} }
} }
@ -263,20 +271,6 @@ where
Event::Stop => Ok(BrokerEventResult::Forward), Event::Stop => Ok(BrokerEventResult::Forward),
} }
} }
// Handle arriving events in the client
#[allow(clippy::unused_self)]
fn handle_in_client(&mut self, state: &mut S, event: &Event<I>) -> Result<(), Error> {
match event {
Event::Stop => {
state.request_stop();
Ok(())
}
_ => Err(Error::unknown(format!(
"Received illegal message that message should not have arrived: {event:?}."
))),
}
}
} }
/// Provides a `builder` which can be used to build a [`SimpleRestartingEventManager`]. /// Provides a `builder` which can be used to build a [`SimpleRestartingEventManager`].
@ -293,6 +287,12 @@ pub struct SimpleRestartingEventManager<I, MT, S, SHM, SP> {
staterestorer: StateRestorer<SHM, SP>, staterestorer: StateRestorer<SHM, SP>,
} }
#[cfg(feature = "std")]
impl<I, MT, S, SHM, SP> RecordSerializationTime
for SimpleRestartingEventManager<I, MT, S, SHM, SP>
{
}
#[cfg(feature = "std")] #[cfg(feature = "std")]
impl<I, MT, S, SHM, SP> EventFirer<I, S> for SimpleRestartingEventManager<I, MT, S, SHM, SP> impl<I, MT, S, SHM, SP> EventFirer<I, S> for SimpleRestartingEventManager<I, MT, S, SHM, SP>
where where
@ -352,6 +352,10 @@ where
self.staterestorer.send_exiting(); self.staterestorer.send_exiting();
Ok(()) Ok(())
} }
fn on_shutdown(&mut self) -> Result<(), Error> {
self.send_exiting()
}
} }
#[cfg(feature = "std")] #[cfg(feature = "std")]
@ -362,8 +366,7 @@ impl<I, MT, S, SHM, SP> AwaitRestartSafe for SimpleRestartingEventManager<I, MT,
} }
#[cfg(feature = "std")] #[cfg(feature = "std")]
impl<E, I, MT, S, SHM, SP, Z> EventProcessor<E, S, Z> impl<I, MT, S, SHM, SP> EventReceiver<I, S> for SimpleRestartingEventManager<I, MT, S, SHM, SP>
for SimpleRestartingEventManager<I, MT, S, SHM, SP>
where where
I: Debug, I: Debug,
MT: Monitor, MT: Monitor,
@ -371,12 +374,12 @@ where
SHM: ShMem, SHM: ShMem,
SP: ShMemProvider<ShMem = SHM>, SP: ShMemProvider<ShMem = SHM>,
{ {
fn process(&mut self, fuzzer: &mut Z, state: &mut S, executor: &mut E) -> Result<usize, Error> { fn try_receive(&mut self, state: &mut S) -> Result<Option<(Event<I>, bool)>, Error> {
self.inner.process(fuzzer, state, executor) self.inner.try_receive(state)
} }
fn on_shutdown(&mut self) -> Result<(), Error> { fn on_interesting(&mut self, _state: &mut S, _event_vec: Event<I>) -> Result<(), Error> {
self.send_exiting() Ok(())
} }
} }

View File

@ -30,7 +30,7 @@ use libafl_bolts::{
tuples::tuple_list, tuples::tuple_list,
ClientId, ClientId,
}; };
use serde::{de::DeserializeOwned, Deserialize, Serialize}; use serde::{de::DeserializeOwned, Serialize};
use tokio::{ use tokio::{
io::{AsyncReadExt, AsyncWriteExt}, io::{AsyncReadExt, AsyncWriteExt},
sync::{broadcast, broadcast::error::RecvError, mpsc}, sync::{broadcast, broadcast::error::RecvError, mpsc},
@ -44,13 +44,10 @@ use crate::events::EVENTMGR_SIGHANDLER_STATE;
use crate::{ use crate::{
events::{ events::{
std_on_restart, BrokerEventResult, Event, EventConfig, EventFirer, EventManagerHooksTuple, std_on_restart, BrokerEventResult, Event, EventConfig, EventFirer, EventManagerHooksTuple,
EventManagerId, EventProcessor, EventRestarter, HasEventManagerId, ProgressReporter, EventManagerId, EventReceiver, EventRestarter, HasEventManagerId, ProgressReporter,
}, },
executors::{Executor, HasObservers},
fuzzer::{EvaluatorObservers, ExecutionProcessor},
inputs::Input, inputs::Input,
monitors::Monitor, monitors::Monitor,
observers::ObserversTuple,
stages::HasCurrentStageId, stages::HasCurrentStageId,
state::{ state::{
HasCurrentTestcase, HasExecutions, HasImported, HasLastReportTime, HasSolutions, HasCurrentTestcase, HasExecutions, HasImported, HasLastReportTime, HasSolutions,
@ -569,75 +566,6 @@ where
pub fn to_env(&self, env_name: &str) { pub fn to_env(&self, env_name: &str) {
env::set_var(env_name, format!("{}", self.client_id.0)); env::set_var(env_name, format!("{}", self.client_id.0));
} }
// Handle arriving events in the client
fn handle_in_client<E, Z>(
&mut self,
fuzzer: &mut Z,
executor: &mut E,
state: &mut S,
client_id: ClientId,
event: Event<I>,
) -> Result<(), Error>
where
E: Executor<Self, I, S, Z> + HasObservers,
E::Observers: Serialize + ObserversTuple<I, S>,
for<'a> E::Observers: Deserialize<'a>,
Z: ExecutionProcessor<Self, I, E::Observers, S> + EvaluatorObservers<E, Self, I, S>,
{
if !self.hooks.pre_exec_all(state, client_id, &event)? {
return Ok(());
}
match event {
Event::NewTestcase {
input,
client_config,
exit_kind,
observers_buf,
forward_id,
..
} => {
log::info!("Received new Testcase from {client_id:?} ({client_config:?}, forward {forward_id:?})");
let _res = if client_config.match_with(&self.configuration)
&& observers_buf.is_some()
{
let observers: E::Observers =
postcard::from_bytes(observers_buf.as_ref().unwrap())?;
fuzzer.evaluate_execution(state, self, input, &observers, &exit_kind, false)?
} else {
fuzzer.evaluate_input_with_observers(state, executor, self, input, false)?
};
if let Some(item) = _res.1 {
*state.imported_mut() += 1;
log::info!("Added received Testcase as item #{item}");
}
}
#[cfg(feature = "share_objectives")]
Event::Objective { input, .. } => {
log::info!("Received new Objective");
let res =
fuzzer.evaluate_input_with_observers(state, executor, self, input, false)?;
if let Some(item) = res.1 {
*state.imported_mut() += 1;
log::info!("Added received Testcase as item #{item}");
}
}
Event::Stop => {
state.request_stop();
}
_ => {
return Err(Error::unknown(format!(
"Received illegal message that message should not have arrived: {:?}.",
event.name()
)))
}
}
self.hooks.post_exec_all(state, client_id)?;
Ok(())
}
} }
impl<EMH, I, S> TcpEventManager<EMH, I, S> { impl<EMH, I, S> TcpEventManager<EMH, I, S> {
@ -693,11 +621,8 @@ where
} }
} }
impl<E, EMH, I, S, Z> EventProcessor<E, S, Z> for TcpEventManager<EMH, I, S> impl<EMH, I, S> EventReceiver<I, S> for TcpEventManager<EMH, I, S>
where where
E: HasObservers + Executor<Self, I, S, Z>,
E::Observers: Serialize + ObserversTuple<I, S>,
for<'a> E::Observers: Deserialize<'a>,
EMH: EventManagerHooksTuple<I, S>, EMH: EventManagerHooksTuple<I, S>,
S: HasExecutions S: HasExecutions
+ HasMetadata + HasMetadata
@ -706,16 +631,12 @@ where
+ HasCurrentTestcase<I> + HasCurrentTestcase<I>
+ Stoppable, + Stoppable,
I: DeserializeOwned, I: DeserializeOwned,
Z: ExecutionProcessor<Self, I, E::Observers, S> + EvaluatorObservers<E, Self, I, S>,
{ {
fn process(&mut self, fuzzer: &mut Z, state: &mut S, executor: &mut E) -> Result<usize, Error> { fn try_receive(&mut self, state: &mut S) -> Result<Option<(Event<I>, bool)>, Error> {
// TODO: Get around local event copy by moving handle_in_client // TODO: Get around local event copy by moving handle_in_client
let self_id = self.client_id; let self_id = self.client_id;
let mut len_buf = [0_u8; 4]; let mut len_buf = [0_u8; 4];
let mut count = 0;
self.tcp.set_nonblocking(true).expect("set to non-blocking"); self.tcp.set_nonblocking(true).expect("set to non-blocking");
// read all pending messages // read all pending messages
loop { loop {
match self.tcp.read_exact(&mut len_buf) { match self.tcp.read_exact(&mut len_buf) {
@ -743,11 +664,41 @@ where
// make decompressed vec and slice compatible // make decompressed vec and slice compatible
let event = postcard::from_bytes(buf)?; let event = postcard::from_bytes(buf)?;
self.handle_in_client(fuzzer, executor, state, other_client_id, event)?; if !self.hooks.pre_receive_all(state, other_client_id, &event)? {
count += 1; continue;
}
match event {
Event::NewTestcase {
client_config,
ref observers_buf,
forward_id,
..
} => {
log::info!("Received new Testcase from {other_client_id:?} ({client_config:?}, forward {forward_id:?})");
if client_config.match_with(&self.configuration)
&& observers_buf.is_some()
{
return Ok(Some((event, true)));
}
return Ok(Some((event, false)));
}
#[cfg(feature = "share_objectives")]
Event::Objective { .. } => {
log::info!("Received new Objective");
return Ok(Some((event, false)));
}
Event::Stop => {
state.request_stop();
}
_ => {
return Err(Error::unknown(format!(
"Received illegal message that message should not have arrived: {:?}.",
event.name()
)))
}
}
} }
} }
Err(e) if e.kind() == ErrorKind::WouldBlock => { Err(e) if e.kind() == ErrorKind::WouldBlock => {
// no new data on the socket // no new data on the socket
break; break;
@ -758,12 +709,11 @@ where
} }
} }
self.tcp.set_nonblocking(false).expect("set to blocking"); self.tcp.set_nonblocking(false).expect("set to blocking");
Ok(None)
Ok(count)
} }
fn on_shutdown(&mut self) -> Result<(), Error> { fn on_interesting(&mut self, _state: &mut S, _event: Event<I>) -> Result<(), Error> {
self.send_exiting() Ok(())
} }
} }
@ -782,6 +732,10 @@ impl<EMH, I, S> SendExiting for TcpEventManager<EMH, I, S> {
//self.tcp.sender.send_exiting() //self.tcp.sender.send_exiting()
Ok(()) Ok(())
} }
fn on_shutdown(&mut self) -> Result<(), Error> {
self.send_exiting()
}
} }
impl<EMH, I, S> ProgressReporter<S> for TcpEventManager<EMH, I, S> impl<EMH, I, S> ProgressReporter<S> for TcpEventManager<EMH, I, S>
@ -870,6 +824,10 @@ where
// This way, the broker can clean up the pages, and eventually exit. // This way, the broker can clean up the pages, and eventually exit.
self.tcp_mgr.send_exiting() self.tcp_mgr.send_exiting()
} }
fn on_shutdown(&mut self) -> Result<(), Error> {
self.send_exiting()
}
} }
impl<EMH, I, S, SHM, SP> AwaitRestartSafe for TcpRestartingEventManager<EMH, I, S, SHM, SP> impl<EMH, I, S, SHM, SP> AwaitRestartSafe for TcpRestartingEventManager<EMH, I, S, SHM, SP>
@ -908,12 +866,8 @@ where
} }
} }
impl<E, EMH, I, S, SHM, SP, Z> EventProcessor<E, S, Z> impl<EMH, I, S, SHM, SP> EventReceiver<I, S> for TcpRestartingEventManager<EMH, I, S, SHM, SP>
for TcpRestartingEventManager<EMH, I, S, SHM, SP>
where where
E: HasObservers + Executor<TcpEventManager<EMH, I, S>, I, S, Z>,
for<'a> E::Observers: Deserialize<'a>,
E::Observers: ObserversTuple<I, S> + Serialize,
EMH: EventManagerHooksTuple<I, S>, EMH: EventManagerHooksTuple<I, S>,
I: DeserializeOwned, I: DeserializeOwned,
S: HasExecutions S: HasExecutions
@ -924,15 +878,13 @@ where
+ Stoppable, + Stoppable,
SHM: ShMem, SHM: ShMem,
SP: ShMemProvider<ShMem = SHM>, SP: ShMemProvider<ShMem = SHM>,
Z: ExecutionProcessor<TcpEventManager<EMH, I, S>, I, E::Observers, S>
+ EvaluatorObservers<E, TcpEventManager<EMH, I, S>, I, S>,
{ {
fn process(&mut self, fuzzer: &mut Z, state: &mut S, executor: &mut E) -> Result<usize, Error> { fn try_receive(&mut self, state: &mut S) -> Result<Option<(Event<I>, bool)>, Error> {
self.tcp_mgr.process(fuzzer, state, executor) self.tcp_mgr.try_receive(state)
} }
fn on_shutdown(&mut self) -> Result<(), Error> { fn on_interesting(&mut self, state: &mut S, event: Event<I>) -> Result<(), Error> {
self.send_exiting() self.tcp_mgr.on_interesting(state, event)
} }
} }

View File

@ -8,14 +8,15 @@ use std::hash::Hash;
#[cfg(feature = "std")] #[cfg(feature = "std")]
use fastbloom::BloomFilter; use fastbloom::BloomFilter;
use libafl_bolts::{current_time, tuples::MatchName}; use libafl_bolts::{current_time, tuples::MatchName};
use serde::Serialize; use serde::{de::DeserializeOwned, Serialize};
#[cfg(feature = "introspection")] #[cfg(feature = "introspection")]
use crate::monitors::PerfFeature; use crate::monitors::PerfFeature;
use crate::{ use crate::{
corpus::{Corpus, CorpusId, HasCurrentCorpusId, HasTestcase, Testcase}, corpus::{Corpus, CorpusId, HasCurrentCorpusId, HasTestcase, Testcase},
events::{ events::{
CanSerializeObserver, Event, EventConfig, EventFirer, EventProcessor, ProgressReporter, CanSerializeObserver, Event, EventConfig, EventFirer, EventReceiver, ProgressReporter,
RecordSerializationTime, SendExiting,
}, },
executors::{Executor, ExitKind, HasObservers}, executors::{Executor, ExitKind, HasObservers},
feedbacks::Feedback, feedbacks::Feedback,
@ -26,8 +27,8 @@ use crate::{
stages::{HasCurrentStageId, StagesTuple}, stages::{HasCurrentStageId, StagesTuple},
start_timer, start_timer,
state::{ state::{
HasCorpus, HasCurrentTestcase, HasExecutions, HasLastFoundTime, HasLastReportTime, HasCorpus, HasCurrentTestcase, HasExecutions, HasImported, HasLastFoundTime,
HasSolutions, MaybeHasClientPerfMonitor, Stoppable, HasLastReportTime, HasSolutions, MaybeHasClientPerfMonitor, Stoppable,
}, },
Error, HasMetadata, Error, HasMetadata,
}; };
@ -98,7 +99,7 @@ pub trait ExecutionProcessor<EM, I, OT, S> {
&mut self, &mut self,
state: &mut S, state: &mut S,
manager: &mut EM, manager: &mut EM,
input: I, input: &I,
exec_res: &ExecuteInputResult, exec_res: &ExecuteInputResult,
observers: &OT, observers: &OT,
exit_kind: &ExitKind, exit_kind: &ExitKind,
@ -109,7 +110,7 @@ pub trait ExecutionProcessor<EM, I, OT, S> {
&mut self, &mut self,
state: &mut S, state: &mut S,
manager: &mut EM, manager: &mut EM,
input: I, input: &I,
exec_res: &ExecuteInputResult, exec_res: &ExecuteInputResult,
obs_buf: Option<Vec<u8>>, obs_buf: Option<Vec<u8>>,
exit_kind: &ExitKind, exit_kind: &ExitKind,
@ -120,7 +121,7 @@ pub trait ExecutionProcessor<EM, I, OT, S> {
&mut self, &mut self,
state: &mut S, state: &mut S,
manager: &mut EM, manager: &mut EM,
input: I, input: &I,
observers: &OT, observers: &OT,
exit_kind: &ExitKind, exit_kind: &ExitKind,
send_events: bool, send_events: bool,
@ -137,11 +138,24 @@ pub trait EvaluatorObservers<E, EM, I, S> {
state: &mut S, state: &mut S,
executor: &mut E, executor: &mut E,
manager: &mut EM, manager: &mut EM,
input: I, input: &I,
send_events: bool, send_events: bool,
) -> Result<(ExecuteInputResult, Option<CorpusId>), Error>; ) -> Result<(ExecuteInputResult, Option<CorpusId>), Error>;
} }
/// Receives and event from event manager and then evaluates it
pub trait EventProcessor<E, EM, I, S> {
/// Asks event manager to see if there's any event to evaluate
/// If there is any, then evaluates it.
/// After, run the post processing routines, for example, re-sending the events to the other
fn process_events(
&mut self,
state: &mut S,
executor: &mut E,
manager: &mut EM,
) -> Result<(), Error>;
}
/// Evaluate an input modifying the state of the fuzzer /// Evaluate an input modifying the state of the fuzzer
pub trait Evaluator<E, EM, I, S> { pub trait Evaluator<E, EM, I, S> {
/// Runs the input if it was (likely) not previously run and triggers observers and feedback and adds the input to the previously executed list /// Runs the input if it was (likely) not previously run and triggers observers and feedback and adds the input to the previously executed list
@ -151,7 +165,7 @@ pub trait Evaluator<E, EM, I, S> {
state: &mut S, state: &mut S,
executor: &mut E, executor: &mut E,
manager: &mut EM, manager: &mut EM,
input: I, input: &I,
) -> Result<(ExecuteInputResult, Option<CorpusId>), Error>; ) -> Result<(ExecuteInputResult, Option<CorpusId>), Error>;
/// Runs the input and triggers observers and feedback, /// Runs the input and triggers observers and feedback,
@ -161,7 +175,7 @@ pub trait Evaluator<E, EM, I, S> {
state: &mut S, state: &mut S,
executor: &mut E, executor: &mut E,
manager: &mut EM, manager: &mut EM,
input: I, input: &I,
) -> Result<(ExecuteInputResult, Option<CorpusId>), Error>; ) -> Result<(ExecuteInputResult, Option<CorpusId>), Error>;
/// Runs the input and triggers observers and feedback. /// Runs the input and triggers observers and feedback.
@ -401,7 +415,7 @@ where
&mut self, &mut self,
state: &mut S, state: &mut S,
manager: &mut EM, manager: &mut EM,
input: I, input: &I,
exec_res: &ExecuteInputResult, exec_res: &ExecuteInputResult,
observers: &OT, observers: &OT,
exit_kind: &ExitKind, exit_kind: &ExitKind,
@ -431,7 +445,7 @@ where
&mut self, &mut self,
state: &mut S, state: &mut S,
manager: &mut EM, manager: &mut EM,
input: I, input: &I,
exec_res: &ExecuteInputResult, exec_res: &ExecuteInputResult,
observers_buf: Option<Vec<u8>>, observers_buf: Option<Vec<u8>>,
exit_kind: &ExitKind, exit_kind: &ExitKind,
@ -443,7 +457,7 @@ where
manager.fire( manager.fire(
state, state,
Event::NewTestcase { Event::NewTestcase {
input, input: input.clone(),
observers_buf, observers_buf,
exit_kind: *exit_kind, exit_kind: *exit_kind,
corpus_size: state.corpus().count(), corpus_size: state.corpus().count(),
@ -462,7 +476,7 @@ where
state, state,
Event::Objective { Event::Objective {
#[cfg(feature = "share_objectives")] #[cfg(feature = "share_objectives")]
input, input: input.clone(),
objective_size: state.solutions().count(), objective_size: state.solutions().count(),
time: current_time(), time: current_time(),
@ -479,13 +493,13 @@ where
&mut self, &mut self,
state: &mut S, state: &mut S,
manager: &mut EM, manager: &mut EM,
input: I, input: &I,
observers: &OT, observers: &OT,
exit_kind: &ExitKind, exit_kind: &ExitKind,
send_events: bool, send_events: bool,
) -> Result<(ExecuteInputResult, Option<CorpusId>), Error> { ) -> Result<(ExecuteInputResult, Option<CorpusId>), Error> {
let exec_res = self.check_results(state, manager, &input, observers, exit_kind)?; let exec_res = self.check_results(state, manager, input, observers, exit_kind)?;
let corpus_id = self.process_execution(state, manager, &input, &exec_res, observers)?; let corpus_id = self.process_execution(state, manager, input, &exec_res, observers)?;
if send_events { if send_events {
self.serialize_and_dispatch(state, manager, input, &exec_res, observers, exit_kind)?; self.serialize_and_dispatch(state, manager, input, &exec_res, observers, exit_kind)?;
} }
@ -519,13 +533,13 @@ where
state: &mut S, state: &mut S,
executor: &mut E, executor: &mut E,
manager: &mut EM, manager: &mut EM,
input: I, input: &I,
send_events: bool, send_events: bool,
) -> Result<(ExecuteInputResult, Option<CorpusId>), Error> { ) -> Result<(ExecuteInputResult, Option<CorpusId>), Error> {
let exit_kind = self.execute_input(state, executor, manager, &input)?; let exit_kind = self.execute_input(state, executor, manager, input)?;
let observers = executor.observers(); let observers = executor.observers();
self.scheduler.on_evaluation(state, &input, &*observers)?; self.scheduler.on_evaluation(state, input, &*observers)?;
self.evaluate_execution(state, manager, input, &*observers, &exit_kind, send_events) self.evaluate_execution(state, manager, input, &*observers, &exit_kind, send_events)
} }
@ -593,9 +607,9 @@ where
state: &mut S, state: &mut S,
executor: &mut E, executor: &mut E,
manager: &mut EM, manager: &mut EM,
input: I, input: &I,
) -> Result<(ExecuteInputResult, Option<CorpusId>), Error> { ) -> Result<(ExecuteInputResult, Option<CorpusId>), Error> {
if self.input_filter.should_execute(&input) { if self.input_filter.should_execute(input) {
self.evaluate_input(state, executor, manager, input) self.evaluate_input(state, executor, manager, input)
} else { } else {
Ok((ExecuteInputResult::None, None)) Ok((ExecuteInputResult::None, None))
@ -609,7 +623,7 @@ where
state: &mut S, state: &mut S,
executor: &mut E, executor: &mut E,
manager: &mut EM, manager: &mut EM,
input: I, input: &I,
) -> Result<(ExecuteInputResult, Option<CorpusId>), Error> { ) -> Result<(ExecuteInputResult, Option<CorpusId>), Error> {
self.evaluate_input_with_observers(state, executor, manager, input, true) self.evaluate_input_with_observers(state, executor, manager, input, true)
} }
@ -724,14 +738,107 @@ where
} }
} }
impl<CS, E, EM, F, I, IF, OF, S> EventProcessor<E, EM, I, S> for StdFuzzer<CS, F, IF, OF>
where
CS: Scheduler<I, S>,
E: HasObservers + Executor<EM, I, S, Self>,
E::Observers: DeserializeOwned + Serialize + ObserversTuple<I, S>,
EM: EventReceiver<I, S>
+ RecordSerializationTime
+ CanSerializeObserver<E::Observers>
+ EventFirer<I, S>,
F: Feedback<EM, I, E::Observers, S>,
I: Input,
OF: Feedback<EM, I, E::Observers, S>,
S: HasCorpus<I>
+ HasSolutions<I>
+ HasExecutions
+ HasLastFoundTime
+ MaybeHasClientPerfMonitor
+ HasCurrentCorpusId
+ HasImported,
{
fn process_events(
&mut self,
state: &mut S,
executor: &mut E,
manager: &mut EM,
) -> Result<(), Error> {
// todo make this into a trait
// Execute the manager
while let Some((event, with_observers)) = manager.try_receive(state)? {
// at this point event is either newtestcase or objectives
let res = if with_observers {
match event {
Event::NewTestcase {
ref input,
ref observers_buf,
exit_kind,
..
} => {
let start = current_time();
let observers: E::Observers =
postcard::from_bytes(observers_buf.as_ref().unwrap())?;
{
let dur = current_time() - start;
manager.set_deserialization_time(dur);
}
let res = self.evaluate_execution(
state, manager, input, &observers, &exit_kind, false,
)?;
res.1
}
_ => None,
}
} else {
match event {
Event::NewTestcase { ref input, .. } => {
let res = self.evaluate_input_with_observers(
state, executor, manager, input, false,
)?;
res.1
}
#[cfg(feature = "share_objectives")]
Event::Objective { ref input, .. } => {
let res = self.evaluate_input_with_observers(
state, executor, manager, input, false,
)?;
res.1
}
_ => None,
}
};
if let Some(item) = res {
*state.imported_mut() += 1;
log::debug!("Added received input as item #{item}");
// for centralize
manager.on_interesting(state, event)?;
} else {
log::debug!("Received input was discarded");
}
}
Ok(())
}
}
impl<CS, E, EM, F, I, IF, OF, S, ST> Fuzzer<E, EM, I, S, ST> for StdFuzzer<CS, F, IF, OF> impl<CS, E, EM, F, I, IF, OF, S, ST> Fuzzer<E, EM, I, S, ST> for StdFuzzer<CS, F, IF, OF>
where where
CS: Scheduler<I, S>, CS: Scheduler<I, S>,
EM: ProgressReporter<S> + EventProcessor<E, S, Self>, E: HasObservers + Executor<EM, I, S, Self>,
E::Observers: DeserializeOwned + Serialize + ObserversTuple<I, S>,
EM: CanSerializeObserver<E::Observers> + EventFirer<I, S> + RecordSerializationTime,
I: Input,
F: Feedback<EM, I, E::Observers, S>,
OF: Feedback<EM, I, E::Observers, S>,
EM: ProgressReporter<S> + SendExiting + EventReceiver<I, S>,
S: HasExecutions S: HasExecutions
+ HasMetadata + HasMetadata
+ HasCorpus<I> + HasCorpus<I>
+ HasSolutions<I>
+ HasLastReportTime + HasLastReportTime
+ HasLastFoundTime
+ HasImported
+ HasTestcase<I> + HasTestcase<I>
+ HasCurrentCorpusId + HasCurrentCorpusId
+ HasCurrentStageId + HasCurrentStageId
@ -774,8 +881,7 @@ where
#[cfg(feature = "introspection")] #[cfg(feature = "introspection")]
state.introspection_monitor_mut().start_timer(); state.introspection_monitor_mut().start_timer();
// Execute the manager self.process_events(state, executor, manager)?;
manager.process(self, state, executor)?;
// Mark the elapsed time for the manager // Mark the elapsed time for the manager
#[cfg(feature = "introspection")] #[cfg(feature = "introspection")]
@ -951,7 +1057,7 @@ impl Default for NopFuzzer {
impl<E, EM, I, S, ST> Fuzzer<E, EM, I, S, ST> for NopFuzzer impl<E, EM, I, S, ST> Fuzzer<E, EM, I, S, ST> for NopFuzzer
where where
EM: ProgressReporter<S> + EventProcessor<E, S, Self>, EM: ProgressReporter<S>,
ST: StagesTuple<E, EM, S, Self>, ST: StagesTuple<E, EM, S, Self>,
S: HasMetadata + HasExecutions + HasLastReportTime + HasCurrentStageId, S: HasMetadata + HasExecutions + HasLastReportTime + HasCurrentStageId,
{ {
@ -1026,22 +1132,22 @@ mod tests {
.unwrap(); .unwrap();
let input = BytesInput::new(vec![1, 2, 3]); let input = BytesInput::new(vec![1, 2, 3]);
assert!(fuzzer assert!(fuzzer
.evaluate_input(&mut state, &mut executor, &mut manager, input.clone()) .evaluate_input(&mut state, &mut executor, &mut manager, &input)
.is_ok()); .is_ok());
assert_eq!(1, *execution_count.borrow()); // evaluate_input does not add it to the filter assert_eq!(1, *execution_count.borrow()); // evaluate_input does not add it to the filter
assert!(fuzzer assert!(fuzzer
.evaluate_filtered(&mut state, &mut executor, &mut manager, input.clone()) .evaluate_filtered(&mut state, &mut executor, &mut manager, &input)
.is_ok()); .is_ok());
assert_eq!(2, *execution_count.borrow()); // at to the filter assert_eq!(2, *execution_count.borrow()); // at to the filter
assert!(fuzzer assert!(fuzzer
.evaluate_filtered(&mut state, &mut executor, &mut manager, input.clone()) .evaluate_filtered(&mut state, &mut executor, &mut manager, &input)
.is_ok()); .is_ok());
assert_eq!(2, *execution_count.borrow()); // the harness is not called assert_eq!(2, *execution_count.borrow()); // the harness is not called
assert!(fuzzer assert!(fuzzer
.evaluate_input(&mut state, &mut executor, &mut manager, input.clone()) .evaluate_input(&mut state, &mut executor, &mut manager, &input)
.is_ok()); .is_ok());
assert_eq!(3, *execution_count.borrow()); // evaluate_input ignores filters assert_eq!(3, *execution_count.borrow()); // evaluate_input ignores filters
} }

View File

@ -413,7 +413,7 @@ where
for (index, new_byte) in mutation { for (index, new_byte) in mutation {
input_copy.mutator_bytes_mut()[index] = new_byte; input_copy.mutator_bytes_mut()[index] = new_byte;
} }
fuzzer.evaluate_filtered(state, executor, manager, input_copy)?; fuzzer.evaluate_filtered(state, executor, manager, &input_copy)?;
} }
} }
Ok(()) Ok(())

View File

@ -37,7 +37,7 @@ where
manager: &mut EM, manager: &mut EM,
) -> Result<(), Error> { ) -> Result<(), Error> {
let input = self.0.generate(state)?; let input = self.0.generate(state)?;
fuzzer.evaluate_filtered(state, executor, manager, input)?; fuzzer.evaluate_filtered(state, executor, manager, &input)?;
Ok(()) Ok(())
} }

View File

@ -48,7 +48,7 @@ pub use verify_timeouts::{TimeoutsToVerify, VerifyTimeoutsStage};
use crate::{ use crate::{
corpus::{CorpusId, HasCurrentCorpusId}, corpus::{CorpusId, HasCurrentCorpusId},
events::EventProcessor, events::SendExiting,
state::{HasExecutions, Stoppable}, state::{HasExecutions, Stoppable},
Error, HasNamedMetadata, Error, HasNamedMetadata,
}; };
@ -161,7 +161,7 @@ where
Head: Stage<E, EM, S, Z>, Head: Stage<E, EM, S, Z>,
Tail: StagesTuple<E, EM, S, Z> + HasConstLen, Tail: StagesTuple<E, EM, S, Z> + HasConstLen,
S: HasCurrentStageId + Stoppable, S: HasCurrentStageId + Stoppable,
EM: EventProcessor<E, S, Z>, EM: SendExiting,
{ {
/// Performs all stages in the tuple, /// Performs all stages in the tuple,
/// Checks after every stage if state wants to stop /// Checks after every stage if state wants to stop
@ -248,7 +248,7 @@ impl<E, EM, S, Z> IntoVec<Box<dyn Stage<E, EM, S, Z>>> for Vec<Box<dyn Stage<E,
impl<E, EM, S, Z> StagesTuple<E, EM, S, Z> for Vec<Box<dyn Stage<E, EM, S, Z>>> impl<E, EM, S, Z> StagesTuple<E, EM, S, Z> for Vec<Box<dyn Stage<E, EM, S, Z>>>
where where
EM: EventProcessor<E, S, Z>, EM: SendExiting,
S: HasCurrentStageId + Stoppable, S: HasCurrentStageId + Stoppable,
{ {
/// Performs all stages in the `Vec` /// Performs all stages in the `Vec`

View File

@ -279,7 +279,7 @@ where
let (untransformed, post) = input.try_transform_into(state)?; let (untransformed, post) = input.try_transform_into(state)?;
let (_, corpus_id) = let (_, corpus_id) =
fuzzer.evaluate_filtered(state, executor, manager, untransformed)?; fuzzer.evaluate_filtered(state, executor, manager, &untransformed)?;
start_timer!(state); start_timer!(state);
self.mutator_mut().post_exec(state, corpus_id)?; self.mutator_mut().post_exec(state, corpus_id)?;
@ -345,7 +345,7 @@ where
for new_input in generated { for new_input in generated {
let (untransformed, post) = new_input.try_transform_into(state)?; let (untransformed, post) = new_input.try_transform_into(state)?;
let (_, corpus_id) = let (_, corpus_id) =
fuzzer.evaluate_filtered(state, executor, manager, untransformed)?; fuzzer.evaluate_filtered(state, executor, manager, &untransformed)?;
self.mutator.multi_post_exec(state, corpus_id)?; self.mutator.multi_post_exec(state, corpus_id)?;
post.post_exec(state, corpus_id)?; post.post_exec(state, corpus_id)?;
} }

View File

@ -176,7 +176,7 @@ where
let (untransformed, post) = input.try_transform_into(state)?; let (untransformed, post) = input.try_transform_into(state)?;
let (_, corpus_id) = let (_, corpus_id) =
fuzzer.evaluate_filtered(state, executor, manager, untransformed)?; fuzzer.evaluate_filtered(state, executor, manager, &untransformed)?;
start_timer!(state); start_timer!(state);
self.mutator_mut().post_exec(state, corpus_id)?; self.mutator_mut().post_exec(state, corpus_id)?;

View File

@ -161,7 +161,7 @@ where
) -> Result<(), Error> { ) -> Result<(), Error> {
// todo: is_interesting, etc. // todo: is_interesting, etc.
fuzzer.evaluate_execution(state, event_mgr, last_input, observers, &exit_kind, true)?; fuzzer.evaluate_execution(state, event_mgr, &last_input, observers, &exit_kind, true)?;
start_timer!(state); start_timer!(state);
self.mutator.post_exec(state, self.current_corpus_id)?; self.mutator.post_exec(state, self.current_corpus_id)?;

View File

@ -148,7 +148,7 @@ where
.left_to_sync .left_to_sync
.retain(|p| p != &path); .retain(|p| p != &path);
log::debug!("Syncing and evaluating {:?}", path); log::debug!("Syncing and evaluating {:?}", path);
fuzzer.evaluate_input(state, executor, manager, input)?; fuzzer.evaluate_input(state, executor, manager, &input)?;
} }
#[cfg(feature = "introspection")] #[cfg(feature = "introspection")]

View File

@ -238,7 +238,7 @@ where
let (_, corpus_id) = fuzzer.evaluate_execution( let (_, corpus_id) = fuzzer.evaluate_execution(
state, state,
manager, manager,
input.clone(), &input,
&*observers, &*observers,
&exit_kind, &exit_kind,
false, false,

View File

@ -445,7 +445,7 @@ where
} }
let (untransformed, post) = input.try_transform_into(state)?; let (untransformed, post) = input.try_transform_into(state)?;
let (_, corpus_id) = fuzzer.evaluate_filtered(state, executor, manager, untransformed)?; let (_, corpus_id) = fuzzer.evaluate_filtered(state, executor, manager, &untransformed)?;
start_timer!(state); start_timer!(state);
self.mutator_mut().post_exec(state, corpus_id)?; self.mutator_mut().post_exec(state, corpus_id)?;

View File

@ -103,7 +103,7 @@ where
executor.set_timeout(self.doubled_timeout); executor.set_timeout(self.doubled_timeout);
*self.capture_timeouts.borrow_mut() = false; *self.capture_timeouts.borrow_mut() = false;
while let Some(input) = timeouts.pop() { while let Some(input) = timeouts.pop() {
fuzzer.evaluate_input(state, executor, manager, input)?; fuzzer.evaluate_input(state, executor, manager, &input)?;
} }
executor.set_timeout(self.original_timeout); executor.set_timeout(self.original_timeout);
*self.capture_timeouts.borrow_mut() = true; *self.capture_timeouts.borrow_mut() = true;

View File

@ -694,7 +694,7 @@ where
let _: CorpusId = fuzzer.add_input(self, executor, manager, input)?; let _: CorpusId = fuzzer.add_input(self, executor, manager, input)?;
Ok(ExecuteInputResult::Corpus) Ok(ExecuteInputResult::Corpus)
} else { } else {
let (res, _) = fuzzer.evaluate_input(self, executor, manager, input.clone())?; let (res, _) = fuzzer.evaluate_input(self, executor, manager, &input)?;
if res == ExecuteInputResult::None { if res == ExecuteInputResult::None {
fuzzer.add_disabled_input(self, input)?; fuzzer.add_disabled_input(self, input)?;
log::warn!("input {:?} was not interesting, adding as disabled.", &path); log::warn!("input {:?} was not interesting, adding as disabled.", &path);
@ -1027,7 +1027,7 @@ where
let _: CorpusId = fuzzer.add_input(self, executor, manager, input)?; let _: CorpusId = fuzzer.add_input(self, executor, manager, input)?;
added += 1; added += 1;
} else { } else {
let (res, _) = fuzzer.evaluate_input(self, executor, manager, input)?; let (res, _) = fuzzer.evaluate_input(self, executor, manager, &input)?;
if res != ExecuteInputResult::None { if res != ExecuteInputResult::None {
added += 1; added += 1;
} }

View File

@ -13,7 +13,7 @@ use std::{
use libafl::{ use libafl::{
corpus::Corpus, corpus::Corpus,
events::{ events::{
launcher::Launcher, EventConfig, EventProcessor, ProgressReporter, SimpleEventManager, launcher::Launcher, EventConfig, EventReceiver, ProgressReporter, SimpleEventManager,
SimpleRestartingEventManager, SimpleRestartingEventManager,
}, },
executors::ExitKind, executors::ExitKind,
@ -68,7 +68,7 @@ where
+ HasLastReportTime + HasLastReportTime
+ HasCurrentStageId + HasCurrentStageId
+ Stoppable, + Stoppable,
EM: ProgressReporter<S> + EventProcessor<E, S, F>, EM: ProgressReporter<S> + EventReceiver<I, S>,
ST: StagesTuple<E, EM, S, F>, ST: StagesTuple<E, EM, S, F>,
{ {
if let Some(solution) = state.solutions().last() { if let Some(solution) = state.solutions().last() {

View File

@ -1,7 +1,7 @@
use std::ffi::c_int; use std::ffi::c_int;
use libafl::{ use libafl::{
events::{EventProcessor, ProgressReporter, SimpleEventManager}, events::{EventReceiver, ProgressReporter, SimpleEventManager},
executors::HasObservers, executors::HasObservers,
feedbacks::MapFeedbackMetadata, feedbacks::MapFeedbackMetadata,
monitors::SimpleMonitor, monitors::SimpleMonitor,
@ -30,7 +30,7 @@ where
+ HasCurrentStageId + HasCurrentStageId
+ Stoppable, + Stoppable,
E: HasObservers, E: HasObservers,
EM: ProgressReporter<S> + EventProcessor<E, S, F>, EM: ProgressReporter<S> + EventReceiver<I, S>,
ST: StagesTuple<E, EM, S, F>, ST: StagesTuple<E, EM, S, F>,
{ {
let meta = state let meta = state