From 18f288e2d368c2449b2833329222e1f44826f5a7 Mon Sep 17 00:00:00 2001 From: Peter Whiting <113447180+peterwhitingyb@users.noreply.github.com> Date: Mon, 7 Nov 2022 13:01:59 -0500 Subject: [PATCH] Monitor to export fuzzer metrics to Prometheus server (#875) * add custom monitor prometheus as a baseline to build functionality * working server, set up function to update metrics in the registry * for a test * metrics for corpus count, objective count, executions, execution rate are intermittently updated and exposed on /metrics * add runtime metric, clean up some comments * IP:PORT as argument instead of hardcoded * add client # as label attached to fuzzer metrics for filtering by client. add clients_count as a tracked metric * added support for custom metrics added to client_stats via feedbacks, such as edges count. cleaned up code * cargo fmt * clean up prometheus.rs * ran autofix and fmt scripts, and put optional dependencies behind prometheus_monitor feature Co-authored-by: Dominik Maier --- libafl/Cargo.toml | 8 +- libafl/src/monitors/mod.rs | 6 + libafl/src/monitors/prometheus.rs | 355 ++++++++++++++++++++++++++++++ 3 files changed, 368 insertions(+), 1 deletion(-) create mode 100644 libafl/src/monitors/prometheus.rs diff --git a/libafl/Cargo.toml b/libafl/Cargo.toml index 8b9b375e56..ed4fe610f8 100644 --- a/libafl/Cargo.toml +++ b/libafl/Cargo.toml @@ -22,6 +22,7 @@ concolic_mutation = ["z3"] # include a simple concolic mutator based on z3 python = ["pyo3", "concat-idents"] prelude = [] # Expose libafl::prelude for access without additional using directives tui_monitor = ["tui", "crossterm"] # enable TuiMonitor with crossterm +prometheus_monitor = ["std", "async-std", "prometheus-client", "tide", "futures"] cli = ["clap"] # expose bolts::cli qemu_cli = ["cli"] frida_cli = ["cli"] @@ -55,7 +56,7 @@ tuple_list = { version = "0.1.3" } hashbrown = { version = "0.12", features = ["serde", "ahash-compile-time-rng"], default-features=false } # A faster hashmap, nostd compatible num-traits = { version = "0.2", default-features = false } xxhash-rust = { version = "0.8.5", features = ["xxh3"] } # xxh3 hashing for rust -serde = { version = "1.0", default-features = false, features = ["alloc"] } # serialization lib +serde = { version = "1.0", default-features = false, features = ["alloc", "derive"] } # serialization lib erased-serde = { version = "0.3.21", default-features = false, features = ["alloc"] } # erased serde postcard = { version = "1.0", features = ["alloc"] } # no_std compatible serde serialization fromat bincode = {version = "1.3", optional = true } @@ -81,6 +82,11 @@ tui = { version = "0.19", default-features = false, features = ['crossterm'], op crossterm = { version = "0.25", optional = true } clap = {version = "4.0", features = ["derive", "wrap_help"], optional = true} +prometheus-client = { version= "0.18.0", optional = true} +tide = { version = "0.16.0", optional = true } +async-std = { version = "1.8.0", features = ["attributes"], optional = true } +futures = { version = "0.3.24", optional = true } + wait-timeout = { version = "0.2", optional = true } # used by CommandExecutor to wait for child process z3 = { version = "0.11", features = ["static-link-z3"], optional = true } # for concolic mutation diff --git a/libafl/src/monitors/mod.rs b/libafl/src/monitors/mod.rs index 4ed73cee7e..882204bbd7 100644 --- a/libafl/src/monitors/mod.rs +++ b/libafl/src/monitors/mod.rs @@ -7,6 +7,12 @@ pub use multi::MultiMonitor; #[allow(missing_docs)] pub mod tui; +#[cfg(all(feature = "prometheus_monitor", feature = "std"))] +#[allow(missing_docs)] +pub mod prometheus; +#[cfg(all(feature = "prometheus_monitor", feature = "std"))] +pub use prometheus::PrometheusMonitor; + #[cfg(feature = "std")] pub mod disk; use alloc::{fmt::Debug, string::String, vec::Vec}; diff --git a/libafl/src/monitors/prometheus.rs b/libafl/src/monitors/prometheus.rs new file mode 100644 index 0000000000..7a1e6f83c8 --- /dev/null +++ b/libafl/src/monitors/prometheus.rs @@ -0,0 +1,355 @@ +// ===== overview for prommon ===== +// The client (i.e., the fuzzer) sets up an HTTP endpoint (/metrics). +// The endpoint contains metrics such as execution rate. + +// A prometheus server (can use a precompiled binary or docker) then scrapes \ +// the endpoint at regular intervals (configurable via prometheus.yml file). +// ==================== +// +// == how to use it === +// This monitor should plug into any fuzzer similar to other monitors. +// In your fuzzer, include: +// use libafl::monitors::PrometheusMonitor; +// as well as: +// let listener = "127.0.0.1:8080".to_string(); // point prometheus to scrape here in your prometheus.yml +// let mon = PrometheusMonitor::new(listener, |s| println!("{}", s)); +// and then like with any other monitor, pass it into the event manager like so: +// let mut mgr = SimpleEventManager::new(mon); +// When using docker, you may need to point prometheus.yml to the docker0 interface or host.docker.internal +// ==================== + +use alloc::{fmt::Debug, string::String, vec::Vec}; +use core::{fmt, time::Duration}; +use std::{ + boxed::Box, + sync::{atomic::AtomicU64, Arc}, + thread, +}; + +// using thread in order to start the HTTP server in a separate thread +use futures::executor::block_on; +// using the official rust client library for Prometheus: https://github.com/prometheus/client_rust +use prometheus_client::{ + encoding::text::{encode, Encode, SendSyncEncodeMetric}, + metrics::{family::Family, gauge::Gauge}, + registry::Registry, +}; +// using tide for the HTTP server library (fast, async, simple) +use tide::Request; + +use crate::{ + bolts::{current_time, format_duration_hms}, + monitors::{ClientStats, Monitor, UserStats}, +}; + +/// Tracking monitor during fuzzing. +#[derive(Clone)] +pub struct PrometheusMonitor +where + F: FnMut(String), +{ + print_fn: F, + start_time: Duration, + client_stats: Vec, + corpus_count: Family, + objective_count: Family, + executions: Family, + exec_rate: Family, + runtime: Family, + clients_count: Family, + custom_stat: Family>, +} + +impl Debug for PrometheusMonitor +where + F: FnMut(String), +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("PrometheusMonitor") + .field("start_time", &self.start_time) + .field("client_stats", &self.client_stats) + .finish() + } +} + +impl Monitor for PrometheusMonitor +where + F: FnMut(String), +{ + /// the client monitor, mutable + fn client_stats_mut(&mut self) -> &mut Vec { + &mut self.client_stats + } + + /// the client monitor + fn client_stats(&self) -> &[ClientStats] { + &self.client_stats + } + + /// Time this fuzzing run stated + fn start_time(&mut self) -> Duration { + self.start_time + } + + fn display(&mut self, event_msg: String, sender_id: u32) { + // Update the prometheus metrics + // Label each metric with the sender / client_id + let corpus_size = self.corpus_size(); + self.corpus_count + .get_or_create(&Labels { + client: sender_id, + stat: String::new(), + }) + .set(corpus_size); + let objective_size = self.objective_size(); + self.objective_count + .get_or_create(&Labels { + client: sender_id, + stat: String::new(), + }) + .set(objective_size); + let total_execs = self.total_execs(); + self.executions + .get_or_create(&Labels { + client: sender_id, + stat: String::new(), + }) + .set(total_execs); + let execs_per_sec = self.execs_per_sec(); + self.exec_rate + .get_or_create(&Labels { + client: sender_id, + stat: String::new(), + }) + .set(execs_per_sec); + let run_time = (current_time() - self.start_time).as_secs(); + self.runtime + .get_or_create(&Labels { + client: sender_id, + stat: String::new(), + }) + .set(run_time); // run time in seconds, which can be converted to a time format by Grafana or similar + let total_clients = self.client_stats().len().try_into().unwrap(); // convert usize to u64 (unlikely that # of clients will be > 2^64 -1...) + self.clients_count + .get_or_create(&Labels { + client: sender_id, + stat: String::new(), + }) + .set(total_clients); + + // display stats in a SimpleMonitor format + let fmt = format!( + "[Prometheus] [{} #{}] run time: {}, clients: {}, corpus: {}, objectives: {}, executions: {}, exec/sec: {}", + event_msg, + sender_id, + format_duration_hms(&(current_time() - self.start_time)), + self.client_stats().len(), + self.corpus_size(), + self.objective_size(), + self.total_execs(), + self.execs_per_sec() + ); + (self.print_fn)(fmt); + + let cur_client = self.client_stats_mut_for(sender_id); + let cur_client_clone = cur_client.clone(); + + for (key, val) in cur_client_clone.user_monitor { + // Update metrics added to the user_stats hashmap by feedback event-fires + // You can filter for each custom stat in promQL via labels of both the stat name and client id + println!("{key}: {val}"); + #[allow(clippy::cast_precision_loss)] + let value: f64 = match val { + UserStats::Number(n) => n as f64, + UserStats::Float(f) => f, + UserStats::String(_s) => 0.0, + UserStats::Ratio(a, b) => (a as f64 / b as f64) * 100.0, + }; + self.custom_stat + .get_or_create(&Labels { + client: sender_id, + stat: key.clone(), + }) + .set(value); + } + } +} + +impl PrometheusMonitor +where + F: FnMut(String), +{ + pub fn new(listener: String, print_fn: F) -> Self { + // Gauge's implementation of clone uses Arc + let corpus_count = Family::::default(); + let corpus_count_clone = corpus_count.clone(); + let objective_count = Family::::default(); + let objective_count_clone = objective_count.clone(); + let executions = Family::::default(); + let executions_clone = executions.clone(); + let exec_rate = Family::::default(); + let exec_rate_clone = exec_rate.clone(); + let runtime = Family::::default(); + let runtime_clone = runtime.clone(); + let clients_count = Family::::default(); + let clients_count_clone = clients_count.clone(); + let custom_stat = Family::>::default(); + let custom_stat_clone = custom_stat.clone(); + + // Need to run the metrics server in a different thread to avoid blocking + thread::spawn(move || { + block_on(serve_metrics( + listener, + corpus_count_clone, + objective_count_clone, + executions_clone, + exec_rate_clone, + runtime_clone, + clients_count_clone, + custom_stat_clone, + )) + .map_err(|err| println!("{err:?}")) + .ok(); + }); + Self { + print_fn, + start_time: current_time(), + client_stats: vec![], + corpus_count, + objective_count, + executions, + exec_rate, + runtime, + clients_count, + custom_stat, + } + } + /// Creates the monitor with a given `start_time`. + pub fn with_time(listener: String, print_fn: F, start_time: Duration) -> Self { + let corpus_count = Family::::default(); + let corpus_count_clone = corpus_count.clone(); + let objective_count = Family::::default(); + let objective_count_clone = objective_count.clone(); + let executions = Family::::default(); + let executions_clone = executions.clone(); + let exec_rate = Family::::default(); + let exec_rate_clone = exec_rate.clone(); + let runtime = Family::::default(); + let runtime_clone = runtime.clone(); + let clients_count = Family::::default(); + let clients_count_clone = clients_count.clone(); + let custom_stat = Family::>::default(); + let custom_stat_clone = custom_stat.clone(); + + thread::spawn(move || { + block_on(serve_metrics( + listener, + corpus_count_clone, + objective_count_clone, + executions_clone, + exec_rate_clone, + runtime_clone, + clients_count_clone, + custom_stat_clone, + )) + .map_err(|err| println!("{err:?}")) + .ok(); + }); + Self { + print_fn, + start_time, + client_stats: vec![], + corpus_count, + objective_count, + executions, + exec_rate, + runtime, + clients_count, + custom_stat, + } + } +} + +// set up an HTTP endpoint /metrics +#[allow(clippy::too_many_arguments)] +pub async fn serve_metrics( + listener: String, + corpus: Family, + objectives: Family, + executions: Family, + exec_rate: Family, + runtime: Family, + clients_count: Family, + custom_stat: Family>, +) -> Result<(), std::io::Error> { + tide::log::start(); + + let mut registry = ::default(); + + registry.register( + "corpus_count", + "Number of test cases in the corpus", + Box::new(corpus), + ); + registry.register( + "objective_count", + "Number of times the objective has been achieved (e.g., crashes)", + Box::new(objectives), + ); + registry.register( + "executions_total", + "Number of executions the fuzzer has done", + Box::new(executions), + ); + registry.register( + "execution_rate", + "Rate of executions per second", + Box::new(exec_rate), + ); + registry.register( + "runtime", + "How long the fuzzer has been running for (seconds)", + Box::new(runtime), + ); + registry.register( + "clients_count", + "How many clients have been spawned for the fuzzing job", + Box::new(clients_count), + ); + registry.register( + "custom_stat", + "A metric to contain custom stats returned by feedbacks, filterable by label", + Box::new(custom_stat), + ); + + let mut app = tide::with_state(State { + registry: Arc::new(registry), + }); + + app.at("/") + .get(|_| async { Ok("LibAFL Prometheus Monitor") }); + app.at("/metrics").get(|req: Request| async move { + let mut encoded = Vec::new(); + encode(&mut encoded, &req.state().registry).unwrap(); + let response = tide::Response::builder(200) + .body(encoded) + .content_type("application/openmetrics-text; version=1.0.0; charset=utf-8") + .build(); + Ok(response) + }); + app.listen(listener).await?; + + Ok(()) +} + +#[derive(Clone, Hash, PartialEq, Eq, Encode, Debug)] +pub struct Labels { + client: u32, // sender_id: u32, to differentiate between clients when multiple are spawned. + stat: String, // for custom_stat filtering. +} + +#[derive(Clone)] +struct State { + #[allow(dead_code)] + registry: Arc>>, +}