Add client stats to Events (#3116)

* add stats alongside Event over the wire
This commit is contained in:
Romain Malmain 2025-04-01 16:51:52 +02:00 committed by GitHub
parent 184b69be8e
commit 9dff7a438d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
20 changed files with 464 additions and 341 deletions

View File

@ -1,5 +1,5 @@
use libafl::{ use libafl::{
events::{Event, EventManagerHook}, events::{Event, EventManagerHook, EventWithStats},
state::Stoppable, state::Stoppable,
Error, Error,
}; };
@ -18,9 +18,9 @@ where
&mut self, &mut self,
state: &mut S, state: &mut S,
_client_id: ClientId, _client_id: ClientId,
event: &Event<I>, event: &EventWithStats<I>,
) -> Result<bool, Error> { ) -> Result<bool, Error> {
if self.exit_on_solution && matches!(event, Event::Objective { .. }) { if self.exit_on_solution && matches!(event.event(), Event::Objective { .. }) {
// TODO: dump state // TODO: dump state
state.request_stop(); state.request_stop();
} }

View File

@ -6,7 +6,7 @@ use core::{hash::Hash, marker::PhantomData};
use hashbrown::{HashMap, HashSet}; use hashbrown::{HashMap, HashSet};
use libafl_bolts::{ use libafl_bolts::{
AsIter, Named, current_time, AsIter, Named,
tuples::{Handle, Handled}, tuples::{Handle, Handled},
}; };
use num_traits::ToPrimitive; use num_traits::ToPrimitive;
@ -15,7 +15,7 @@ use z3::{Config, Context, Optimize, ast::Bool};
use crate::{ use crate::{
Error, HasMetadata, HasScheduler, Error, HasMetadata, HasScheduler,
corpus::Corpus, corpus::Corpus,
events::{Event, EventFirer, LogSeverity}, events::{Event, EventFirer, EventWithStats, LogSeverity},
executors::{Executor, HasObservers}, executors::{Executor, HasObservers},
inputs::Input, inputs::Input,
monitors::stats::{AggregatorOps, UserStats, UserStatsValue}, monitors::stats::{AggregatorOps, UserStats, UserStatsValue},
@ -123,20 +123,17 @@ where
manager.fire( manager.fire(
state, state,
Event::UpdateUserStats { EventWithStats::with_current_time(
name: Cow::from("minimisation exec pass"), Event::UpdateUserStats {
value: UserStats::new(UserStatsValue::Ratio(curr, total), AggregatorOps::None), name: Cow::from("minimisation exec pass"),
phantom: PhantomData, value: UserStats::new(
}, UserStatsValue::Ratio(curr, total),
)?; AggregatorOps::None,
),
manager.fire( phantom: PhantomData,
state, },
Event::UpdateExecStats {
time: current_time(),
phantom: PhantomData,
executions, executions,
}, ),
)?; )?;
let seed_expr = Bool::fresh_const(&ctx, "seed"); let seed_expr = Bool::fresh_const(&ctx, "seed");

View File

@ -11,7 +11,7 @@ use serde::de::DeserializeOwned;
#[cfg(feature = "llmp_compression")] #[cfg(feature = "llmp_compression")]
use crate::events::COMPRESS_THRESHOLD; use crate::events::COMPRESS_THRESHOLD;
use crate::events::{_LLMP_TAG_TO_MAIN, BrokerEventResult, Event}; use crate::events::{_LLMP_TAG_TO_MAIN, BrokerEventResult, Event, EventWithStats};
/// An LLMP-backed event manager for scalable multi-processed fuzzing /// An LLMP-backed event manager for scalable multi-processed fuzzing
pub struct CentralizedLlmpHook<I> { pub struct CentralizedLlmpHook<I> {
@ -47,7 +47,7 @@ where
} else { } else {
&*msg &*msg
}; };
let event: Event<I> = postcard::from_bytes(event_bytes)?; let event: EventWithStats<I> = postcard::from_bytes(event_bytes)?;
match Self::handle_in_broker(client_id, &event)? { match Self::handle_in_broker(client_id, &event)? {
BrokerEventResult::Forward => Ok(LlmpMsgHookResult::ForwardToClients), BrokerEventResult::Forward => Ok(LlmpMsgHookResult::ForwardToClients),
BrokerEventResult::Handled => Ok(LlmpMsgHookResult::Handled), BrokerEventResult::Handled => Ok(LlmpMsgHookResult::Handled),
@ -85,9 +85,9 @@ impl<I> CentralizedLlmpHook<I> {
#[expect(clippy::unnecessary_wraps)] #[expect(clippy::unnecessary_wraps)]
fn handle_in_broker( fn handle_in_broker(
_client_id: ClientId, _client_id: ClientId,
event: &Event<I>, event: &EventWithStats<I>,
) -> Result<BrokerEventResult, Error> { ) -> Result<BrokerEventResult, Error> {
match &event { match event.event() {
Event::NewTestcase { .. } | Event::Stop => Ok(BrokerEventResult::Forward), Event::NewTestcase { .. } | Event::Stop => Ok(BrokerEventResult::Forward),
_ => Ok(BrokerEventResult::Handled), _ => Ok(BrokerEventResult::Handled),
} }

View File

@ -22,7 +22,7 @@ use tokio::{
use crate::{ use crate::{
events::{ events::{
Event, EventWithStats,
centralized::_LLMP_TAG_TO_MAIN, centralized::_LLMP_TAG_TO_MAIN,
multi_machine::{MultiMachineMsg, TcpMultiMachineState}, multi_machine::{MultiMachineMsg, TcpMultiMachineState},
}, },
@ -128,7 +128,7 @@ where
#[cfg(feature = "llmp_compression")] #[cfg(feature = "llmp_compression")]
fn try_compress( fn try_compress(
state_lock: &mut RwLockWriteGuard<TcpMultiMachineState<A>>, state_lock: &mut RwLockWriteGuard<TcpMultiMachineState<A>>,
event: &Event<I>, event: &EventWithStats<I>,
) -> Result<(Flags, Vec<u8>), Error> { ) -> Result<(Flags, Vec<u8>), Error> {
let serialized = postcard::to_allocvec(&event)?; let serialized = postcard::to_allocvec(&event)?;
@ -141,7 +141,7 @@ where
#[cfg(not(feature = "llmp_compression"))] #[cfg(not(feature = "llmp_compression"))]
fn try_compress( fn try_compress(
_state_lock: &mut RwLockWriteGuard<TcpMultiMachineState<A>>, _state_lock: &mut RwLockWriteGuard<TcpMultiMachineState<A>>,
event: &Event<I>, event: &EventWithStats<I>,
) -> Result<(Flags, Vec<u8>), Error> { ) -> Result<(Flags, Vec<u8>), Error> {
Ok((Flags(0), postcard::to_allocvec(&event)?)) Ok((Flags(0), postcard::to_allocvec(&event)?))
} }

View File

@ -30,6 +30,8 @@ pub mod centralized_multi_machine;
#[cfg(all(unix, feature = "multi_machine"))] #[cfg(all(unix, feature = "multi_machine"))]
pub use centralized_multi_machine::*; pub use centralized_multi_machine::*;
use super::EventWithStats;
/// An LLMP-backed event hook for scalable multi-processed fuzzing /// An LLMP-backed event hook for scalable multi-processed fuzzing
#[derive(Debug)] #[derive(Debug)]
pub struct StdLlmpEventHook<I, MT> { pub struct StdLlmpEventHook<I, MT> {
@ -71,7 +73,7 @@ where
} else { } else {
&*msg &*msg
}; };
let event: Event<I> = postcard::from_bytes(event_bytes)?; let event: EventWithStats<I> = postcard::from_bytes(event_bytes)?;
match Self::handle_in_broker( match Self::handle_in_broker(
monitor, monitor,
&mut self.client_stats_manager, &mut self.client_stats_manager,
@ -117,8 +119,16 @@ where
monitor: &mut MT, monitor: &mut MT,
client_stats_manager: &mut ClientStatsManager, client_stats_manager: &mut ClientStatsManager,
client_id: ClientId, client_id: ClientId,
event: &Event<I>, event: &EventWithStats<I>,
) -> Result<BrokerEventResult, Error> { ) -> Result<BrokerEventResult, Error> {
let stats = event.stats();
client_stats_manager.client_stats_insert(ClientId(0));
client_stats_manager.update_client_stats_for(ClientId(0), |client_stat| {
client_stat.update_executions(stats.executions, stats.time);
});
let event = event.event();
match &event { match &event {
Event::NewTestcase { Event::NewTestcase {
corpus_size, corpus_size,
@ -138,19 +148,7 @@ where
monitor.display(client_stats_manager, event.name(), id); monitor.display(client_stats_manager, event.name(), id);
Ok(BrokerEventResult::Forward) Ok(BrokerEventResult::Forward)
} }
Event::UpdateExecStats { Event::Heartbeat => Ok(BrokerEventResult::Handled),
time,
executions,
phantom: _,
} => {
// TODO: The monitor buffer should be added on client add.
client_stats_manager.client_stats_insert(client_id);
client_stats_manager.update_client_stats_for(client_id, |client_stat| {
client_stat.update_executions(*executions, *time);
});
monitor.display(client_stats_manager, event.name(), client_id);
Ok(BrokerEventResult::Handled)
}
Event::UpdateUserStats { name, value, .. } => { Event::UpdateUserStats { name, value, .. } => {
client_stats_manager.client_stats_insert(client_id); client_stats_manager.client_stats_insert(client_id);
client_stats_manager.update_client_stats_for(client_id, |client_stat| { client_stats_manager.update_client_stats_for(client_id, |client_stat| {
@ -162,8 +160,6 @@ where
} }
#[cfg(feature = "introspection")] #[cfg(feature = "introspection")]
Event::UpdatePerfMonitor { Event::UpdatePerfMonitor {
time,
executions,
introspection_stats, introspection_stats,
phantom: _, phantom: _,
} => { } => {
@ -172,8 +168,6 @@ where
// Get the client for the staterestorer ID // Get the client for the staterestorer ID
client_stats_manager.client_stats_insert(client_id); client_stats_manager.client_stats_insert(client_id);
client_stats_manager.update_client_stats_for(client_id, |client_stat| { client_stats_manager.update_client_stats_for(client_id, |client_stat| {
// Update the normal monitor for this client
client_stat.update_executions(*executions, *time);
// Update the performance monitor for this client // Update the performance monitor for this client
client_stat.update_introspection_stats((**introspection_stats).clone()); client_stat.update_introspection_stats((**introspection_stats).clone());
}); });

View File

@ -22,7 +22,7 @@ use libafl_bolts::{
llmp::{LLMP_FLAG_COMPRESSED, LLMP_FLAG_INITIALIZED}, llmp::{LLMP_FLAG_COMPRESSED, LLMP_FLAG_INITIALIZED},
}; };
use super::AwaitRestartSafe; use super::{AwaitRestartSafe, EventWithStats};
#[cfg(feature = "llmp_compression")] #[cfg(feature = "llmp_compression")]
use crate::events::llmp::COMPRESS_THRESHOLD; use crate::events::llmp::COMPRESS_THRESHOLD;
use crate::{ use crate::{
@ -166,18 +166,18 @@ where
} }
#[expect(clippy::match_same_arms)] #[expect(clippy::match_same_arms)]
fn fire(&mut self, state: &mut S, mut event: Event<I>) -> Result<(), Error> { fn fire(&mut self, state: &mut S, mut event: EventWithStats<I>) -> Result<(), Error> {
if !self.is_main { if !self.is_main {
// secondary node // secondary node
let mut is_tc = false; let mut is_tc = false;
// Forward to main only if new tc, heartbeat, or optionally, a new objective // Forward to main only if new tc, heartbeat, or optionally, a new objective
let should_be_forwarded = match &mut event { let should_be_forwarded = match event.event_mut() {
Event::NewTestcase { forward_id, .. } => { Event::NewTestcase { forward_id, .. } => {
*forward_id = Some(ClientId(self.inner.mgr_id().0 as u32)); *forward_id = Some(ClientId(self.inner.mgr_id().0 as u32));
is_tc = true; is_tc = true;
true true
} }
Event::UpdateExecStats { .. } => true, // send UpdateExecStats but this guy won't be handled. the only purpose is to keep this client alive else the broker thinks it is dead and will dc it Event::Heartbeat => true, // the only purpose is to keep this client alive else the broker thinks it is dead and will dc it
Event::Objective { .. } => true, Event::Objective { .. } => true,
Event::Stop => true, Event::Stop => true,
_ => false, _ => false,
@ -201,7 +201,10 @@ where
state: &mut S, state: &mut S,
severity_level: LogSeverity, severity_level: LogSeverity,
message: String, message: String,
) -> Result<(), Error> { ) -> Result<(), Error>
where
S: HasExecutions,
{
self.inner.log(state, severity_level, message) self.inner.log(state, severity_level, message)
} }
@ -261,7 +264,7 @@ where
SHM: ShMem, SHM: ShMem,
SP: ShMemProvider<ShMem = SHM>, SP: ShMemProvider<ShMem = SHM>,
{ {
fn try_receive(&mut self, state: &mut S) -> Result<Option<(Event<I>, bool)>, Error> { fn try_receive(&mut self, state: &mut S) -> Result<Option<(EventWithStats<I>, bool)>, Error> {
if self.is_main { if self.is_main {
// main node // main node
self.receive_from_secondary(state) self.receive_from_secondary(state)
@ -272,7 +275,7 @@ where
} }
} }
fn on_interesting(&mut self, state: &mut S, event: Event<I>) -> Result<(), Error> { fn on_interesting(&mut self, state: &mut S, event: EventWithStats<I>) -> Result<(), Error> {
self.inner.fire(state, event) self.inner.fire(state, event)
} }
} }
@ -343,7 +346,7 @@ where
SP: ShMemProvider<ShMem = SHM>, SP: ShMemProvider<ShMem = SHM>,
{ {
#[cfg(feature = "llmp_compression")] #[cfg(feature = "llmp_compression")]
fn forward_to_main(&mut self, event: &Event<I>) -> Result<(), Error> { fn forward_to_main(&mut self, event: &EventWithStats<I>) -> Result<(), Error> {
let serialized = postcard::to_allocvec(event)?; let serialized = postcard::to_allocvec(event)?;
let flags = LLMP_FLAG_INITIALIZED; let flags = LLMP_FLAG_INITIALIZED;
@ -363,13 +366,16 @@ where
} }
#[cfg(not(feature = "llmp_compression"))] #[cfg(not(feature = "llmp_compression"))]
fn forward_to_main(&mut self, event: &Event<I>) -> Result<(), Error> { fn forward_to_main(&mut self, event: &EventWithStats<I>) -> Result<(), Error> {
let serialized = postcard::to_allocvec(event)?; let serialized = postcard::to_allocvec(event)?;
self.client.send_buf(_LLMP_TAG_TO_MAIN, &serialized)?; self.client.send_buf(_LLMP_TAG_TO_MAIN, &serialized)?;
Ok(()) Ok(())
} }
fn receive_from_secondary(&mut self, state: &mut S) -> Result<Option<(Event<I>, bool)>, Error> { fn receive_from_secondary(
&mut self,
state: &mut S,
) -> Result<Option<(EventWithStats<I>, bool)>, Error> {
// TODO: Get around local event copy by moving handle_in_client // TODO: Get around local event copy by moving handle_in_client
let self_id = self.client.sender().id(); let self_id = self.client.sender().id();
while let Some((client_id, tag, _flags, msg)) = self.client.recv_buf_with_flags()? { while let Some((client_id, tag, _flags, msg)) = self.client.recv_buf_with_flags()? {
@ -392,15 +398,18 @@ where
} else { } else {
msg msg
}; };
let event: Event<I> = postcard::from_bytes(event_bytes)?; let event: EventWithStats<I> = postcard::from_bytes(event_bytes)?;
log::debug!("Processor received message {}", event.name_detailed()); log::debug!(
"Processor received message {}",
event.event().name_detailed()
);
let event_name = event.name_detailed(); let event_name = event.event().name_detailed();
match event { match event.event() {
Event::NewTestcase { Event::NewTestcase {
client_config, client_config,
ref observers_buf, observers_buf,
forward_id, forward_id,
.. ..
} => { } => {
@ -425,7 +434,7 @@ where
_ => { _ => {
return Err(Error::illegal_state(format!( return Err(Error::illegal_state(format!(
"Received illegal message that message should not have arrived: {:?}.", "Received illegal message that message should not have arrived: {:?}.",
event.name() event.event().name()
))); )));
} }
} }

View File

@ -4,7 +4,7 @@
//! other clients //! other clients
use libafl_bolts::ClientId; use libafl_bolts::ClientId;
use crate::{Error, events::Event}; use crate::{Error, events::EventWithStats};
/// The `broker_hooks` that are run before and after the event manager calls `try_receive` /// The `broker_hooks` that are run before and after the event manager calls `try_receive`
pub trait EventManagerHook<I, S> { pub trait EventManagerHook<I, S> {
@ -14,7 +14,7 @@ pub trait EventManagerHook<I, S> {
&mut self, &mut self,
state: &mut S, state: &mut S,
client_id: ClientId, client_id: ClientId,
event: &Event<I>, event: &EventWithStats<I>,
) -> Result<bool, Error>; ) -> Result<bool, Error>;
} }
@ -25,7 +25,7 @@ pub trait EventManagerHooksTuple<I, S> {
&mut self, &mut self,
state: &mut S, state: &mut S,
client_id: ClientId, client_id: ClientId,
event: &Event<I>, event: &EventWithStats<I>,
) -> Result<bool, Error>; ) -> Result<bool, Error>;
} }
@ -35,7 +35,7 @@ impl<I, S> EventManagerHooksTuple<I, S> for () {
&mut self, &mut self,
_state: &mut S, _state: &mut S,
_client_id: ClientId, _client_id: ClientId,
_event: &Event<I>, _event: &EventWithStats<I>,
) -> Result<bool, Error> { ) -> Result<bool, Error> {
Ok(true) Ok(true)
} }
@ -51,7 +51,7 @@ where
&mut self, &mut self,
state: &mut S, state: &mut S,
client_id: ClientId, client_id: ClientId,
event: &Event<I>, event: &EventWithStats<I>,
) -> Result<bool, Error> { ) -> Result<bool, Error> {
let first = self.0.pre_receive(state, client_id, event)?; let first = self.0.pre_receive(state, client_id, event)?;
let second = self.1.pre_receive_all(state, client_id, event)?; let second = self.1.pre_receive_all(state, client_id, event)?;

View File

@ -16,7 +16,7 @@ use serde::{Serialize, de::DeserializeOwned};
use crate::{ use crate::{
Error, Error,
events::{Event, EventFirer}, events::{Event, EventFirer, EventWithStats},
fuzzer::EvaluatorObservers, fuzzer::EvaluatorObservers,
inputs::{Input, InputConverter, NopInput, NopInputConverter}, inputs::{Input, InputConverter, NopInput, NopInputConverter},
state::{HasCurrentTestcase, HasSolutions, NopState}, state::{HasCurrentTestcase, HasSolutions, NopState},
@ -385,38 +385,40 @@ where
} }
#[cfg(feature = "llmp_compression")] #[cfg(feature = "llmp_compression")]
fn fire(&mut self, _state: &mut S, event: Event<I>) -> Result<(), Error> { fn fire(&mut self, _state: &mut S, event: EventWithStats<I>) -> Result<(), Error> {
if self.converter.is_none() { if self.converter.is_none() {
return Ok(()); return Ok(());
} }
// Filter out non interestign events and convert `NewTestcase` // Filter out non interestign events and convert `NewTestcase`
let converted_event = match event { let converted_event = EventWithStats::new(
Event::NewTestcase { match event.event {
input, Event::NewTestcase {
client_config, input,
exit_kind, client_config,
corpus_size, exit_kind,
observers_buf, corpus_size,
time, observers_buf,
forward_id, forward_id,
#[cfg(all(unix, feature = "std", feature = "multi_machine"))] #[cfg(all(unix, feature = "std", feature = "multi_machine"))]
node_id, node_id,
} => Event::NewTestcase { } => Event::NewTestcase {
input: self.converter.as_mut().unwrap().convert(input)?, input: self.converter.as_mut().unwrap().convert(input)?,
client_config, client_config,
exit_kind, exit_kind,
corpus_size, corpus_size,
observers_buf, observers_buf,
time, forward_id,
forward_id, #[cfg(all(unix, feature = "std", feature = "multi_machine"))]
#[cfg(all(unix, feature = "std", feature = "multi_machine"))] node_id,
node_id, },
_ => {
return Ok(());
}
}, },
_ => { event.stats,
return Ok(()); );
}
};
let serialized = postcard::to_allocvec(&converted_event)?; let serialized = postcard::to_allocvec(&converted_event)?;
let flags = LLMP_FLAG_INITIALIZED; let flags = LLMP_FLAG_INITIALIZED;
@ -437,38 +439,40 @@ where
} }
#[cfg(not(feature = "llmp_compression"))] #[cfg(not(feature = "llmp_compression"))]
fn fire(&mut self, _state: &mut S, event: Event<I>) -> Result<(), Error> { fn fire(&mut self, _state: &mut S, event: EventWithStats<I>) -> Result<(), Error> {
if self.converter.is_none() { if self.converter.is_none() {
return Ok(()); return Ok(());
} }
// Filter out non interestign events and convert `NewTestcase` // Filter out non interestign events and convert `NewTestcase`
let converted_event = match event { let converted_event = EventWithStats::new(
Event::NewTestcase { match event.event {
input, Event::NewTestcase {
client_config, input,
exit_kind, client_config,
corpus_size, exit_kind,
observers_buf, corpus_size,
time, observers_buf,
forward_id, forward_id,
#[cfg(all(unix, feature = "std", feature = "multi_machine"))] #[cfg(all(unix, feature = "std", feature = "multi_machine"))]
node_id, node_id,
} => Event::NewTestcase { } => Event::NewTestcase {
input: self.converter.as_mut().unwrap().convert(input)?, input: self.converter.as_mut().unwrap().convert(input)?,
client_config, client_config,
exit_kind, exit_kind,
corpus_size, corpus_size,
observers_buf, observers_buf,
time, forward_id,
forward_id, #[cfg(all(unix, feature = "std", feature = "multi_machine"))]
#[cfg(all(unix, feature = "std", feature = "multi_machine"))] node_id,
node_id, },
_ => {
return Ok(());
}
}, },
_ => { event.stats,
return Ok(()); );
}
};
let serialized = postcard::to_allocvec(&converted_event)?; let serialized = postcard::to_allocvec(&converted_event)?;
self.llmp.send_buf(LLMP_TAG_EVENT_TO_BOTH, &serialized)?; self.llmp.send_buf(LLMP_TAG_EVENT_TO_BOTH, &serialized)?;
Ok(()) Ok(())

View File

@ -55,9 +55,9 @@ use crate::{
common::HasMetadata, common::HasMetadata,
events::{ events::{
_LLMP_TAG_EVENT_TO_BROKER, AwaitRestartSafe, Event, EventConfig, EventFirer, _LLMP_TAG_EVENT_TO_BROKER, AwaitRestartSafe, Event, EventConfig, EventFirer,
EventManagerHooksTuple, EventManagerId, EventReceiver, EventRestarter, HasEventManagerId, EventManagerHooksTuple, EventManagerId, EventReceiver, EventRestarter, EventWithStats,
LLMP_TAG_EVENT_TO_BOTH, LlmpShouldSaveState, ProgressReporter, SendExiting, HasEventManagerId, LLMP_TAG_EVENT_TO_BOTH, LlmpShouldSaveState, ProgressReporter,
StdLlmpEventHook, launcher::ClientDescription, std_maybe_report_progress, SendExiting, StdLlmpEventHook, launcher::ClientDescription, std_maybe_report_progress,
std_report_progress, std_report_progress,
}, },
inputs::Input, inputs::Input,
@ -121,7 +121,7 @@ where
SHM: ShMem, SHM: ShMem,
SP: ShMemProvider<ShMem = SHM>, SP: ShMemProvider<ShMem = SHM>,
{ {
fn fire(&mut self, _state: &mut S, event: Event<I>) -> Result<(), Error> { fn fire(&mut self, _state: &mut S, event: EventWithStats<I>) -> Result<(), Error> {
// Check if we are going to crash in the event, in which case we store our current state for the next runner // Check if we are going to crash in the event, in which case we store our current state for the next runner
#[cfg(feature = "llmp_compression")] #[cfg(feature = "llmp_compression")]
let flags = LLMP_FLAG_INITIALIZED; let flags = LLMP_FLAG_INITIALIZED;
@ -255,7 +255,7 @@ where
SHM: ShMem, SHM: ShMem,
SP: ShMemProvider<ShMem = SHM>, SP: ShMemProvider<ShMem = SHM>,
{ {
fn try_receive(&mut self, state: &mut S) -> Result<Option<(Event<I>, bool)>, Error> { fn try_receive(&mut self, state: &mut S) -> Result<Option<(EventWithStats<I>, bool)>, Error> {
// TODO: Get around local event copy by moving handle_in_client // TODO: Get around local event copy by moving handle_in_client
let self_id = self.llmp.sender().id(); let self_id = self.llmp.sender().id();
while let Some((client_id, tag, flags, msg)) = self.llmp.recv_buf_with_flags()? { while let Some((client_id, tag, flags, msg)) = self.llmp.recv_buf_with_flags()? {
@ -280,24 +280,31 @@ where
msg msg
}; };
let event: Event<I> = postcard::from_bytes(event_bytes)?; let event: EventWithStats<I> = postcard::from_bytes(event_bytes)?;
log::debug!("Received event in normal llmp {}", event.name_detailed()); log::debug!(
"Received event in normal llmp {}",
event.event().name_detailed()
);
// If the message comes from another machine, do not // If the message comes from another machine, do not
// consider other events than new testcase. // consider other events than new testcase.
if !event.is_new_testcase() && (flags & LLMP_FLAG_FROM_MM == LLMP_FLAG_FROM_MM) { if !event.event().is_new_testcase() && (flags & LLMP_FLAG_FROM_MM == LLMP_FLAG_FROM_MM)
{
continue; continue;
} }
log::trace!("Got event in client: {} from {client_id:?}", event.name()); log::trace!(
"Got event in client: {} from {client_id:?}",
event.event().name()
);
if !self.hooks.pre_receive_all(state, client_id, &event)? { if !self.hooks.pre_receive_all(state, client_id, &event)? {
continue; continue;
} }
let evt_name = event.name_detailed(); let evt_name = event.event().name_detailed();
match event { match event.event() {
Event::NewTestcase { Event::NewTestcase {
client_config, client_config,
ref observers_buf, observers_buf,
#[cfg(feature = "std")] #[cfg(feature = "std")]
forward_id, forward_id,
.. ..
@ -326,7 +333,7 @@ where
_ => { _ => {
return Err(Error::unknown(format!( return Err(Error::unknown(format!(
"Received illegal message that message should not have arrived: {:?}.", "Received illegal message that message should not have arrived: {:?}.",
event.name() event.event().name()
))); )));
} }
} }
@ -334,7 +341,11 @@ where
Ok(None) Ok(None)
} }
fn on_interesting(&mut self, _state: &mut S, _event_vec: Event<I>) -> Result<(), Error> { fn on_interesting(
&mut self,
_state: &mut S,
_event_vec: EventWithStats<I>,
) -> Result<(), Error> {
Ok(()) Ok(())
} }
} }

View File

@ -238,6 +238,64 @@ where
} }
*/ */
/// Basic statistics
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct ExecStats {
/// The time of generation of the [`Event`]
time: Duration,
/// The executions of this client
executions: u64,
}
impl ExecStats {
/// Create an new [`ExecStats`].
#[must_use]
pub fn new(time: Duration, executions: u64) -> Self {
Self { time, executions }
}
}
/// Event with associated stats
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct EventWithStats<I> {
/// The event
event: Event<I>,
/// Statistics on new event
stats: ExecStats,
}
impl<I> EventWithStats<I> {
/// Create a new [`EventWithStats`].
pub fn new(event: Event<I>, stats: ExecStats) -> Self {
Self { event, stats }
}
/// Create a new [`EventWithStats`], with the current time.
pub fn with_current_time(event: Event<I>, executions: u64) -> Self {
let time = current_time();
Self {
event,
stats: ExecStats { time, executions },
}
}
/// Get the inner ref to the [`Event`] in [`EventWithStats`].
pub fn event(&self) -> &Event<I> {
&self.event
}
/// Get the inner mutable ref to the [`Event`] in [`EventWithStats`].
pub fn event_mut(&mut self) -> &mut Event<I> {
&mut self.event
}
/// Get the inner ref to the [`ExecStats`] in [`EventWithStats`].
pub fn stats(&self) -> &ExecStats {
&self.stats
}
}
// TODO remove forward_id as not anymore needed for centralized // TODO remove forward_id as not anymore needed for centralized
/// Events sent around in the library /// Events sent around in the library
#[derive(Serialize, Deserialize, Clone, Debug)] #[derive(Serialize, Deserialize, Clone, Debug)]
@ -256,23 +314,14 @@ pub enum Event<I> {
corpus_size: usize, corpus_size: usize,
/// The client config for this observers/testcase combination /// The client config for this observers/testcase combination
client_config: EventConfig, client_config: EventConfig,
/// The time of generation of the event
time: Duration,
/// The original sender if, if forwarded /// The original sender if, if forwarded
forward_id: Option<libafl_bolts::ClientId>, forward_id: Option<libafl_bolts::ClientId>,
/// The (multi-machine) node from which the tc is from, if any /// The (multi-machine) node from which the tc is from, if any
#[cfg(all(unix, feature = "std", feature = "multi_machine"))] #[cfg(all(unix, feature = "std", feature = "multi_machine"))]
node_id: Option<NodeId>, node_id: Option<NodeId>,
}, },
/// New stats event to monitor. /// A hearbeat, to notice a fuzzer is still alive.
UpdateExecStats { Heartbeat,
/// The time of generation of the [`Event`]
time: Duration,
/// The executions of this client
executions: u64,
/// [`PhantomData`]
phantom: PhantomData<I>,
},
/// New user stats event to monitor. /// New user stats event to monitor.
UpdateUserStats { UpdateUserStats {
/// Custom user monitor name /// Custom user monitor name
@ -285,10 +334,6 @@ pub enum Event<I> {
/// New monitor with performance monitor. /// New monitor with performance monitor.
#[cfg(feature = "introspection")] #[cfg(feature = "introspection")]
UpdatePerfMonitor { UpdatePerfMonitor {
/// The time of generation of the event
time: Duration,
/// The executions of this client
executions: u64,
/// Current performance statistics /// Current performance statistics
introspection_stats: Box<ClientPerfStats>, introspection_stats: Box<ClientPerfStats>,
@ -301,8 +346,6 @@ pub enum Event<I> {
input: Option<I>, input: Option<I>,
/// Objective corpus size /// Objective corpus size
objective_size: usize, objective_size: usize,
/// The time when this event was created
time: Duration,
}, },
/// Write a new log /// Write a new log
Log { Log {
@ -327,7 +370,7 @@ impl<I> Event<I> {
pub fn name(&self) -> &str { pub fn name(&self) -> &str {
match self { match self {
Event::NewTestcase { .. } => "Testcase", Event::NewTestcase { .. } => "Testcase",
Event::UpdateExecStats { .. } => "Client Heartbeat", Event::Heartbeat => "Client Heartbeat",
Event::UpdateUserStats { .. } => "UserStats", Event::UpdateUserStats { .. } => "UserStats",
#[cfg(feature = "introspection")] #[cfg(feature = "introspection")]
Event::UpdatePerfMonitor { .. } => "PerfMonitor", Event::UpdatePerfMonitor { .. } => "PerfMonitor",
@ -349,7 +392,7 @@ impl<I> Event<I> {
Event::NewTestcase { input, .. } => { Event::NewTestcase { input, .. } => {
Cow::Owned(format!("Testcase {}", input.generate_name(None))) Cow::Owned(format!("Testcase {}", input.generate_name(None)))
} }
Event::UpdateExecStats { .. } => Cow::Borrowed("Client Heartbeat"), Event::Heartbeat => Cow::Borrowed("Client Heartbeat"),
Event::UpdateUserStats { .. } => Cow::Borrowed("UserStats"), Event::UpdateUserStats { .. } => Cow::Borrowed("UserStats"),
#[cfg(feature = "introspection")] #[cfg(feature = "introspection")]
Event::UpdatePerfMonitor { .. } => Cow::Borrowed("PerfMonitor"), Event::UpdatePerfMonitor { .. } => Cow::Borrowed("PerfMonitor"),
@ -378,7 +421,7 @@ pub trait EventFirer<I, S> {
/// (for example for each [`Input`], on multiple cores) /// (for example for each [`Input`], on multiple cores)
/// the [`llmp`] shared map may fill up and the client will eventually OOM or [`panic`]. /// the [`llmp`] shared map may fill up and the client will eventually OOM or [`panic`].
/// This should not happen for a normal use-case. /// This should not happen for a normal use-case.
fn fire(&mut self, state: &mut S, event: Event<I>) -> Result<(), Error>; fn fire(&mut self, state: &mut S, event: EventWithStats<I>) -> Result<(), Error>;
/// Send off an [`Event::Log`] event to the broker. /// Send off an [`Event::Log`] event to the broker.
/// This is a shortcut for [`EventFirer::fire`] with [`Event::Log`] as argument. /// This is a shortcut for [`EventFirer::fire`] with [`Event::Log`] as argument.
@ -387,13 +430,27 @@ pub trait EventFirer<I, S> {
state: &mut S, state: &mut S,
severity_level: LogSeverity, severity_level: LogSeverity,
message: String, message: String,
) -> Result<(), Error> { ) -> Result<(), Error>
where
S: HasExecutions,
{
let executions = *state.executions();
let cur = current_time();
let stats = ExecStats {
executions,
time: cur,
};
self.fire( self.fire(
state, state,
Event::Log { EventWithStats {
severity_level, event: Event::Log {
message, severity_level,
phantom: PhantomData, message,
phantom: PhantomData,
},
stats,
}, },
) )
} }
@ -442,14 +499,18 @@ where
let executions = *state.executions(); let executions = *state.executions();
let cur = current_time(); let cur = current_time();
let stats = ExecStats {
executions,
time: cur,
};
// Default no introspection implmentation // Default no introspection implmentation
#[cfg(not(feature = "introspection"))] #[cfg(not(feature = "introspection"))]
reporter.fire( reporter.fire(
state, state,
Event::UpdateExecStats { EventWithStats {
executions, event: Event::Heartbeat,
time: cur, stats,
phantom: PhantomData,
}, },
)?; )?;
@ -464,12 +525,13 @@ where
// costly as `ClientPerfStats` impls `Copy` since it only contains `u64`s // costly as `ClientPerfStats` impls `Copy` since it only contains `u64`s
reporter.fire( reporter.fire(
state, state,
Event::UpdatePerfMonitor { EventWithStats::new(
executions, Event::UpdatePerfMonitor {
time: cur, introspection_stats: Box::new(state.introspection_stats().clone()),
introspection_stats: Box::new(state.introspection_stats().clone()), phantom: PhantomData,
phantom: PhantomData, },
}, stats,
),
)?; )?;
} }
@ -538,11 +600,11 @@ pub trait AwaitRestartSafe {
pub trait EventReceiver<I, S> { pub trait EventReceiver<I, S> {
/// Lookup for incoming events and process them. /// Lookup for incoming events and process them.
/// Return the event, if any, that needs to be evaluated /// Return the event, if any, that needs to be evaluated
fn try_receive(&mut self, state: &mut S) -> Result<Option<(Event<I>, bool)>, Error>; fn try_receive(&mut self, state: &mut S) -> Result<Option<(EventWithStats<I>, bool)>, Error>;
/// Run the post processing routine after the fuzzer deemed this event as interesting /// Run the post processing routine after the fuzzer deemed this event as interesting
/// For example, in centralized manager you wanna send this an event. /// For example, in centralized manager you wanna send this an event.
fn on_interesting(&mut self, state: &mut S, event: Event<I>) -> Result<(), Error>; fn on_interesting(&mut self, state: &mut S, event: EventWithStats<I>) -> Result<(), Error>;
} }
/// The id of this `EventManager`. /// The id of this `EventManager`.
/// For multi processed `EventManagers`, /// For multi processed `EventManagers`,
@ -570,7 +632,7 @@ impl<I, S> EventFirer<I, S> for NopEventManager {
true true
} }
fn fire(&mut self, _state: &mut S, _event: Event<I>) -> Result<(), Error> { fn fire(&mut self, _state: &mut S, _event: EventWithStats<I>) -> Result<(), Error> {
Ok(()) Ok(())
} }
} }
@ -602,11 +664,15 @@ impl AwaitRestartSafe for NopEventManager {
} }
impl<I, S> EventReceiver<I, S> for NopEventManager { impl<I, S> EventReceiver<I, S> for NopEventManager {
fn try_receive(&mut self, _state: &mut S) -> Result<Option<(Event<I>, bool)>, Error> { fn try_receive(&mut self, _state: &mut S) -> Result<Option<(EventWithStats<I>, bool)>, Error> {
Ok(None) Ok(None)
} }
fn on_interesting(&mut self, _state: &mut S, _event_vec: Event<I>) -> Result<(), Error> { fn on_interesting(
&mut self,
_state: &mut S,
_event_vec: EventWithStats<I>,
) -> Result<(), Error> {
Ok(()) Ok(())
} }
} }
@ -659,7 +725,7 @@ where
} }
#[inline] #[inline]
fn fire(&mut self, state: &mut S, event: Event<I>) -> Result<(), Error> { fn fire(&mut self, state: &mut S, event: EventWithStats<I>) -> Result<(), Error> {
self.inner.fire(state, event) self.inner.fire(state, event)
} }
@ -669,7 +735,10 @@ where
state: &mut S, state: &mut S,
severity_level: LogSeverity, severity_level: LogSeverity,
message: String, message: String,
) -> Result<(), Error> { ) -> Result<(), Error>
where
S: HasExecutions,
{
self.inner.log(state, severity_level, message) self.inner.log(state, severity_level, message)
} }
@ -718,10 +787,14 @@ where
EM: EventReceiver<I, S>, EM: EventReceiver<I, S>,
{ {
#[inline] #[inline]
fn try_receive(&mut self, state: &mut S) -> Result<Option<(Event<I>, bool)>, Error> { fn try_receive(&mut self, state: &mut S) -> Result<Option<(EventWithStats<I>, bool)>, Error> {
self.inner.try_receive(state) self.inner.try_receive(state)
} }
fn on_interesting(&mut self, _state: &mut S, _event_vec: Event<I>) -> Result<(), Error> { fn on_interesting(
&mut self,
_state: &mut S,
_event_vec: EventWithStats<I>,
) -> Result<(), Error> {
Ok(()) Ok(())
} }
} }
@ -758,7 +831,7 @@ where
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use libafl_bolts::{Named, current_time, tuples::tuple_list}; use libafl_bolts::{Named, tuples::tuple_list};
use tuple_list::tuple_list_type; use tuple_list::tuple_list_type;
use crate::{ use crate::{
@ -787,7 +860,6 @@ mod tests {
exit_kind: ExitKind::Ok, exit_kind: ExitKind::Ok,
corpus_size: 123, corpus_size: 123,
client_config: EventConfig::AlwaysUnique, client_config: EventConfig::AlwaysUnique,
time: current_time(),
forward_id: None, forward_id: None,
#[cfg(all(unix, feature = "std", feature = "multi_machine"))] #[cfg(all(unix, feature = "std", feature = "multi_machine"))]
node_id: None, node_id: None,

View File

@ -22,7 +22,7 @@ use tokio::{
use typed_builder::TypedBuilder; use typed_builder::TypedBuilder;
use crate::{ use crate::{
events::{Event, TcpMultiMachineLlmpReceiverHook, TcpMultiMachineLlmpSenderHook}, events::{EventWithStats, TcpMultiMachineLlmpReceiverHook, TcpMultiMachineLlmpSenderHook},
inputs::{Input, NopInput}, inputs::{Input, NopInput},
}; };
@ -50,7 +50,7 @@ pub enum MultiMachineMsg<'a, I> {
LlmpMsg(OwnedRef<'a, [u8]>), LlmpMsg(OwnedRef<'a, [u8]>),
/// A `LibAFL` Event (already deserialized) /// A `LibAFL` Event (already deserialized)
Event(OwnedRef<'a, Event<I>>), Event(OwnedRef<'a, EventWithStats<I>>),
} }
/// We do not use raw pointers, so no problem with thead-safety /// We do not use raw pointers, so no problem with thead-safety
@ -65,7 +65,7 @@ impl<'a, I> MultiMachineMsg<'a, I> {
/// `OwnedRef` should **never** be a raw pointer for thread-safety reasons. /// `OwnedRef` should **never** be a raw pointer for thread-safety reasons.
/// We check this for debug builds, but not for release. /// We check this for debug builds, but not for release.
#[must_use] #[must_use]
pub unsafe fn event(event: OwnedRef<'a, Event<I>>) -> Self { pub unsafe fn event(event: OwnedRef<'a, EventWithStats<I>>) -> Self {
debug_assert!(!event.is_raw()); debug_assert!(!event.is_raw());
MultiMachineMsg::Event(event) MultiMachineMsg::Event(event)

View File

@ -23,7 +23,7 @@ use serde::Serialize;
#[cfg(feature = "std")] #[cfg(feature = "std")]
use serde::de::DeserializeOwned; use serde::de::DeserializeOwned;
use super::{AwaitRestartSafe, ProgressReporter, std_on_restart}; use super::{AwaitRestartSafe, EventWithStats, ProgressReporter, std_on_restart};
#[cfg(all(unix, feature = "std", not(miri)))] #[cfg(all(unix, feature = "std", not(miri)))]
use crate::events::EVENTMGR_SIGHANDLER_STATE; use crate::events::EVENTMGR_SIGHANDLER_STATE;
use crate::{ use crate::{
@ -54,7 +54,7 @@ pub struct SimpleEventManager<I, MT, S> {
/// The monitor /// The monitor
monitor: MT, monitor: MT,
/// The events that happened since the last `handle_in_broker` /// The events that happened since the last `handle_in_broker`
events: Vec<Event<I>>, events: Vec<EventWithStats<I>>,
phantom: PhantomData<S>, phantom: PhantomData<S>,
client_stats_manager: ClientStatsManager, client_stats_manager: ClientStatsManager,
} }
@ -83,7 +83,7 @@ where
true true
} }
fn fire(&mut self, _state: &mut S, event: Event<I>) -> Result<(), Error> { fn fire(&mut self, _state: &mut S, event: EventWithStats<I>) -> Result<(), Error> {
match Self::handle_in_broker(&mut self.monitor, &mut self.client_stats_manager, &event)? { match Self::handle_in_broker(&mut self.monitor, &mut self.client_stats_manager, &event)? {
BrokerEventResult::Forward => self.events.push(event), BrokerEventResult::Forward => self.events.push(event),
BrokerEventResult::Handled => (), BrokerEventResult::Handled => (),
@ -121,9 +121,9 @@ where
MT: Monitor, MT: Monitor,
S: Stoppable, S: Stoppable,
{ {
fn try_receive(&mut self, state: &mut S) -> Result<Option<(Event<I>, bool)>, Error> { fn try_receive(&mut self, state: &mut S) -> Result<Option<(EventWithStats<I>, bool)>, Error> {
while let Some(event) = self.events.pop() { while let Some(event) = self.events.pop() {
match event { match event.event() {
Event::Stop => { Event::Stop => {
state.request_stop(); state.request_stop();
} }
@ -136,7 +136,11 @@ where
} }
Ok(None) Ok(None)
} }
fn on_interesting(&mut self, _state: &mut S, _event_vec: Event<I>) -> Result<(), Error> { fn on_interesting(
&mut self,
_state: &mut S,
_event_vec: EventWithStats<I>,
) -> Result<(), Error> {
Ok(()) Ok(())
} }
} }
@ -200,8 +204,16 @@ where
fn handle_in_broker( fn handle_in_broker(
monitor: &mut MT, monitor: &mut MT,
client_stats_manager: &mut ClientStatsManager, client_stats_manager: &mut ClientStatsManager,
event: &Event<I>, event: &EventWithStats<I>,
) -> Result<BrokerEventResult, Error> { ) -> Result<BrokerEventResult, Error> {
let stats = event.stats();
client_stats_manager.client_stats_insert(ClientId(0));
client_stats_manager.update_client_stats_for(ClientId(0), |client_stat| {
client_stat.update_executions(stats.executions, stats.time);
});
let event = event.event();
match event { match event {
Event::NewTestcase { corpus_size, .. } => { Event::NewTestcase { corpus_size, .. } => {
client_stats_manager.client_stats_insert(ClientId(0)); client_stats_manager.client_stats_insert(ClientId(0));
@ -211,15 +223,7 @@ where
monitor.display(client_stats_manager, event.name(), ClientId(0)); monitor.display(client_stats_manager, event.name(), ClientId(0));
Ok(BrokerEventResult::Handled) Ok(BrokerEventResult::Handled)
} }
Event::UpdateExecStats { Event::Heartbeat => {
time, executions, ..
} => {
// TODO: The monitor buffer should be added on client add.
client_stats_manager.client_stats_insert(ClientId(0));
client_stats_manager.update_client_stats_for(ClientId(0), |client_stat| {
client_stat.update_executions(*executions, *time);
});
monitor.display(client_stats_manager, event.name(), ClientId(0)); monitor.display(client_stats_manager, event.name(), ClientId(0));
Ok(BrokerEventResult::Handled) Ok(BrokerEventResult::Handled)
} }
@ -234,15 +238,11 @@ where
} }
#[cfg(feature = "introspection")] #[cfg(feature = "introspection")]
Event::UpdatePerfMonitor { Event::UpdatePerfMonitor {
time,
executions,
introspection_stats, introspection_stats,
.. ..
} => { } => {
// TODO: The monitor buffer should be added on client add. // TODO: The monitor buffer should be added on client add.
client_stats_manager.client_stats_insert(ClientId(0));
client_stats_manager.update_client_stats_for(ClientId(0), |client_stat| { client_stats_manager.update_client_stats_for(ClientId(0), |client_stat| {
client_stat.update_executions(*executions, *time);
client_stat.update_introspection_stats((**introspection_stats).clone()); client_stat.update_introspection_stats((**introspection_stats).clone());
}); });
monitor.display(client_stats_manager, event.name(), ClientId(0)); monitor.display(client_stats_manager, event.name(), ClientId(0));
@ -295,7 +295,7 @@ where
true true
} }
fn fire(&mut self, _state: &mut S, event: Event<I>) -> Result<(), Error> { fn fire(&mut self, _state: &mut S, event: EventWithStats<I>) -> Result<(), Error> {
self.inner.fire(_state, event) self.inner.fire(_state, event)
} }
} }
@ -354,11 +354,15 @@ where
SHM: ShMem, SHM: ShMem,
SP: ShMemProvider<ShMem = SHM>, SP: ShMemProvider<ShMem = SHM>,
{ {
fn try_receive(&mut self, state: &mut S) -> Result<Option<(Event<I>, bool)>, Error> { fn try_receive(&mut self, state: &mut S) -> Result<Option<(EventWithStats<I>, bool)>, Error> {
self.inner.try_receive(state) self.inner.try_receive(state)
} }
fn on_interesting(&mut self, _state: &mut S, _event_vec: Event<I>) -> Result<(), Error> { fn on_interesting(
&mut self,
_state: &mut S,
_event_vec: EventWithStats<I>,
) -> Result<(), Error> {
Ok(()) Ok(())
} }
} }

View File

@ -45,7 +45,8 @@ use crate::{
Error, HasMetadata, Error, HasMetadata,
events::{ events::{
BrokerEventResult, Event, EventConfig, EventFirer, EventManagerHooksTuple, EventManagerId, BrokerEventResult, Event, EventConfig, EventFirer, EventManagerHooksTuple, EventManagerId,
EventReceiver, EventRestarter, HasEventManagerId, ProgressReporter, std_on_restart, EventReceiver, EventRestarter, EventWithStats, HasEventManagerId, ProgressReporter,
std_on_restart,
}, },
inputs::Input, inputs::Input,
monitors::{Monitor, stats::ClientStatsManager}, monitors::{Monitor, stats::ClientStatsManager},
@ -292,7 +293,7 @@ where
#[cfg(feature = "tcp_compression")] #[cfg(feature = "tcp_compression")]
let event_bytes = &GzipCompressor::new().decompress(event_bytes)?; let event_bytes = &GzipCompressor::new().decompress(event_bytes)?;
let event: Event<I> = postcard::from_bytes(event_bytes)?; let event: EventWithStats<I> = postcard::from_bytes(event_bytes)?;
match Self::handle_in_broker( match Self::handle_in_broker(
&mut self.monitor, &mut self.monitor,
&mut self.client_stats_manager, &mut self.client_stats_manager,
@ -321,9 +322,17 @@ where
monitor: &mut MT, monitor: &mut MT,
client_stats_manager: &mut ClientStatsManager, client_stats_manager: &mut ClientStatsManager,
client_id: ClientId, client_id: ClientId,
event: &Event<I>, event: &EventWithStats<I>,
) -> Result<BrokerEventResult, Error> { ) -> Result<BrokerEventResult, Error> {
match &event { let stats = event.stats();
client_stats_manager.client_stats_insert(ClientId(0));
client_stats_manager.update_client_stats_for(ClientId(0), |client_stat| {
client_stat.update_executions(stats.executions, stats.time);
});
let event = event.event();
match event {
Event::NewTestcase { Event::NewTestcase {
corpus_size, corpus_size,
forward_id, forward_id,
@ -341,16 +350,7 @@ where
monitor.display(client_stats_manager, event.name(), id); monitor.display(client_stats_manager, event.name(), id);
Ok(BrokerEventResult::Forward) Ok(BrokerEventResult::Forward)
} }
Event::UpdateExecStats { Event::Heartbeat => {
time,
executions,
phantom: _,
} => {
// TODO: The monitor buffer should be added on client add.
client_stats_manager.client_stats_insert(client_id);
client_stats_manager.update_client_stats_for(client_id, |client| {
client.update_executions(*executions, *time);
});
monitor.display(client_stats_manager, event.name(), client_id); monitor.display(client_stats_manager, event.name(), client_id);
Ok(BrokerEventResult::Handled) Ok(BrokerEventResult::Handled)
} }
@ -369,8 +369,6 @@ where
} }
#[cfg(feature = "introspection")] #[cfg(feature = "introspection")]
Event::UpdatePerfMonitor { Event::UpdatePerfMonitor {
time,
executions,
introspection_stats, introspection_stats,
phantom: _, phantom: _,
} => { } => {
@ -379,8 +377,6 @@ where
// Get the client for the staterestorer ID // Get the client for the staterestorer ID
client_stats_manager.client_stats_insert(client_id); client_stats_manager.client_stats_insert(client_id);
client_stats_manager.update_client_stats_for(client_id, |client| { client_stats_manager.update_client_stats_for(client_id, |client| {
// Update the normal monitor for this client
client.update_executions(*executions, *time);
// Update the performance monitor for this client // Update the performance monitor for this client
client.update_introspection_stats((**introspection_stats).clone()); client.update_introspection_stats((**introspection_stats).clone());
}); });
@ -608,7 +604,7 @@ where
} }
} }
fn fire(&mut self, _state: &mut S, event: Event<I>) -> Result<(), Error> { fn fire(&mut self, _state: &mut S, event: EventWithStats<I>) -> Result<(), Error> {
let serialized = postcard::to_allocvec(&event)?; let serialized = postcard::to_allocvec(&event)?;
#[cfg(feature = "tcp_compression")] #[cfg(feature = "tcp_compression")]
@ -648,7 +644,7 @@ where
+ Stoppable, + Stoppable,
I: DeserializeOwned, I: DeserializeOwned,
{ {
fn try_receive(&mut self, state: &mut S) -> Result<Option<(Event<I>, bool)>, Error> { fn try_receive(&mut self, state: &mut S) -> Result<Option<(EventWithStats<I>, bool)>, Error> {
// TODO: Get around local event copy by moving handle_in_client // TODO: Get around local event copy by moving handle_in_client
let self_id = self.client_id; let self_id = self.client_id;
let mut len_buf = [0_u8; 4]; let mut len_buf = [0_u8; 4];
@ -678,15 +674,15 @@ where
let buf = &self.compressor.decompress(buf)?; let buf = &self.compressor.decompress(buf)?;
// make decompressed vec and slice compatible // make decompressed vec and slice compatible
let event = postcard::from_bytes(buf)?; let event: EventWithStats<I> = postcard::from_bytes(buf)?;
if !self.hooks.pre_receive_all(state, other_client_id, &event)? { if !self.hooks.pre_receive_all(state, other_client_id, &event)? {
continue; continue;
} }
match event { match event.event() {
Event::NewTestcase { Event::NewTestcase {
client_config, client_config,
ref observers_buf, observers_buf,
forward_id, forward_id,
.. ..
} => { } => {
@ -710,7 +706,7 @@ where
_ => { _ => {
return Err(Error::unknown(format!( return Err(Error::unknown(format!(
"Received illegal message that message should not have arrived: {:?}.", "Received illegal message that message should not have arrived: {:?}.",
event.name() event.event().name()
))); )));
} }
} }
@ -729,7 +725,7 @@ where
Ok(None) Ok(None)
} }
fn on_interesting(&mut self, _state: &mut S, _event: Event<I>) -> Result<(), Error> { fn on_interesting(&mut self, _state: &mut S, _event: EventWithStats<I>) -> Result<(), Error> {
Ok(()) Ok(())
} }
} }
@ -820,7 +816,7 @@ where
self.tcp_mgr.should_send() self.tcp_mgr.should_send()
} }
fn fire(&mut self, state: &mut S, event: Event<I>) -> Result<(), Error> { fn fire(&mut self, state: &mut S, event: EventWithStats<I>) -> Result<(), Error> {
// Check if we are going to crash in the event, in which case we store our current state for the next runner // Check if we are going to crash in the event, in which case we store our current state for the next runner
self.tcp_mgr.fire(state, event) self.tcp_mgr.fire(state, event)
} }
@ -896,11 +892,11 @@ where
SHM: ShMem, SHM: ShMem,
SP: ShMemProvider<ShMem = SHM>, SP: ShMemProvider<ShMem = SHM>,
{ {
fn try_receive(&mut self, state: &mut S) -> Result<Option<(Event<I>, bool)>, Error> { fn try_receive(&mut self, state: &mut S) -> Result<Option<(EventWithStats<I>, bool)>, Error> {
self.tcp_mgr.try_receive(state) self.tcp_mgr.try_receive(state)
} }
fn on_interesting(&mut self, state: &mut S, event: Event<I>) -> Result<(), Error> { fn on_interesting(&mut self, state: &mut S, event: EventWithStats<I>) -> Result<(), Error> {
self.tcp_mgr.on_interesting(state, event) self.tcp_mgr.on_interesting(state, event)
} }
} }

View File

@ -17,7 +17,7 @@ use libafl_bolts::tuples::{RefIndexable, tuple_list};
use crate::{ use crate::{
Error, HasMetadata, Error, HasMetadata,
corpus::{Corpus, Testcase}, corpus::{Corpus, Testcase},
events::{Event, EventFirer, EventRestarter}, events::{Event, EventFirer, EventRestarter, EventWithStats},
executors::{ executors::{
Executor, ExitKind, HasObservers, Executor, ExitKind, HasObservers,
hooks::{ExecutorHooksTuple, inprocess::InProcessHooks}, hooks::{ExecutorHooksTuple, inprocess::InProcessHooks},
@ -360,14 +360,16 @@ pub fn run_observers_and_save_state<E, EM, I, OF, S, Z>(
.solutions_mut() .solutions_mut()
.add(new_testcase) .add(new_testcase)
.expect("In run_observers_and_save_state solutions failure."); .expect("In run_observers_and_save_state solutions failure.");
let event = Event::Objective {
input: fuzzer.share_objectives().then_some(input.clone()),
objective_size: state.solutions().count(),
};
event_mgr event_mgr
.fire( .fire(
state, state,
Event::Objective { EventWithStats::with_current_time(event, *state.executions()),
input: fuzzer.share_objectives().then_some(input.clone()),
objective_size: state.solutions().count(),
time: libafl_bolts::current_time(),
},
) )
.expect("Could not send off events in run_observers_and_save_state"); .expect("Could not send off events in run_observers_and_save_state");
} }

View File

@ -23,11 +23,12 @@ use crate::feedbacks::premature_last_result_err;
use crate::{ use crate::{
Error, HasMetadata, HasNamedMetadata, Error, HasMetadata, HasNamedMetadata,
corpus::Testcase, corpus::Testcase,
events::{Event, EventFirer}, events::{Event, EventFirer, EventWithStats},
executors::ExitKind, executors::ExitKind,
feedbacks::{Feedback, HasObserverHandle, StateInitializer}, feedbacks::{Feedback, HasObserverHandle, StateInitializer},
monitors::stats::{AggregatorOps, UserStats, UserStatsValue}, monitors::stats::{AggregatorOps, UserStats, UserStatsValue},
observers::{CanTrack, MapObserver}, observers::{CanTrack, MapObserver},
state::HasExecutions,
}; };
/// A [`MapFeedback`] that implements the AFL algorithm using an [`OrReducer`] combining the bits for the history map and the bit from (`HitcountsMapObserver`)[`crate::observers::HitcountsMapObserver`]. /// A [`MapFeedback`] that implements the AFL algorithm using an [`OrReducer`] combining the bits for the history map and the bit from (`HitcountsMapObserver`)[`crate::observers::HitcountsMapObserver`].
@ -390,7 +391,7 @@ where
O::Entry: 'static + Default + Debug + DeserializeOwned + Serialize, O::Entry: 'static + Default + Debug + DeserializeOwned + Serialize,
OT: MatchName, OT: MatchName,
R: Reducer<O::Entry>, R: Reducer<O::Entry>,
S: HasNamedMetadata, S: HasNamedMetadata + HasExecutions,
{ {
#[rustversion::nightly] #[rustversion::nightly]
default fn is_interesting( default fn is_interesting(
@ -512,14 +513,17 @@ where
// unnecessarily // unnecessarily
manager.fire( manager.fire(
state, state,
Event::UpdateUserStats { EventWithStats::with_current_time(
name: self.stats_name.clone(), Event::UpdateUserStats {
value: UserStats::new( name: self.stats_name.clone(),
UserStatsValue::Ratio(covered as u64, len as u64), value: UserStats::new(
AggregatorOps::Avg, UserStatsValue::Ratio(covered as u64, len as u64),
), AggregatorOps::Avg,
phantom: PhantomData, ),
}, phantom: PhantomData,
},
*state.executions(),
),
)?; )?;
Ok(()) Ok(())
@ -534,7 +538,7 @@ where
EM: EventFirer<I, S>, EM: EventFirer<I, S>,
O: MapObserver<Entry = u8> + for<'a> AsSlice<'a, Entry = u8> + for<'a> AsIter<'a, Item = u8>, O: MapObserver<Entry = u8> + for<'a> AsSlice<'a, Entry = u8> + for<'a> AsIter<'a, Item = u8>,
OT: MatchName, OT: MatchName,
S: HasNamedMetadata, S: HasNamedMetadata + HasExecutions,
{ {
fn is_interesting( fn is_interesting(
&mut self, &mut self,

View File

@ -15,7 +15,10 @@ use crate::monitors::stats::PerfFeature;
use crate::{ use crate::{
Error, HasMetadata, Error, HasMetadata,
corpus::{Corpus, CorpusId, HasCurrentCorpusId, HasTestcase, Testcase}, corpus::{Corpus, CorpusId, HasCurrentCorpusId, HasTestcase, Testcase},
events::{Event, EventConfig, EventFirer, EventReceiver, ProgressReporter, SendExiting}, events::{
Event, EventConfig, EventFirer, EventReceiver, EventWithStats, ProgressReporter,
SendExiting,
},
executors::{Executor, ExitKind, HasObservers}, executors::{Executor, ExitKind, HasObservers},
feedbacks::Feedback, feedbacks::Feedback,
inputs::Input, inputs::Input,
@ -359,7 +362,8 @@ where
+ MaybeHasClientPerfMonitor + MaybeHasClientPerfMonitor
+ HasCurrentTestcase<I> + HasCurrentTestcase<I>
+ HasSolutions<I> + HasSolutions<I>
+ HasLastFoundTime, + HasLastFoundTime
+ HasExecutions,
{ {
fn check_results( fn check_results(
&mut self, &mut self,
@ -484,27 +488,32 @@ where
if exec_res.is_corpus() { if exec_res.is_corpus() {
manager.fire( manager.fire(
state, state,
Event::NewTestcase { EventWithStats::with_current_time(
input: input.clone(), Event::NewTestcase {
observers_buf, input: input.clone(),
exit_kind: *exit_kind, observers_buf,
corpus_size: state.corpus().count(), exit_kind: *exit_kind,
client_config: manager.configuration(), corpus_size: state.corpus().count(),
time: current_time(), client_config: manager.configuration(),
forward_id: None, forward_id: None,
#[cfg(all(unix, feature = "std", feature = "multi_machine"))] #[cfg(all(unix, feature = "std", feature = "multi_machine"))]
node_id: None, node_id: None,
}, },
*state.executions(),
),
)?; )?;
} }
if exec_res.is_solution() { if exec_res.is_solution() {
manager.fire( manager.fire(
state, state,
Event::Objective { EventWithStats::with_current_time(
input: self.share_objectives.then_some(input.clone()), Event::Objective {
objective_size: state.solutions().count(), input: self.share_objectives.then_some(input.clone()),
time: current_time(), objective_size: state.solutions().count(),
}, },
*state.executions(),
),
)?; )?;
} }
} }
@ -693,11 +702,13 @@ where
manager.fire( manager.fire(
state, state,
Event::Objective { EventWithStats::with_current_time(
input: self.share_objectives.then_some(input.clone()), Event::Objective {
objective_size: state.solutions().count(), input: self.share_objectives.then_some(input.clone()),
time: current_time(), objective_size: state.solutions().count(),
}, },
*state.executions(),
),
)?; )?;
} }
@ -733,17 +744,19 @@ where
}; };
manager.fire( manager.fire(
state, state,
Event::NewTestcase { EventWithStats::with_current_time(
input, Event::NewTestcase {
observers_buf, input,
exit_kind, observers_buf,
corpus_size: state.corpus().count(), exit_kind,
client_config: manager.configuration(), corpus_size: state.corpus().count(),
time: current_time(), client_config: manager.configuration(),
forward_id: None, forward_id: None,
#[cfg(all(unix, feature = "std", feature = "multi_machine"))] #[cfg(all(unix, feature = "std", feature = "multi_machine"))]
node_id: None, node_id: None,
}, },
*state.executions(),
),
)?; )?;
Ok((id, ExecuteInputResult::new(corpus_worthy, is_solution))) Ok((id, ExecuteInputResult::new(corpus_worthy, is_solution)))
} }
@ -785,32 +798,32 @@ where
while let Some((event, with_observers)) = manager.try_receive(state)? { while let Some((event, with_observers)) = manager.try_receive(state)? {
// at this point event is either newtestcase or objectives // at this point event is either newtestcase or objectives
let res = if with_observers { let res = if with_observers {
match event { match event.event() {
Event::NewTestcase { Event::NewTestcase {
ref input, input,
ref observers_buf, observers_buf,
exit_kind, exit_kind,
.. ..
} => { } => {
let observers: E::Observers = let observers: E::Observers =
postcard::from_bytes(observers_buf.as_ref().unwrap())?; postcard::from_bytes(observers_buf.as_ref().unwrap())?;
let res = self.evaluate_execution( let res = self.evaluate_execution(
state, manager, input, &observers, &exit_kind, false, state, manager, input, &observers, exit_kind, false,
)?; )?;
res.1 res.1
} }
_ => None, _ => None,
} }
} else { } else {
match event { match event.event() {
Event::NewTestcase { ref input, .. } => { Event::NewTestcase { input, .. } => {
let res = self.evaluate_input_with_observers( let res = self.evaluate_input_with_observers(
state, executor, manager, input, false, state, executor, manager, input, false,
)?; )?;
res.1 res.1
} }
Event::Objective { Event::Objective {
input: Some(ref unwrapped_input), input: Some(unwrapped_input),
.. ..
} => { } => {
let res = self.evaluate_input_with_observers( let res = self.evaluate_input_with_observers(

View File

@ -23,7 +23,7 @@ use crate::feedbacks::{CRASH_FEEDBACK_NAME, TIMEOUT_FEEDBACK_NAME};
use crate::{ use crate::{
Error, HasMetadata, HasNamedMetadata, HasScheduler, Error, HasMetadata, HasNamedMetadata, HasScheduler,
corpus::{Corpus, HasCurrentCorpusId, SchedulerTestcaseMetadata, Testcase}, corpus::{Corpus, HasCurrentCorpusId, SchedulerTestcaseMetadata, Testcase},
events::{Event, EventFirer}, events::{Event, EventFirer, EventWithStats},
executors::HasObservers, executors::HasObservers,
monitors::stats::{AggregatorOps, UserStats, UserStatsValue}, monitors::stats::{AggregatorOps, UserStats, UserStatsValue},
mutators::Tokens, mutators::Tokens,
@ -418,14 +418,17 @@ where
manager.fire( manager.fire(
state, state,
Event::UpdateUserStats { EventWithStats::with_current_time(
name: Cow::Borrowed("AflStats"), Event::UpdateUserStats {
value: UserStats::new( name: Cow::Borrowed("AflStats"),
UserStatsValue::String(Cow::Owned(json)), value: UserStats::new(
AggregatorOps::None, UserStatsValue::String(Cow::Owned(json)),
), AggregatorOps::None,
phantom: PhantomData, ),
}, phantom: PhantomData,
},
*state.executions(),
),
)?; )?;
Ok(()) Ok(())

View File

@ -15,7 +15,7 @@ use serde::{Deserialize, Serialize};
use crate::{ use crate::{
Error, HasMetadata, HasNamedMetadata, Error, HasMetadata, HasNamedMetadata,
corpus::{Corpus, HasCurrentCorpusId, SchedulerTestcaseMetadata}, corpus::{Corpus, HasCurrentCorpusId, SchedulerTestcaseMetadata},
events::{Event, EventFirer, LogSeverity}, events::{Event, EventFirer, EventWithStats, LogSeverity},
executors::{Executor, ExitKind, HasObservers}, executors::{Executor, ExitKind, HasObservers},
feedbacks::{HasObserverHandle, map::MapFeedbackMetadata}, feedbacks::{HasObserverHandle, map::MapFeedbackMetadata},
fuzzer::Evaluator, fuzzer::Evaluator,
@ -335,30 +335,36 @@ where
map_first_filled_count.saturating_sub(unstable_entries) as u64; map_first_filled_count.saturating_sub(unstable_entries) as u64;
mgr.fire( mgr.fire(
state, state,
Event::UpdateUserStats { EventWithStats::with_current_time(
name: Cow::from("stability"), Event::UpdateUserStats {
value: UserStats::new( name: Cow::from("stability"),
UserStatsValue::Ratio(stable_count, map_first_filled_count as u64), value: UserStats::new(
AggregatorOps::Avg, UserStatsValue::Ratio(stable_count, map_first_filled_count as u64),
), AggregatorOps::Avg,
phantom: PhantomData, ),
}, phantom: PhantomData,
},
*state.executions(),
),
)?; )?;
} }
} else if send_default_stability { } else if send_default_stability {
mgr.fire( mgr.fire(
state, state,
Event::UpdateUserStats { EventWithStats::with_current_time(
name: Cow::from("stability"), Event::UpdateUserStats {
value: UserStats::new( name: Cow::from("stability"),
UserStatsValue::Ratio( value: UserStats::new(
map_first_filled_count as u64, UserStatsValue::Ratio(
map_first_filled_count as u64, map_first_filled_count as u64,
map_first_filled_count as u64,
),
AggregatorOps::Avg,
), ),
AggregatorOps::Avg, phantom: PhantomData,
), },
phantom: PhantomData, *state.executions(),
}, ),
)?; )?;
} }

View File

@ -17,7 +17,7 @@ use serde::{Deserialize, Serialize};
use crate::{ use crate::{
Error, HasMetadata, HasNamedMetadata, Error, HasMetadata, HasNamedMetadata,
corpus::{Corpus, CorpusId, HasCurrentCorpusId}, corpus::{Corpus, CorpusId, HasCurrentCorpusId},
events::{Event, EventConfig, EventFirer, llmp::LlmpEventConverter}, events::{Event, EventConfig, EventFirer, EventWithStats, llmp::LlmpEventConverter},
executors::{Executor, ExitKind, HasObservers}, executors::{Executor, ExitKind, HasObservers},
fuzzer::{Evaluator, EvaluatorObservers, ExecutionProcessor, HasObjective}, fuzzer::{Evaluator, EvaluatorObservers, ExecutionProcessor, HasObjective},
inputs::{Input, InputConverter}, inputs::{Input, InputConverter},
@ -272,17 +272,19 @@ where
self.client.fire( self.client.fire(
state, state,
Event::NewTestcase { EventWithStats::with_current_time(
input, Event::NewTestcase {
observers_buf: None, input,
exit_kind: ExitKind::Ok, observers_buf: None,
corpus_size: 0, // TODO choose if sending 0 or the actual real value exit_kind: ExitKind::Ok,
client_config: EventConfig::AlwaysUnique, corpus_size: 0, // TODO choose if sending 0 or the actual real value
time: current_time(), client_config: EventConfig::AlwaysUnique,
forward_id: None, forward_id: None,
#[cfg(all(unix, feature = "multi_machine"))] #[cfg(all(unix, feature = "multi_machine"))]
node_id: None, node_id: None,
}, },
*state.executions(),
),
)?; )?;
cur_id = state.corpus().next(id); cur_id = state.corpus().next(id);

View File

@ -33,7 +33,7 @@ use crate::monitors::stats::ClientPerfStats;
use crate::{ use crate::{
Error, HasMetadata, HasNamedMetadata, Error, HasMetadata, HasNamedMetadata,
corpus::{Corpus, CorpusId, HasCurrentCorpusId, HasTestcase, InMemoryCorpus, Testcase}, corpus::{Corpus, CorpusId, HasCurrentCorpusId, HasTestcase, InMemoryCorpus, Testcase},
events::{Event, EventFirer, LogSeverity}, events::{Event, EventFirer, EventWithStats, LogSeverity},
feedbacks::StateInitializer, feedbacks::StateInitializer,
fuzzer::Evaluator, fuzzer::Evaluator,
generators::Generator, generators::Generator,
@ -764,11 +764,14 @@ where
manager.fire( manager.fire(
self, self,
Event::Log { EventWithStats::with_current_time(
severity_level: LogSeverity::Debug, Event::Log {
message: format!("Loaded {} initial testcases.", self.corpus().count()), // get corpus count severity_level: LogSeverity::Debug,
phantom: PhantomData::<I>, message: format!("Loaded {} initial testcases.", self.corpus().count()), // get corpus count
}, phantom: PhantomData::<I>,
},
*self.executions(),
),
)?; )?;
Ok(()) Ok(())
} }
@ -1062,11 +1065,14 @@ where
} }
manager.fire( manager.fire(
self, self,
Event::Log { EventWithStats::with_current_time(
severity_level: LogSeverity::Debug, Event::Log {
message: format!("Loaded {added} over {num} initial testcases"), severity_level: LogSeverity::Debug,
phantom: PhantomData, message: format!("Loaded {added} over {num} initial testcases"),
}, phantom: PhantomData,
},
*self.executions(),
),
)?; )?;
Ok(()) Ok(())
} }