Separate Prometheus metrics into global and per-client categories with refactoring (#2781)

* separated global from per-client stats. refactoring

* cargo +nighlty fmt

* derive(Default) for PrometheusStats
This commit is contained in:
cube0x8 2024-12-19 11:38:43 +02:00 committed by GitHub
parent 358a5ea7f7
commit df3384d868
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -28,8 +28,9 @@
//! When using docker, you may need to point `prometheus.yml` to the `docker0` interface or `host.docker.internal` //! When using docker, you may need to point `prometheus.yml` to the `docker0` interface or `host.docker.internal`
use alloc::{borrow::Cow, fmt::Debug, string::String, vec::Vec}; use alloc::{borrow::Cow, fmt::Debug, string::String, vec::Vec};
use core::{fmt, time::Duration}; use core::{fmt, fmt::Write, time::Duration};
use std::{ use std::{
string::ToString,
sync::{atomic::AtomicU64, Arc}, sync::{atomic::AtomicU64, Arc},
thread, thread,
}; };
@ -46,8 +47,21 @@ use prometheus_client::{
// using tide for the HTTP server library (fast, async, simple) // using tide for the HTTP server library (fast, async, simple)
use tide::Request; use tide::Request;
use super::Aggregator;
use crate::monitors::{ClientStats, Monitor, UserStatsValue}; use crate::monitors::{ClientStats, Monitor, UserStatsValue};
/// Prometheus metrics for global and each client.
#[derive(Clone, Debug, Default)]
pub struct PrometheusStats {
corpus_count: Family<Labels, Gauge>,
objective_count: Family<Labels, Gauge>,
executions: Family<Labels, Gauge>,
exec_rate: Family<Labels, Gauge<f64, AtomicU64>>,
runtime: Family<Labels, Gauge>,
clients_count: Family<Labels, Gauge>,
custom_stat: Family<Labels, Gauge<f64, AtomicU64>>,
}
/// Tracking monitor during fuzzing. /// Tracking monitor during fuzzing.
#[derive(Clone)] #[derive(Clone)]
pub struct PrometheusMonitor<F> pub struct PrometheusMonitor<F>
@ -56,14 +70,10 @@ where
{ {
print_fn: F, print_fn: F,
start_time: Duration, start_time: Duration,
client_stats: Vec<ClientStats>, prometheus_global_stats: PrometheusStats, // global prometheus metrics
corpus_count: Family<Labels, Gauge>, prometheus_client_stats: PrometheusStats, // per-client prometheus metrics
objective_count: Family<Labels, Gauge>, client_stats: Vec<ClientStats>, // per-client statistics
executions: Family<Labels, Gauge>, aggregator: Aggregator, // aggregator for global custom statistics
exec_rate: Family<Labels, Gauge<f64, AtomicU64>>,
runtime: Family<Labels, Gauge>,
clients_count: Family<Labels, Gauge>,
custom_stat: Family<Labels, Gauge<f64, AtomicU64>>,
} }
impl<F> Debug for PrometheusMonitor<F> impl<F> Debug for PrometheusMonitor<F>
@ -102,64 +112,80 @@ where
self.start_time = time; self.start_time = time;
} }
/// aggregate client stats
fn aggregate(&mut self, name: &str) {
self.aggregator.aggregate(name, &self.client_stats);
}
#[allow(clippy::cast_sign_loss)] #[allow(clippy::cast_sign_loss)]
fn display(&mut self, event_msg: &str, sender_id: ClientId) { fn display(&mut self, event_msg: &str, sender_id: ClientId) {
// Update the prometheus metrics // Update the prometheus metrics
// Label each metric with the sender / client_id
// The gauges must take signed i64's, with max value of 2^63-1 so it is // The gauges must take signed i64's, with max value of 2^63-1 so it is
// probably fair to error out at a count of nine quintillion across any // probably fair to error out at a count of nine quintillion across any
// of these counts. // of these counts.
// realistically many of these metrics should be counters but would // realistically many of these metrics should be counters but would
// require a fair bit of logic to handle "amount to increment given // require a fair bit of logic to handle "amount to increment given
// time since last observation" // time since last observation"
// Global (aggregated) metrics
let corpus_size = self.corpus_size(); let corpus_size = self.corpus_size();
self.corpus_count self.prometheus_global_stats
.corpus_count
.get_or_create(&Labels { .get_or_create(&Labels {
client: sender_id.0, client: Cow::from("global"),
stat: Cow::from(""), stat: Cow::from(""),
}) })
.set(corpus_size.try_into().unwrap()); .set(corpus_size.try_into().unwrap());
let objective_size = self.objective_size(); let objective_size = self.objective_size();
self.objective_count self.prometheus_global_stats
.objective_count
.get_or_create(&Labels { .get_or_create(&Labels {
client: sender_id.0, client: Cow::from("global"),
stat: Cow::from(""), stat: Cow::from(""),
}) })
.set(objective_size.try_into().unwrap()); .set(objective_size.try_into().unwrap());
let total_execs = self.total_execs(); let total_execs = self.total_execs();
self.executions self.prometheus_global_stats
.executions
.get_or_create(&Labels { .get_or_create(&Labels {
client: sender_id.0, client: Cow::from("global"),
stat: Cow::from(""), stat: Cow::from(""),
}) })
.set(total_execs.try_into().unwrap()); .set(total_execs.try_into().unwrap());
let execs_per_sec = self.execs_per_sec(); let execs_per_sec = self.execs_per_sec();
self.exec_rate self.prometheus_global_stats
.exec_rate
.get_or_create(&Labels { .get_or_create(&Labels {
client: sender_id.0, client: Cow::from("global"),
stat: Cow::from(""), stat: Cow::from(""),
}) })
.set(execs_per_sec); .set(execs_per_sec);
let run_time = (current_time() - self.start_time).as_secs(); let run_time = (current_time() - self.start_time).as_secs();
self.runtime self.prometheus_global_stats
.runtime
.get_or_create(&Labels { .get_or_create(&Labels {
client: sender_id.0, client: Cow::from("global"),
stat: Cow::from(""), stat: Cow::from(""),
}) })
.set(run_time.try_into().unwrap()); // run time in seconds, which can be converted to a time format by Grafana or similar .set(run_time.try_into().unwrap()); // run time in seconds, which can be converted to a time format by Grafana or similar
let total_clients = self.client_stats_count().try_into().unwrap(); // convert usize to u64 (unlikely that # of clients will be > 2^64 -1...) let total_clients = self.client_stats_count().try_into().unwrap(); // convert usize to u64 (unlikely that # of clients will be > 2^64 -1...)
self.clients_count self.prometheus_global_stats
.clients_count
.get_or_create(&Labels { .get_or_create(&Labels {
client: sender_id.0, client: Cow::from("global"),
stat: Cow::from(""), stat: Cow::from(""),
}) })
.set(total_clients); .set(total_clients);
// display stats in a SimpleMonitor format // display stats in a SimpleMonitor format
let fmt = format!( let mut global_fmt = format!(
"[Prometheus] [{} #{}] run time: {}, clients: {}, corpus: {}, objectives: {}, executions: {}, exec/sec: {}", "[Prometheus] [{} #GLOBAL] run time: {}, clients: {}, corpus: {}, objectives: {}, executions: {}, exec/sec: {}",
event_msg, event_msg,
sender_id.0,
format_duration_hms(&(current_time() - self.start_time)), format_duration_hms(&(current_time() - self.start_time)),
self.client_stats_count(), self.client_stats_count(),
self.corpus_size(), self.corpus_size(),
@ -167,31 +193,151 @@ where
self.total_execs(), self.total_execs(),
self.execs_per_sec_pretty() self.execs_per_sec_pretty()
); );
(self.print_fn)(&fmt); for (key, val) in &self.aggregator.aggregated {
// print global aggregated custom stats
write!(global_fmt, ", {key}: {val}").unwrap();
#[allow(clippy::cast_precision_loss)]
let value: f64 = match val {
UserStatsValue::Number(n) => *n as f64,
UserStatsValue::Float(f) => *f,
UserStatsValue::String(_s) => 0.0,
UserStatsValue::Ratio(a, b) => {
if key == "edges" {
self.prometheus_global_stats
.custom_stat
.get_or_create(&Labels {
client: Cow::from("global"),
stat: Cow::from("edges_total"),
})
.set(*b as f64);
self.prometheus_global_stats
.custom_stat
.get_or_create(&Labels {
client: Cow::from("global"),
stat: Cow::from("edges_hit"),
})
.set(*a as f64);
}
(*a as f64 / *b as f64) * 100.0
}
UserStatsValue::Percent(p) => *p * 100.0,
};
self.prometheus_global_stats
.custom_stat
.get_or_create(&Labels {
client: Cow::from("global"),
stat: Cow::from(key.clone()),
})
.set(value);
}
(self.print_fn)(&global_fmt);
// Client-specific metrics
self.client_stats_insert(sender_id); self.client_stats_insert(sender_id);
let cur_client = self.client_stats_mut_for(sender_id); let client = self.client_stats_for(sender_id);
let cur_client_clone = cur_client.clone(); let mut cur_client_clone = client.clone();
self.prometheus_client_stats
.corpus_count
.get_or_create(&Labels {
client: Cow::from(sender_id.0.to_string()),
stat: Cow::from(""),
})
.set(cur_client_clone.corpus_size.try_into().unwrap());
self.prometheus_client_stats
.objective_count
.get_or_create(&Labels {
client: Cow::from(sender_id.0.to_string()),
stat: Cow::from(""),
})
.set(cur_client_clone.objective_size.try_into().unwrap());
self.prometheus_client_stats
.executions
.get_or_create(&Labels {
client: Cow::from(sender_id.0.to_string()),
stat: Cow::from(""),
})
.set(cur_client_clone.executions.try_into().unwrap());
self.prometheus_client_stats
.exec_rate
.get_or_create(&Labels {
client: Cow::from(sender_id.0.to_string()),
stat: Cow::from(""),
})
.set(cur_client_clone.execs_per_sec(current_time()));
let client_run_time = (current_time() - cur_client_clone.start_time).as_secs();
self.prometheus_client_stats
.runtime
.get_or_create(&Labels {
client: Cow::from(sender_id.0.to_string()),
stat: Cow::from(""),
})
.set(client_run_time.try_into().unwrap()); // run time in seconds per-client, which can be converted to a time format by Grafana or similar
self.prometheus_global_stats
.clients_count
.get_or_create(&Labels {
client: Cow::from(sender_id.0.to_string()),
stat: Cow::from(""),
})
.set(total_clients);
let mut fmt = format!(
"[Prometheus] [{} #{}] corpus: {}, objectives: {}, executions: {}, exec/sec: {}",
event_msg,
sender_id.0,
client.corpus_size,
client.objective_size,
client.executions,
cur_client_clone.execs_per_sec_pretty(current_time())
);
for (key, val) in cur_client_clone.user_monitor { for (key, val) in cur_client_clone.user_monitor {
// print the custom stats for each client
write!(fmt, ", {key}: {val}").unwrap();
// Update metrics added to the user_stats hashmap by feedback event-fires // Update metrics added to the user_stats hashmap by feedback event-fires
// You can filter for each custom stat in promQL via labels of both the stat name and client id // You can filter for each custom stat in promQL via labels of both the stat name and client id
log::info!("{key}: {val}");
#[allow(clippy::cast_precision_loss)] #[allow(clippy::cast_precision_loss)]
let value: f64 = match val.value() { let value: f64 = match val.value() {
UserStatsValue::Number(n) => *n as f64, UserStatsValue::Number(n) => *n as f64,
UserStatsValue::Float(f) => *f, UserStatsValue::Float(f) => *f,
UserStatsValue::String(_s) => 0.0, UserStatsValue::String(_s) => 0.0,
UserStatsValue::Ratio(a, b) => (*a as f64 / *b as f64) * 100.0, UserStatsValue::Ratio(a, b) => {
if key == "edges" {
self.prometheus_client_stats
.custom_stat
.get_or_create(&Labels {
client: Cow::from(sender_id.0.to_string()),
stat: Cow::from("edges_total"),
})
.set(*b as f64);
self.prometheus_client_stats
.custom_stat
.get_or_create(&Labels {
client: Cow::from(sender_id.0.to_string()),
stat: Cow::from("edges_hit"),
})
.set(*a as f64);
}
(*a as f64 / *b as f64) * 100.0
}
UserStatsValue::Percent(p) => *p * 100.0, UserStatsValue::Percent(p) => *p * 100.0,
}; };
self.custom_stat self.prometheus_client_stats
.custom_stat
.get_or_create(&Labels { .get_or_create(&Labels {
client: sender_id.0, client: Cow::from(sender_id.0.to_string()),
stat: key.clone(), stat: key.clone(),
}) })
.set(value); .set(value);
} }
(self.print_fn)(&fmt);
} }
} }
@ -203,33 +349,18 @@ where
/// The `listener` is the address to send logs to. /// The `listener` is the address to send logs to.
/// The `print_fn` is the printing function that can output the logs otherwise. /// The `print_fn` is the printing function that can output the logs otherwise.
pub fn new(listener: String, print_fn: F) -> Self { pub fn new(listener: String, print_fn: F) -> Self {
// Gauge's implementation of clone uses Arc let prometheus_global_stats = PrometheusStats::default();
let corpus_count = Family::<Labels, Gauge>::default(); let prometheus_global_stats_clone = prometheus_global_stats.clone();
let corpus_count_clone = corpus_count.clone(); let prometheus_client_stats = PrometheusStats::default();
let objective_count = Family::<Labels, Gauge>::default(); let prometheus_client_stats_clone = prometheus_client_stats.clone();
let objective_count_clone = objective_count.clone(); let client_stats = Vec::<ClientStats>::default();
let executions = Family::<Labels, Gauge>::default();
let executions_clone = executions.clone();
let exec_rate = Family::<Labels, Gauge<f64, AtomicU64>>::default();
let exec_rate_clone = exec_rate.clone();
let runtime = Family::<Labels, Gauge>::default();
let runtime_clone = runtime.clone();
let clients_count = Family::<Labels, Gauge>::default();
let clients_count_clone = clients_count.clone();
let custom_stat = Family::<Labels, Gauge<f64, AtomicU64>>::default();
let custom_stat_clone = custom_stat.clone();
// Need to run the metrics server in a different thread to avoid blocking // Need to run the metrics server in a different thread to avoid blocking
thread::spawn(move || { thread::spawn(move || {
block_on(serve_metrics( block_on(serve_metrics(
listener, listener,
corpus_count_clone, prometheus_global_stats_clone,
objective_count_clone, prometheus_client_stats_clone,
executions_clone,
exec_rate_clone,
runtime_clone,
clients_count_clone,
custom_stat_clone,
)) ))
.map_err(|err| log::error!("{err:?}")) .map_err(|err| log::error!("{err:?}"))
.ok(); .ok();
@ -237,43 +368,25 @@ where
Self { Self {
print_fn, print_fn,
start_time: current_time(), start_time: current_time(),
client_stats: vec![], prometheus_global_stats,
corpus_count, prometheus_client_stats,
objective_count, client_stats,
executions, aggregator: Aggregator::new(),
exec_rate,
runtime,
clients_count,
custom_stat,
} }
} }
/// Creates the monitor with a given `start_time`. /// Creates the monitor with a given `start_time`.
pub fn with_time(listener: String, print_fn: F, start_time: Duration) -> Self { pub fn with_time(listener: String, print_fn: F, start_time: Duration) -> Self {
let corpus_count = Family::<Labels, Gauge>::default(); let prometheus_global_stats = PrometheusStats::default();
let corpus_count_clone = corpus_count.clone(); let prometheus_global_stats_clone = prometheus_global_stats.clone();
let objective_count = Family::<Labels, Gauge>::default(); let prometheus_client_stats = PrometheusStats::default();
let objective_count_clone = objective_count.clone(); let prometheus_client_stats_clone = prometheus_client_stats.clone();
let executions = Family::<Labels, Gauge>::default(); let client_stats = Vec::<ClientStats>::default();
let executions_clone = executions.clone();
let exec_rate = Family::<Labels, Gauge<f64, AtomicU64>>::default();
let exec_rate_clone = exec_rate.clone();
let runtime = Family::<Labels, Gauge>::default();
let runtime_clone = runtime.clone();
let clients_count = Family::<Labels, Gauge>::default();
let clients_count_clone = clients_count.clone();
let custom_stat = Family::<Labels, Gauge<f64, AtomicU64>>::default();
let custom_stat_clone = custom_stat.clone();
thread::spawn(move || { thread::spawn(move || {
block_on(serve_metrics( block_on(serve_metrics(
listener, listener,
corpus_count_clone, prometheus_global_stats_clone,
objective_count_clone, prometheus_client_stats_clone,
executions_clone,
exec_rate_clone,
runtime_clone,
clients_count_clone,
custom_stat_clone,
)) ))
.map_err(|err| log::error!("{err:?}")) .map_err(|err| log::error!("{err:?}"))
.ok(); .ok();
@ -281,58 +394,94 @@ where
Self { Self {
print_fn, print_fn,
start_time, start_time,
client_stats: vec![], prometheus_global_stats,
corpus_count, prometheus_client_stats,
objective_count, client_stats,
executions, aggregator: Aggregator::new(),
exec_rate,
runtime,
clients_count,
custom_stat,
} }
} }
} }
/// Set up an HTTP endpoint /metrics /// Set up an HTTP endpoint /metrics
#[allow(clippy::too_many_arguments)]
pub(crate) async fn serve_metrics( pub(crate) async fn serve_metrics(
listener: String, listener: String,
corpus: Family<Labels, Gauge>, global_stats: PrometheusStats,
objectives: Family<Labels, Gauge>, client_stats: PrometheusStats,
executions: Family<Labels, Gauge>,
exec_rate: Family<Labels, Gauge<f64, AtomicU64>>,
runtime: Family<Labels, Gauge>,
clients_count: Family<Labels, Gauge>,
custom_stat: Family<Labels, Gauge<f64, AtomicU64>>,
) -> Result<(), std::io::Error> { ) -> Result<(), std::io::Error> {
let mut registry = Registry::default(); let mut registry = Registry::default();
registry.register("corpus_count", "Number of test cases in the corpus", corpus); // Register the global stats
registry.register(
"global_corpus_count",
"Number of test cases in the corpus",
global_stats.corpus_count,
);
registry.register(
"global_objective_count",
"Number of times the objective has been achieved (e.g., crashes)",
global_stats.objective_count,
);
registry.register(
"global_executions_total",
"Total number of executions",
global_stats.executions,
);
registry.register(
"execution_rate",
"Rate of executions per second",
global_stats.exec_rate,
);
registry.register(
"global_runtime",
"How long the fuzzer has been running for (seconds)",
global_stats.runtime,
);
registry.register(
"global_clients_count",
"How many clients have been spawned for the fuzzing job",
global_stats.clients_count,
);
registry.register(
"global_custom_stat",
"A metric to contain custom stats returned by feedbacks, filterable by label (aggregated)",
global_stats.custom_stat,
);
// Register the client stats
registry.register(
"corpus_count",
"Number of test cases in the client's corpus",
client_stats.corpus_count,
);
registry.register( registry.register(
"objective_count", "objective_count",
"Number of times the objective has been achieved (e.g., crashes)", "Number of client's objectives (e.g., crashes)",
objectives, client_stats.objective_count,
); );
registry.register( registry.register(
"executions_total", "executions_total",
"Number of executions the fuzzer has done", "Total number of client executions",
executions, client_stats.executions,
);
registry.register(
"execution_rate",
"Rate of executions per second",
client_stats.exec_rate,
); );
registry.register("execution_rate", "Rate of executions per second", exec_rate);
registry.register( registry.register(
"runtime", "runtime",
"How long the fuzzer has been running for (seconds)", "How long the client has been running for (seconds)",
runtime, client_stats.runtime,
); );
registry.register( registry.register(
"clients_count", "clients_count",
"How many clients have been spawned for the fuzzing job", "How many clients have been spawned for the fuzzing job",
clients_count, client_stats.clients_count,
); );
registry.register( registry.register(
"custom_stat", "custom_stat",
"A metric to contain custom stats returned by feedbacks, filterable by label", "A metric to contain custom stats returned by feedbacks, filterable by label",
custom_stat, client_stats.custom_stat,
); );
let mut app = tide::with_state(State { let mut app = tide::with_state(State {
@ -359,7 +508,7 @@ pub(crate) async fn serve_metrics(
#[derive(Clone, Hash, PartialEq, Eq, EncodeLabelSet, Debug)] #[derive(Clone, Hash, PartialEq, Eq, EncodeLabelSet, Debug)]
pub struct Labels { pub struct Labels {
/// The `sender_id` helps to differentiate between clients when multiple are spawned. /// The `sender_id` helps to differentiate between clients when multiple are spawned.
client: u32, client: Cow<'static, str>,
/// Used for `custom_stat` filtering. /// Used for `custom_stat` filtering.
stat: Cow<'static, str>, stat: Cow<'static, str>,
} }