Add statistics. Move client stats from Monitor to EventManager (#2940)

* Add statistics entity. Move client stats from Monitor to EventManager

* Fix warning in no_std

* Make rustfmt happy

* Fix more

* Fix with feature tcp_manager on

* Rename more introspection monitor; Remove unnecessary client_stats

* Fix unused import for no_std

* Fix unused import for prometheus_monitor feature on

* Cleanup docs
This commit is contained in:
EvianZhang 2025-02-06 23:58:24 +08:00 committed by GitHub
parent 0573bbb159
commit ab50afe8e4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
33 changed files with 1477 additions and 1485 deletions

View File

@ -3,7 +3,10 @@
use std::{thread::sleep, time::Duration}; use std::{thread::sleep, time::Duration};
use libafl::monitors::{tui::TuiMonitor, ClientStats, Monitor}; use libafl::{
monitors::{tui::TuiMonitor, Monitor},
statistics::{manager::ClientStatsManager, ClientStats},
};
use libafl_bolts::ClientId; use libafl_bolts::ClientId;
pub fn main() { pub fn main() {
@ -14,7 +17,8 @@ pub fn main() {
executions: 512, executions: 512,
..ClientStats::default() ..ClientStats::default()
}; };
let mut client_stats_manager = ClientStatsManager::default();
monitor.display("Test", ClientId(0)); monitor.display(&mut client_stats_manager, "Test", ClientId(0));
sleep(Duration::from_secs(10)); sleep(Duration::from_secs(10));
} }

View File

@ -18,10 +18,10 @@ use crate::{
events::{Event, EventFirer, LogSeverity}, events::{Event, EventFirer, LogSeverity},
executors::{Executor, HasObservers}, executors::{Executor, HasObservers},
inputs::Input, inputs::Input,
monitors::{AggregatorOps, UserStats, UserStatsValue},
observers::{MapObserver, ObserversTuple}, observers::{MapObserver, ObserversTuple},
schedulers::{LenTimeMulTestcaseScore, RemovableScheduler, Scheduler, TestcaseScore}, schedulers::{LenTimeMulTestcaseScore, RemovableScheduler, Scheduler, TestcaseScore},
state::{HasCorpus, HasExecutions}, state::{HasCorpus, HasExecutions},
statistics::user_stats::{AggregatorOps, UserStats, UserStatsValue},
Error, HasMetadata, HasScheduler, Error, HasMetadata, HasScheduler,
}; };

View File

@ -15,6 +15,7 @@ use crate::events::llmp::COMPRESS_THRESHOLD;
use crate::{ use crate::{
events::{llmp::LLMP_TAG_EVENT_TO_BOTH, BrokerEventResult, Event}, events::{llmp::LLMP_TAG_EVENT_TO_BOTH, BrokerEventResult, Event},
monitors::Monitor, monitors::Monitor,
statistics::manager::ClientStatsManager,
Error, Error,
}; };
@ -37,6 +38,7 @@ pub struct StdLlmpEventHook<I, MT> {
#[cfg(feature = "llmp_compression")] #[cfg(feature = "llmp_compression")]
compressor: GzipCompressor, compressor: GzipCompressor,
phantom: PhantomData<I>, phantom: PhantomData<I>,
client_stats_manager: ClientStatsManager,
} }
impl<I, MT, SHM, SP> LlmpHook<SHM, SP> for StdLlmpEventHook<I, MT> impl<I, MT, SHM, SP> LlmpHook<SHM, SP> for StdLlmpEventHook<I, MT>
@ -71,7 +73,12 @@ where
&*msg &*msg
}; };
let event: Event<I> = postcard::from_bytes(event_bytes)?; let event: Event<I> = postcard::from_bytes(event_bytes)?;
match Self::handle_in_broker(monitor, client_id, &event)? { match Self::handle_in_broker(
monitor,
&mut self.client_stats_manager,
client_id,
&event,
)? {
BrokerEventResult::Forward => Ok(LlmpMsgHookResult::ForwardToClients), BrokerEventResult::Forward => Ok(LlmpMsgHookResult::ForwardToClients),
BrokerEventResult::Handled => Ok(LlmpMsgHookResult::Handled), BrokerEventResult::Handled => Ok(LlmpMsgHookResult::Handled),
} }
@ -81,7 +88,11 @@ where
} }
fn on_timeout(&mut self) -> Result<(), Error> { fn on_timeout(&mut self) -> Result<(), Error> {
self.monitor.display("Broker Heartbeat", ClientId(0)); self.monitor.display(
&mut self.client_stats_manager,
"Broker Heartbeat",
ClientId(0),
);
Ok(()) Ok(())
} }
} }
@ -96,6 +107,7 @@ where
monitor, monitor,
#[cfg(feature = "llmp_compression")] #[cfg(feature = "llmp_compression")]
compressor: GzipCompressor::with_threshold(COMPRESS_THRESHOLD), compressor: GzipCompressor::with_threshold(COMPRESS_THRESHOLD),
client_stats_manager: ClientStatsManager::default(),
phantom: PhantomData, phantom: PhantomData,
}) })
} }
@ -104,6 +116,7 @@ where
#[expect(clippy::unnecessary_wraps)] #[expect(clippy::unnecessary_wraps)]
fn handle_in_broker( fn handle_in_broker(
monitor: &mut MT, monitor: &mut MT,
client_stats_manager: &mut ClientStatsManager,
client_id: ClientId, client_id: ClientId,
event: &Event<I>, event: &Event<I>,
) -> Result<BrokerEventResult, Error> { ) -> Result<BrokerEventResult, Error> {
@ -119,11 +132,11 @@ where
client_id client_id
}; };
monitor.client_stats_insert(id); client_stats_manager.client_stats_insert(id);
monitor.update_client_stats_for(id, |client_stat| { client_stats_manager.update_client_stats_for(id, |client_stat| {
client_stat.update_corpus_size(*corpus_size as u64); client_stat.update_corpus_size(*corpus_size as u64);
}); });
monitor.display(event.name(), id); monitor.display(client_stats_manager, event.name(), id);
Ok(BrokerEventResult::Forward) Ok(BrokerEventResult::Forward)
} }
Event::UpdateExecStats { Event::UpdateExecStats {
@ -132,56 +145,52 @@ where
phantom: _, phantom: _,
} => { } => {
// TODO: The monitor buffer should be added on client add. // TODO: The monitor buffer should be added on client add.
monitor.client_stats_insert(client_id); client_stats_manager.client_stats_insert(client_id);
monitor.update_client_stats_for(client_id, |client_stat| { client_stats_manager.update_client_stats_for(client_id, |client_stat| {
client_stat.update_executions(*executions, *time); client_stat.update_executions(*executions, *time);
}); });
monitor.display(event.name(), client_id); monitor.display(client_stats_manager, event.name(), client_id);
Ok(BrokerEventResult::Handled) Ok(BrokerEventResult::Handled)
} }
Event::UpdateUserStats { Event::UpdateUserStats { name, value, .. } => {
name, client_stats_manager.client_stats_insert(client_id);
value, client_stats_manager.update_client_stats_for(client_id, |client_stat| {
phantom: _,
} => {
monitor.client_stats_insert(client_id);
monitor.update_client_stats_for(client_id, |client_stat| {
client_stat.update_user_stats(name.clone(), value.clone()); client_stat.update_user_stats(name.clone(), value.clone());
}); });
monitor.aggregate(name); client_stats_manager.aggregate(name);
monitor.display(event.name(), client_id); monitor.display(client_stats_manager, event.name(), client_id);
Ok(BrokerEventResult::Handled) Ok(BrokerEventResult::Handled)
} }
#[cfg(feature = "introspection")] #[cfg(feature = "introspection")]
Event::UpdatePerfMonitor { Event::UpdatePerfMonitor {
time, time,
executions, executions,
introspection_monitor, introspection_stats,
phantom: _, phantom: _,
} => { } => {
// TODO: The monitor buffer should be added on client add. // TODO: The monitor buffer should be added on client add.
// Get the client for the staterestorer ID // Get the client for the staterestorer ID
monitor.client_stats_insert(client_id); client_stats_manager.client_stats_insert(client_id);
monitor.update_client_stats_for(client_id, |client_stat| { client_stats_manager.update_client_stats_for(client_id, |client_stat| {
// Update the normal monitor for this client // Update the normal monitor for this client
client_stat.update_executions(*executions, *time); client_stat.update_executions(*executions, *time);
// Update the performance monitor for this client // Update the performance monitor for this client
client_stat.update_introspection_monitor((**introspection_monitor).clone()); client_stat.update_introspection_stats((**introspection_stats).clone());
}); });
// Display the monitor via `.display` only on core #1 // Display the monitor via `.display` only on core #1
monitor.display(event.name(), client_id); monitor.display(client_stats_manager, event.name(), client_id);
// Correctly handled the event // Correctly handled the event
Ok(BrokerEventResult::Handled) Ok(BrokerEventResult::Handled)
} }
Event::Objective { objective_size, .. } => { Event::Objective { objective_size, .. } => {
monitor.client_stats_insert(client_id); client_stats_manager.client_stats_insert(client_id);
monitor.update_client_stats_for(client_id, |client_stat| { client_stats_manager.update_client_stats_for(client_id, |client_stat| {
client_stat.update_objective_size(*objective_size as u64); client_stat.update_objective_size(*objective_size as u64);
}); });
monitor.display(event.name(), client_id); monitor.display(client_stats_manager, event.name(), client_id);
Ok(BrokerEventResult::Handled) Ok(BrokerEventResult::Handled)
} }
Event::Log { Event::Log {

View File

@ -47,8 +47,8 @@ use uuid::Uuid;
use crate::{ use crate::{
executors::ExitKind, executors::ExitKind,
inputs::Input, inputs::Input,
monitors::UserStats,
state::{HasExecutions, HasLastReportTime, MaybeHasClientPerfMonitor}, state::{HasExecutions, HasLastReportTime, MaybeHasClientPerfMonitor},
statistics::user_stats::UserStats,
Error, HasMetadata, Error, HasMetadata,
}; };
@ -107,7 +107,7 @@ pub struct EventManagerId(
#[cfg(all(unix, feature = "std", feature = "multi_machine"))] #[cfg(all(unix, feature = "std", feature = "multi_machine"))]
use crate::events::multi_machine::NodeId; use crate::events::multi_machine::NodeId;
#[cfg(feature = "introspection")] #[cfg(feature = "introspection")]
use crate::monitors::ClientPerfMonitor; use crate::statistics::perf_stats::ClientPerfStats;
use crate::{observers::TimeObserver, stages::HasCurrentStageId}; use crate::{observers::TimeObserver, stages::HasCurrentStageId};
/// The log event severity /// The log event severity
@ -294,7 +294,7 @@ pub enum Event<I> {
/// The executions of this client /// The executions of this client
executions: u64, executions: u64,
/// Current performance statistics /// Current performance statistics
introspection_monitor: Box<ClientPerfMonitor>, introspection_stats: Box<ClientPerfStats>,
/// phantomm data /// phantomm data
phantom: PhantomData<I>, phantom: PhantomData<I>,
@ -514,17 +514,17 @@ where
#[cfg(feature = "introspection")] #[cfg(feature = "introspection")]
{ {
state state
.introspection_monitor_mut() .introspection_stats_mut()
.set_current_time(libafl_bolts::cpu::read_time_counter()); .set_current_time(libafl_bolts::cpu::read_time_counter());
// Send the current monitor over to the manager. This `.clone` shouldn't be // Send the current monitor over to the manager. This `.clone` shouldn't be
// costly as `ClientPerfMonitor` impls `Copy` since it only contains `u64`s // costly as `ClientPerfStats` impls `Copy` since it only contains `u64`s
reporter.fire( reporter.fire(
state, state,
Event::UpdatePerfMonitor { Event::UpdatePerfMonitor {
executions, executions,
time: cur, time: cur,
introspection_monitor: Box::new(state.introspection_monitor().clone()), introspection_stats: Box::new(state.introspection_stats().clone()),
phantom: PhantomData, phantom: PhantomData,
}, },
)?; )?;

View File

@ -34,13 +34,11 @@ use crate::{
monitors::Monitor, monitors::Monitor,
stages::HasCurrentStageId, stages::HasCurrentStageId,
state::{HasExecutions, HasLastReportTime, MaybeHasClientPerfMonitor, Stoppable}, state::{HasExecutions, HasLastReportTime, MaybeHasClientPerfMonitor, Stoppable},
statistics::manager::ClientStatsManager,
Error, HasMetadata, Error, HasMetadata,
}; };
#[cfg(feature = "std")] #[cfg(feature = "std")]
use crate::{ use crate::{monitors::SimplePrintingMonitor, state::HasSolutions, statistics::ClientStats};
monitors::{ClientStats, SimplePrintingMonitor},
state::HasSolutions,
};
/// The llmp connection from the actual fuzzer to the process supervising it /// The llmp connection from the actual fuzzer to the process supervising it
const _ENV_FUZZER_SENDER: &str = "_AFL_ENV_FUZZER_SENDER"; const _ENV_FUZZER_SENDER: &str = "_AFL_ENV_FUZZER_SENDER";
@ -55,6 +53,7 @@ pub struct SimpleEventManager<I, MT, S> {
/// The events that happened since the last `handle_in_broker` /// The events that happened since the last `handle_in_broker`
events: Vec<Event<I>>, events: Vec<Event<I>>,
phantom: PhantomData<S>, phantom: PhantomData<S>,
client_stats_manager: ClientStatsManager,
} }
impl<I, MT, S> Debug for SimpleEventManager<I, MT, S> impl<I, MT, S> Debug for SimpleEventManager<I, MT, S>
@ -84,7 +83,7 @@ where
} }
fn fire(&mut self, _state: &mut S, event: Event<I>) -> Result<(), Error> { fn fire(&mut self, _state: &mut S, event: Event<I>) -> Result<(), Error> {
match Self::handle_in_broker(&mut self.monitor, &event)? { match Self::handle_in_broker(&mut self.monitor, &mut self.client_stats_manager, &event)? {
BrokerEventResult::Forward => self.events.push(event), BrokerEventResult::Forward => self.events.push(event),
BrokerEventResult::Handled => (), BrokerEventResult::Handled => (),
} }
@ -199,65 +198,70 @@ where
Self { Self {
monitor, monitor,
events: vec![], events: vec![],
client_stats_manager: ClientStatsManager::default(),
phantom: PhantomData, phantom: PhantomData,
} }
} }
/// Handle arriving events in the broker /// Handle arriving events in the broker
#[expect(clippy::unnecessary_wraps)] #[expect(clippy::unnecessary_wraps)]
fn handle_in_broker(monitor: &mut MT, event: &Event<I>) -> Result<BrokerEventResult, Error> { fn handle_in_broker(
monitor: &mut MT,
client_stats_manager: &mut ClientStatsManager,
event: &Event<I>,
) -> Result<BrokerEventResult, Error> {
match event { match event {
Event::NewTestcase { corpus_size, .. } => { Event::NewTestcase { corpus_size, .. } => {
monitor.client_stats_insert(ClientId(0)); client_stats_manager.client_stats_insert(ClientId(0));
monitor.update_client_stats_for(ClientId(0), |client_stat| { client_stats_manager.update_client_stats_for(ClientId(0), |client_stat| {
client_stat.update_corpus_size(*corpus_size as u64); client_stat.update_corpus_size(*corpus_size as u64);
}); });
monitor.display(event.name(), ClientId(0)); monitor.display(client_stats_manager, event.name(), ClientId(0));
Ok(BrokerEventResult::Handled) Ok(BrokerEventResult::Handled)
} }
Event::UpdateExecStats { Event::UpdateExecStats {
time, executions, .. time, executions, ..
} => { } => {
// TODO: The monitor buffer should be added on client add. // TODO: The monitor buffer should be added on client add.
monitor.client_stats_insert(ClientId(0)); client_stats_manager.client_stats_insert(ClientId(0));
monitor.update_client_stats_for(ClientId(0), |client_stat| { client_stats_manager.update_client_stats_for(ClientId(0), |client_stat| {
client_stat.update_executions(*executions, *time); client_stat.update_executions(*executions, *time);
}); });
monitor.display(event.name(), ClientId(0)); monitor.display(client_stats_manager, event.name(), ClientId(0));
Ok(BrokerEventResult::Handled) Ok(BrokerEventResult::Handled)
} }
Event::UpdateUserStats { name, value, .. } => { Event::UpdateUserStats { name, value, .. } => {
monitor.client_stats_insert(ClientId(0)); client_stats_manager.client_stats_insert(ClientId(0));
monitor.update_client_stats_for(ClientId(0), |client_stat| { client_stats_manager.update_client_stats_for(ClientId(0), |client_stat| {
client_stat.update_user_stats(name.clone(), value.clone()); client_stat.update_user_stats(name.clone(), value.clone());
}); });
monitor.aggregate(name); client_stats_manager.aggregate(name);
monitor.display(event.name(), ClientId(0)); monitor.display(client_stats_manager, event.name(), ClientId(0));
Ok(BrokerEventResult::Handled) Ok(BrokerEventResult::Handled)
} }
#[cfg(feature = "introspection")] #[cfg(feature = "introspection")]
Event::UpdatePerfMonitor { Event::UpdatePerfMonitor {
time, time,
executions, executions,
introspection_monitor, introspection_stats,
.. ..
} => { } => {
// TODO: The monitor buffer should be added on client add. // TODO: The monitor buffer should be added on client add.
monitor.client_stats_insert(ClientId(0)); client_stats_manager.client_stats_insert(ClientId(0));
monitor.update_client_stats_for(ClientId(0), |client_stat| { client_stats_manager.update_client_stats_for(ClientId(0), |client_stat| {
client_stat.update_executions(*executions, *time); client_stat.update_executions(*executions, *time);
client_stat.update_introspection_monitor((**introspection_monitor).clone()); client_stat.update_introspection_stats((**introspection_stats).clone());
}); });
monitor.display(event.name(), ClientId(0)); monitor.display(client_stats_manager, event.name(), ClientId(0));
Ok(BrokerEventResult::Handled) Ok(BrokerEventResult::Handled)
} }
Event::Objective { objective_size, .. } => { Event::Objective { objective_size, .. } => {
monitor.client_stats_insert(ClientId(0)); client_stats_manager.client_stats_insert(ClientId(0));
monitor.update_client_stats_for(ClientId(0), |client_stat| { client_stats_manager.update_client_stats_for(ClientId(0), |client_stat| {
client_stat.update_objective_size(*objective_size as u64); client_stat.update_objective_size(*objective_size as u64);
}); });
monitor.display(event.name(), ClientId(0)); monitor.display(client_stats_manager, event.name(), ClientId(0));
Ok(BrokerEventResult::Handled) Ok(BrokerEventResult::Handled)
} }
Event::Log { Event::Log {
@ -327,7 +331,7 @@ where
self.staterestorer.save(&( self.staterestorer.save(&(
state, state,
self.inner.monitor.start_time(), self.inner.monitor.start_time(),
self.inner.monitor.client_stats(), self.inner.client_stats_manager.client_stats(),
)) ))
} }
} }
@ -543,12 +547,12 @@ where
// reload the state of the monitor to display the correct stats after restarts // reload the state of the monitor to display the correct stats after restarts
monitor.set_start_time(start_time); monitor.set_start_time(start_time);
monitor.update_all_client_stats(clients_stats); let mut this = SimpleRestartingEventManager::launched(monitor, staterestorer);
this.inner
.client_stats_manager
.update_all_client_stats(clients_stats);
( (Some(state), this)
Some(state),
SimpleRestartingEventManager::launched(monitor, staterestorer),
)
} }
}; };

View File

@ -53,6 +53,7 @@ use crate::{
HasCurrentTestcase, HasExecutions, HasImported, HasLastReportTime, HasSolutions, HasCurrentTestcase, HasExecutions, HasImported, HasLastReportTime, HasSolutions,
MaybeHasClientPerfMonitor, Stoppable, MaybeHasClientPerfMonitor, Stoppable,
}, },
statistics::manager::ClientStatsManager,
Error, HasMetadata, Error, HasMetadata,
}; };
@ -77,6 +78,7 @@ where
listener: Option<TcpListener>, listener: Option<TcpListener>,
/// Amount of all clients ever, after which (when all are disconnected) this broker should quit. /// Amount of all clients ever, after which (when all are disconnected) this broker should quit.
exit_cleanly_after: Option<NonZeroUsize>, exit_cleanly_after: Option<NonZeroUsize>,
client_stats_manager: ClientStatsManager,
phantom: PhantomData<I>, phantom: PhantomData<I>,
} }
@ -100,6 +102,7 @@ where
Self { Self {
listener: Some(listener), listener: Some(listener),
monitor, monitor,
client_stats_manager: ClientStatsManager::default(),
phantom: PhantomData, phantom: PhantomData,
exit_cleanly_after: None, exit_cleanly_after: None,
} }
@ -291,7 +294,12 @@ where
let event_bytes = &GzipCompressor::new().decompress(event_bytes)?; let event_bytes = &GzipCompressor::new().decompress(event_bytes)?;
let event: Event<I> = postcard::from_bytes(event_bytes)?; let event: Event<I> = postcard::from_bytes(event_bytes)?;
match Self::handle_in_broker(&mut self.monitor, client_id, &event)? { match Self::handle_in_broker(
&mut self.monitor,
&mut self.client_stats_manager,
client_id,
&event,
)? {
BrokerEventResult::Forward => { BrokerEventResult::Forward => {
tx_bc.send(buf).expect("Could not send"); tx_bc.send(buf).expect("Could not send");
} }
@ -312,6 +320,7 @@ where
#[expect(clippy::unnecessary_wraps)] #[expect(clippy::unnecessary_wraps)]
fn handle_in_broker( fn handle_in_broker(
monitor: &mut MT, monitor: &mut MT,
client_stats_manager: &mut ClientStatsManager,
client_id: ClientId, client_id: ClientId,
event: &Event<I>, event: &Event<I>,
) -> Result<BrokerEventResult, Error> { ) -> Result<BrokerEventResult, Error> {
@ -326,11 +335,11 @@ where
} else { } else {
client_id client_id
}; };
monitor.client_stats_insert(id); client_stats_manager.client_stats_insert(id);
monitor.update_client_stats_for(id, |client| { client_stats_manager.update_client_stats_for(id, |client| {
client.update_corpus_size(*corpus_size as u64); client.update_corpus_size(*corpus_size as u64);
}); });
monitor.display(event.name(), id); monitor.display(client_stats_manager, event.name(), id);
Ok(BrokerEventResult::Forward) Ok(BrokerEventResult::Forward)
} }
Event::UpdateExecStats { Event::UpdateExecStats {
@ -339,11 +348,11 @@ where
phantom: _, phantom: _,
} => { } => {
// TODO: The monitor buffer should be added on client add. // TODO: The monitor buffer should be added on client add.
monitor.client_stats_insert(client_id); client_stats_manager.client_stats_insert(client_id);
monitor.update_client_stats_for(client_id, |client| { client_stats_manager.update_client_stats_for(client_id, |client| {
client.update_executions(*executions, *time); client.update_executions(*executions, *time);
}); });
monitor.display(event.name(), client_id); monitor.display(client_stats_manager, event.name(), client_id);
Ok(BrokerEventResult::Handled) Ok(BrokerEventResult::Handled)
} }
Event::UpdateUserStats { Event::UpdateUserStats {
@ -351,44 +360,44 @@ where
value, value,
phantom: _, phantom: _,
} => { } => {
monitor.client_stats_insert(client_id); client_stats_manager.client_stats_insert(client_id);
monitor.update_client_stats_for(client_id, |client| { client_stats_manager.update_client_stats_for(client_id, |client| {
client.update_user_stats(name.clone(), value.clone()); client.update_user_stats(name.clone(), value.clone());
}); });
monitor.aggregate(name); client_stats_manager.aggregate(name);
monitor.display(event.name(), client_id); monitor.display(client_stats_manager, event.name(), client_id);
Ok(BrokerEventResult::Handled) Ok(BrokerEventResult::Handled)
} }
#[cfg(feature = "introspection")] #[cfg(feature = "introspection")]
Event::UpdatePerfMonitor { Event::UpdatePerfMonitor {
time, time,
executions, executions,
introspection_monitor, introspection_stats,
phantom: _, phantom: _,
} => { } => {
// TODO: The monitor buffer should be added on client add. // TODO: The monitor buffer should be added on client add.
// Get the client for the staterestorer ID // Get the client for the staterestorer ID
monitor.client_stats_insert(client_id); client_stats_manager.client_stats_insert(client_id);
monitor.update_client_stats_for(client_id, |client| { client_stats_manager.update_client_stats_for(client_id, |client| {
// Update the normal monitor for this client // Update the normal monitor for this client
client.update_executions(*executions, *time); client.update_executions(*executions, *time);
// Update the performance monitor for this client // Update the performance monitor for this client
client.update_introspection_monitor((**introspection_monitor).clone()); client.update_introspection_stats((**introspection_stats).clone());
}); });
// Display the monitor via `.display` only on core #1 // Display the monitor via `.display` only on core #1
monitor.display(event.name(), client_id); monitor.display(client_stats_manager, event.name(), client_id);
// Correctly handled the event // Correctly handled the event
Ok(BrokerEventResult::Handled) Ok(BrokerEventResult::Handled)
} }
Event::Objective { objective_size, .. } => { Event::Objective { objective_size, .. } => {
monitor.client_stats_insert(client_id); client_stats_manager.client_stats_insert(client_id);
monitor.update_client_stats_for(client_id, |client| { client_stats_manager.update_client_stats_for(client_id, |client| {
client.update_objective_size(*objective_size as u64); client.update_objective_size(*objective_size as u64);
}); });
monitor.display(event.name(), client_id); monitor.display(client_stats_manager, event.name(), client_id);
Ok(BrokerEventResult::Handled) Ok(BrokerEventResult::Handled)
} }
Event::Log { Event::Log {

View File

@ -25,8 +25,8 @@ use crate::{
events::{Event, EventFirer}, events::{Event, EventFirer},
executors::ExitKind, executors::ExitKind,
feedbacks::{Feedback, HasObserverHandle, StateInitializer}, feedbacks::{Feedback, HasObserverHandle, StateInitializer},
monitors::{AggregatorOps, UserStats, UserStatsValue},
observers::{CanTrack, MapObserver}, observers::{CanTrack, MapObserver},
statistics::user_stats::{AggregatorOps, UserStats, UserStatsValue},
Error, HasMetadata, HasNamedMetadata, Error, HasMetadata, HasNamedMetadata,
}; };

View File

@ -115,7 +115,7 @@ pub trait Feedback<EM, I, OT, S>: StateInitializer<S> + Named {
// Add this stat to the feedback metrics // Add this stat to the feedback metrics
state state
.introspection_monitor_mut() .introspection_stats_mut()
.update_feedback(self.name(), elapsed); .update_feedback(self.name(), elapsed);
ret ret

View File

@ -11,7 +11,7 @@ use libafl_bolts::{current_time, tuples::MatchName};
use serde::{de::DeserializeOwned, Serialize}; use serde::{de::DeserializeOwned, Serialize};
#[cfg(feature = "introspection")] #[cfg(feature = "introspection")]
use crate::monitors::PerfFeature; use crate::statistics::perf_stats::PerfFeature;
use crate::{ use crate::{
corpus::{Corpus, CorpusId, HasCurrentCorpusId, HasTestcase, Testcase}, corpus::{Corpus, CorpusId, HasCurrentCorpusId, HasTestcase, Testcase},
events::{ events::{
@ -855,7 +855,7 @@ where
) -> Result<CorpusId, Error> { ) -> Result<CorpusId, Error> {
// Init timer for scheduler // Init timer for scheduler
#[cfg(feature = "introspection")] #[cfg(feature = "introspection")]
state.introspection_monitor_mut().start_timer(); state.introspection_stats_mut().start_timer();
// Get the next index from the scheduler // Get the next index from the scheduler
let id = if let Some(id) = state.current_corpus_id()? { let id = if let Some(id) = state.current_corpus_id()? {
@ -868,24 +868,24 @@ where
// Mark the elapsed time for the scheduler // Mark the elapsed time for the scheduler
#[cfg(feature = "introspection")] #[cfg(feature = "introspection")]
state.introspection_monitor_mut().mark_scheduler_time(); state.introspection_stats_mut().mark_scheduler_time();
// Mark the elapsed time for the scheduler // Mark the elapsed time for the scheduler
#[cfg(feature = "introspection")] #[cfg(feature = "introspection")]
state.introspection_monitor_mut().reset_stage_index(); state.introspection_stats_mut().reset_stage_index();
// Execute all stages // Execute all stages
stages.perform_all(self, executor, state, manager)?; stages.perform_all(self, executor, state, manager)?;
// Init timer for manager // Init timer for manager
#[cfg(feature = "introspection")] #[cfg(feature = "introspection")]
state.introspection_monitor_mut().start_timer(); state.introspection_stats_mut().start_timer();
self.process_events(state, executor, manager)?; self.process_events(state, executor, manager)?;
// Mark the elapsed time for the manager // Mark the elapsed time for the manager
#[cfg(feature = "introspection")] #[cfg(feature = "introspection")]
state.introspection_monitor_mut().mark_manager_time(); state.introspection_stats_mut().mark_manager_time();
{ {
if let Ok(mut testcase) = state.testcase_mut(id) { if let Ok(mut testcase) = state.testcase_mut(id) {

View File

@ -79,6 +79,7 @@ pub mod observers;
pub mod schedulers; pub mod schedulers;
pub mod stages; pub mod stages;
pub mod state; pub mod state;
pub mod statistics;
pub use fuzzer::*; pub use fuzzer::*;
pub use libafl_bolts::{nonzero, Error}; pub use libafl_bolts::{nonzero, Error};
@ -90,7 +91,8 @@ pub mod prelude {
pub use super::{ pub use super::{
corpus::*, events::*, executors::*, feedbacks::*, fuzzer::*, generators::*, inputs::*, corpus::*, events::*, executors::*, feedbacks::*, fuzzer::*, generators::*, inputs::*,
monitors::*, mutators::*, observers::*, schedulers::*, stages::*, state::*, *, monitors::*, mutators::*, observers::*, schedulers::*, stages::*, state::*, statistics::*,
*,
}; };
} }

View File

@ -1,6 +1,6 @@
//! Monitors that wrap a base monitor and also log to disk using different formats like `JSON` and `TOML`. //! Monitors that wrap a base monitor and also log to disk using different formats like `JSON` and `TOML`.
use alloc::{string::String, vec::Vec}; use alloc::string::String;
use core::time::Duration; use core::time::Duration;
use std::{ use std::{
fs::{File, OpenOptions}, fs::{File, OpenOptions},
@ -11,7 +11,10 @@ use std::{
use libafl_bolts::{current_time, format_duration_hms, ClientId}; use libafl_bolts::{current_time, format_duration_hms, ClientId};
use serde_json::json; use serde_json::json;
use crate::monitors::{ClientStats, Monitor, NopMonitor}; use crate::{
monitors::{Monitor, NopMonitor},
statistics::manager::ClientStatsManager,
};
/// Wrap a monitor and log the current state of the monitor into a Toml file. /// Wrap a monitor and log the current state of the monitor into a Toml file.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
@ -29,16 +32,6 @@ impl<M> Monitor for OnDiskTomlMonitor<M>
where where
M: Monitor, M: Monitor,
{ {
/// The client monitor, mutable
fn client_stats_mut(&mut self) -> &mut Vec<ClientStats> {
self.base.client_stats_mut()
}
/// The client monitor
fn client_stats(&self) -> &[ClientStats] {
self.base.client_stats()
}
/// Time this fuzzing run stated /// Time this fuzzing run stated
fn start_time(&self) -> Duration { fn start_time(&self) -> Duration {
self.base.start_time() self.base.start_time()
@ -49,11 +42,12 @@ where
self.base.set_start_time(time); self.base.set_start_time(time);
} }
fn aggregate(&mut self, name: &str) { fn display(
self.base.aggregate(name); &mut self,
} client_stats_manager: &mut ClientStatsManager,
event_msg: &str,
fn display(&mut self, event_msg: &str, sender_id: ClientId) { sender_id: ClientId,
) {
let cur_time = current_time(); let cur_time = current_time();
if cur_time - self.last_update >= self.update_interval { if cur_time - self.last_update >= self.update_interval {
@ -73,21 +67,22 @@ executions = {}
exec_sec = {} exec_sec = {}
", ",
format_duration_hms(&(cur_time - self.start_time())), format_duration_hms(&(cur_time - self.start_time())),
self.client_stats_count(), client_stats_manager.client_stats_count(),
self.corpus_size(), client_stats_manager.corpus_size(),
self.objective_size(), client_stats_manager.objective_size(),
self.total_execs(), client_stats_manager.total_execs(),
self.execs_per_sec() client_stats_manager.execs_per_sec()
) )
.expect("Failed to write to the Toml file"); .expect("Failed to write to the Toml file");
for i in 0..(self.client_stats().len()) { for i in 0..(client_stats_manager.client_stats().len()) {
let client_id = ClientId(i as u32); let client_id = ClientId(i as u32);
let exec_sec = self.update_client_stats_for(client_id, |client_stat| { let exec_sec = client_stats_manager
client_stat.execs_per_sec(cur_time) .update_client_stats_for(client_id, |client_stat| {
}); client_stat.execs_per_sec(cur_time)
});
let client = self.client_stats_for(client_id); let client = client_stats_manager.client_stats_for(client_id);
write!( write!(
&mut file, &mut file,
@ -102,7 +97,7 @@ exec_sec = {}
) )
.expect("Failed to write to the Toml file"); .expect("Failed to write to the Toml file");
for (key, val) in &client.user_monitor { for (key, val) in &client.user_stats {
let k: String = key let k: String = key
.chars() .chars()
.map(|c| if c.is_whitespace() { '_' } else { c }) .map(|c| if c.is_whitespace() { '_' } else { c })
@ -116,7 +111,8 @@ exec_sec = {}
drop(file); drop(file);
} }
self.base.display(event_msg, sender_id); self.base
.display(client_stats_manager, event_msg, sender_id);
} }
} }
@ -197,14 +193,6 @@ where
F: FnMut(&mut M) -> bool, F: FnMut(&mut M) -> bool,
M: Monitor, M: Monitor,
{ {
fn client_stats_mut(&mut self) -> &mut Vec<ClientStats> {
self.base.client_stats_mut()
}
fn client_stats(&self) -> &[ClientStats] {
self.base.client_stats()
}
fn start_time(&self) -> Duration { fn start_time(&self) -> Duration {
self.base.start_time() self.base.start_time()
} }
@ -213,7 +201,12 @@ where
self.base.set_start_time(time); self.base.set_start_time(time);
} }
fn display(&mut self, event_msg: &str, sender_id: ClientId) { fn display(
&mut self,
client_stats_manager: &mut ClientStatsManager,
event_msg: &str,
sender_id: ClientId,
) {
if (self.log_record)(&mut self.base) { if (self.log_record)(&mut self.base) {
let file = OpenOptions::new() let file = OpenOptions::new()
.append(true) .append(true)
@ -223,15 +216,16 @@ where
let line = json!({ let line = json!({
"run_time": current_time() - self.base.start_time(), "run_time": current_time() - self.base.start_time(),
"clients": self.client_stats_count(), "clients": client_stats_manager.client_stats_count(),
"corpus": self.base.corpus_size(), "corpus": client_stats_manager.corpus_size(),
"objectives": self.base.objective_size(), "objectives": client_stats_manager.objective_size(),
"executions": self.base.total_execs(), "executions": client_stats_manager.total_execs(),
"exec_sec": self.base.execs_per_sec(), "exec_sec": client_stats_manager.execs_per_sec(),
"client_stats": self.client_stats(), "client_stats": client_stats_manager.client_stats(),
}); });
writeln!(&file, "{line}").expect("Unable to write Json to file"); writeln!(&file, "{line}").expect("Unable to write Json to file");
} }
self.base.display(event_msg, sender_id); self.base
.display(client_stats_manager, event_msg, sender_id);
} }
} }

View File

@ -1,6 +1,5 @@
//! Monitors that log aggregated stats to disk. //! Monitors that log aggregated stats to disk.
use alloc::vec::Vec;
use core::{ use core::{
fmt::{Debug, Formatter}, fmt::{Debug, Formatter},
time::Duration, time::Duration,
@ -10,13 +9,12 @@ use std::{fs::OpenOptions, io::Write, path::PathBuf};
use libafl_bolts::{current_time, ClientId}; use libafl_bolts::{current_time, ClientId};
use serde_json::json; use serde_json::json;
use crate::monitors::{Aggregator, ClientStats, Monitor}; use crate::{monitors::Monitor, statistics::manager::ClientStatsManager};
/// A monitor that wraps another monitor and logs aggregated stats to a JSON file. /// A monitor that wraps another monitor and logs aggregated stats to a JSON file.
#[derive(Clone)] #[derive(Clone)]
pub struct OnDiskJsonAggregateMonitor<M> { pub struct OnDiskJsonAggregateMonitor<M> {
base: M, base: M,
aggregator: Aggregator,
json_path: PathBuf, json_path: PathBuf,
last_update: Duration, last_update: Duration,
update_interval: Duration, update_interval: Duration,
@ -40,14 +38,6 @@ impl<M> Monitor for OnDiskJsonAggregateMonitor<M>
where where
M: Monitor, M: Monitor,
{ {
fn client_stats_mut(&mut self) -> &mut Vec<ClientStats> {
self.base.client_stats_mut()
}
fn client_stats(&self) -> &[ClientStats] {
self.base.client_stats()
}
fn set_start_time(&mut self, time: Duration) { fn set_start_time(&mut self, time: Duration) {
self.base.set_start_time(time); self.base.set_start_time(time);
} }
@ -56,14 +46,15 @@ where
self.base.start_time() self.base.start_time()
} }
fn aggregate(&mut self, name: &str) { fn display(
self.aggregator.aggregate(name, self.base.client_stats()); &mut self,
self.base.aggregate(name); client_stats_manager: &mut ClientStatsManager,
} event_msg: &str,
sender_id: ClientId,
fn display(&mut self, event_msg: &str, sender_id: ClientId) { ) {
// First let the base monitor handle its display // First let the base monitor handle its display
self.base.display(event_msg, sender_id); self.base
.display(client_stats_manager, event_msg, sender_id);
// Write JSON stats if update interval has elapsed // Write JSON stats if update interval has elapsed
let cur_time = current_time(); let cur_time = current_time();
@ -78,18 +69,18 @@ where
let mut json_value = json!({ let mut json_value = json!({
"run_time": (cur_time - self.start_time()).as_secs(), "run_time": (cur_time - self.start_time()).as_secs(),
"clients": self.client_stats_count(), "clients": client_stats_manager.client_stats_count(),
"corpus": self.corpus_size(), "corpus": client_stats_manager.corpus_size(),
"objectives": self.objective_size(), "objectives": client_stats_manager.objective_size(),
"executions": self.total_execs(), "executions": client_stats_manager.total_execs(),
"exec_sec": self.execs_per_sec(), "exec_sec": client_stats_manager.execs_per_sec(),
}); });
// Add all aggregated values directly to the root // Add all aggregated values directly to the root
if let Some(obj) = json_value.as_object_mut() { if let Some(obj) = json_value.as_object_mut() {
obj.extend( obj.extend(
self.aggregator client_stats_manager
.aggregated .aggregated()
.iter() .iter()
.map(|(k, v)| (k.clone(), json!(v))), .map(|(k, v)| (k.clone(), json!(v))),
); );
@ -116,7 +107,6 @@ impl<M> OnDiskJsonAggregateMonitor<M> {
{ {
Self { Self {
base, base,
aggregator: Aggregator::new(),
json_path: json_path.into(), json_path: json_path.into(),
last_update: current_time() - update_interval, last_update: current_time() - update_interval,
update_interval, update_interval,

File diff suppressed because it is too large Load Diff

View File

@ -1,6 +1,6 @@
//! The [`MultiMonitor`] displays both cumulative and per-client stats. //! The [`MultiMonitor`] displays both cumulative and per-client stats.
use alloc::{string::String, vec::Vec}; use alloc::string::String;
use core::{ use core::{
fmt::{Debug, Formatter, Write}, fmt::{Debug, Formatter, Write},
time::Duration, time::Duration,
@ -8,8 +8,7 @@ use core::{
use libafl_bolts::{current_time, format_duration_hms, ClientId}; use libafl_bolts::{current_time, format_duration_hms, ClientId};
use super::Aggregator; use crate::{monitors::Monitor, statistics::manager::ClientStatsManager};
use crate::monitors::{ClientStats, Monitor};
/// Tracking monitor during fuzzing and display both per-client and cumulative info. /// Tracking monitor during fuzzing and display both per-client and cumulative info.
#[derive(Clone)] #[derive(Clone)]
@ -19,8 +18,6 @@ where
{ {
print_fn: F, print_fn: F,
start_time: Duration, start_time: Duration,
client_stats: Vec<ClientStats>,
aggregator: Aggregator,
} }
impl<F> Debug for MultiMonitor<F> impl<F> Debug for MultiMonitor<F>
@ -30,7 +27,6 @@ where
fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result { fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result {
f.debug_struct("MultiMonitor") f.debug_struct("MultiMonitor")
.field("start_time", &self.start_time) .field("start_time", &self.start_time)
.field("client_stats", &self.client_stats)
.finish_non_exhaustive() .finish_non_exhaustive()
} }
} }
@ -39,16 +35,6 @@ impl<F> Monitor for MultiMonitor<F>
where where
F: FnMut(&str), F: FnMut(&str),
{ {
/// the client monitor, mutable
fn client_stats_mut(&mut self) -> &mut Vec<ClientStats> {
&mut self.client_stats
}
/// the client monitor
fn client_stats(&self) -> &[ClientStats] {
&self.client_stats
}
/// Set creation time /// Set creation time
fn set_start_time(&mut self, time: Duration) { fn set_start_time(&mut self, time: Duration) {
self.start_time = time; self.start_time = time;
@ -59,11 +45,12 @@ where
self.start_time self.start_time
} }
fn aggregate(&mut self, name: &str) { fn display(
self.aggregator.aggregate(name, &self.client_stats); &mut self,
} client_stats_manager: &mut ClientStatsManager,
event_msg: &str,
fn display(&mut self, event_msg: &str, sender_id: ClientId) { sender_id: ClientId,
) {
let sender = format!("#{}", sender_id.0); let sender = format!("#{}", sender_id.0);
let pad = if event_msg.len() + sender.len() < 13 { let pad = if event_msg.len() + sender.len() < 13 {
" ".repeat(13 - event_msg.len() - sender.len()) " ".repeat(13 - event_msg.len() - sender.len())
@ -75,30 +62,30 @@ where
"[{}] (GLOBAL) run time: {}, clients: {}, corpus: {}, objectives: {}, executions: {}, exec/sec: {}", "[{}] (GLOBAL) run time: {}, clients: {}, corpus: {}, objectives: {}, executions: {}, exec/sec: {}",
head, head,
format_duration_hms(&(current_time() - self.start_time)), format_duration_hms(&(current_time() - self.start_time)),
self.client_stats_count(), client_stats_manager.client_stats_count(),
self.corpus_size(), client_stats_manager.corpus_size(),
self.objective_size(), client_stats_manager.objective_size(),
self.total_execs(), client_stats_manager.total_execs(),
self.execs_per_sec_pretty() client_stats_manager.execs_per_sec_pretty()
); );
for (key, val) in &self.aggregator.aggregated { for (key, val) in client_stats_manager.aggregated() {
write!(global_fmt, ", {key}: {val}").unwrap(); write!(global_fmt, ", {key}: {val}").unwrap();
} }
(self.print_fn)(&global_fmt); (self.print_fn)(&global_fmt);
self.client_stats_insert(sender_id); client_stats_manager.client_stats_insert(sender_id);
let cur_time = current_time(); let cur_time = current_time();
let exec_sec = let exec_sec = client_stats_manager
self.update_client_stats_for(sender_id, |client| client.execs_per_sec_pretty(cur_time)); .update_client_stats_for(sender_id, |client| client.execs_per_sec_pretty(cur_time));
let client = self.client_stats_for(sender_id); let client = client_stats_manager.client_stats_for(sender_id);
let pad = " ".repeat(head.len()); let pad = " ".repeat(head.len());
let mut fmt = format!( let mut fmt = format!(
" {} (CLIENT) corpus: {}, objectives: {}, executions: {}, exec/sec: {}", " {} (CLIENT) corpus: {}, objectives: {}, executions: {}, exec/sec: {}",
pad, client.corpus_size, client.objective_size, client.executions, exec_sec pad, client.corpus_size, client.objective_size, client.executions, exec_sec
); );
for (key, val) in &client.user_monitor { for (key, val) in &client.user_stats {
write!(fmt, ", {key}: {val}").unwrap(); write!(fmt, ", {key}: {val}").unwrap();
} }
(self.print_fn)(&fmt); (self.print_fn)(&fmt);
@ -107,8 +94,13 @@ where
#[cfg(feature = "introspection")] #[cfg(feature = "introspection")]
{ {
// Print the client performance monitor. Skip the Client 0 which is the broker // Print the client performance monitor. Skip the Client 0 which is the broker
for (i, client) in self.client_stats.iter().filter(|x| x.enabled).enumerate() { for (i, client) in client_stats_manager
let fmt = format!("Client {:03}:\n{}", i + 1, client.introspection_monitor); .client_stats()
.iter()
.filter(|x| x.enabled)
.enumerate()
{
let fmt = format!("Client {:03}:\n{}", i + 1, client.introspection_stats);
(self.print_fn)(&fmt); (self.print_fn)(&fmt);
} }
@ -127,8 +119,6 @@ where
Self { Self {
print_fn, print_fn,
start_time: current_time(), start_time: current_time(),
client_stats: vec![],
aggregator: Aggregator::new(),
} }
} }
@ -137,8 +127,6 @@ where
Self { Self {
print_fn, print_fn,
start_time, start_time,
client_stats: vec![],
aggregator: Aggregator::new(),
} }
} }
} }

View File

@ -27,7 +27,7 @@
//! //!
//! When using docker, you may need to point `prometheus.yml` to the `docker0` interface or `host.docker.internal` //! When using docker, you may need to point `prometheus.yml` to the `docker0` interface or `host.docker.internal`
use alloc::{borrow::Cow, fmt::Debug, string::String, vec::Vec}; use alloc::{borrow::Cow, fmt::Debug, string::String};
use core::{fmt, fmt::Write, time::Duration}; use core::{fmt, fmt::Write, time::Duration};
use std::{ use std::{
string::ToString, string::ToString,
@ -47,8 +47,10 @@ use prometheus_client::{
// using tide for the HTTP server library (fast, async, simple) // using tide for the HTTP server library (fast, async, simple)
use tide::Request; use tide::Request;
use super::Aggregator; use crate::{
use crate::monitors::{ClientStats, Monitor, UserStatsValue}; monitors::Monitor,
statistics::{manager::ClientStatsManager, user_stats::UserStatsValue},
};
/// Prometheus metrics for global and each client. /// Prometheus metrics for global and each client.
#[derive(Clone, Debug, Default)] #[derive(Clone, Debug, Default)]
@ -72,8 +74,6 @@ where
start_time: Duration, start_time: Duration,
prometheus_global_stats: PrometheusStats, // global prometheus metrics prometheus_global_stats: PrometheusStats, // global prometheus metrics
prometheus_client_stats: PrometheusStats, // per-client prometheus metrics prometheus_client_stats: PrometheusStats, // per-client prometheus metrics
client_stats: Vec<ClientStats>, // per-client statistics
aggregator: Aggregator, // aggregator for global custom statistics
} }
impl<F> Debug for PrometheusMonitor<F> impl<F> Debug for PrometheusMonitor<F>
@ -83,7 +83,6 @@ where
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("PrometheusMonitor") f.debug_struct("PrometheusMonitor")
.field("start_time", &self.start_time) .field("start_time", &self.start_time)
.field("client_stats", &self.client_stats)
.finish_non_exhaustive() .finish_non_exhaustive()
} }
} }
@ -92,16 +91,6 @@ impl<F> Monitor for PrometheusMonitor<F>
where where
F: FnMut(&str), F: FnMut(&str),
{ {
/// the client monitor, mutable
fn client_stats_mut(&mut self) -> &mut Vec<ClientStats> {
&mut self.client_stats
}
/// the client monitor
fn client_stats(&self) -> &[ClientStats] {
&self.client_stats
}
/// Time this fuzzing run stated /// Time this fuzzing run stated
fn start_time(&self) -> Duration { fn start_time(&self) -> Duration {
self.start_time self.start_time
@ -112,12 +101,12 @@ where
self.start_time = time; self.start_time = time;
} }
/// aggregate client stats fn display(
fn aggregate(&mut self, name: &str) { &mut self,
self.aggregator.aggregate(name, &self.client_stats); client_stats_manager: &mut ClientStatsManager,
} event_msg: &str,
sender_id: ClientId,
fn display(&mut self, event_msg: &str, sender_id: ClientId) { ) {
// Update the prometheus metrics // Update the prometheus metrics
// The gauges must take signed i64's, with max value of 2^63-1 so it is // The gauges must take signed i64's, with max value of 2^63-1 so it is
// probably fair to error out at a count of nine quintillion across any // probably fair to error out at a count of nine quintillion across any
@ -127,7 +116,7 @@ where
// time since last observation" // time since last observation"
// Global (aggregated) metrics // Global (aggregated) metrics
let corpus_size = self.corpus_size(); let corpus_size = client_stats_manager.corpus_size();
self.prometheus_global_stats self.prometheus_global_stats
.corpus_count .corpus_count
.get_or_create(&Labels { .get_or_create(&Labels {
@ -136,7 +125,7 @@ where
}) })
.set(corpus_size.try_into().unwrap()); .set(corpus_size.try_into().unwrap());
let objective_size = self.objective_size(); let objective_size = client_stats_manager.objective_size();
self.prometheus_global_stats self.prometheus_global_stats
.objective_count .objective_count
.get_or_create(&Labels { .get_or_create(&Labels {
@ -145,7 +134,7 @@ where
}) })
.set(objective_size.try_into().unwrap()); .set(objective_size.try_into().unwrap());
let total_execs = self.total_execs(); let total_execs = client_stats_manager.total_execs();
self.prometheus_global_stats self.prometheus_global_stats
.executions .executions
.get_or_create(&Labels { .get_or_create(&Labels {
@ -154,7 +143,7 @@ where
}) })
.set(total_execs.try_into().unwrap()); .set(total_execs.try_into().unwrap());
let execs_per_sec = self.execs_per_sec(); let execs_per_sec = client_stats_manager.execs_per_sec();
self.prometheus_global_stats self.prometheus_global_stats
.exec_rate .exec_rate
.get_or_create(&Labels { .get_or_create(&Labels {
@ -172,7 +161,10 @@ where
}) })
.set(run_time.try_into().unwrap()); // run time in seconds, which can be converted to a time format by Grafana or similar .set(run_time.try_into().unwrap()); // run time in seconds, which can be converted to a time format by Grafana or similar
let total_clients = self.client_stats_count().try_into().unwrap(); // convert usize to u64 (unlikely that # of clients will be > 2^64 -1...) let total_clients = client_stats_manager
.client_stats_count()
.try_into()
.unwrap(); // convert usize to u64 (unlikely that # of clients will be > 2^64 -1...)
self.prometheus_global_stats self.prometheus_global_stats
.clients_count .clients_count
.get_or_create(&Labels { .get_or_create(&Labels {
@ -186,13 +178,13 @@ where
"[Prometheus] [{} #GLOBAL] run time: {}, clients: {}, corpus: {}, objectives: {}, executions: {}, exec/sec: {}", "[Prometheus] [{} #GLOBAL] run time: {}, clients: {}, corpus: {}, objectives: {}, executions: {}, exec/sec: {}",
event_msg, event_msg,
format_duration_hms(&(current_time() - self.start_time)), format_duration_hms(&(current_time() - self.start_time)),
self.client_stats_count(), client_stats_manager.client_stats_count(),
self.corpus_size(), client_stats_manager.corpus_size(),
self.objective_size(), client_stats_manager.objective_size(),
self.total_execs(), client_stats_manager.total_execs(),
self.execs_per_sec_pretty() client_stats_manager.execs_per_sec_pretty()
); );
for (key, val) in &self.aggregator.aggregated { for (key, val) in client_stats_manager.aggregated() {
// print global aggregated custom stats // print global aggregated custom stats
write!(global_fmt, ", {key}: {val}").unwrap(); write!(global_fmt, ", {key}: {val}").unwrap();
#[expect(clippy::cast_precision_loss)] #[expect(clippy::cast_precision_loss)]
@ -234,8 +226,8 @@ where
// Client-specific metrics // Client-specific metrics
self.client_stats_insert(sender_id); client_stats_manager.client_stats_insert(sender_id);
let client = self.client_stats_for(sender_id); let client = client_stats_manager.client_stats_for(sender_id);
let mut cur_client_clone = client.clone(); let mut cur_client_clone = client.clone();
self.prometheus_client_stats self.prometheus_client_stats
@ -297,7 +289,7 @@ where
cur_client_clone.execs_per_sec_pretty(current_time()) cur_client_clone.execs_per_sec_pretty(current_time())
); );
for (key, val) in cur_client_clone.user_monitor { for (key, val) in cur_client_clone.user_stats {
// print the custom stats for each client // print the custom stats for each client
write!(fmt, ", {key}: {val}").unwrap(); write!(fmt, ", {key}: {val}").unwrap();
// Update metrics added to the user_stats hashmap by feedback event-fires // Update metrics added to the user_stats hashmap by feedback event-fires
@ -352,7 +344,6 @@ where
let prometheus_global_stats_clone = prometheus_global_stats.clone(); let prometheus_global_stats_clone = prometheus_global_stats.clone();
let prometheus_client_stats = PrometheusStats::default(); let prometheus_client_stats = PrometheusStats::default();
let prometheus_client_stats_clone = prometheus_client_stats.clone(); let prometheus_client_stats_clone = prometheus_client_stats.clone();
let client_stats = Vec::<ClientStats>::default();
// Need to run the metrics server in a different thread to avoid blocking // Need to run the metrics server in a different thread to avoid blocking
thread::spawn(move || { thread::spawn(move || {
@ -369,8 +360,6 @@ where
start_time: current_time(), start_time: current_time(),
prometheus_global_stats, prometheus_global_stats,
prometheus_client_stats, prometheus_client_stats,
client_stats,
aggregator: Aggregator::new(),
} }
} }
/// Creates the monitor with a given `start_time`. /// Creates the monitor with a given `start_time`.
@ -379,7 +368,6 @@ where
let prometheus_global_stats_clone = prometheus_global_stats.clone(); let prometheus_global_stats_clone = prometheus_global_stats.clone();
let prometheus_client_stats = PrometheusStats::default(); let prometheus_client_stats = PrometheusStats::default();
let prometheus_client_stats_clone = prometheus_client_stats.clone(); let prometheus_client_stats_clone = prometheus_client_stats.clone();
let client_stats = Vec::<ClientStats>::default();
thread::spawn(move || { thread::spawn(move || {
block_on(serve_metrics( block_on(serve_metrics(
@ -395,8 +383,6 @@ where
start_time, start_time,
prometheus_global_stats, prometheus_global_stats,
prometheus_client_stats, prometheus_client_stats,
client_stats,
aggregator: Aggregator::new(),
} }
} }
} }

View File

@ -29,8 +29,15 @@ use serde_json::Value;
use typed_builder::TypedBuilder; use typed_builder::TypedBuilder;
#[cfg(feature = "introspection")] #[cfg(feature = "introspection")]
use super::{ClientPerfMonitor, PerfFeature}; use crate::statistics::perf_stats::{ClientPerfStats, PerfFeature};
use crate::monitors::{Aggregator, AggregatorOps, ClientStats, Monitor, UserStats, UserStatsValue}; use crate::{
monitors::Monitor,
statistics::{
manager::ClientStatsManager,
user_stats::{AggregatorOps, UserStats, UserStatsValue},
ClientStats,
},
};
#[expect(missing_docs)] #[expect(missing_docs)]
pub mod ui; pub mod ui;
@ -144,7 +151,7 @@ pub struct PerfTuiContext {
impl PerfTuiContext { impl PerfTuiContext {
/// Get the data for performance metrics /// Get the data for performance metrics
#[expect(clippy::cast_precision_loss)] #[expect(clippy::cast_precision_loss)]
pub fn grab_data(&mut self, m: &ClientPerfMonitor) { pub fn grab_data(&mut self, m: &ClientPerfStats) {
// Calculate the elapsed time from the monitor // Calculate the elapsed time from the monitor
let elapsed: f64 = m.elapsed_cycles() as f64; let elapsed: f64 = m.elapsed_cycles() as f64;
@ -315,7 +322,7 @@ impl ClientTuiContext {
.map_or("0%".to_string(), ToString::to_string); .map_or("0%".to_string(), ToString::to_string);
self.item_geometry.stability = stability; self.item_geometry.stability = stability;
for (key, val) in &client.user_monitor { for (key, val) in &client.user_stats {
self.user_stats.insert(key.clone(), val.clone()); self.user_stats.insert(key.clone(), val.clone());
} }
} }
@ -388,8 +395,6 @@ pub struct TuiMonitor {
pub(crate) context: Arc<RwLock<TuiContext>>, pub(crate) context: Arc<RwLock<TuiContext>>,
start_time: Duration, start_time: Duration,
client_stats: Vec<ClientStats>,
aggregator: Aggregator,
} }
impl From<TuiMonitorConfig> for TuiMonitor { impl From<TuiMonitorConfig> for TuiMonitor {
@ -403,20 +408,6 @@ impl From<TuiMonitorConfig> for TuiMonitor {
} }
impl Monitor for TuiMonitor { impl Monitor for TuiMonitor {
/// The client monitor, mutable
/// This also includes disabled "padding" clients.
/// Results should be filtered by `.enabled`.
fn client_stats_mut(&mut self) -> &mut Vec<ClientStats> {
&mut self.client_stats
}
/// The client monitor
/// This also includes disabled "padding" clients.
/// Results should be filtered by `.enabled`.
fn client_stats(&self) -> &[ClientStats] {
&self.client_stats
}
/// Time this fuzzing run stated /// Time this fuzzing run stated
fn start_time(&self) -> Duration { fn start_time(&self) -> Duration {
self.start_time self.start_time
@ -428,35 +419,41 @@ impl Monitor for TuiMonitor {
} }
#[expect(clippy::cast_sign_loss)] #[expect(clippy::cast_sign_loss)]
fn display(&mut self, event_msg: &str, sender_id: ClientId) { fn display(
&mut self,
client_stats_manager: &mut ClientStatsManager,
event_msg: &str,
sender_id: ClientId,
) {
let cur_time = current_time(); let cur_time = current_time();
{ {
// TODO implement floating-point support for TimedStat // TODO implement floating-point support for TimedStat
let execsec = self.execs_per_sec() as u64; let execsec = client_stats_manager.execs_per_sec() as u64;
let totalexec = self.total_execs(); let totalexec = client_stats_manager.total_execs();
let run_time = cur_time - self.start_time; let run_time = cur_time - self.start_time;
let total_process_timing = self.process_timing(); let total_process_timing = self.process_timing(client_stats_manager);
let mut ctx = self.context.write().unwrap(); let mut ctx = self.context.write().unwrap();
ctx.total_process_timing = total_process_timing; ctx.total_process_timing = total_process_timing;
ctx.corpus_size_timed.add(run_time, self.corpus_size()); ctx.corpus_size_timed
.add(run_time, client_stats_manager.corpus_size());
ctx.objective_size_timed ctx.objective_size_timed
.add(run_time, self.objective_size()); .add(run_time, client_stats_manager.objective_size());
ctx.execs_per_sec_timed.add(run_time, execsec); ctx.execs_per_sec_timed.add(run_time, execsec);
ctx.total_execs = totalexec; ctx.total_execs = totalexec;
ctx.clients_num = self.client_stats.len(); ctx.clients_num = client_stats_manager.client_stats().len();
ctx.total_map_density = self.map_density(); ctx.total_map_density = get_map_density(client_stats_manager);
ctx.total_solutions = self.objective_size(); ctx.total_solutions = client_stats_manager.objective_size();
ctx.total_cycles_done = 0; ctx.total_cycles_done = 0;
ctx.total_corpus_count = self.corpus_size(); ctx.total_corpus_count = client_stats_manager.corpus_size();
ctx.total_item_geometry = self.item_geometry(); ctx.total_item_geometry = get_item_geometry(client_stats_manager);
} }
self.client_stats_insert(sender_id); client_stats_manager.client_stats_insert(sender_id);
let exec_sec = let exec_sec = client_stats_manager
self.update_client_stats_for(sender_id, |client| client.execs_per_sec_pretty(cur_time)); .update_client_stats_for(sender_id, |client| client.execs_per_sec_pretty(cur_time));
let client = self.client_stats_for(sender_id); let client = client_stats_manager.client_stats_for(sender_id);
let sender = format!("#{}", sender_id.0); let sender = format!("#{}", sender_id.0);
let pad = if event_msg.len() + sender.len() < 13 { let pad = if event_msg.len() + sender.len() < 13 {
@ -469,15 +466,15 @@ impl Monitor for TuiMonitor {
"[{}] corpus: {}, objectives: {}, executions: {}, exec/sec: {}", "[{}] corpus: {}, objectives: {}, executions: {}, exec/sec: {}",
head, client.corpus_size, client.objective_size, client.executions, exec_sec head, client.corpus_size, client.objective_size, client.executions, exec_sec
); );
for (key, val) in &client.user_monitor { for (key, val) in &client.user_stats {
write!(fmt, ", {key}: {val}").unwrap(); write!(fmt, ", {key}: {val}").unwrap();
} }
for (key, val) in &self.aggregator.aggregated { for (key, val) in client_stats_manager.aggregated() {
write!(fmt, ", {key}: {val}").unwrap(); write!(fmt, ", {key}: {val}").unwrap();
} }
{ {
let client = &self.client_stats()[sender_id.0 as usize]; let client = &client_stats_manager.client_stats()[sender_id.0 as usize];
let mut ctx = self.context.write().unwrap(); let mut ctx = self.context.write().unwrap();
ctx.clients ctx.clients
.entry(sender_id.0 as usize) .entry(sender_id.0 as usize)
@ -492,21 +489,22 @@ impl Monitor for TuiMonitor {
#[cfg(feature = "introspection")] #[cfg(feature = "introspection")]
{ {
// Print the client performance monitor. Skip the Client IDs that have never sent anything. // Print the client performance monitor. Skip the Client IDs that have never sent anything.
for (i, client) in self.client_stats.iter().filter(|x| x.enabled).enumerate() { for (i, client) in client_stats_manager
.client_stats()
.iter()
.filter(|x| x.enabled)
.enumerate()
{
self.context self.context
.write() .write()
.unwrap() .unwrap()
.introspection .introspection
.entry(i + 1) .entry(i + 1)
.or_default() .or_default()
.grab_data(&client.introspection_monitor); .grab_data(&client.introspection_stats);
} }
} }
} }
fn aggregate(&mut self, name: &str) {
self.aggregator.aggregate(name, &self.client_stats);
}
} }
impl TuiMonitor { impl TuiMonitor {
@ -570,71 +568,20 @@ impl TuiMonitor {
Self { Self {
context, context,
start_time, start_time,
client_stats: vec![],
aggregator: Aggregator::new(),
} }
} }
fn map_density(&self) -> String { fn process_timing(&self, client_stats_manager: &mut ClientStatsManager) -> ProcessTiming {
self.client_stats()
.iter()
.filter(|client| client.enabled)
.filter_map(|client| client.get_user_stats("edges"))
.map(ToString::to_string)
.fold("0%".to_string(), cmp::max)
}
fn item_geometry(&self) -> ItemGeometry {
let mut total_item_geometry = ItemGeometry::new();
if self.client_stats.len() < 2 {
return total_item_geometry;
}
let mut ratio_a: u64 = 0;
let mut ratio_b: u64 = 0;
for client in self.client_stats().iter().filter(|client| client.enabled) {
let afl_stats = client
.get_user_stats("AflStats")
.map_or("None".to_string(), ToString::to_string);
let stability = client.get_user_stats("stability").map_or(
UserStats::new(UserStatsValue::Ratio(0, 100), AggregatorOps::Avg),
Clone::clone,
);
if afl_stats != "None" {
let default_json = serde_json::json!({
"pending": 0,
"pend_fav": 0,
"imported": 0,
"own_finds": 0,
});
let afl_stats_json: Value =
serde_json::from_str(afl_stats.as_str()).unwrap_or(default_json);
total_item_geometry.pending +=
afl_stats_json["pending"].as_u64().unwrap_or_default();
total_item_geometry.pend_fav +=
afl_stats_json["pend_fav"].as_u64().unwrap_or_default();
total_item_geometry.own_finds +=
afl_stats_json["own_finds"].as_u64().unwrap_or_default();
total_item_geometry.imported +=
afl_stats_json["imported"].as_u64().unwrap_or_default();
}
if let UserStatsValue::Ratio(a, b) = stability.value() {
ratio_a += a;
ratio_b += b;
}
}
total_item_geometry.stability = format!("{}%", ratio_a * 100 / ratio_b);
total_item_geometry
}
fn process_timing(&mut self) -> ProcessTiming {
let mut total_process_timing = ProcessTiming::new(); let mut total_process_timing = ProcessTiming::new();
total_process_timing.exec_speed = self.execs_per_sec_pretty(); total_process_timing.exec_speed = client_stats_manager.execs_per_sec_pretty();
if self.client_stats.len() > 1 { if client_stats_manager.client_stats().len() > 1 {
let mut new_path_time = Duration::default(); let mut new_path_time = Duration::default();
let mut new_objectives_time = Duration::default(); let mut new_objectives_time = Duration::default();
for client in self.client_stats().iter().filter(|client| client.enabled) { for client in client_stats_manager
.client_stats()
.iter()
.filter(|client| client.enabled)
{
new_path_time = client.last_corpus_time.max(new_path_time); new_path_time = client.last_corpus_time.max(new_path_time);
new_objectives_time = client.last_objective_time.max(new_objectives_time); new_objectives_time = client.last_objective_time.max(new_objectives_time);
} }
@ -649,6 +596,61 @@ impl TuiMonitor {
} }
} }
fn get_map_density(client_stats_manager: &ClientStatsManager) -> String {
client_stats_manager
.client_stats()
.iter()
.filter(|client| client.enabled)
.filter_map(|client| client.get_user_stats("edges"))
.map(ToString::to_string)
.fold("0%".to_string(), cmp::max)
}
fn get_item_geometry(client_stats_manager: &ClientStatsManager) -> ItemGeometry {
let mut total_item_geometry = ItemGeometry::new();
if client_stats_manager.client_stats().len() < 2 {
return total_item_geometry;
}
let mut ratio_a: u64 = 0;
let mut ratio_b: u64 = 0;
for client in client_stats_manager
.client_stats()
.iter()
.filter(|client| client.enabled)
{
let afl_stats = client
.get_user_stats("AflStats")
.map_or("None".to_string(), ToString::to_string);
let stability = client.get_user_stats("stability").map_or(
UserStats::new(UserStatsValue::Ratio(0, 100), AggregatorOps::Avg),
Clone::clone,
);
if afl_stats != "None" {
let default_json = serde_json::json!({
"pending": 0,
"pend_fav": 0,
"imported": 0,
"own_finds": 0,
});
let afl_stats_json: Value =
serde_json::from_str(afl_stats.as_str()).unwrap_or(default_json);
total_item_geometry.pending += afl_stats_json["pending"].as_u64().unwrap_or_default();
total_item_geometry.pend_fav += afl_stats_json["pend_fav"].as_u64().unwrap_or_default();
total_item_geometry.own_finds +=
afl_stats_json["own_finds"].as_u64().unwrap_or_default();
total_item_geometry.imported += afl_stats_json["imported"].as_u64().unwrap_or_default();
}
if let UserStatsValue::Ratio(a, b) = stability.value() {
ratio_a += a;
ratio_b += b;
}
}
total_item_geometry.stability = format!("{}%", ratio_a * 100 / ratio_b);
total_item_geometry
}
fn run_tui_thread<W: Write + Send + Sync + 'static>( fn run_tui_thread<W: Write + Send + Sync + 'static>(
context: Arc<RwLock<TuiContext>>, context: Arc<RwLock<TuiContext>>,
tick_rate: Duration, tick_rate: Duration,

View File

@ -26,12 +26,12 @@ use crate::{
corpus::{Corpus, HasCurrentCorpusId, SchedulerTestcaseMetadata, Testcase}, corpus::{Corpus, HasCurrentCorpusId, SchedulerTestcaseMetadata, Testcase},
events::{Event, EventFirer}, events::{Event, EventFirer},
executors::HasObservers, executors::HasObservers,
monitors::{AggregatorOps, UserStats, UserStatsValue},
mutators::Tokens, mutators::Tokens,
observers::MapObserver, observers::MapObserver,
schedulers::{minimizer::IsFavoredMetadata, HasQueueCycles}, schedulers::{minimizer::IsFavoredMetadata, HasQueueCycles},
stages::{calibrate::UnstableEntriesMetadata, Stage}, stages::{calibrate::UnstableEntriesMetadata, Stage},
state::{HasCorpus, HasExecutions, HasImported, HasStartTime, Stoppable}, state::{HasCorpus, HasExecutions, HasImported, HasStartTime, Stoppable},
statistics::user_stats::{AggregatorOps, UserStats, UserStatsValue},
std::string::ToString, std::string::ToString,
Error, HasMetadata, HasNamedMetadata, HasScheduler, Error, HasMetadata, HasNamedMetadata, HasScheduler,
}; };

View File

@ -19,11 +19,11 @@ use crate::{
feedbacks::{map::MapFeedbackMetadata, HasObserverHandle}, feedbacks::{map::MapFeedbackMetadata, HasObserverHandle},
fuzzer::Evaluator, fuzzer::Evaluator,
inputs::Input, inputs::Input,
monitors::{AggregatorOps, UserStats, UserStatsValue},
observers::{MapObserver, ObserversTuple}, observers::{MapObserver, ObserversTuple},
schedulers::powersched::SchedulerMetadata, schedulers::powersched::SchedulerMetadata,
stages::{RetryCountRestartHelper, Stage}, stages::{RetryCountRestartHelper, Stage},
state::{HasCorpus, HasCurrentTestcase, HasExecutions}, state::{HasCorpus, HasCurrentTestcase, HasExecutions},
statistics::user_stats::{AggregatorOps, UserStats, UserStatsValue},
Error, HasMetadata, HasNamedMetadata, Error, HasMetadata, HasNamedMetadata,
}; };

View File

@ -13,7 +13,7 @@ use libafl_bolts::{
}; };
#[cfg(all(feature = "concolic_mutation", feature = "introspection"))] #[cfg(all(feature = "concolic_mutation", feature = "introspection"))]
use crate::monitors::PerfFeature; use crate::statistics::perf_stats::PerfFeature;
use crate::{ use crate::{
corpus::HasCurrentCorpusId, corpus::HasCurrentCorpusId,
executors::{Executor, HasObservers}, executors::{Executor, HasObservers},

View File

@ -12,7 +12,7 @@ use libafl_bolts::{
}; };
#[cfg(feature = "introspection")] #[cfg(feature = "introspection")]
use crate::monitors::PerfFeature; use crate::statistics::perf_stats::PerfFeature;
use crate::{ use crate::{
corpus::{Corpus, HasCurrentCorpusId}, corpus::{Corpus, HasCurrentCorpusId},
executors::{Executor, HasObservers}, executors::{Executor, HasObservers},

View File

@ -10,7 +10,7 @@ use core::{marker::PhantomData, num::NonZeroUsize};
use libafl_bolts::{rands::Rand, Named}; use libafl_bolts::{rands::Rand, Named};
#[cfg(feature = "introspection")] #[cfg(feature = "introspection")]
use crate::monitors::PerfFeature; use crate::statistics::perf_stats::PerfFeature;
use crate::{ use crate::{
corpus::{Corpus, CorpusId, HasCurrentCorpusId, Testcase}, corpus::{Corpus, CorpusId, HasCurrentCorpusId, Testcase},
fuzzer::Evaluator, fuzzer::Evaluator,
@ -166,7 +166,7 @@ where
let ret = self.perform_mutational(fuzzer, executor, state, manager); let ret = self.perform_mutational(fuzzer, executor, state, manager);
#[cfg(feature = "introspection")] #[cfg(feature = "introspection")]
state.introspection_monitor_mut().finish_stage(); state.introspection_stats_mut().finish_stage();
ret ret
} }

View File

@ -9,7 +9,7 @@ use core::{fmt::Debug, marker::PhantomData};
use libafl_bolts::Named; use libafl_bolts::Named;
#[cfg(feature = "introspection")] #[cfg(feature = "introspection")]
use crate::monitors::PerfFeature; use crate::statistics::perf_stats::PerfFeature;
use crate::{ use crate::{
corpus::HasCurrentCorpusId, corpus::HasCurrentCorpusId,
executors::{Executor, HasObservers}, executors::{Executor, HasObservers},

View File

@ -12,7 +12,7 @@ use serde::Serialize;
use super::{PushStage, PushStageHelper, PushStageSharedState}; use super::{PushStage, PushStageHelper, PushStageSharedState};
#[cfg(feature = "introspection")] #[cfg(feature = "introspection")]
use crate::monitors::PerfFeature; use crate::statistics::perf_stats::PerfFeature;
use crate::{ use crate::{
corpus::{Corpus, CorpusId}, corpus::{Corpus, CorpusId},
events::{EventFirer, ProgressReporter}, events::{EventFirer, ProgressReporter},

View File

@ -152,7 +152,7 @@ where
} }
#[cfg(feature = "introspection")] #[cfg(feature = "introspection")]
state.introspection_monitor_mut().finish_stage(); state.introspection_stats_mut().finish_stage();
Ok(()) Ok(())
} }
@ -315,7 +315,7 @@ where
self.client.process(fuzzer, state, executor, manager)?; self.client.process(fuzzer, state, executor, manager)?;
#[cfg(feature = "introspection")] #[cfg(feature = "introspection")]
state.introspection_monitor_mut().finish_stage(); state.introspection_stats_mut().finish_stage();
Ok(()) Ok(())
} }
} }

View File

@ -17,7 +17,7 @@ use serde::Serialize;
#[cfg(feature = "track_hit_feedbacks")] #[cfg(feature = "track_hit_feedbacks")]
use crate::feedbacks::premature_last_result_err; use crate::feedbacks::premature_last_result_err;
#[cfg(feature = "introspection")] #[cfg(feature = "introspection")]
use crate::monitors::PerfFeature; use crate::statistics::perf_stats::PerfFeature;
use crate::{ use crate::{
corpus::{Corpus, HasCurrentCorpusId, Testcase}, corpus::{Corpus, HasCurrentCorpusId, Testcase},
events::EventFirer, events::EventFirer,
@ -100,7 +100,7 @@ where
self.perform_minification(fuzzer, executor, state, manager)?; self.perform_minification(fuzzer, executor, state, manager)?;
#[cfg(feature = "introspection")] #[cfg(feature = "introspection")]
state.introspection_monitor_mut().finish_stage(); state.introspection_stats_mut().finish_stage();
Ok(()) Ok(())
} }

View File

@ -9,7 +9,7 @@ use core::{fmt::Debug, marker::PhantomData};
use libafl_bolts::Named; use libafl_bolts::Named;
#[cfg(feature = "introspection")] #[cfg(feature = "introspection")]
use crate::monitors::PerfFeature; use crate::statistics::perf_stats::PerfFeature;
use crate::{ use crate::{
corpus::HasCurrentCorpusId, corpus::HasCurrentCorpusId,
executors::{Executor, HasObservers, ShadowExecutor}, executors::{Executor, HasObservers, ShadowExecutor},

View File

@ -7,7 +7,7 @@ use libafl_bolts::{current_time, impl_serdeany, rands::Rand};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
#[cfg(feature = "introspection")] #[cfg(feature = "introspection")]
use crate::monitors::PerfFeature; use crate::statistics::perf_stats::PerfFeature;
use crate::{ use crate::{
mark_feature_time, mark_feature_time,
mutators::{MutationResult, Mutator}, mutators::{MutationResult, Mutator},
@ -213,7 +213,7 @@ where
let ret = self.perform_mutational(fuzzer, executor, state, manager); let ret = self.perform_mutational(fuzzer, executor, state, manager);
#[cfg(feature = "introspection")] #[cfg(feature = "introspection")]
state.introspection_monitor_mut().finish_stage(); state.introspection_stats_mut().finish_stage();
ret ret
} }

View File

@ -27,7 +27,7 @@ mod stack;
pub use stack::StageStack; pub use stack::StageStack;
#[cfg(feature = "introspection")] #[cfg(feature = "introspection")]
use crate::monitors::ClientPerfMonitor; use crate::statistics::perf_stats::ClientPerfStats;
use crate::{ use crate::{
corpus::{Corpus, CorpusId, HasCurrentCorpusId, HasTestcase, InMemoryCorpus, Testcase}, corpus::{Corpus, CorpusId, HasCurrentCorpusId, HasTestcase, InMemoryCorpus, Testcase},
events::{Event, EventFirer, LogSeverity}, events::{Event, EventFirer, LogSeverity},
@ -109,13 +109,13 @@ pub trait HasRand {
} }
#[cfg(feature = "introspection")] #[cfg(feature = "introspection")]
/// Trait for offering a [`ClientPerfMonitor`] /// Trait for offering a [`ClientPerfStats`]
pub trait HasClientPerfMonitor { pub trait HasClientPerfMonitor {
/// [`ClientPerfMonitor`] itself /// [`ClientPerfStats`] itself
fn introspection_monitor(&self) -> &ClientPerfMonitor; fn introspection_stats(&self) -> &ClientPerfStats;
/// Mutatable ref to [`ClientPerfMonitor`] /// Mutatable ref to [`ClientPerfStats`]
fn introspection_monitor_mut(&mut self) -> &mut ClientPerfMonitor; fn introspection_stats_mut(&mut self) -> &mut ClientPerfStats;
} }
/// Intermediate trait for `HasClientPerfMonitor` /// Intermediate trait for `HasClientPerfMonitor`
@ -225,7 +225,7 @@ pub struct StdState<C, I, R, SC> {
max_size: usize, max_size: usize,
/// Performance statistics for this fuzzer /// Performance statistics for this fuzzer
#[cfg(feature = "introspection")] #[cfg(feature = "introspection")]
introspection_monitor: ClientPerfMonitor, introspection_stats: ClientPerfStats,
#[cfg(feature = "std")] #[cfg(feature = "std")]
/// Remaining initial inputs to load, if any /// Remaining initial inputs to load, if any
remaining_initial_files: Option<Vec<PathBuf>>, remaining_initial_files: Option<Vec<PathBuf>>,
@ -1104,7 +1104,7 @@ where
max_size: DEFAULT_MAX_SIZE, max_size: DEFAULT_MAX_SIZE,
stop_requested: false, stop_requested: false,
#[cfg(feature = "introspection")] #[cfg(feature = "introspection")]
introspection_monitor: ClientPerfMonitor::new(), introspection_stats: ClientPerfStats::new(),
#[cfg(feature = "std")] #[cfg(feature = "std")]
remaining_initial_files: None, remaining_initial_files: None,
#[cfg(feature = "std")] #[cfg(feature = "std")]
@ -1142,12 +1142,12 @@ impl StdState<InMemoryCorpus<NopInput>, NopInput, StdRand, InMemoryCorpus<NopInp
#[cfg(feature = "introspection")] #[cfg(feature = "introspection")]
impl<C, I, R, SC> HasClientPerfMonitor for StdState<C, I, R, SC> { impl<C, I, R, SC> HasClientPerfMonitor for StdState<C, I, R, SC> {
fn introspection_monitor(&self) -> &ClientPerfMonitor { fn introspection_stats(&self) -> &ClientPerfStats {
&self.introspection_monitor &self.introspection_stats
} }
fn introspection_monitor_mut(&mut self) -> &mut ClientPerfMonitor { fn introspection_stats_mut(&mut self) -> &mut ClientPerfStats {
&mut self.introspection_monitor &mut self.introspection_stats
} }
} }
@ -1295,11 +1295,11 @@ impl<I> HasCurrentStageId for NopState<I> {
#[cfg(feature = "introspection")] #[cfg(feature = "introspection")]
impl<I> HasClientPerfMonitor for NopState<I> { impl<I> HasClientPerfMonitor for NopState<I> {
fn introspection_monitor(&self) -> &ClientPerfMonitor { fn introspection_stats(&self) -> &ClientPerfStats {
unimplemented!(); unimplemented!();
} }
fn introspection_monitor_mut(&mut self) -> &mut ClientPerfMonitor { fn introspection_stats_mut(&mut self) -> &mut ClientPerfStats {
unimplemented!(); unimplemented!();
} }
} }

View File

@ -0,0 +1,150 @@
//! Client statistics manager
use alloc::{string::String, vec::Vec};
use core::time::Duration;
use hashbrown::HashMap;
use libafl_bolts::{current_time, ClientId};
use serde::{Deserialize, Serialize};
use super::{user_stats::UserStatsValue, ClientStats};
/// Manager of all client's statistics
#[derive(Serialize, Deserialize, Debug)]
pub struct ClientStatsManager {
client_stats: Vec<ClientStats>,
/// Aggregated user stats value.
///
/// This map is updated by event manager, and is read by monitors to display user-defined stats.
pub(super) cached_aggregated_user_stats: HashMap<String, UserStatsValue>,
}
impl ClientStatsManager {
/// Create a new client stats manager
#[must_use]
pub fn new() -> Self {
Self {
client_stats: vec![],
cached_aggregated_user_stats: HashMap::new(),
}
}
/// Get all client stats
#[must_use]
pub fn client_stats(&self) -> &[ClientStats] {
&self.client_stats
}
/// Get all client stats
pub fn client_stats_mut(&mut self) -> &mut Vec<ClientStats> {
&mut self.client_stats
}
/// Amount of elements in the corpus (combined for all children)
#[must_use]
pub fn corpus_size(&self) -> u64 {
self.client_stats()
.iter()
.fold(0_u64, |acc, x| acc + x.corpus_size)
}
/// Count the number of enabled client stats
#[must_use]
pub fn client_stats_count(&self) -> usize {
self.client_stats()
.iter()
.filter(|client| client.enabled)
.count()
}
/// Amount of elements in the objectives (combined for all children)
#[must_use]
pub fn objective_size(&self) -> u64 {
self.client_stats()
.iter()
.fold(0_u64, |acc, x| acc + x.objective_size)
}
/// Total executions
#[inline]
#[must_use]
pub fn total_execs(&self) -> u64 {
self.client_stats()
.iter()
.fold(0_u64, |acc, x| acc + x.executions)
}
/// Executions per second
#[inline]
pub fn execs_per_sec(&mut self) -> f64 {
let cur_time = current_time();
self.client_stats_mut()
.iter_mut()
.fold(0.0, |acc, x| acc + x.execs_per_sec(cur_time))
}
/// Executions per second
pub fn execs_per_sec_pretty(&mut self) -> String {
super::prettify_float(self.execs_per_sec())
}
/// The client monitor for a specific id, creating new if it doesn't exist
pub fn client_stats_insert(&mut self, client_id: ClientId) {
let total_client_stat_count = self.client_stats().len();
for _ in total_client_stat_count..=(client_id.0) as usize {
self.client_stats_mut().push(ClientStats {
enabled: false,
last_window_time: Duration::from_secs(0),
start_time: Duration::from_secs(0),
..ClientStats::default()
});
}
self.update_client_stats_for(client_id, |new_stat| {
if !new_stat.enabled {
let timestamp = current_time();
// I have never seen this man in my life
new_stat.start_time = timestamp;
new_stat.last_window_time = timestamp;
new_stat.enabled = true;
}
});
}
/// Update sepecific client stats.
pub fn update_client_stats_for<T, F: FnOnce(&mut ClientStats) -> T>(
&mut self,
client_id: ClientId,
update: F,
) -> T {
let client_stat = &mut self.client_stats_mut()[client_id.0 as usize];
update(client_stat)
}
/// Update all client stats. This will clear all previous client stats, and fill in the new client stats.
pub fn update_all_client_stats(&mut self, new_client_stats: Vec<ClientStats>) {
*self.client_stats_mut() = new_client_stats;
}
/// Get immutable reference to client stats
#[must_use]
pub fn client_stats_for(&self, client_id: ClientId) -> &ClientStats {
&self.client_stats()[client_id.0 as usize]
}
/// Aggregate user-defined stats
pub fn aggregate(&mut self, name: &str) {
super::user_stats::aggregate_user_stats(self, name);
}
/// Get aggregated user-defined stats
#[must_use]
pub fn aggregated(&self) -> &HashMap<String, UserStatsValue> {
&self.cached_aggregated_user_stats
}
}
impl Default for ClientStatsManager {
fn default() -> Self {
Self::new()
}
}

View File

@ -0,0 +1,196 @@
//! Statistics used for Monitors to display.
pub mod manager;
#[cfg(feature = "introspection")]
pub mod perf_stats;
pub mod user_stats;
use alloc::{borrow::Cow, string::String};
use core::time::Duration;
use hashbrown::HashMap;
use libafl_bolts::current_time;
#[cfg(feature = "introspection")]
use perf_stats::ClientPerfStats;
use serde::{Deserialize, Serialize};
use user_stats::UserStats;
#[cfg(feature = "afl_exec_sec")]
const CLIENT_STATS_TIME_WINDOW_SECS: u64 = 5; // 5 seconds
/// A simple struct to keep track of client statistics
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct ClientStats {
/// If this client is enabled. This is set to `true` the first time we see this client.
pub enabled: bool,
// monitor (maybe we need a separated struct?)
/// The corpus size for this client
pub corpus_size: u64,
/// The time for the last update of the corpus size
pub last_corpus_time: Duration,
/// The total executions for this client
pub executions: u64,
/// The number of executions of the previous state in case a client decrease the number of execution (e.g when restarting without saving the state)
pub prev_state_executions: u64,
/// The size of the objectives corpus for this client
pub objective_size: u64,
/// The time for the last update of the objective size
pub last_objective_time: Duration,
/// The last reported executions for this client
#[cfg(feature = "afl_exec_sec")]
pub last_window_executions: u64,
/// The last executions per sec
#[cfg(feature = "afl_exec_sec")]
pub last_execs_per_sec: f64,
/// The last time we got this information
pub last_window_time: Duration,
/// the start time of the client
pub start_time: Duration,
/// User-defined stats
pub user_stats: HashMap<Cow<'static, str>, UserStats>,
/// Client performance statistics
#[cfg(feature = "introspection")]
pub introspection_stats: ClientPerfStats,
}
impl ClientStats {
/// We got new information about executions for this client, insert them.
#[cfg(feature = "afl_exec_sec")]
pub fn update_executions(&mut self, executions: u64, cur_time: Duration) {
let diff = cur_time
.checked_sub(self.last_window_time)
.map_or(0, |d| d.as_secs());
if diff > CLIENT_STATS_TIME_WINDOW_SECS {
let _: f64 = self.execs_per_sec(cur_time);
self.last_window_time = cur_time;
self.last_window_executions = self.executions;
}
if self.executions > self.prev_state_executions + executions {
// Something is strange here, sum the executions
self.prev_state_executions = self.executions;
}
self.executions = self.prev_state_executions + executions;
}
/// We got a new information about executions for this client, insert them.
#[cfg(not(feature = "afl_exec_sec"))]
pub fn update_executions(&mut self, executions: u64, _cur_time: Duration) {
if self.executions > self.prev_state_executions + executions {
// Something is strange here, sum the executions
self.prev_state_executions = self.executions;
}
self.executions = self.prev_state_executions + executions;
}
/// We got new information about corpus size for this client, insert them.
pub fn update_corpus_size(&mut self, corpus_size: u64) {
self.corpus_size = corpus_size;
self.last_corpus_time = current_time();
}
/// We got a new information about objective corpus size for this client, insert them.
pub fn update_objective_size(&mut self, objective_size: u64) {
self.objective_size = objective_size;
}
/// Get the calculated executions per second for this client
#[expect(clippy::cast_precision_loss, clippy::cast_sign_loss)]
#[cfg(feature = "afl_exec_sec")]
pub fn execs_per_sec(&mut self, cur_time: Duration) -> f64 {
if self.executions == 0 {
return 0.0;
}
let elapsed = cur_time
.checked_sub(self.last_window_time)
.map_or(0.0, |d| d.as_secs_f64());
if elapsed as u64 == 0 {
return self.last_execs_per_sec;
}
let cur_avg = ((self.executions - self.last_window_executions) as f64) / elapsed;
if self.last_window_executions == 0 {
self.last_execs_per_sec = cur_avg;
return self.last_execs_per_sec;
}
// If there is a dramatic (5x+) jump in speed, reset the indicator more quickly
if cur_avg * 5.0 < self.last_execs_per_sec || cur_avg / 5.0 > self.last_execs_per_sec {
self.last_execs_per_sec = cur_avg;
}
self.last_execs_per_sec =
self.last_execs_per_sec * (1.0 - 1.0 / 16.0) + cur_avg * (1.0 / 16.0);
self.last_execs_per_sec
}
/// Get the calculated executions per second for this client
#[expect(clippy::cast_precision_loss, clippy::cast_sign_loss)]
#[cfg(not(feature = "afl_exec_sec"))]
pub fn execs_per_sec(&mut self, cur_time: Duration) -> f64 {
if self.executions == 0 {
return 0.0;
}
let elapsed = cur_time
.checked_sub(self.last_window_time)
.map_or(0.0, |d| d.as_secs_f64());
if elapsed as u64 == 0 {
return 0.0;
}
(self.executions as f64) / elapsed
}
/// Executions per second
pub fn execs_per_sec_pretty(&mut self, cur_time: Duration) -> String {
prettify_float(self.execs_per_sec(cur_time))
}
/// Update the user-defined stat with name and value
pub fn update_user_stats(
&mut self,
name: Cow<'static, str>,
value: UserStats,
) -> Option<UserStats> {
self.user_stats.insert(name, value)
}
#[must_use]
/// Get a user-defined stat using the name
pub fn get_user_stats(&self, name: &str) -> Option<&UserStats> {
self.user_stats.get(name)
}
/// Update the current [`ClientPerfStats`] with the given [`ClientPerfStats`]
#[cfg(feature = "introspection")]
pub fn update_introspection_stats(&mut self, introspection_stats: ClientPerfStats) {
self.introspection_stats = introspection_stats;
}
}
/// Prettifies float values for human-readable output
fn prettify_float(value: f64) -> String {
let (value, suffix) = match value {
value if value >= 1_000_000.0 => (value / 1_000_000.0, "M"),
value if value >= 1_000.0 => (value / 1_000.0, "k"),
value => (value, ""),
};
match value {
value if value >= 1_000_000.0 => {
format!("{value:.2}{suffix}")
}
value if value >= 1_000.0 => {
format!("{value:.1}{suffix}")
}
value if value >= 100.0 => {
format!("{value:.1}{suffix}")
}
value if value >= 10.0 => {
format!("{value:.2}{suffix}")
}
value => {
format!("{value:.3}{suffix}")
}
}
}

View File

@ -0,0 +1,424 @@
//! Statistics related to introspection
use alloc::{string::String, vec::Vec};
use core::fmt;
use hashbrown::HashMap;
use serde::{Deserialize, Serialize};
/// Client performance statistics
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct ClientPerfStats {
/// Starting counter (in clock cycles from `read_time_counter`)
start_time: u64,
/// Current counter in the fuzzer (in clock cycles from `read_time_counter`
current_time: u64,
/// Clock cycles spent in the scheduler
scheduler: u64,
/// Clock cycles spent in the manager
manager: u64,
/// Current stage index to write the next stage benchmark time
curr_stage: u8,
/// Flag to dictate this stage is in use. Used during printing to not print the empty
/// stages if they are not in use.
stages_used: Vec<bool>,
/// Clock cycles spent in the the various features of each stage
stages: Vec<[u64; PerfFeature::Count as usize]>,
/// Clock cycles spent in each feedback mechanism of the fuzzer.
feedbacks: HashMap<String, u64>,
/// Current time set by `start_timer`
timer_start: Option<u64>,
}
/// Various features that are measured for performance
#[derive(Serialize, Deserialize, Debug, Clone)]
#[repr(u8)]
pub enum PerfFeature {
/// Getting an input from the corpus
GetInputFromCorpus = 0,
/// Mutating the input
Mutate = 1,
/// Post-Exec Mutator callback
MutatePostExec = 2,
/// Actual time spent executing the target
TargetExecution = 3,
/// Time spent in `pre_exec`
PreExec = 4,
/// Time spent in `post_exec`
PostExec = 5,
/// Time spent in `observer` `pre_exec_all`
PreExecObservers = 6,
/// Time spent in `executor.observers_mut().post_exec_all`
PostExecObservers = 7,
/// Time spent getting the feedback from `is_interesting` from all feedbacks
GetFeedbackInterestingAll = 8,
/// Time spent getting the feedback from `is_interesting` from all objectives
GetObjectivesInterestingAll = 9,
/// Used as a counter to know how many elements are in [`PerfFeature`]. Must be the
/// last value in the enum.
Count, // !! No more values here since Count is last! !!
// !! No more values here since Count is last! !!
}
// TryFromPrimitive requires `std` so these are implemented manually
impl From<PerfFeature> for usize {
fn from(val: PerfFeature) -> usize {
match val {
PerfFeature::GetInputFromCorpus => PerfFeature::GetInputFromCorpus as usize,
PerfFeature::Mutate => PerfFeature::Mutate as usize,
PerfFeature::MutatePostExec => PerfFeature::MutatePostExec as usize,
PerfFeature::TargetExecution => PerfFeature::TargetExecution as usize,
PerfFeature::PreExec => PerfFeature::PreExec as usize,
PerfFeature::PostExec => PerfFeature::PostExec as usize,
PerfFeature::PreExecObservers => PerfFeature::PreExecObservers as usize,
PerfFeature::PostExecObservers => PerfFeature::PostExecObservers as usize,
PerfFeature::GetFeedbackInterestingAll => {
PerfFeature::GetFeedbackInterestingAll as usize
}
PerfFeature::GetObjectivesInterestingAll => {
PerfFeature::GetObjectivesInterestingAll as usize
}
PerfFeature::Count => PerfFeature::Count as usize,
}
}
}
// TryFromPrimitive requires `std` so these are implemented manually
impl From<usize> for PerfFeature {
fn from(val: usize) -> PerfFeature {
match val {
0 => PerfFeature::GetInputFromCorpus,
1 => PerfFeature::Mutate,
2 => PerfFeature::MutatePostExec,
3 => PerfFeature::TargetExecution,
4 => PerfFeature::PreExec,
5 => PerfFeature::PostExec,
6 => PerfFeature::PreExecObservers,
7 => PerfFeature::PostExecObservers,
8 => PerfFeature::GetFeedbackInterestingAll,
9 => PerfFeature::GetObjectivesInterestingAll,
_ => panic!("Unknown PerfFeature: {val}"),
}
}
}
/// Number of features we can measure for performance
pub const NUM_PERF_FEATURES: usize = PerfFeature::Count as usize;
impl ClientPerfStats {
/// Create a blank [`ClientPerfStats`] with the `start_time` and `current_time` with
/// the current clock counter
#[must_use]
pub fn new() -> Self {
let start_time = libafl_bolts::cpu::read_time_counter();
Self {
start_time,
current_time: start_time,
scheduler: 0,
manager: 0,
curr_stage: 0,
stages: vec![],
stages_used: vec![],
feedbacks: HashMap::new(),
timer_start: None,
}
}
/// Set the current time with the given time
#[inline]
pub fn set_current_time(&mut self, time: u64) {
self.current_time = time;
}
/// Start a timer with the current time counter
#[inline]
pub fn start_timer(&mut self) {
self.timer_start = Some(libafl_bolts::cpu::read_time_counter());
}
/// Update the current [`ClientPerfStats`] with the given [`ClientPerfStats`]
pub fn update(&mut self, monitor: &ClientPerfStats) {
self.set_current_time(monitor.current_time);
self.update_scheduler(monitor.scheduler);
self.update_manager(monitor.manager);
self.update_stages(&monitor.stages);
self.update_feedbacks(&monitor.feedbacks);
}
/// Gets the elapsed time since the internal timer started. Resets the timer when
/// finished execution.
#[inline]
fn mark_time(&mut self) -> u64 {
match self.timer_start {
None => {
// Warning message if marking time without starting the timer first
log::warn!("Attempted to `mark_time` without starting timer first.");
// Return 0 for no time marked
0
}
Some(timer_start) => {
// Calculate the elapsed time
let elapsed = libafl_bolts::cpu::read_time_counter() - timer_start;
// Reset the timer
self.timer_start = None;
// Return the elapsed time
elapsed
}
}
}
/// Update the time spent in the scheduler with the elapsed time that we have seen
#[inline]
pub fn mark_scheduler_time(&mut self) {
// Get the current elapsed time
let elapsed = self.mark_time();
// Add the time to the scheduler stat
self.update_scheduler(elapsed);
}
/// Update the time spent in the scheduler with the elapsed time that we have seen
#[inline]
pub fn mark_manager_time(&mut self) {
// Get the current elapsed time
let elapsed = self.mark_time();
// Add the time the manager stat
self.update_manager(elapsed);
}
/// Update the time spent in the given [`PerfFeature`] with the elapsed time that we have seen
#[inline]
pub fn mark_feature_time(&mut self, feature: PerfFeature) {
// Get the current elapsed time
let elapsed = self.mark_time();
// Add the time the the given feature
self.update_feature(feature, elapsed);
}
/// Add the given `time` to the `scheduler` monitor
#[inline]
pub fn update_scheduler(&mut self, time: u64) {
self.scheduler = self
.scheduler
.checked_add(time)
.expect("update_scheduler overflow");
}
/// Add the given `time` to the `manager` monitor
#[inline]
pub fn update_manager(&mut self, time: u64) {
self.manager = self
.manager
.checked_add(time)
.expect("update_manager overflow");
}
/// Update the total stage counter and increment the stage counter for the next stage
#[inline]
pub fn finish_stage(&mut self) {
// Increment the stage to the next index. The check is only done if this were to
// be used past the length of the `self.stages` buffer
self.curr_stage += 1;
}
/// Reset the stage index counter to zero
#[inline]
pub fn reset_stage_index(&mut self) {
self.curr_stage = 0;
}
/// Update the time spent in the feedback
pub fn update_feedback(&mut self, name: &str, time: u64) {
self.feedbacks.insert(
name.into(),
self.feedbacks
.get(name)
.unwrap_or(&0)
.checked_add(time)
.expect("update_feedback overflow"),
);
}
/// Update the time spent in all the feedbacks
pub fn update_feedbacks(&mut self, feedbacks: &HashMap<String, u64>) {
for (key, value) in feedbacks {
self.update_feedback(key, *value);
}
}
/// Update the time spent in the stages
pub fn update_stages(&mut self, stages: &[[u64; PerfFeature::Count as usize]]) {
if self.stages.len() < stages.len() {
self.stages
.resize(stages.len(), [0; PerfFeature::Count as usize]);
self.stages_used.resize(stages.len(), false);
}
for (stage_index, features) in stages.iter().enumerate() {
for (feature_index, feature) in features.iter().enumerate() {
self.stages[stage_index][feature_index] = self.stages[stage_index][feature_index]
.checked_add(*feature)
.expect("Stage overflow");
}
}
}
/// Update the given [`PerfFeature`] with the given `time`
pub fn update_feature(&mut self, feature: PerfFeature, time: u64) {
// Get the current stage index as `usize`
let stage_index: usize = self.curr_stage.into();
// Get the index of the given feature
let feature_index: usize = feature.into();
if stage_index >= self.stages.len() {
self.stages
.resize(stage_index + 1, [0; PerfFeature::Count as usize]);
self.stages_used.resize(stage_index + 1, false);
}
// Update the given feature
self.stages[stage_index][feature_index] = self.stages[stage_index][feature_index]
.checked_add(time)
.expect("Stage overflow");
// Set that the current stage is being used
self.stages_used[stage_index] = true;
}
/// The elapsed cycles (or time)
#[must_use]
pub fn elapsed_cycles(&self) -> u64 {
self.current_time - self.start_time
}
/// The amount of cycles the `manager` did
#[must_use]
pub fn manager_cycles(&self) -> u64 {
self.manager
}
/// The amount of cycles the `scheduler` did
#[must_use]
pub fn scheduler_cycles(&self) -> u64 {
self.scheduler
}
/// Iterator over all used stages
pub fn used_stages(
&self,
) -> impl Iterator<Item = (usize, &[u64; PerfFeature::Count as usize])> {
let used = self.stages_used.clone();
self.stages
.iter()
.enumerate()
.filter(move |(stage_index, _)| used[*stage_index])
}
/// A map of all `feedbacks`
#[must_use]
pub fn feedbacks(&self) -> &HashMap<String, u64> {
&self.feedbacks
}
}
impl fmt::Display for ClientPerfStats {
#[expect(clippy::cast_precision_loss)]
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
// Calculate the elapsed time from the monitor
let elapsed: f64 = self.elapsed_cycles() as f64;
// Calculate the percentages for each benchmark
let scheduler_percent = self.scheduler as f64 / elapsed;
let manager_percent = self.manager as f64 / elapsed;
// Calculate the remaining percentage that has not been benchmarked
let mut other_percent = 1.0;
other_percent -= scheduler_percent;
other_percent -= manager_percent;
// Create the formatted string
writeln!(
f,
" {scheduler_percent:6.4}: Scheduler\n {manager_percent:6.4}: Manager"
)?;
// Calculate each stage
// Make sure we only iterate over used stages
for (stage_index, features) in self.used_stages() {
// Write the stage header
writeln!(f, " Stage {stage_index}:")?;
for (feature_index, feature) in features.iter().enumerate() {
// Calculate this current stage's percentage
let feature_percent = *feature as f64 / elapsed;
// Ignore this feature if it isn't used
if feature_percent == 0.0 {
continue;
}
// Update the other percent by removing this current percent
other_percent -= feature_percent;
// Get the actual feature from the feature index for printing its name
let feature: PerfFeature = feature_index.into();
// Write the percentage for this feature
writeln!(f, " {feature_percent:6.4}: {feature:?}")?;
}
}
writeln!(f, " Feedbacks:")?;
for (feedback_name, feedback_time) in self.feedbacks() {
// Calculate this current stage's percentage
let feedback_percent = *feedback_time as f64 / elapsed;
// Ignore this feedback if it isn't used
if feedback_percent == 0.0 {
continue;
}
// Update the other percent by removing this current percent
other_percent -= feedback_percent;
// Write the percentage for this feedback
writeln!(f, " {feedback_percent:6.4}: {feedback_name}")?;
}
write!(f, " {other_percent:6.4}: Not Measured")?;
Ok(())
}
}
impl Default for ClientPerfStats {
#[must_use]
fn default() -> Self {
Self::new()
}
}

View File

@ -0,0 +1,123 @@
//! User-defined statistics
mod user_stats_value;
use alloc::string::ToString;
use core::fmt;
use serde::{Deserialize, Serialize};
pub use user_stats_value::*;
use super::manager::ClientStatsManager;
/// user defined stats enum
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct UserStats {
value: UserStatsValue,
aggregator_op: AggregatorOps,
}
impl UserStats {
/// Get the `AggregatorOps`
#[must_use]
pub fn aggregator_op(&self) -> &AggregatorOps {
&self.aggregator_op
}
/// Get the actual value for the stats
#[must_use]
pub fn value(&self) -> &UserStatsValue {
&self.value
}
/// Constructor
#[must_use]
pub fn new(value: UserStatsValue, aggregator_op: AggregatorOps) -> Self {
Self {
value,
aggregator_op,
}
}
}
impl fmt::Display for UserStats {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.value())
}
}
/// Definition of how we aggregate this across multiple clients
#[derive(Serialize, Deserialize, Debug, Clone, Copy, Eq, PartialEq)]
pub enum AggregatorOps {
/// Do nothing
None,
/// Add stats up
Sum,
/// Average stats out
Avg,
/// Get the min
Min,
/// Get the max
Max,
}
/// Aggregate user statistics according to their ops
pub(super) fn aggregate_user_stats(client_stats_manager: &mut ClientStatsManager, name: &str) {
let mut gather = client_stats_manager
.client_stats()
.iter()
.filter_map(|client| client.user_stats.get(name));
let gather_count = gather.clone().count();
let (mut init, op) = match gather.next() {
Some(x) => (x.value().clone(), *x.aggregator_op()),
_ => {
return;
}
};
for item in gather {
match op {
AggregatorOps::None => {
// Nothing
return;
}
AggregatorOps::Avg | AggregatorOps::Sum => {
init = match init.stats_add(item.value()) {
Some(x) => x,
_ => {
return;
}
};
}
AggregatorOps::Min => {
init = match init.stats_min(item.value()) {
Some(x) => x,
_ => {
return;
}
};
}
AggregatorOps::Max => {
init = match init.stats_max(item.value()) {
Some(x) => x,
_ => {
return;
}
};
}
}
}
if let AggregatorOps::Avg = op {
// if avg then divide last.
init = match init.stats_div(gather_count) {
Some(x) => x,
_ => {
return;
}
}
}
client_stats_manager
.cached_aggregated_user_stats
.insert(name.to_string(), init);
}

View File

@ -0,0 +1,162 @@
//! Value type of user stats
use alloc::borrow::Cow;
use core::fmt;
use serde::{Deserialize, Serialize};
/// The actual value for the userstats
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum UserStatsValue {
/// A numerical value
Number(u64),
/// A Float value
Float(f64),
/// A `String`
String(Cow<'static, str>),
/// A ratio of two values
Ratio(u64, u64),
/// Percent
Percent(f64),
}
impl UserStatsValue {
/// Check if this guy is numeric
#[must_use]
pub fn is_numeric(&self) -> bool {
match &self {
Self::Number(_) | Self::Float(_) | Self::Ratio(_, _) | Self::Percent(_) => true,
Self::String(_) => false,
}
}
/// Divide by the number of elements
#[expect(clippy::cast_precision_loss)]
pub fn stats_div(&mut self, divisor: usize) -> Option<Self> {
match self {
Self::Number(x) => Some(Self::Float(*x as f64 / divisor as f64)),
Self::Float(x) => Some(Self::Float(*x / divisor as f64)),
Self::Percent(x) => Some(Self::Percent(*x / divisor as f64)),
Self::Ratio(x, y) => Some(Self::Percent((*x as f64 / divisor as f64) / *y as f64)),
Self::String(_) => None,
}
}
/// min user stats with the other
#[expect(clippy::cast_precision_loss)]
pub fn stats_max(&mut self, other: &Self) -> Option<Self> {
match (self, other) {
(Self::Number(x), Self::Number(y)) => {
if y > x {
Some(Self::Number(*y))
} else {
Some(Self::Number(*x))
}
}
(Self::Float(x), Self::Float(y)) => {
if y > x {
Some(Self::Float(*y))
} else {
Some(Self::Float(*x))
}
}
(Self::Ratio(x, a), Self::Ratio(y, b)) => {
let first = *x as f64 / *a as f64;
let second = *y as f64 / *b as f64;
if first > second {
Some(Self::Percent(first))
} else {
Some(Self::Percent(second))
}
}
(Self::Percent(x), Self::Percent(y)) => {
if y > x {
Some(Self::Percent(*y))
} else {
Some(Self::Percent(*x))
}
}
_ => None,
}
}
/// min user stats with the other
#[expect(clippy::cast_precision_loss)]
pub fn stats_min(&mut self, other: &Self) -> Option<Self> {
match (self, other) {
(Self::Number(x), Self::Number(y)) => {
if y > x {
Some(Self::Number(*x))
} else {
Some(Self::Number(*y))
}
}
(Self::Float(x), Self::Float(y)) => {
if y > x {
Some(Self::Float(*x))
} else {
Some(Self::Float(*y))
}
}
(Self::Ratio(x, a), Self::Ratio(y, b)) => {
let first = *x as f64 / *a as f64;
let second = *y as f64 / *b as f64;
if first > second {
Some(Self::Percent(second))
} else {
Some(Self::Percent(first))
}
}
(Self::Percent(x), Self::Percent(y)) => {
if y > x {
Some(Self::Percent(*x))
} else {
Some(Self::Percent(*y))
}
}
_ => None,
}
}
/// add user stats with the other
#[expect(clippy::cast_precision_loss)]
pub fn stats_add(&mut self, other: &Self) -> Option<Self> {
match (self, other) {
(Self::Number(x), Self::Number(y)) => Some(Self::Number(*x + *y)),
(Self::Float(x), Self::Float(y)) => Some(Self::Float(*x + *y)),
(Self::Percent(x), Self::Percent(y)) => Some(Self::Percent(*x + *y)),
(Self::Ratio(x, a), Self::Ratio(y, b)) => {
let first = *x as f64 / *a as f64;
let second = *y as f64 / *b as f64;
Some(Self::Percent(first + second))
}
(Self::Percent(x), Self::Ratio(y, b)) => {
let ratio = *y as f64 / *b as f64;
Some(Self::Percent(*x + ratio))
}
(Self::Ratio(x, a), Self::Percent(y)) => {
let ratio = *x as f64 / *a as f64;
Some(Self::Percent(ratio + *y))
}
_ => None,
}
}
}
impl fmt::Display for UserStatsValue {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match &self {
UserStatsValue::Number(n) => write!(f, "{n}"),
UserStatsValue::Float(n) => write!(f, "{}", crate::statistics::prettify_float(*n)),
UserStatsValue::Percent(n) => write!(f, "{:.3}%", n * 100.0),
UserStatsValue::String(s) => write!(f, "{s}"),
UserStatsValue::Ratio(a, b) => {
if *b == 0 {
write!(f, "{a}/{b}")
} else {
write!(f, "{a}/{b} ({}%)", a * 100 / b)
}
}
}
}
}