Use HashMap to manage ClientStat, Fix #3133 (#3142)

* fix

* lol

* lol

* clp fixer

* clp fixer

* revert cargo.toml
This commit is contained in:
Dongjia "toka" Zhang 2025-04-08 19:06:20 +02:00 committed by GitHub
parent fa8a576ef0
commit 373fe03633
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
20 changed files with 234 additions and 195 deletions

View File

@ -39,7 +39,6 @@ default = [
"regex",
"serdeany_autoreg",
"libafl_bolts/xxh3",
"tui_monitor",
]
document-features = ["dep:document-features"]

View File

@ -17,6 +17,6 @@ pub fn main() {
let _client_stats = ClientStats::default();
let mut client_stats_manager = ClientStatsManager::default();
monitor.display(&mut client_stats_manager, "Test", ClientId(0));
let _ = monitor.display(&mut client_stats_manager, "Test", ClientId(0));
sleep(Duration::from_secs(10));
}

View File

@ -190,7 +190,7 @@ where
// Perform the optimization!
opt.check(&[]);
let res = if let Some(model) = opt.get_model() {
if let Some(model) = opt.get_model() {
let mut removed = Vec::with_capacity(state.corpus().count());
for (seed, (id, _)) in seed_exprs {
// if the model says the seed isn't there, mark it for deletion
@ -214,11 +214,8 @@ where
}
*state.corpus_mut().current_mut() = None; //we may have removed the current ID from the corpus
Ok(())
} else {
Err(Error::unknown("Corpus minimization failed; unsat."))
};
res
return Ok(());
}
Err(Error::unknown("Corpus minimization failed; unsat."))
}
}

View File

@ -93,7 +93,7 @@ where
&mut self.client_stats_manager,
"Broker Heartbeat",
ClientId(0),
);
)?;
Ok(())
}
}
@ -114,7 +114,6 @@ where
}
/// Handle arriving events in the broker
#[expect(clippy::unnecessary_wraps)]
fn handle_in_broker(
monitor: &mut MT,
client_stats_manager: &mut ClientStatsManager,
@ -123,10 +122,10 @@ where
) -> Result<BrokerEventResult, Error> {
let stats = event.stats();
client_stats_manager.client_stats_insert(ClientId(0));
client_stats_manager.update_client_stats_for(ClientId(0), |client_stat| {
client_stats_manager.client_stats_insert(client_id)?;
client_stats_manager.update_client_stats_for(client_id, |client_stat| {
client_stat.update_executions(stats.executions, stats.time);
});
})?;
let event = event.event();
match &event {
@ -141,21 +140,21 @@ where
client_id
};
client_stats_manager.client_stats_insert(id);
client_stats_manager.client_stats_insert(id)?;
client_stats_manager.update_client_stats_for(id, |client_stat| {
client_stat.update_corpus_size(*corpus_size as u64);
});
monitor.display(client_stats_manager, event.name(), id);
})?;
monitor.display(client_stats_manager, event.name(), id)?;
Ok(BrokerEventResult::Forward)
}
Event::Heartbeat => Ok(BrokerEventResult::Handled),
Event::UpdateUserStats { name, value, .. } => {
client_stats_manager.client_stats_insert(client_id);
client_stats_manager.client_stats_insert(client_id)?;
client_stats_manager.update_client_stats_for(client_id, |client_stat| {
client_stat.update_user_stats(name.clone(), value.clone());
});
})?;
client_stats_manager.aggregate(name);
monitor.display(client_stats_manager, event.name(), client_id);
monitor.display(client_stats_manager, event.name(), client_id)?;
Ok(BrokerEventResult::Handled)
}
#[cfg(feature = "introspection")]
@ -166,24 +165,24 @@ where
// TODO: The monitor buffer should be added on client add.
// Get the client for the staterestorer ID
client_stats_manager.client_stats_insert(client_id);
client_stats_manager.client_stats_insert(client_id)?;
client_stats_manager.update_client_stats_for(client_id, |client_stat| {
// Update the performance monitor for this client
client_stat.update_introspection_stats((**introspection_stats).clone());
});
})?;
// Display the monitor via `.display` only on core #1
monitor.display(client_stats_manager, event.name(), client_id);
monitor.display(client_stats_manager, event.name(), client_id)?;
// Correctly handled the event
Ok(BrokerEventResult::Handled)
}
Event::Objective { objective_size, .. } => {
client_stats_manager.client_stats_insert(client_id);
client_stats_manager.client_stats_insert(client_id)?;
client_stats_manager.update_client_stats_for(client_id, |client_stat| {
client_stat.update_objective_size(*objective_size as u64);
});
monitor.display(client_stats_manager, event.name(), client_id);
})?;
monitor.display(client_stats_manager, event.name(), client_id)?;
Ok(BrokerEventResult::Handled)
}
Event::Log {

View File

@ -268,10 +268,10 @@ where
let timeout = current_time() + parent_lock.node_descriptor.timeout;
parent_lock.parent = loop {
log::debug!("Trying to connect to parent @ {}..", parent_addr);
log::debug!("Trying to connect to parent @ {parent_addr}..");
match TcpStream::connect(parent_addr).await {
Ok(stream) => {
log::debug!("Connected to parent @ {}", parent_addr);
log::debug!("Connected to parent @ {parent_addr}");
break Some(stream);
}
@ -302,10 +302,10 @@ where
// The main listening loop. Should never fail.
'listening: loop {
log::debug!("listening for children on {:?}...", listener);
log::debug!("listening for children on {listener:?}...");
match listener.accept().await {
Ok((mut stream, addr)) => {
log::debug!("{} joined the children.", addr);
log::debug!("{addr} joined the children.");
let mut state_guard = state.write().await;
if let Err(e) = state_guard
@ -487,7 +487,7 @@ where
// Garbage collect disconnected children
for id_to_remove in &ids_to_remove {
log::debug!("Child {:?} has been garbage collected.", id_to_remove);
log::debug!("Child {id_to_remove:?} has been garbage collected.");
self.children.remove(id_to_remove);
}
}
@ -596,7 +596,7 @@ where
// Garbage collect disconnected children
for id_to_remove in &ids_to_remove {
log::debug!("Child {:?} has been garbage collected.", id_to_remove);
log::debug!("Child {id_to_remove:?} has been garbage collected.");
self.children.remove(id_to_remove);
}

View File

@ -5,6 +5,8 @@ use alloc::vec::Vec;
use core::sync::atomic::{Ordering, compiler_fence};
use core::{fmt::Debug, marker::PhantomData, time::Duration};
#[cfg(feature = "std")]
use hashbrown::HashMap;
use libafl_bolts::ClientId;
#[cfg(all(feature = "std", any(windows, not(feature = "fork"))))]
use libafl_bolts::os::startable_self;
@ -200,7 +202,6 @@ where
}
/// Handle arriving events in the broker
#[expect(clippy::unnecessary_wraps)]
fn handle_in_broker(
monitor: &mut MT,
client_stats_manager: &mut ClientStatsManager,
@ -208,32 +209,32 @@ where
) -> Result<BrokerEventResult, Error> {
let stats = event.stats();
client_stats_manager.client_stats_insert(ClientId(0));
client_stats_manager.client_stats_insert(ClientId(0))?;
client_stats_manager.update_client_stats_for(ClientId(0), |client_stat| {
client_stat.update_executions(stats.executions, stats.time);
});
})?;
let event = event.event();
match event {
Event::NewTestcase { corpus_size, .. } => {
client_stats_manager.client_stats_insert(ClientId(0));
client_stats_manager.client_stats_insert(ClientId(0))?;
client_stats_manager.update_client_stats_for(ClientId(0), |client_stat| {
client_stat.update_corpus_size(*corpus_size as u64);
});
monitor.display(client_stats_manager, event.name(), ClientId(0));
})?;
monitor.display(client_stats_manager, event.name(), ClientId(0))?;
Ok(BrokerEventResult::Handled)
}
Event::Heartbeat => {
monitor.display(client_stats_manager, event.name(), ClientId(0));
monitor.display(client_stats_manager, event.name(), ClientId(0))?;
Ok(BrokerEventResult::Handled)
}
Event::UpdateUserStats { name, value, .. } => {
client_stats_manager.client_stats_insert(ClientId(0));
client_stats_manager.client_stats_insert(ClientId(0))?;
client_stats_manager.update_client_stats_for(ClientId(0), |client_stat| {
client_stat.update_user_stats(name.clone(), value.clone());
});
})?;
client_stats_manager.aggregate(name);
monitor.display(client_stats_manager, event.name(), ClientId(0));
monitor.display(client_stats_manager, event.name(), ClientId(0))?;
Ok(BrokerEventResult::Handled)
}
#[cfg(feature = "introspection")]
@ -244,16 +245,16 @@ where
// TODO: The monitor buffer should be added on client add.
client_stats_manager.update_client_stats_for(ClientId(0), |client_stat| {
client_stat.update_introspection_stats((**introspection_stats).clone());
});
monitor.display(client_stats_manager, event.name(), ClientId(0));
})?;
monitor.display(client_stats_manager, event.name(), ClientId(0))?;
Ok(BrokerEventResult::Handled)
}
Event::Objective { objective_size, .. } => {
client_stats_manager.client_stats_insert(ClientId(0));
client_stats_manager.client_stats_insert(ClientId(0))?;
client_stats_manager.update_client_stats_for(ClientId(0), |client_stat| {
client_stat.update_objective_size(*objective_size as u64);
});
monitor.display(client_stats_manager, event.name(), ClientId(0));
})?;
monitor.display(client_stats_manager, event.name(), ClientId(0))?;
Ok(BrokerEventResult::Handled)
}
Event::Log {
@ -525,31 +526,32 @@ where
}
// If we're restarting, deserialize the old state.
let (state, mgr) = match staterestorer.restore::<(S, Duration, Vec<ClientStats>)>()? {
None => {
log::info!("First run. Let's set it all up");
// Mgr to send and receive msgs from/to all other fuzzer instances
(
None,
SimpleRestartingEventManager::launched(monitor, staterestorer),
)
}
// Restoring from a previous run, deserialize state and corpus.
Some((state, start_time, clients_stats)) => {
log::info!("Subsequent run. Loaded previous state.");
// We reset the staterestorer, the next staterestorer and receiver (after crash) will reuse the page from the initial message.
staterestorer.reset();
let (state, mgr) =
match staterestorer.restore::<(S, Duration, HashMap<ClientId, ClientStats>)>()? {
None => {
log::info!("First run. Let's set it all up");
// Mgr to send and receive msgs from/to all other fuzzer instances
(
None,
SimpleRestartingEventManager::launched(monitor, staterestorer),
)
}
// Restoring from a previous run, deserialize state and corpus.
Some((state, start_time, clients_stats)) => {
log::info!("Subsequent run. Loaded previous state.");
// We reset the staterestorer, the next staterestorer and receiver (after crash) will reuse the page from the initial message.
staterestorer.reset();
// reload the state of the monitor to display the correct stats after restarts
let mut this = SimpleRestartingEventManager::launched(monitor, staterestorer);
this.inner.client_stats_manager.set_start_time(start_time);
this.inner
.client_stats_manager
.update_all_client_stats(clients_stats);
// reload the state of the monitor to display the correct stats after restarts
let mut this = SimpleRestartingEventManager::launched(monitor, staterestorer);
this.inner.client_stats_manager.set_start_time(start_time);
this.inner
.client_stats_manager
.update_all_client_stats(clients_stats);
(Some(state), this)
}
};
(Some(state), this)
}
};
/* TODO: Not sure if this is needed
// We commit an empty NO_RESTART message to this buf, against infinite loops,

View File

@ -317,7 +317,6 @@ where
}
/// Handle arriving events in the broker
#[expect(clippy::unnecessary_wraps)]
fn handle_in_broker(
monitor: &mut MT,
client_stats_manager: &mut ClientStatsManager,
@ -326,10 +325,10 @@ where
) -> Result<BrokerEventResult, Error> {
let stats = event.stats();
client_stats_manager.client_stats_insert(ClientId(0));
client_stats_manager.update_client_stats_for(ClientId(0), |client_stat| {
client_stats_manager.client_stats_insert(client_id)?;
client_stats_manager.update_client_stats_for(client_id, |client_stat| {
client_stat.update_executions(stats.executions, stats.time);
});
})?;
let event = event.event();
match event {
@ -343,28 +342,25 @@ where
} else {
client_id
};
client_stats_manager.client_stats_insert(id);
client_stats_manager.client_stats_insert(id)?;
client_stats_manager.update_client_stats_for(id, |client| {
client.update_corpus_size(*corpus_size as u64);
});
monitor.display(client_stats_manager, event.name(), id);
})?;
monitor.display(client_stats_manager, event.name(), id)?;
Ok(BrokerEventResult::Forward)
}
Event::Heartbeat => {
monitor.display(client_stats_manager, event.name(), client_id);
Ok(BrokerEventResult::Handled)
}
Event::Heartbeat => Ok(BrokerEventResult::Handled),
Event::UpdateUserStats {
name,
value,
phantom: _,
} => {
client_stats_manager.client_stats_insert(client_id);
client_stats_manager.client_stats_insert(client_id)?;
client_stats_manager.update_client_stats_for(client_id, |client| {
client.update_user_stats(name.clone(), value.clone());
});
})?;
client_stats_manager.aggregate(name);
monitor.display(client_stats_manager, event.name(), client_id);
monitor.display(client_stats_manager, event.name(), client_id)?;
Ok(BrokerEventResult::Handled)
}
#[cfg(feature = "introspection")]
@ -375,24 +371,24 @@ where
// TODO: The monitor buffer should be added on client add.
// Get the client for the staterestorer ID
client_stats_manager.client_stats_insert(client_id);
client_stats_manager.client_stats_insert(client_id)?;
client_stats_manager.update_client_stats_for(client_id, |client| {
// Update the performance monitor for this client
client.update_introspection_stats((**introspection_stats).clone());
});
})?;
// Display the monitor via `.display` only on core #1
monitor.display(client_stats_manager, event.name(), client_id);
monitor.display(client_stats_manager, event.name(), client_id)?;
// Correctly handled the event
Ok(BrokerEventResult::Handled)
}
Event::Objective { objective_size, .. } => {
client_stats_manager.client_stats_insert(client_id);
client_stats_manager.client_stats_insert(client_id)?;
client_stats_manager.update_client_stats_for(client_id, |client| {
client.update_objective_size(*objective_size as u64);
});
monitor.display(client_stats_manager, event.name(), client_id);
})?;
monitor.display(client_stats_manager, event.name(), client_id)?;
Ok(BrokerEventResult::Handled)
}
Event::Log {

View File

@ -1,6 +1,6 @@
//! Monitors that log to disk using different formats like `JSON` and `TOML`.
use alloc::string::String;
use alloc::{string::String, vec::Vec};
use core::time::Duration;
use std::{
fs::{File, OpenOptions},
@ -8,7 +8,7 @@ use std::{
path::PathBuf,
};
use libafl_bolts::{ClientId, current_time};
use libafl_bolts::{ClientId, Error, current_time};
use serde_json::json;
use crate::monitors::{Monitor, stats::ClientStatsManager};
@ -27,7 +27,7 @@ impl Monitor for OnDiskTomlMonitor {
client_stats_manager: &mut ClientStatsManager,
_event_msg: &str,
_sender_id: ClientId,
) {
) -> Result<(), Error> {
let cur_time = current_time();
if cur_time - self.last_update >= self.update_interval {
@ -57,14 +57,19 @@ exec_sec = {}
)
.expect("Failed to write to the Toml file");
for i in 0..(client_stats_manager.client_stats().len()) {
let client_id = ClientId(i as u32);
let exec_sec = client_stats_manager
.update_client_stats_for(client_id, |client_stat| {
client_stat.execs_per_sec(cur_time)
});
let all_clients: Vec<ClientId> = client_stats_manager
.client_stats()
.keys()
.copied()
.collect();
let client = client_stats_manager.client_stats_for(client_id);
for client_id in &all_clients {
let exec_sec = client_stats_manager
.update_client_stats_for(*client_id, |client_stat| {
client_stat.execs_per_sec(cur_time)
})?;
let client = client_stats_manager.client_stats_for(*client_id)?;
write!(
&mut file,
@ -75,7 +80,7 @@ objectives = {}
executions = {}
exec_sec = {}
",
i,
client_id.0,
client.corpus_size(),
client.objective_size(),
client.executions(),
@ -96,6 +101,7 @@ exec_sec = {}
drop(file);
}
Ok(())
}
}
@ -170,7 +176,7 @@ where
client_stats_manager: &mut ClientStatsManager,
_event_msg: &str,
_sender_id: ClientId,
) {
) -> Result<(), Error> {
if (self.log_record)(client_stats_manager) {
let file = OpenOptions::new()
.append(true)
@ -190,5 +196,6 @@ where
});
writeln!(&file, "{line}").expect("Unable to write Json to file");
}
Ok(())
}
}

View File

@ -6,7 +6,7 @@ use core::{
};
use std::{fs::OpenOptions, io::Write, path::PathBuf};
use libafl_bolts::{ClientId, current_time};
use libafl_bolts::{ClientId, Error, current_time};
use serde_json::json;
use crate::monitors::{Monitor, stats::ClientStatsManager};
@ -35,7 +35,7 @@ impl Monitor for OnDiskJsonAggregateMonitor {
client_stats_manager: &mut ClientStatsManager,
_event_msg: &str,
_sender_id: ClientId,
) {
) -> Result<(), Error> {
// Write JSON stats if update interval has elapsed
let cur_time = current_time();
if cur_time - self.last_update >= self.update_interval {
@ -69,6 +69,7 @@ impl Monitor for OnDiskJsonAggregateMonitor {
writeln!(&file, "{json_value}").expect("Unable to write JSON to file");
}
Ok(())
}
}

View File

@ -1,6 +1,6 @@
//! Monitor wrappers that add logics to monitor
use libafl_bolts::ClientId;
use libafl_bolts::{ClientId, Error};
use crate::monitors::{Monitor, stats::ClientStatsManager};
@ -21,11 +21,12 @@ where
client_stats_manager: &mut ClientStatsManager,
event_msg: &str,
sender_id: ClientId,
) {
) -> Result<(), Error> {
while (self.closure)(client_stats_manager, event_msg, sender_id) {
self.monitor
.display(client_stats_manager, event_msg, sender_id);
.display(client_stats_manager, event_msg, sender_id)?;
}
Ok(())
}
}
@ -60,11 +61,12 @@ where
client_stats_manager: &mut ClientStatsManager,
event_msg: &str,
sender_id: ClientId,
) {
) -> Result<(), Error> {
if (self.closure)(client_stats_manager, event_msg, sender_id) {
self.monitor
.display(client_stats_manager, event_msg, sender_id);
.display(client_stats_manager, event_msg, sender_id)?;
}
Ok(())
}
}
@ -101,14 +103,15 @@ where
client_stats_manager: &mut ClientStatsManager,
event_msg: &str,
sender_id: ClientId,
) {
) -> Result<(), Error> {
if (self.closure)(client_stats_manager, event_msg, sender_id) {
self.if_monitor
.display(client_stats_manager, event_msg, sender_id);
.display(client_stats_manager, event_msg, sender_id)?;
} else {
self.else_monitor
.display(client_stats_manager, event_msg, sender_id);
.display(client_stats_manager, event_msg, sender_id)?;
}
Ok(())
}
}
@ -146,10 +149,11 @@ where
client_stats_manager: &mut ClientStatsManager,
event_msg: &str,
sender_id: ClientId,
) {
) -> Result<(), Error> {
if let Some(monitor) = self.monitor.as_mut() {
monitor.display(client_stats_manager, event_msg, sender_id);
monitor.display(client_stats_manager, event_msg, sender_id)?;
}
Ok(())
}
}

View File

@ -1,8 +1,8 @@
//! Keep stats, and display them to the user. Usually used in a broker, or main node, of some sort.
pub mod multi;
use libafl_bolts::Error;
pub use multi::MultiMonitor;
pub mod stats;
pub mod logics;
@ -53,7 +53,7 @@ pub trait Monitor {
client_stats_manager: &mut ClientStatsManager,
event_msg: &str,
sender_id: ClientId,
);
) -> Result<(), Error>;
}
/// Monitor that print exactly nothing.
@ -68,7 +68,8 @@ impl Monitor for NopMonitor {
_client_stats_manager: &mut ClientStatsManager,
_event_msg: &str,
_sender_id: ClientId,
) {
) -> Result<(), Error> {
Ok(())
}
}
@ -107,8 +108,9 @@ impl Monitor for SimplePrintingMonitor {
client_stats_manager: &mut ClientStatsManager,
event_msg: &str,
sender_id: ClientId,
) {
let mut userstats = client_stats_manager.client_stats()[sender_id.0 as usize]
) -> Result<(), Error> {
let mut userstats = client_stats_manager
.get(sender_id)?
.user_stats()
.iter()
.map(|(key, value)| format!("{key}: {value}"))
@ -135,11 +137,12 @@ impl Monitor for SimplePrintingMonitor {
println!(
"Client {:03}:\n{}",
sender_id.0,
client_stats_manager.client_stats()[sender_id.0 as usize].introspection_stats
client_stats_manager.get(sender_id)?.introspection_stats
);
// Separate the spacing just a bit
println!();
}
Ok(())
}
}
@ -171,7 +174,7 @@ where
client_stats_manager: &mut ClientStatsManager,
event_msg: &str,
sender_id: ClientId,
) {
) -> Result<(), Error> {
let global_stats = client_stats_manager.global_stats();
let mut fmt = format!(
"[{} #{}] run time: {}, clients: {}, corpus: {}, objectives: {}, executions: {}, exec/sec: {}",
@ -186,8 +189,8 @@ where
);
if self.print_user_monitor {
client_stats_manager.client_stats_insert(sender_id);
let client = client_stats_manager.client_stats_for(sender_id);
client_stats_manager.client_stats_insert(sender_id)?;
let client = client_stats_manager.client_stats_for(sender_id)?;
for (key, val) in client.user_stats() {
write!(fmt, ", {key}: {val}").unwrap();
}
@ -202,13 +205,14 @@ where
let fmt = format!(
"Client {:03}:\n{}",
sender_id.0,
client_stats_manager.client_stats()[sender_id.0 as usize].introspection_stats
client_stats_manager.get(sender_id)?.introspection_stats
);
(self.print_fn)(&fmt);
// Separate the spacing just a bit
(self.print_fn)("");
}
Ok(())
}
}
@ -278,9 +282,9 @@ impl<A: Monitor, B: Monitor> Monitor for (A, B) {
client_stats_manager: &mut ClientStatsManager,
event_msg: &str,
sender_id: ClientId,
) {
self.0.display(client_stats_manager, event_msg, sender_id);
self.1.display(client_stats_manager, event_msg, sender_id);
) -> Result<(), Error> {
self.0.display(client_stats_manager, event_msg, sender_id)?;
self.1.display(client_stats_manager, event_msg, sender_id)
}
}
@ -290,8 +294,8 @@ impl<A: Monitor> Monitor for (A, ()) {
client_stats_manager: &mut ClientStatsManager,
event_msg: &str,
sender_id: ClientId,
) {
self.0.display(client_stats_manager, event_msg, sender_id);
) -> Result<(), Error> {
self.0.display(client_stats_manager, event_msg, sender_id)
}
}
@ -317,6 +321,6 @@ mod test {
NopMonitor::default(),
NopMonitor::default(),
);
mgr_list.display(&mut client_stats, "test", ClientId(0));
let _ = mgr_list.display(&mut client_stats, "test", ClientId(0));
}
}

View File

@ -6,7 +6,7 @@ use core::{
time::Duration,
};
use libafl_bolts::{ClientId, current_time};
use libafl_bolts::{ClientId, Error, current_time};
use crate::monitors::{Monitor, stats::ClientStatsManager};
@ -37,7 +37,7 @@ where
client_stats_manager: &mut ClientStatsManager,
event_msg: &str,
sender_id: ClientId,
) {
) -> Result<(), Error> {
let sender = format!("#{}", sender_id.0);
let pad = if event_msg.len() + sender.len() < 13 {
" ".repeat(13 - event_msg.len() - sender.len())
@ -62,11 +62,11 @@ where
(self.print_fn)(&global_fmt);
client_stats_manager.client_stats_insert(sender_id);
client_stats_manager.client_stats_insert(sender_id)?;
let cur_time = current_time();
let exec_sec = client_stats_manager
.update_client_stats_for(sender_id, |client| client.execs_per_sec_pretty(cur_time));
let client = client_stats_manager.client_stats_for(sender_id);
.update_client_stats_for(sender_id, |client| client.execs_per_sec_pretty(cur_time))?;
let client = client_stats_manager.client_stats_for(sender_id)?;
let pad = " ".repeat(head.len());
let mut fmt = format!(
@ -86,10 +86,10 @@ where
#[cfg(feature = "introspection")]
{
// Print the client performance monitor. Skip the Client 0 which is the broker
for (i, client) in client_stats_manager
for (i, (_, client)) in client_stats_manager
.client_stats()
.iter()
.filter(|x| x.enabled())
.filter(|(_, x)| x.enabled())
.enumerate()
{
let fmt = format!("Client {:03}:\n{}", i + 1, client.introspection_stats);
@ -99,6 +99,7 @@ where
// Separate the spacing just a bit
(self.print_fn)("\n");
}
Ok(())
}
}

View File

@ -42,7 +42,7 @@ use std::thread;
// using thread in order to start the HTTP server in a separate thread
use futures::executor::block_on;
use libafl_bolts::{ClientId, current_time};
use libafl_bolts::{ClientId, Error, current_time};
// using the official rust client library for Prometheus: https://github.com/prometheus/client_rust
use prometheus_client::{
encoding::{EncodeLabelSet, text::encode},
@ -98,7 +98,7 @@ where
client_stats_manager: &mut ClientStatsManager,
event_msg: &str,
sender_id: ClientId,
) {
) -> Result<(), Error> {
// Update the prometheus metrics
// The gauges must take signed i64's, with max value of 2^63-1 so it is
// probably fair to error out at a count of nine quintillion across any
@ -216,8 +216,8 @@ where
// Client-specific metrics
client_stats_manager.client_stats_insert(sender_id);
let client = client_stats_manager.client_stats_for(sender_id);
client_stats_manager.client_stats_insert(sender_id)?;
let client = client_stats_manager.client_stats_for(sender_id)?;
let mut cur_client_clone = client.clone();
self.prometheus_client_stats
@ -319,6 +319,7 @@ where
.set(value);
}
(self.print_fn)(&fmt);
Ok(())
}
}

View File

@ -2,11 +2,11 @@
#[cfg(feature = "std")]
use alloc::string::ToString;
use alloc::{borrow::Cow, string::String, vec::Vec};
use alloc::{borrow::Cow, string::String};
use core::time::Duration;
use hashbrown::HashMap;
use libafl_bolts::{ClientId, current_time, format_duration_hms};
use libafl_bolts::{ClientId, Error, current_time, format_duration_hms};
#[cfg(feature = "std")]
use serde_json::Value;
@ -20,7 +20,7 @@ use super::{
/// Manager of all client's statistics
#[derive(Debug)]
pub struct ClientStatsManager {
client_stats: Vec<ClientStats>,
client_stats: HashMap<ClientId, ClientStats>,
/// Aggregated user stats value.
///
/// This map is updated by event manager, and is read by monitors to display user-defined stats.
@ -37,7 +37,7 @@ impl ClientStatsManager {
#[must_use]
pub fn new() -> Self {
Self {
client_stats: vec![],
client_stats: HashMap::new(),
cached_aggregated_user_stats: HashMap::new(),
cached_global_stats: None,
start_time: current_time(),
@ -46,25 +46,31 @@ impl ClientStatsManager {
/// Get all client stats
#[must_use]
pub fn client_stats(&self) -> &[ClientStats] {
pub fn client_stats(&self) -> &HashMap<ClientId, ClientStats> {
&self.client_stats
}
/// Get client with `client_id`
pub fn get(&self, client_id: ClientId) -> Result<&ClientStats, Error> {
self.client_stats
.get(&client_id)
.ok_or_else(|| Error::key_not_found(format!("Client id {client_id:#?} not found")))
}
/// The client monitor for a specific id, creating new if it doesn't exist
pub fn client_stats_insert(&mut self, client_id: ClientId) {
let total_client_stat_count = self.client_stats().len();
for _ in total_client_stat_count..=(client_id.0) as usize {
self.client_stats.push(ClientStats {
pub fn client_stats_insert(&mut self, client_id: ClientId) -> Result<(), Error> {
// if it doesn't contain this new client then insert it
if !self.client_stats.contains_key(&client_id) {
let stats = ClientStats {
enabled: false,
last_window_time: Duration::from_secs(0),
start_time: Duration::from_secs(0),
..ClientStats::default()
});
}
if total_client_stat_count <= client_id.0 as usize {
// The client count changed!
};
self.client_stats.insert(client_id, stats);
self.cached_global_stats = None;
}
self.update_client_stats_for(client_id, |new_stat| {
if !new_stat.enabled {
let timestamp = current_time();
@ -74,7 +80,8 @@ impl ClientStatsManager {
new_stat.enabled = true;
new_stat.stats_status.basic_stats_updated = true;
}
});
})?;
Ok(())
}
/// Update sepecific client stats.
@ -84,28 +91,34 @@ impl ClientStatsManager {
&mut self,
client_id: ClientId,
update: F,
) -> T {
let client_stat = &mut self.client_stats[client_id.0 as usize];
client_stat.clear_stats_status();
let res = update(client_stat);
if client_stat.stats_status.basic_stats_updated {
self.cached_global_stats = None;
) -> Result<T, Error> {
if let Some(stat) = self.client_stats.get_mut(&client_id) {
stat.clear_stats_status();
let res = update(stat);
if stat.stats_status.basic_stats_updated {
self.cached_global_stats = None;
}
Ok(res)
} else {
Err(Error::key_not_found(format!(
"Client id {client_id:#?} not found!"
)))
}
res
}
/// Update all client stats. This will clear all previous client stats, and fill in the new client stats.
///
/// This will clear global stats cache.
pub fn update_all_client_stats(&mut self, new_client_stats: Vec<ClientStats>) {
pub fn update_all_client_stats(&mut self, new_client_stats: HashMap<ClientId, ClientStats>) {
self.client_stats = new_client_stats;
self.cached_global_stats = None;
}
/// Get immutable reference to client stats
#[must_use]
pub fn client_stats_for(&self, client_id: ClientId) -> &ClientStats {
&self.client_stats()[client_id.0 as usize]
pub fn client_stats_for(&self, client_id: ClientId) -> Result<&ClientStats, Error> {
self.client_stats
.get(&client_id)
.ok_or_else(|| Error::key_not_found(format!("Client id {client_id:#?} not found")))
}
/// Aggregate user-defined stats
@ -138,20 +151,20 @@ impl ClientStatsManager {
client_stats_count: self
.client_stats
.iter()
.filter(|client| client.enabled)
.filter(|(_, client)| client.enabled)
.count(),
corpus_size: self
.client_stats
.iter()
.fold(0_u64, |acc, x| acc + x.corpus_size),
.fold(0_u64, |acc, (_, client)| acc + client.corpus_size),
objective_size: self
.client_stats
.iter()
.fold(0_u64, |acc, x| acc + x.objective_size),
.fold(0_u64, |acc, (_, client)| acc + client.objective_size),
total_execs: self
.client_stats
.iter()
.fold(0_u64, |acc, x| acc + x.executions),
.fold(0_u64, |acc, (_, client)| acc + client.executions),
..GlobalStats::default()
});
@ -162,7 +175,7 @@ impl ClientStatsManager {
global_stats.execs_per_sec = self
.client_stats
.iter_mut()
.fold(0.0, |acc, x| acc + x.execs_per_sec(cur_time));
.fold(0.0, |acc, (_, client)| acc + client.execs_per_sec(cur_time));
global_stats.execs_per_sec_pretty = super::prettify_float(global_stats.execs_per_sec);
global_stats
@ -177,9 +190,13 @@ impl ClientStatsManager {
if self.client_stats().len() > 1 {
let mut new_path_time = Duration::default();
let mut new_objectives_time = Duration::default();
for client in self.client_stats().iter().filter(|client| client.enabled()) {
new_path_time = client.last_corpus_time().max(new_path_time);
new_objectives_time = client.last_objective_time().max(new_objectives_time);
for (_, stat) in self
.client_stats()
.iter()
.filter(|(_, client)| client.enabled())
{
new_path_time = stat.last_corpus_time().max(new_path_time);
new_objectives_time = stat.last_objective_time().max(new_objectives_time);
}
if new_path_time > self.start_time() {
total_process_timing.last_new_entry = new_path_time - self.start_time();
@ -196,7 +213,8 @@ impl ClientStatsManager {
pub fn edges_coverage(&self) -> Option<EdgeCoverage> {
self.client_stats()
.iter()
.filter(|client| client.enabled())
.filter(|(_, client)| client.enabled())
.map(|(_, client)| client)
.filter_map(ClientStats::edges_coverage)
.max_by_key(
|EdgeCoverage {
@ -217,7 +235,11 @@ impl ClientStatsManager {
}
let mut ratio_a: u64 = 0;
let mut ratio_b: u64 = 0;
for client in self.client_stats().iter().filter(|client| client.enabled()) {
for (_, client) in self
.client_stats()
.iter()
.filter(|(_, client)| client.enabled())
{
let afl_stats = client
.get_user_stats("AflStats")
.map_or("None".to_string(), ToString::to_string);

View File

@ -69,7 +69,7 @@ pub(super) fn aggregate_user_stats(
let mut gather = client_stats_manager
.client_stats()
.iter()
.filter_map(|client| client.user_stats.get(name.as_ref()));
.filter_map(|(_, client)| client.user_stats.get(name.as_ref()));
let gather_count = gather.clone().count();

View File

@ -14,7 +14,7 @@ use alloc::{borrow::Cow, string::String, vec::Vec};
use std::net::UdpSocket;
use cadence::{BufferedUdpMetricSink, Gauged, QueuingMetricSink, StatsdClient};
use libafl_bolts::ClientId;
use libafl_bolts::{ClientId, Error};
use super::{
Monitor,
@ -194,7 +194,7 @@ impl Monitor for StatsdMonitor {
client_stats_manager: &mut ClientStatsManager,
_event_msg: &str,
_sender_id: ClientId,
) {
) -> Result<(), Error> {
if self.try_display(client_stats_manager).is_none() {
// The client failed to send metrics, which means the server is down
// or something else happened. We then de-initialize the client, and
@ -202,5 +202,6 @@ impl Monitor for StatsdMonitor {
// and try to connect the server then.
self.statsd_client = None;
}
Ok(())
}
}

View File

@ -26,7 +26,7 @@ use crossterm::{
terminal::{EnterAlternateScreen, LeaveAlternateScreen, disable_raw_mode, enable_raw_mode},
};
use hashbrown::HashMap;
use libafl_bolts::{ClientId, current_time, format_big_number, format_duration_hms};
use libafl_bolts::{ClientId, Error, current_time, format_big_number, format_duration_hms};
use ratatui::{Terminal, backend::CrosstermBackend};
use typed_builder::TypedBuilder;
@ -341,7 +341,7 @@ impl Monitor for TuiMonitor {
client_stats_manager: &mut ClientStatsManager,
event_msg: &str,
sender_id: ClientId,
) {
) -> Result<(), Error> {
let cur_time = current_time();
{
@ -378,10 +378,10 @@ impl Monitor for TuiMonitor {
ctx.total_item_geometry = client_stats_manager.item_geometry();
}
client_stats_manager.client_stats_insert(sender_id);
client_stats_manager.client_stats_insert(sender_id)?;
let exec_sec = client_stats_manager
.update_client_stats_for(sender_id, |client| client.execs_per_sec_pretty(cur_time));
let client = client_stats_manager.client_stats_for(sender_id);
.update_client_stats_for(sender_id, |client| client.execs_per_sec_pretty(cur_time))?;
let client = client_stats_manager.client_stats_for(sender_id)?;
let sender = format!("#{}", sender_id.0);
let pad = if event_msg.len() + sender.len() < 13 {
@ -412,7 +412,7 @@ impl Monitor for TuiMonitor {
.entry(sender_id.0 as usize)
.or_default()
.grab_data(client);
});
})?;
while ctx.client_logs.len() >= DEFAULT_LOGS_NUMBER {
ctx.client_logs.pop_front();
}
@ -422,10 +422,10 @@ impl Monitor for TuiMonitor {
#[cfg(feature = "introspection")]
{
// Print the client performance monitor. Skip the Client IDs that have never sent anything.
for (i, client) in client_stats_manager
for (i, (_, client)) in client_stats_manager
.client_stats()
.iter()
.filter(|x| x.enabled())
.filter(|(_, x)| x.enabled())
.enumerate()
{
self.context
@ -437,6 +437,8 @@ impl Monitor for TuiMonitor {
.grab_data(&client.introspection_stats);
}
}
Ok(())
}
}

View File

@ -345,7 +345,7 @@ fn generate_mutations(iter: impl Iterator<Item = (SymExprRef, SymExpr)>) -> Vec<
res.push(replacements);
solver.pop(1);
}
};
}
// assert the path constraint
solver.assert(&op);
}

View File

@ -1,8 +1,8 @@
use core::str::FromStr;
use std::{
env, fs,
path::{Path, PathBuf},
process::Command,
str::FromStr,
};
use which::which;

View File

@ -23,6 +23,7 @@ run_clippy() {
# Define projects based on the operating system
if [[ "$OSTYPE" == "linux-gnu"* ]]; then
ALL_PROJECTS=(
"libafl"
"libafl_bolts"
"libafl_cc"
"libafl_concolic/symcc_runtime"
@ -52,17 +53,16 @@ else
IFS=',' read -ra PROJECTS <<<"$1"
fi
# First run it on all
eval "$CLIPPY_CMD --workspace -- $RUSTC_FLAGS"
# Loop through each project and run Clippy
for project in "${PROJECTS[@]}"; do
# Trim leading and trailing whitespace
project=$(echo "$project" | sed 's/^[[:space:]]*//;s/[[:space:]]*$//')
features="--all-features"
if [[ " ${NO_ALL_FEATURES[*]} " =~ ${project} ]]; then
features="--features=clippy"
fi
for item in "${NO_ALL_FEATURES[@]}"; do
if [[ "$item" == "$project" ]]; then
features="--features=clippy"
fi
done
if [ -d "$project" ]; then
run_clippy "$project" "$features"
else
@ -71,3 +71,6 @@ for project in "${PROJECTS[@]}"; do
done
echo "Clippy run completed for all specified projects."
# Last run it on all
eval "$CLIPPY_CMD --workspace -- $RUSTC_FLAGS"