Add StatsD monitor (#2969)

* Add StatsD monitor

* Fix

* Use f64 instead of fractal
This commit is contained in:
EvianZhang 2025-02-13 08:15:27 +08:00 committed by GitHub
parent 0736c56647
commit 5281b41abb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 284 additions and 33 deletions

View File

@ -133,6 +133,9 @@ prometheus_monitor = [
"futures",
]
## Enables the `StatsdMonitor`.
statsd_monitor = ["std", "cadence"]
## Include a simple concolic mutator based on z3
concolic_mutation = ["z3"]
@ -257,6 +260,7 @@ ratatui = { version = "0.29.0", default-features = false, features = [
], optional = true } # Commandline rendering, for TUI Monitor
crossterm = { version = "0.28.1", optional = true }
cadence = { version = "1.5.0", optional = true } # For the statsd monitor
prometheus-client = { version = "0.23.0", optional = true } # For the prometheus monitor
tide = { version = "0.16.0", optional = true }
async-std = { version = "1.13.0", features = ["attributes"], optional = true }

View File

@ -20,17 +20,22 @@ pub mod tui;
#[cfg(all(feature = "tui_monitor", feature = "std"))]
pub use tui::TuiMonitor;
#[cfg(all(feature = "prometheus_monitor", feature = "std"))]
#[cfg(feature = "prometheus_monitor")]
pub mod prometheus;
#[cfg(feature = "statsd_monitor")]
pub mod statsd;
use alloc::fmt::Debug;
#[cfg(feature = "std")]
use alloc::vec::Vec;
use core::{fmt, fmt::Write, time::Duration};
use libafl_bolts::ClientId;
#[cfg(all(feature = "prometheus_monitor", feature = "std"))]
#[cfg(feature = "prometheus_monitor")]
pub use prometheus::PrometheusMonitor;
#[cfg(feature = "statsd_monitor")]
pub use statsd::StatsdMonitor;
use crate::monitors::stats::ClientStatsManager;

View File

@ -1,18 +1,16 @@
//! Client statistics manager
use alloc::{
borrow::Cow,
string::{String, ToString},
vec::Vec,
};
use core::{cmp, time::Duration};
#[cfg(feature = "std")]
use alloc::string::ToString;
use alloc::{borrow::Cow, string::String, vec::Vec};
use core::time::Duration;
use hashbrown::HashMap;
use libafl_bolts::{current_time, format_duration_hms, ClientId};
#[cfg(feature = "std")]
use serde_json::Value;
use super::{user_stats::UserStatsValue, ClientStats, ProcessTiming};
use super::{user_stats::UserStatsValue, ClientStats, EdgeCoverage, ProcessTiming};
#[cfg(feature = "std")]
use super::{
user_stats::{AggregatorOps, UserStats},
@ -192,18 +190,23 @@ impl ClientStatsManager {
total_process_timing
}
/// Get map density
/// Get max edges coverage of all clients
#[must_use]
pub fn map_density(&self) -> String {
pub fn edges_coverage(&self) -> Option<EdgeCoverage> {
self.client_stats()
.iter()
.filter(|client| client.enabled())
.filter_map(|client| client.get_user_stats("edges"))
.map(ToString::to_string)
.fold("0%".to_string(), cmp::max)
.filter_map(ClientStats::edges_coverage)
.max_by_key(
|EdgeCoverage {
edges_hit,
edges_total,
}| { *edges_hit * 100 / *edges_total },
)
}
/// Get item geometry
#[allow(clippy::cast_precision_loss)]
#[cfg(feature = "std")]
#[must_use]
pub fn item_geometry(&self) -> ItemGeometry {
@ -246,7 +249,11 @@ impl ClientStatsManager {
ratio_b += b;
}
}
total_item_geometry.stability = format!("{}%", ratio_a * 100 / ratio_b);
total_item_geometry.stability = if ratio_b == 0 {
None
} else {
Some((ratio_a as f64) / (ratio_b as f64))
};
total_item_geometry
}
}

View File

@ -118,19 +118,27 @@ pub struct ItemGeometry {
pub own_finds: u64,
/// How much entries were imported
pub imported: u64,
/// The stability, stringified
pub stability: String,
/// The stability, ranges from 0.0 to 1.0.
///
/// If there is no such data, this field will be `None`.
pub stability: Option<f64>,
}
impl ItemGeometry {
/// Create a new [`ItemGeometry`]
#[must_use]
pub fn new() -> Self {
Self {
stability: "0%".to_string(),
..Default::default()
ItemGeometry::default()
}
}
/// Stats of edge coverage
#[derive(Debug, Default, Clone)]
pub struct EdgeCoverage {
/// Count of hit edges
pub edges_hit: u64,
/// Count of total edges
pub edges_total: u64,
}
impl ClientStats {
@ -340,14 +348,22 @@ impl ClientStats {
}
}
/// Get map density of current client
/// Get edge coverage of current client
#[must_use]
pub fn map_density(&self) -> String {
self.get_user_stats("edges")
.map_or("0%".to_string(), ToString::to_string)
pub fn edges_coverage(&self) -> Option<EdgeCoverage> {
self.get_user_stats("edges").and_then(|user_stats| {
let UserStatsValue::Ratio(edges_hit, edges_total) = user_stats.value() else {
return None;
};
Some(EdgeCoverage {
edges_hit: *edges_hit,
edges_total: *edges_total,
})
})
}
/// Get item geometry of current client
#[allow(clippy::cast_precision_loss)]
#[cfg(feature = "std")]
#[must_use]
pub fn item_geometry(&self) -> ItemGeometry {
@ -368,9 +384,20 @@ impl ClientStats {
let imported = afl_stats_json["imported"].as_u64().unwrap_or_default();
let own_finds = afl_stats_json["own_finds"].as_u64().unwrap_or_default();
let stability = self
.get_user_stats("stability")
.map_or("0%".to_string(), ToString::to_string);
let stability = self.get_user_stats("stability").map_or(
UserStats::new(UserStatsValue::Ratio(0, 100), AggregatorOps::Avg),
Clone::clone,
);
let stability = if let UserStatsValue::Ratio(a, b) = stability.value() {
if *b == 0 {
Some(0.0)
} else {
Some((*a as f64) / (*b as f64))
}
} else {
None
};
ItemGeometry {
pending,

View File

@ -0,0 +1,193 @@
//! StatsD monitor.
//!
//! This roughly corresponds to the [AFL++'s rpc_statsd](https://github.com/AFLplusplus/AFLplusplus/blob/stable/docs/rpc_statsd.md),
//! so you could view such documentation for detailed information.
//!
//! StatsD monitor is useful when you have multiple fuzzing instances, and this monitor
//! could help visualizing the aggregated fuzzing statistics with serveral third-party
//! statsd-related tools.
// Use this since clippy thinks we should use `StatsD` instead of StatsD.
#![allow(clippy::doc_markdown)]
use alloc::string::String;
use std::{borrow::ToOwned, net::UdpSocket};
use cadence::{BufferedUdpMetricSink, Gauged, QueuingMetricSink, StatsdClient};
use libafl_bolts::ClientId;
use super::{
stats::{manager::GlobalStats, ClientStatsManager, EdgeCoverage, ItemGeometry},
Monitor,
};
const METRIC_PREFIX: &str = "fuzzing";
/// Flavor of StatsD tag
#[derive(Debug)]
pub enum StatsdMonitorTagFlavor {
/// [Datadog](https://docs.datadoghq.com/developers/dogstatsd/) style tag
DogStatsd {
/// Identifier to distinguish this fuzzing instance with others.
tag_identifier: String,
},
/// No tag
None,
}
impl Default for StatsdMonitorTagFlavor {
fn default() -> Self {
Self::DogStatsd {
tag_identifier: "default".to_owned(),
}
}
}
/// StatsD monitor
#[derive(Debug)]
pub struct StatsdMonitor {
target_host: String,
target_port: u16,
tag_flavor: StatsdMonitorTagFlavor,
statsd_client: Option<StatsdClient>,
}
impl StatsdMonitor {
/// Create a new StatsD monitor, which sends metrics to server
/// specified by `target_host` and `target_port` via UDP.
///
/// If that server is down, this monitor will just do nothing and will
/// not crash or throw, so use this freely. :)
#[must_use]
pub fn new(target_host: String, target_port: u16, tag_flavor: StatsdMonitorTagFlavor) -> Self {
let mut this = Self {
target_host,
target_port,
tag_flavor,
statsd_client: None,
};
this.setup_statsd_client();
this
}
// Call this method if self.statsd_client is None.
fn setup_statsd_client(&mut self) {
// This code follows https://docs.rs/cadence/latest/cadence/#queuing-asynchronous-metric-sink,
// which is the preferred way to use Cadence in production.
//
// For anyone maintaining this module, please carefully read that section.
// This bind would never fail, or something extermely unexpected happened
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
// This set config would never fail, or something extermely unexpected happened
socket.set_nonblocking(true).unwrap();
let Ok(udp_sink) =
BufferedUdpMetricSink::from((self.target_host.as_str(), self.target_port), socket)
else {
log::warn!(
"Statsd monitor failed to connect target host {}:{}",
self.target_host,
self.target_port
);
return;
};
let queuing_sink = QueuingMetricSink::builder()
.with_error_handler(|e| {
log::warn!("Statsd monitor failed to send to target host: {e:?}");
})
.build(udp_sink);
let mut client_builder = StatsdClient::builder(METRIC_PREFIX, queuing_sink);
if let StatsdMonitorTagFlavor::DogStatsd { tag_identifier } = &self.tag_flavor {
client_builder = client_builder
.with_tag("banner", tag_identifier)
.with_tag("afl_version", env!("CARGO_PKG_VERSION"));
}
let client = client_builder.build();
self.statsd_client = Some(client);
}
#[allow(clippy::cast_precision_loss)]
fn try_display(&mut self, client_stats_manager: &mut ClientStatsManager) -> Option<()> {
if self.statsd_client.is_none() {
self.setup_statsd_client();
}
let Some(statsd_client) = &mut self.statsd_client else {
// The client still cannot be built. Then we do nothing.
return Some(());
};
let GlobalStats {
total_execs,
execs_per_sec,
corpus_size,
objective_size,
..
} = client_stats_manager.global_stats();
let total_execs = *total_execs;
let execs_per_sec = *execs_per_sec;
let corpus_size = *corpus_size;
let objective_size = *objective_size;
let ItemGeometry {
pending,
pend_fav,
own_finds,
imported,
stability,
} = client_stats_manager.item_geometry();
let edges_coverage = client_stats_manager.edges_coverage();
// In the following codes, we immediate throw if the statsd client failed
// to send metrics. The caller should clear the statsd client when error occurred.
//
// The error generated by sending metrics will be handled by the error handler
// registered when creating queuing_sink.
//
// The following metrics are taken from AFLplusplus/src/afl-fuzz-statsd.c
// Metrics followed by "Newly added" mean they are not in AFL++.
statsd_client.gauge("execs_done", total_execs).ok()?;
statsd_client.gauge("execs_per_sec", execs_per_sec).ok()?;
statsd_client.gauge("corpus_count", corpus_size).ok()?;
statsd_client.gauge("corpus_found", own_finds).ok()?;
statsd_client.gauge("corpus_imported", imported).ok()?;
if let Some(stability) = stability {
statsd_client.gauge("stability", stability).ok()?; // Newly added
}
statsd_client.gauge("pending_favs", pend_fav).ok()?;
statsd_client.gauge("pending_total", pending).ok()?;
statsd_client
.gauge("saved_solutions", objective_size)
.ok()?; // Newly added
if let Some(EdgeCoverage {
edges_hit,
edges_total,
}) = edges_coverage
{
statsd_client.gauge("edges_found", edges_hit).ok()?;
statsd_client
.gauge("map_density", (edges_hit as f64) / (edges_total as f64))
.ok()?; // Newly added
}
Some(())
}
}
impl Monitor for StatsdMonitor {
fn display(
&mut self,
client_stats_manager: &mut ClientStatsManager,
_event_msg: &str,
_sender_id: ClientId,
) {
if self.try_display(client_stats_manager).is_none() {
// The client failed to send metrics, which means the server is down
// or something else happened. We then de-initialize the client, and
// when the `display` is called next time, it will be re-initialized
// and try to connect the server then.
self.statsd_client = None;
}
}
}

View File

@ -30,8 +30,8 @@ use typed_builder::TypedBuilder;
use crate::monitors::stats::perf_stats::{ClientPerfStats, PerfFeature};
use crate::monitors::{
stats::{
manager::ClientStatsManager, user_stats::UserStats, ClientStats, ItemGeometry,
ProcessTiming,
manager::ClientStatsManager, user_stats::UserStats, ClientStats, EdgeCoverage,
ItemGeometry, ProcessTiming,
},
Monitor,
};
@ -238,7 +238,13 @@ impl ClientTuiContext {
self.executions = client.executions();
self.process_timing = client.process_timing();
self.map_density = client.map_density();
self.map_density = client.edges_coverage().map_or(
"0%".to_string(),
|EdgeCoverage {
edges_hit,
edges_total,
}| format!("{}%", edges_hit * 100 / edges_total),
);
self.item_geometry = client.item_geometry();
for (key, val) in client.user_stats() {
@ -355,7 +361,13 @@ impl Monitor for TuiMonitor {
ctx.start_time = client_stats_manager.start_time();
ctx.total_execs = totalexec;
ctx.clients_num = client_stats_manager.client_stats().len();
ctx.total_map_density = client_stats_manager.map_density();
ctx.total_map_density = client_stats_manager.edges_coverage().map_or(
"0%".to_string(),
|EdgeCoverage {
edges_hit,
edges_total,
}| format!("{}%", edges_hit * 100 / edges_total),
);
ctx.total_cycles_done = 0;
ctx.total_item_geometry = client_stats_manager.item_geometry();
}

View File

@ -463,7 +463,10 @@ impl TuiUi {
]),
Row::new(vec![
Cell::from(Span::raw("stability")),
Cell::from(Span::raw(&item_geometry.stability)),
Cell::from(Span::raw(format!(
"{:.2}%",
item_geometry.stability.unwrap_or(0.0) * 100.0
))),
]),
];