Monitor refactor + add aggregator (#1671)

* push

* decouple clients_stats_mut_for

* coding done

* push

* more

* upd

* fix

* aa

* don't change harness
This commit is contained in:
Dongjia "toka" Zhang 2023-11-22 20:38:59 +01:00 committed by GitHub
parent cad2ff6319
commit 4a94bcb806
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 363 additions and 50 deletions

View File

@ -16,7 +16,7 @@ use crate::{
corpus::Corpus,
events::{Event, EventFirer, LogSeverity},
executors::{Executor, HasObservers},
monitors::UserStats,
monitors::{AggregatorOps, UserStats, UserStatsValue},
observers::{MapObserver, ObserversTuple},
schedulers::{LenTimeMulTestcaseScore, RemovableScheduler, Scheduler, TestcaseScore},
state::{HasCorpus, HasExecutions, HasMetadata, UsesState},
@ -150,7 +150,7 @@ where
state,
Event::UpdateUserStats {
name: "minimisation exec pass".to_string(),
value: UserStats::Ratio(curr, total),
value: UserStats::new(UserStatsValue::Ratio(curr, total), AggregatorOps::None),
phantom: PhantomData,
},
)?;

View File

@ -238,6 +238,8 @@ where
} else {
client_id
};
monitor.client_stats_insert(id);
let client = monitor.client_stats_mut_for(id);
client.update_corpus_size(*corpus_size as u64);
if id == client_id {
@ -254,6 +256,7 @@ where
phantom: _,
} => {
// TODO: The monitor buffer should be added on client add.
monitor.client_stats_insert(client_id);
let client = monitor.client_stats_mut_for(client_id);
client.update_executions(*executions as u64, *time);
monitor.display(event.name().to_string(), client_id);
@ -264,8 +267,10 @@ where
value,
phantom: _,
} => {
monitor.client_stats_insert(client_id);
let client = monitor.client_stats_mut_for(client_id);
client.update_user_stats(name.clone(), value.clone());
monitor.aggregate(name);
monitor.display(event.name().to_string(), client_id);
Ok(BrokerEventResult::Handled)
}
@ -279,6 +284,7 @@ where
// TODO: The monitor buffer should be added on client add.
// Get the client for the staterestorer ID
monitor.client_stats_insert(client_id);
let client = monitor.client_stats_mut_for(client_id);
// Update the normal monitor for this client
@ -294,6 +300,7 @@ where
Ok(BrokerEventResult::Handled)
}
Event::Objective { objective_size } => {
monitor.client_stats_insert(client_id);
let client = monitor.client_stats_mut_for(client_id);
client.update_objective_size(*objective_size as u64);
monitor.display(event.name().to_string(), client_id);

View File

@ -51,7 +51,10 @@ use crate::{
Error,
};
#[cfg(feature = "scalability_introspection")]
use crate::{monitors::UserStats::Number, state::HasScalabilityMonitor};
use crate::{
monitors::{AggregatorOps, UserStatsValue},
state::HasScalabilityMonitor,
};
/// Check if ctrl-c is sent with this struct
#[cfg(all(unix, feature = "std"))]
@ -539,7 +542,12 @@ where
state,
Event::UpdateUserStats {
name: "total imported".to_string(),
value: Number((imported_with_observer + imported_without_observer) as u64),
value: UserStats::new(
UserStatsValue::Number(
(imported_with_observer + imported_without_observer) as u64,
),
AggregatorOps::Avg,
),
phantom: PhantomData,
},
)?;

View File

@ -216,6 +216,7 @@ where
executions,
forward_id: _,
} => {
monitor.client_stats_insert(ClientId(0));
monitor
.client_stats_mut_for(ClientId(0))
.update_corpus_size(*corpus_size as u64);
@ -231,6 +232,7 @@ where
phantom: _,
} => {
// TODO: The monitor buffer should be added on client add.
monitor.client_stats_insert(ClientId(0));
let client = monitor.client_stats_mut_for(ClientId(0));
client.update_executions(*executions as u64, *time);
@ -243,9 +245,11 @@ where
value,
phantom: _,
} => {
monitor.client_stats_insert(ClientId(0));
monitor
.client_stats_mut_for(ClientId(0))
.update_user_stats(name.clone(), value.clone());
monitor.aggregate(name);
monitor.display(event.name().to_string(), ClientId(0));
Ok(BrokerEventResult::Handled)
}
@ -257,6 +261,7 @@ where
phantom: _,
} => {
// TODO: The monitor buffer should be added on client add.
monitor.client_stats_insert(ClientId(0));
let client = monitor.client_stats_mut_for(ClientId(0));
client.update_executions(*executions as u64, *time);
client.update_introspection_monitor((**introspection_monitor).clone());
@ -264,6 +269,7 @@ where
Ok(BrokerEventResult::Handled)
}
Event::Objective { objective_size } => {
monitor.client_stats_insert(ClientId(0));
monitor
.client_stats_mut_for(ClientId(0))
.update_objective_size(*objective_size as u64);

View File

@ -331,6 +331,7 @@ where
} else {
client_id
};
monitor.client_stats_insert(id);
let client = monitor.client_stats_mut_for(id);
client.update_corpus_size(*corpus_size as u64);
client.update_executions(*executions as u64, *time);
@ -343,6 +344,7 @@ where
phantom: _,
} => {
// TODO: The monitor buffer should be added on client add.
monitor.client_stats_insert(client_id);
let client = monitor.client_stats_mut_for(client_id);
client.update_executions(*executions as u64, *time);
monitor.display(event.name().to_string(), client_id);
@ -353,8 +355,10 @@ where
value,
phantom: _,
} => {
monitor.client_stats_insert(client_id);
let client = monitor.client_stats_mut_for(client_id);
client.update_user_stats(name.clone(), value.clone());
monitor.aggregate(name);
monitor.display(event.name().to_string(), client_id);
Ok(BrokerEventResult::Handled)
}
@ -368,6 +372,7 @@ where
// TODO: The monitor buffer should be added on client add.
// Get the client for the staterestorer ID
monitor.client_stats_insert(client_id);
let client = monitor.client_stats_mut_for(client_id);
// Update the normal monitor for this client
@ -383,6 +388,7 @@ where
Ok(BrokerEventResult::Handled)
}
Event::Objective { objective_size } => {
monitor.client_stats_insert(client_id);
let client = monitor.client_stats_mut_for(client_id);
client.update_objective_size(*objective_size as u64);
monitor.display(event.name().to_string(), client_id);

View File

@ -22,7 +22,7 @@ use crate::{
executors::ExitKind,
feedbacks::{Feedback, HasObserverName},
inputs::UsesInput,
monitors::UserStats,
monitors::{AggregatorOps, UserStats, UserStatsValue},
observers::{MapObserver, Observer, ObserversTuple, UsesObserver},
state::{HasMetadata, HasNamedMetadata, State},
Error,
@ -615,13 +615,16 @@ where
state,
Event::UpdateUserStats {
name: self.stats_name.to_string(),
value: UserStats::Ratio(
value: UserStats::new(
UserStatsValue::Ratio(
self.novelties
.as_ref()
.map_or(filled, |novelties| filled + novelties.len())
as u64,
len as u64,
),
AggregatorOps::Avg,
),
phantom: PhantomData,
},
)?;
@ -823,13 +826,16 @@ where
state,
Event::UpdateUserStats {
name: self.stats_name.to_string(),
value: UserStats::Ratio(
value: UserStats::new(
UserStatsValue::Ratio(
self.novelties
.as_ref()
.map_or(filled, |novelties| filled + novelties.len())
as u64,
len as u64,
),
AggregatorOps::Avg,
),
phantom: PhantomData,
},
)?;

View File

@ -10,9 +10,10 @@ pub mod tui;
#[cfg(all(feature = "prometheus_monitor", feature = "std"))]
#[allow(missing_docs)]
pub mod prometheus;
use alloc::string::ToString;
#[cfg(all(feature = "prometheus_monitor", feature = "std"))]
pub use prometheus::PrometheusMonitor;
#[cfg(feature = "std")]
pub mod disk;
use alloc::{fmt::Debug, string::String, vec::Vec};
@ -27,10 +28,133 @@ use serde::{Deserialize, Serialize};
#[cfg(feature = "afl_exec_sec")]
const CLIENT_STATS_TIME_WINDOW_SECS: u64 = 5; // 5 seconds
/// User-defined stat types
/// TODO define aggregation function (avg, median, max, ...)
/// Definition of how we aggreate this across multiple clients
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum UserStats {
pub enum AggregatorOps {
/// Do nothing
None,
/// Add stats up
Sum,
/// Average stats out
Avg,
/// Get the min
Min,
/// Get the max
Max,
}
/// The standard aggregator, plug this into the monitor to use
#[derive(Serialize, Deserialize, Debug, Clone, Default)]
pub struct Aggregator {
// this struct could also have hashmap or vec for caching but for now i'll just keep it simple
// for example to calculate the sum you don't have to iterate over all clients (obviously)
aggregated: HashMap<String, UserStatsValue>,
}
impl Aggregator {
/// constructor for this aggregator
#[must_use]
pub fn new() -> Self {
Self {
aggregated: HashMap::new(),
}
}
/// takes the key and the ref to clients stats then aggregate them all.
fn aggregate(&mut self, name: &str, client_stats: &[ClientStats]) {
let mut gather = vec![];
for client in client_stats {
if let Some(x) = client.user_monitor.get(name) {
gather.push(x);
}
}
let (mut init, op) = match gather.first() {
Some(x) => (x.value().clone(), x.aggregator_op().clone()),
_ => {
return;
}
};
for item in gather.iter().skip(1) {
match op {
AggregatorOps::None => {
// Nothing
return;
}
AggregatorOps::Avg | AggregatorOps::Sum => {
init = match init.stats_add(item.value()) {
Some(x) => x,
_ => {
return;
}
};
}
AggregatorOps::Min => {
init = match init.stats_min(item.value()) {
Some(x) => x,
_ => {
return;
}
};
}
AggregatorOps::Max => {
init = match init.stats_max(item.value()) {
Some(x) => x,
_ => {
return;
}
};
}
}
}
if let AggregatorOps::Avg = op {
// if avg then divide last.
init = match init.stats_div(gather.len()) {
Some(x) => x,
_ => {
return;
}
}
}
self.aggregated.insert(name.to_string(), init);
}
}
/// user defined stats enum
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct UserStats {
value: UserStatsValue,
aggregator_op: AggregatorOps,
}
impl UserStats {
/// Get the `AggregatorOps`
#[must_use]
pub fn aggregator_op(&self) -> &AggregatorOps {
&self.aggregator_op
}
/// Get the actual value for the stats
#[must_use]
pub fn value(&self) -> &UserStatsValue {
&self.value
}
/// Constructor
#[must_use]
pub fn new(value: UserStatsValue, aggregator_op: AggregatorOps) -> Self {
Self {
value,
aggregator_op,
}
}
}
/// The actual value for the userstats
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum UserStatsValue {
/// A numerical value
Number(u64),
/// A Float value
@ -41,13 +165,118 @@ pub enum UserStats {
Ratio(u64, u64),
}
impl UserStatsValue {
/// Check if this guy is numeric
#[must_use]
pub fn is_numeric(&self) -> bool {
match &self {
Self::Number(_) | Self::Float(_) | Self::Ratio(_, _) => true,
Self::String(_) => false,
}
}
/// Divide by the number of elements
#[allow(clippy::cast_precision_loss)]
pub fn stats_div(&mut self, divisor: usize) -> Option<Self> {
match self {
Self::Number(x) => Some(Self::Float(*x as f64 / divisor as f64)),
Self::Float(x) => Some(Self::Float(*x / divisor as f64)),
Self::Ratio(x, y) => Some(Self::Float((*x as f64 / divisor as f64) / *y as f64)),
Self::String(_) => None,
}
}
/// min user stats with the other
#[allow(clippy::cast_precision_loss)]
pub fn stats_max(&mut self, other: &Self) -> Option<Self> {
match (self, other) {
(Self::Number(x), Self::Number(y)) => {
if y > x {
Some(Self::Number(*y))
} else {
Some(Self::Number(*x))
}
}
(Self::Float(x), Self::Float(y)) => {
if y > x {
Some(Self::Float(*y))
} else {
Some(Self::Float(*x))
}
}
(Self::Ratio(x, a), Self::Ratio(y, b)) => {
let first = *x as f64 / *a as f64;
let second = *y as f64 / *b as f64;
if first > second {
Some(Self::Float(first))
} else {
Some(Self::Float(second))
}
}
_ => None,
}
}
/// min user stats with the other
#[allow(clippy::cast_precision_loss)]
pub fn stats_min(&mut self, other: &Self) -> Option<Self> {
match (self, other) {
(Self::Number(x), Self::Number(y)) => {
if y > x {
Some(Self::Number(*x))
} else {
Some(Self::Number(*y))
}
}
(Self::Float(x), Self::Float(y)) => {
if y > x {
Some(Self::Float(*x))
} else {
Some(Self::Float(*y))
}
}
(Self::Ratio(x, a), Self::Ratio(y, b)) => {
let first = *x as f64 / *a as f64;
let second = *y as f64 / *b as f64;
if first > second {
Some(Self::Float(second))
} else {
Some(Self::Float(first))
}
}
_ => None,
}
}
/// add user stats with the other
#[allow(clippy::cast_precision_loss)]
pub fn stats_add(&mut self, other: &Self) -> Option<Self> {
match (self, other) {
(Self::Number(x), Self::Number(y)) => Some(Self::Number(*x + *y)),
(Self::Float(x), Self::Float(y)) => Some(Self::Float(*x + *y)),
(Self::Ratio(x, a), Self::Ratio(y, b)) => {
let first = *x as f64 / *a as f64;
let second = *y as f64 / *b as f64;
Some(Self::Float(first + second))
}
_ => None,
}
}
}
impl fmt::Display for UserStats {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
UserStats::Number(n) => write!(f, "{n}"),
UserStats::Float(n) => write!(f, "{n}"),
UserStats::String(s) => write!(f, "{s}"),
UserStats::Ratio(a, b) => {
write!(f, "{}", self.value())
}
}
impl fmt::Display for UserStatsValue {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match &self {
UserStatsValue::Number(n) => write!(f, "{n}"),
UserStatsValue::Float(n) => write!(f, "{n}"),
UserStatsValue::String(s) => write!(f, "{s}"),
UserStatsValue::Ratio(a, b) => {
if *b == 0 {
write!(f, "{a}/{b}")
} else {
@ -209,8 +438,8 @@ impl ClientStats {
}
/// Update the user-defined stat with name and value
pub fn update_user_stats(&mut self, name: String, value: UserStats) {
self.user_monitor.insert(name, value);
pub fn update_user_stats(&mut self, name: String, value: UserStats) -> Option<UserStats> {
self.user_monitor.insert(name, value)
}
#[must_use]
@ -259,7 +488,7 @@ pub trait Monitor {
/// Total executions
#[inline]
fn total_execs(&mut self) -> u64 {
fn total_execs(&self) -> u64 {
self.client_stats()
.iter()
.fold(0_u64, |acc, x| acc + x.executions)
@ -281,7 +510,7 @@ pub trait Monitor {
}
/// The client monitor for a specific id, creating new if it doesn't exist
fn client_stats_mut_for(&mut self, client_id: ClientId) -> &mut ClientStats {
fn client_stats_insert(&mut self, client_id: ClientId) {
let client_stat_count = self.client_stats().len();
for _ in client_stat_count..(client_id.0 + 1) as usize {
self.client_stats_mut().push(ClientStats {
@ -290,8 +519,20 @@ pub trait Monitor {
..ClientStats::default()
});
}
}
/// Get mutable reference to client stats
fn client_stats_mut_for(&mut self, client_id: ClientId) -> &mut ClientStats {
&mut self.client_stats_mut()[client_id.0 as usize]
}
/// Get immutable reference to client stats
fn client_stats_for(&self, client_id: ClientId) -> &ClientStats {
&self.client_stats()[client_id.0 as usize]
}
/// Aggregate the results in case there're multiple clients
fn aggregate(&mut self, _name: &str) {}
}
/// Monitor that print exactly nothing.
@ -488,6 +729,7 @@ where
);
if self.print_user_monitor {
self.client_stats_insert(sender_id);
let client = self.client_stats_mut_for(sender_id);
for (key, val) in &client.user_monitor {
write!(fmt, ", {key}: {val}").unwrap();

View File

@ -1,8 +1,9 @@
//! Monitor to display both cumulative and per-client monitor
#[cfg(feature = "introspection")]
use alloc::string::ToString;
use alloc::{string::String, vec::Vec};
use alloc::{
string::{String, ToString},
vec::Vec,
};
use core::{
fmt::{Debug, Formatter, Write},
time::Duration,
@ -10,6 +11,7 @@ use core::{
use libafl_bolts::{current_time, format_duration_hms, ClientId};
use super::Aggregator;
use crate::monitors::{ClientStats, Monitor};
/// Tracking monitor during fuzzing and display both per-client and cumulative info.
@ -21,6 +23,7 @@ where
print_fn: F,
start_time: Duration,
client_stats: Vec<ClientStats>,
aggregator: Aggregator,
}
impl<F> Debug for MultiMonitor<F>
@ -59,6 +62,10 @@ where
self.start_time
}
fn aggregate(&mut self, name: &str) {
self.aggregator.aggregate(name, &self.client_stats);
}
fn display(&mut self, event_msg: String, sender_id: ClientId) {
let sender = format!("#{}", sender_id.0);
let pad = if event_msg.len() + sender.len() < 13 {
@ -79,6 +86,7 @@ where
);
(self.print_fn)(global_fmt);
self.client_stats_insert(sender_id);
let client = self.client_stats_mut_for(sender_id);
let cur_time = current_time();
let exec_sec = client.execs_per_sec_pretty(cur_time);
@ -93,6 +101,12 @@ where
}
(self.print_fn)(fmt);
let mut aggregated_fmt = "(Aggregated): ".to_string();
for (key, val) in &self.aggregator.aggregated {
write!(aggregated_fmt, " {key}: {val}").unwrap();
}
(self.print_fn)(aggregated_fmt);
// Only print perf monitor if the feature is enabled
#[cfg(feature = "introspection")]
{
@ -118,6 +132,7 @@ where
print_fn,
start_time: current_time(),
client_stats: vec![],
aggregator: Aggregator::new(),
}
}
@ -127,6 +142,7 @@ where
print_fn,
start_time,
client_stats: vec![],
aggregator: Aggregator::new(),
}
}
}

View File

@ -41,7 +41,7 @@ use prometheus_client::{
// using tide for the HTTP server library (fast, async, simple)
use tide::Request;
use crate::monitors::{ClientStats, Monitor, UserStats};
use crate::monitors::{ClientStats, Monitor, UserStatsValue};
/// Tracking monitor during fuzzing.
#[derive(Clone)]
@ -164,6 +164,7 @@ where
);
(self.print_fn)(fmt);
self.client_stats_insert(sender_id);
let cur_client = self.client_stats_mut_for(sender_id);
let cur_client_clone = cur_client.clone();
@ -172,11 +173,11 @@ where
// You can filter for each custom stat in promQL via labels of both the stat name and client id
log::info!("{key}: {val}");
#[allow(clippy::cast_precision_loss)]
let value: f64 = match val {
UserStats::Number(n) => n as f64,
UserStats::Float(f) => f,
UserStats::String(_s) => 0.0,
UserStats::Ratio(a, b) => (a as f64 / b as f64) * 100.0,
let value: f64 = match val.value() {
UserStatsValue::Number(n) => *n as f64,
UserStatsValue::Float(f) => *f,
UserStatsValue::String(_s) => 0.0,
UserStatsValue::Ratio(a, b) => (*a as f64 / *b as f64) * 100.0,
};
self.custom_stat
.get_or_create(&Labels {

View File

@ -26,7 +26,7 @@ use serde_json::{self, Value};
#[cfg(feature = "introspection")]
use super::{ClientPerfMonitor, PerfFeature};
use crate::monitors::{ClientStats, Monitor, UserStats};
use crate::monitors::{Aggregator, AggregatorOps, ClientStats, Monitor, UserStats, UserStatsValue};
pub mod ui;
use ui::TuiUI;
@ -332,6 +332,7 @@ pub struct TuiMonitor {
start_time: Duration,
client_stats: Vec<ClientStats>,
aggregator: Aggregator,
}
impl Monitor for TuiMonitor {
@ -381,6 +382,7 @@ impl Monitor for TuiMonitor {
ctx.total_item_geometry = self.item_geometry();
}
self.client_stats_insert(sender_id);
let client = self.client_stats_mut_for(sender_id);
let exec_sec = client.execs_per_sec_pretty(cur_time);
@ -398,6 +400,10 @@ impl Monitor for TuiMonitor {
for (key, val) in &client.user_monitor {
write!(fmt, ", {key}: {val}").unwrap();
}
write!(fmt, ", (Aggregated):").unwrap();
for (key, val) in &self.aggregator.aggregated {
write!(fmt, ", {key}: {val}").unwrap();
}
{
let client = &self.client_stats()[sender_id.0 as usize];
@ -426,6 +432,10 @@ impl Monitor for TuiMonitor {
}
}
}
fn aggregate(&mut self, name: &str) {
self.aggregator.aggregate(name, &self.client_stats);
}
}
impl TuiMonitor {
@ -470,6 +480,7 @@ impl TuiMonitor {
context,
start_time,
client_stats: vec![],
aggregator: Aggregator::new(),
}
}
@ -506,9 +517,10 @@ impl TuiMonitor {
let afl_stats = client
.get_user_stats("AflStats")
.map_or("None".to_string(), ToString::to_string);
let stability = client
.get_user_stats("stability")
.map_or(&UserStats::Ratio(0, 100), |x| x);
let stability = client.get_user_stats("stability").map_or(
UserStats::new(UserStatsValue::Ratio(0, 100), AggregatorOps::Avg),
core::clone::Clone::clone,
);
if afl_stats != "None" {
let default_json = serde_json::json!({
@ -529,7 +541,7 @@ impl TuiMonitor {
afl_stats_json["imported"].as_u64().unwrap_or_default();
}
if let UserStats::Ratio(a, b) = stability {
if let UserStatsValue::Ratio(a, b) = stability.value() {
ratio_a += a;
ratio_b += b;
}

View File

@ -17,7 +17,7 @@ use crate::{
executors::{Executor, ExitKind, HasObservers},
feedbacks::{map::MapFeedbackMetadata, HasObserverName},
fuzzer::Evaluator,
monitors::UserStats,
monitors::{AggregatorOps, UserStats, UserStatsValue},
observers::{MapObserver, ObserversTuple, UsesObserver},
schedulers::powersched::SchedulerMetadata,
stages::Stage,
@ -304,10 +304,13 @@ where
state,
Event::UpdateUserStats {
name: "stability".to_string(),
value: UserStats::Ratio(
value: UserStats::new(
UserStatsValue::Ratio(
(map_len - unstable_entries) as u64,
map_len as u64,
),
AggregatorOps::Avg,
),
phantom: PhantomData,
},
)?;

View File

@ -17,7 +17,10 @@ use crate::{
Error,
};
#[cfg(feature = "std")]
use crate::{events::Event, monitors::UserStats};
use crate::{
events::Event,
monitors::{AggregatorOps, UserStats, UserStatsValue},
};
/// The [`AflStatsStage`] is a simple stage that computes and reports some stats.
#[derive(Debug, Clone)]
@ -102,7 +105,10 @@ where
state,
Event::UpdateUserStats {
name: "AflStats".to_string(),
value: UserStats::String(json.to_string()),
value: UserStats::new(
UserStatsValue::String(json.to_string()),
AggregatorOps::None,
),
phantom: PhantomData,
},
)?;