Change Monitor API for more flexibility (#2927)

* Change Monitor API for more flexibility

* Make clippy happy

* Fix broken doc link
This commit is contained in:
EvianZhang 2025-02-04 20:45:28 +08:00 committed by GitHub
parent 72986fc129
commit a27da1b8be
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 138 additions and 78 deletions

View File

@ -120,8 +120,9 @@ where
};
monitor.client_stats_insert(id);
let client = monitor.client_stats_mut_for(id);
client.update_corpus_size(*corpus_size as u64);
monitor.update_client_stats_for(id, |client_stat| {
client_stat.update_corpus_size(*corpus_size as u64);
});
monitor.display(event.name(), id);
Ok(BrokerEventResult::Forward)
}
@ -132,8 +133,9 @@ where
} => {
// TODO: The monitor buffer should be added on client add.
monitor.client_stats_insert(client_id);
let client = monitor.client_stats_mut_for(client_id);
client.update_executions(*executions, *time);
monitor.update_client_stats_for(client_id, |client_stat| {
client_stat.update_executions(*executions, *time);
});
monitor.display(event.name(), client_id);
Ok(BrokerEventResult::Handled)
}
@ -143,8 +145,9 @@ where
phantom: _,
} => {
monitor.client_stats_insert(client_id);
let client = monitor.client_stats_mut_for(client_id);
client.update_user_stats(name.clone(), value.clone());
monitor.update_client_stats_for(client_id, |client_stat| {
client_stat.update_user_stats(name.clone(), value.clone());
});
monitor.aggregate(name);
monitor.display(event.name(), client_id);
Ok(BrokerEventResult::Handled)
@ -160,13 +163,12 @@ where
// Get the client for the staterestorer ID
monitor.client_stats_insert(client_id);
let client = monitor.client_stats_mut_for(client_id);
// Update the normal monitor for this client
client.update_executions(*executions, *time);
// Update the performance monitor for this client
client.update_introspection_monitor((**introspection_monitor).clone());
monitor.update_client_stats_for(client_id, |client_stat| {
// Update the normal monitor for this client
client_stat.update_executions(*executions, *time);
// Update the performance monitor for this client
client_stat.update_introspection_monitor((**introspection_monitor).clone());
});
// Display the monitor via `.display` only on core #1
monitor.display(event.name(), client_id);
@ -176,8 +178,9 @@ where
}
Event::Objective { objective_size, .. } => {
monitor.client_stats_insert(client_id);
let client = monitor.client_stats_mut_for(client_id);
client.update_objective_size(*objective_size as u64);
monitor.update_client_stats_for(client_id, |client_stat| {
client_stat.update_objective_size(*objective_size as u64);
});
monitor.display(event.name(), client_id);
Ok(BrokerEventResult::Handled)
}

View File

@ -209,9 +209,9 @@ where
match event {
Event::NewTestcase { corpus_size, .. } => {
monitor.client_stats_insert(ClientId(0));
monitor
.client_stats_mut_for(ClientId(0))
.update_corpus_size(*corpus_size as u64);
monitor.update_client_stats_for(ClientId(0), |client_stat| {
client_stat.update_corpus_size(*corpus_size as u64);
});
monitor.display(event.name(), ClientId(0));
Ok(BrokerEventResult::Handled)
}
@ -220,18 +220,18 @@ where
} => {
// TODO: The monitor buffer should be added on client add.
monitor.client_stats_insert(ClientId(0));
let client = monitor.client_stats_mut_for(ClientId(0));
client.update_executions(*executions, *time);
monitor.update_client_stats_for(ClientId(0), |client_stat| {
client_stat.update_executions(*executions, *time);
});
monitor.display(event.name(), ClientId(0));
Ok(BrokerEventResult::Handled)
}
Event::UpdateUserStats { name, value, .. } => {
monitor.client_stats_insert(ClientId(0));
monitor
.client_stats_mut_for(ClientId(0))
.update_user_stats(name.clone(), value.clone());
monitor.update_client_stats_for(ClientId(0), |client_stat| {
client_stat.update_user_stats(name.clone(), value.clone());
});
monitor.aggregate(name);
monitor.display(event.name(), ClientId(0));
Ok(BrokerEventResult::Handled)
@ -245,17 +245,18 @@ where
} => {
// TODO: The monitor buffer should be added on client add.
monitor.client_stats_insert(ClientId(0));
let client = monitor.client_stats_mut_for(ClientId(0));
client.update_executions(*executions, *time);
client.update_introspection_monitor((**introspection_monitor).clone());
monitor.update_client_stats_for(ClientId(0), |client_stat| {
client_stat.update_executions(*executions, *time);
client_stat.update_introspection_monitor((**introspection_monitor).clone());
});
monitor.display(event.name(), ClientId(0));
Ok(BrokerEventResult::Handled)
}
Event::Objective { objective_size, .. } => {
monitor.client_stats_insert(ClientId(0));
monitor
.client_stats_mut_for(ClientId(0))
.update_objective_size(*objective_size as u64);
monitor.update_client_stats_for(ClientId(0), |client_stat| {
client_stat.update_objective_size(*objective_size as u64);
});
monitor.display(event.name(), ClientId(0));
Ok(BrokerEventResult::Handled)
}
@ -542,7 +543,7 @@ where
// reload the state of the monitor to display the correct stats after restarts
monitor.set_start_time(start_time);
*monitor.client_stats_mut() = clients_stats;
monitor.update_all_client_stats(clients_stats);
(
Some(state),

View File

@ -327,8 +327,9 @@ where
client_id
};
monitor.client_stats_insert(id);
let client = monitor.client_stats_mut_for(id);
client.update_corpus_size(*corpus_size as u64);
monitor.update_client_stats_for(id, |client| {
client.update_corpus_size(*corpus_size as u64);
});
monitor.display(event.name(), id);
Ok(BrokerEventResult::Forward)
}
@ -339,8 +340,9 @@ where
} => {
// TODO: The monitor buffer should be added on client add.
monitor.client_stats_insert(client_id);
let client = monitor.client_stats_mut_for(client_id);
client.update_executions(*executions, *time);
monitor.update_client_stats_for(client_id, |client| {
client.update_executions(*executions, *time);
});
monitor.display(event.name(), client_id);
Ok(BrokerEventResult::Handled)
}
@ -350,8 +352,9 @@ where
phantom: _,
} => {
monitor.client_stats_insert(client_id);
let client = monitor.client_stats_mut_for(client_id);
client.update_user_stats(name.clone(), value.clone());
monitor.update_client_stats_for(client_id, |client| {
client.update_user_stats(name.clone(), value.clone());
});
monitor.aggregate(name);
monitor.display(event.name(), client_id);
Ok(BrokerEventResult::Handled)
@ -367,13 +370,12 @@ where
// Get the client for the staterestorer ID
monitor.client_stats_insert(client_id);
let client = monitor.client_stats_mut_for(client_id);
// Update the normal monitor for this client
client.update_executions(*executions, *time);
// Update the performance monitor for this client
client.update_introspection_monitor((**introspection_monitor).clone());
monitor.update_client_stats_for(client_id, |client| {
// Update the normal monitor for this client
client.update_executions(*executions, *time);
// Update the performance monitor for this client
client.update_introspection_monitor((**introspection_monitor).clone());
});
// Display the monitor via `.display` only on core #1
monitor.display(event.name(), client_id);
@ -383,8 +385,9 @@ where
}
Event::Objective { objective_size, .. } => {
monitor.client_stats_insert(client_id);
let client = monitor.client_stats_mut_for(client_id);
client.update_objective_size(*objective_size as u64);
monitor.update_client_stats_for(client_id, |client| {
client.update_objective_size(*objective_size as u64);
});
monitor.display(event.name(), client_id);
Ok(BrokerEventResult::Handled)
}

View File

@ -81,8 +81,13 @@ exec_sec = {}
)
.expect("Failed to write to the Toml file");
for (i, client) in self.client_stats_mut().iter_mut().enumerate() {
let exec_sec = client.execs_per_sec(cur_time);
for i in 0..(self.client_stats().len()) {
let client_id = ClientId(i as u32);
let exec_sec = self.update_client_stats_for(client_id, |client_stat| {
client_stat.execs_per_sec(cur_time)
});
let client = self.client_stats_for(client_id);
write!(
&mut file,

View File

@ -502,6 +502,10 @@ impl ClientStats {
/// The monitor trait keeps track of all the client's monitor, and offers methods to display them.
pub trait Monitor {
/// The client monitor (mutable)
///
/// This method is for internal usage only, you shall never call this method directly.
/// If you want to update one client stats, use [`update_client_stats_for`][Self::update_client_stats_for]. If you
/// want to update all client stats together, use [`update_all_client_stats`][Self::update_all_client_stats].
fn client_stats_mut(&mut self) -> &mut Vec<ClientStats>;
/// The client monitor
@ -571,19 +575,33 @@ pub trait Monitor {
..ClientStats::default()
});
}
let new_stat = self.client_stats_mut_for(client_id);
if !new_stat.enabled {
let timestamp = current_time();
// I have never seen this man in my life
new_stat.start_time = timestamp;
new_stat.last_window_time = timestamp;
new_stat.enabled = true;
}
self.update_client_stats_for(client_id, |new_stat| {
if !new_stat.enabled {
let timestamp = current_time();
// I have never seen this man in my life
new_stat.start_time = timestamp;
new_stat.last_window_time = timestamp;
new_stat.enabled = true;
}
});
}
/// Get mutable reference to client stats
fn client_stats_mut_for(&mut self, client_id: ClientId) -> &mut ClientStats {
&mut self.client_stats_mut()[client_id.0 as usize]
/// Update sepecific client stats.
///
/// The update function is restricted as `Fn` instead of `FnMut` or `FnOnce` since in some
/// monitors, the `update` will be called multiple times, and is assumed as stateless.
fn update_client_stats_for<T, F: Fn(&mut ClientStats) -> T>(
&mut self,
client_id: ClientId,
update: F,
) -> T {
let client_stat = &mut self.client_stats_mut()[client_id.0 as usize];
update(client_stat)
}
/// Update all client stats. This will clear all previous client stats, and fill in the new client stats.
fn update_all_client_stats(&mut self, new_client_stats: Vec<ClientStats>) {
*self.client_stats_mut() = new_client_stats;
}
/// Get immutable reference to client stats
@ -791,7 +809,7 @@ where
if self.print_user_monitor {
self.client_stats_insert(sender_id);
let client = self.client_stats_mut_for(sender_id);
let client = self.client_stats_for(sender_id);
for (key, val) in &client.user_monitor {
write!(fmt, ", {key}: {val}").unwrap();
}
@ -1303,18 +1321,17 @@ impl Default for ClientPerfMonitor {
}
}
/// A combined monitor consisting of multiple [`Monitor`]s
// The client stats of first and second monitor will always be maintained
// to be consistent
/// A combined monitor consisting of multiple [`Monitor`]s.
///
/// Note that the `CombinedMonitor` should never be the base monitor of other wrapper
/// monitors.
#[derive(Debug, Clone)]
pub struct CombinedMonitor<A, B> {
first: A,
second: B,
start_time: Duration,
/// Client stats. This will be maintained to be consistent with
/// client stats of first and second monitor.
///
/// Currently, the client stats will be synced to first and second
/// before each display call.
client_stats: Vec<ClientStats>,
}
impl<A: Monitor, B: Monitor> CombinedMonitor<A, B> {
@ -1323,22 +1340,24 @@ impl<A: Monitor, B: Monitor> CombinedMonitor<A, B> {
let start_time = current_time();
first.set_start_time(start_time);
second.set_start_time(start_time);
first.update_all_client_stats(vec![]);
second.update_all_client_stats(vec![]);
Self {
first,
second,
start_time,
client_stats: vec![],
}
}
}
impl<A: Monitor, B: Monitor> Monitor for CombinedMonitor<A, B> {
/// Never call this method.
fn client_stats_mut(&mut self) -> &mut Vec<ClientStats> {
&mut self.client_stats
panic!("client_stats_mut of CombinedMonitor should never be called")
}
fn client_stats(&self) -> &[ClientStats] {
&self.client_stats
self.first.client_stats()
}
fn start_time(&self) -> Duration {
@ -1351,15 +1370,42 @@ impl<A: Monitor, B: Monitor> Monitor for CombinedMonitor<A, B> {
self.second.set_start_time(time);
}
fn client_stats_insert(&mut self, client_id: ClientId) {
self.first.client_stats_insert(client_id);
self.second.client_stats_insert(client_id);
}
#[inline]
fn execs_per_sec(&mut self) -> f64 {
let execs_per_sec = self.first.execs_per_sec();
let _ = self.second.execs_per_sec();
execs_per_sec
}
fn display(&mut self, event_msg: &str, sender_id: ClientId) {
self.first.client_stats_mut().clone_from(&self.client_stats);
self.first.display(event_msg, sender_id);
self.second
.client_stats_mut()
.clone_from(&self.client_stats);
self.second.display(event_msg, sender_id);
}
/// The `update` will be called multiple times, and the result of first
/// invocation will be returned. Since the client stats are guaranteed
/// to be consistent across first and second monitor, the result should be
/// the same.
fn update_client_stats_for<T, F: Fn(&mut ClientStats) -> T>(
&mut self,
client_id: ClientId,
update: F,
) -> T {
let res = self.first.update_client_stats_for(client_id, &update);
let _ = self.second.update_client_stats_for(client_id, &update);
res
}
fn update_all_client_stats(&mut self, new_client_stats: Vec<ClientStats>) {
self.first.update_all_client_stats(new_client_stats.clone());
self.second.update_all_client_stats(new_client_stats);
}
fn aggregate(&mut self, name: &str) {
self.first.aggregate(name);
self.second.aggregate(name);

View File

@ -88,9 +88,10 @@ where
(self.print_fn)(&global_fmt);
self.client_stats_insert(sender_id);
let client = self.client_stats_mut_for(sender_id);
let cur_time = current_time();
let exec_sec = client.execs_per_sec_pretty(cur_time);
let exec_sec =
self.update_client_stats_for(sender_id, |client| client.execs_per_sec_pretty(cur_time));
let client = self.client_stats_for(sender_id);
let pad = " ".repeat(head.len());
let mut fmt = format!(

View File

@ -454,8 +454,9 @@ impl Monitor for TuiMonitor {
}
self.client_stats_insert(sender_id);
let client = self.client_stats_mut_for(sender_id);
let exec_sec = client.execs_per_sec_pretty(cur_time);
let exec_sec =
self.update_client_stats_for(sender_id, |client| client.execs_per_sec_pretty(cur_time));
let client = self.client_stats_for(sender_id);
let sender = format!("#{}", sender_id.0);
let pad = if event_msg.len() + sender.len() < 13 {