From 672d25e5ac910cda52437d77d60eea83fc61a4fa Mon Sep 17 00:00:00 2001 From: Dominik Maier Date: Fri, 24 Feb 2023 11:50:42 +0100 Subject: [PATCH] LLMP Client timeouts, Exit broker when last client exits (#1057) * Moving type definitions to transparent structs * function to notify other side of exit * docs * Exmaple support windows now * timeout fix * Exiting after the last client quit * inform about quits * clippy * clippy * clean exits * fix * more unsafe * fixes * Move ClientId * fix no_std * Fix prometheus * introduce Cores.trim() * add always_track metadata * docu * add AlwaysUniqueMapFeedback * rename to always_interesting * return CoreId for Launcher * CoreId as transparent tuple struct * fix graceful exits for launcher * Broker exits after launcher * clippy * Fix llmp eop race, introduce llmp shmem cache * initialize cached page, clippy * fix llmp_debug strings * add error handling * nicer error output * More error handling convenience * clippy * fix macos example * nits * trying to add a logger * no_std * inline logger enabled * fix windows, non-fork * macos * no_std docs * clippy * use ? instead of unwraps in example * more logging * docs --- libafl/examples/llmp_test/main.rs | 140 +++++++----- libafl/src/bolts/core_affinity.rs | 72 +++--- libafl/src/bolts/launcher.rs | 43 +++- libafl/src/bolts/llmp.rs | 355 +++++++++++++++++++++++------- libafl/src/bolts/mod.rs | 9 + libafl/src/bolts/staterestore.rs | 41 +++- libafl/src/events/llmp.rs | 86 ++++++-- libafl/src/events/mod.rs | 17 +- libafl/src/events/simple.rs | 46 ++-- libafl/src/feedbacks/map.rs | 35 ++- libafl/src/monitors/disk.rs | 6 +- libafl/src/monitors/mod.rs | 31 +-- libafl/src/monitors/multi.rs | 6 +- libafl/src/monitors/prometheus.rs | 20 +- libafl/src/monitors/tui/mod.rs | 10 +- 15 files changed, 658 insertions(+), 259 deletions(-) diff --git a/libafl/examples/llmp_test/main.rs b/libafl/examples/llmp_test/main.rs index 3c676103a9..2114bdb109 100644 --- a/libafl/examples/llmp_test/main.rs +++ b/libafl/examples/llmp_test/main.rs @@ -3,45 +3,57 @@ This shows how llmp can be used directly, without libafl abstractions */ extern crate alloc; -#[cfg(all(any(unix, windows), feature = "std"))] +#[cfg(feature = "std")] use core::time::Duration; -#[cfg(all(any(unix, windows), feature = "std"))] -use std::{thread, time}; +#[cfg(feature = "std")] +use std::{num::NonZeroUsize, thread, time}; -use libafl::{bolts::llmp::Tag, prelude::SimpleStdErrLogger}; -#[cfg(all(any(unix, windows), feature = "std"))] +#[cfg(feature = "std")] use libafl::{ bolts::{ - llmp, + llmp::{self, Tag}, shmem::{ShMemProvider, StdShMemProvider}, + ClientId, SimpleStdErrLogger, }, Error, }; +#[cfg(feature = "std")] +const _TAG_SIMPLE_U32_V1: Tag = Tag(0x5130_0321); +#[cfg(feature = "std")] +const _TAG_MATH_RESULT_V1: Tag = Tag(0x7747_4331); +#[cfg(feature = "std")] +const _TAG_1MEG_V1: Tag = Tag(0xB111_1161); + +/// The time the broker will wait for things to happen before printing a message +#[cfg(feature = "std")] +const BROKER_TIMEOUT: Duration = Duration::from_secs(10); + +/// How long the broker may sleep between forwarding a new chunk of sent messages +#[cfg(feature = "std")] +const SLEEP_BETWEEN_FORWARDS: Duration = Duration::from_millis(5); + +#[cfg(feature = "std")] static LOGGER: SimpleStdErrLogger = SimpleStdErrLogger::debug(); -const _TAG_SIMPLE_U32_V1: Tag = 0x5130_0321; -const _TAG_MATH_RESULT_V1: Tag = 0x7747_4331; -const _TAG_1MEG_V1: Tag = 0xB111_1161; - -#[cfg(all(any(unix, windows), feature = "std"))] -fn adder_loop(port: u16) -> ! { - let shmem_provider = StdShMemProvider::new().unwrap(); - let mut client = llmp::LlmpClient::create_attach_to_tcp(shmem_provider, port).unwrap(); +#[cfg(feature = "std")] +fn adder_loop(port: u16) -> Result<(), Box> { + let shmem_provider = StdShMemProvider::new()?; + let mut client = llmp::LlmpClient::create_attach_to_tcp(shmem_provider, port)?; let mut last_result: u32 = 0; let mut current_result: u32 = 0; loop { let mut msg_counter = 0; loop { - let Some((sender, tag, buf)) = client.recv_buf().unwrap() else { break }; + let Some((sender, tag, buf)) = client.recv_buf()? else { break }; msg_counter += 1; match tag { _TAG_SIMPLE_U32_V1 => { current_result = - current_result.wrapping_add(u32::from_le_bytes(buf.try_into().unwrap())); + current_result.wrapping_add(u32::from_le_bytes(buf.try_into()?)); } _ => println!( - "Adder Client ignored unknown message {:#x} from client {} with {} bytes", + "Adder Client ignored unknown message {:?} from client {:?} with {} bytes", tag, sender, buf.len() @@ -52,9 +64,7 @@ fn adder_loop(port: u16) -> ! { if current_result != last_result { println!("Adder handled {msg_counter} messages, reporting {current_result} to broker"); - client - .send_buf(_TAG_MATH_RESULT_V1, ¤t_result.to_le_bytes()) - .unwrap(); + client.send_buf(_TAG_MATH_RESULT_V1, ¤t_result.to_le_bytes())?; last_result = current_result; } @@ -62,10 +72,9 @@ fn adder_loop(port: u16) -> ! { } } -#[cfg(all(any(unix, windows), feature = "std"))] -fn large_msg_loop(port: u16) -> ! { - let mut client = - llmp::LlmpClient::create_attach_to_tcp(StdShMemProvider::new().unwrap(), port).unwrap(); +#[cfg(feature = "std")] +fn large_msg_loop(port: u16) -> Result<(), Box> { + let mut client = llmp::LlmpClient::create_attach_to_tcp(StdShMemProvider::new()?, port)?; #[cfg(not(target_vendor = "apple"))] let meg_buf = vec![1u8; 1 << 20]; @@ -73,7 +82,7 @@ fn large_msg_loop(port: u16) -> ! { let meg_buf = vec![1u8; 1 << 19]; loop { - client.send_buf(_TAG_1MEG_V1, &meg_buf).unwrap(); + client.send_buf(_TAG_1MEG_V1, &meg_buf)?; #[cfg(not(target_vendor = "apple"))] println!("Sending the next megabyte"); #[cfg(target_vendor = "apple")] @@ -83,31 +92,36 @@ fn large_msg_loop(port: u16) -> ! { } #[allow(clippy::unnecessary_wraps)] -#[cfg(all(any(unix, windows), feature = "std"))] +#[cfg(feature = "std")] fn broker_message_hook( - client_id: u32, - tag: llmp::Tag, - _flags: llmp::Flags, - message: &[u8], + msg_or_timeout: Option<(ClientId, llmp::Tag, llmp::Flags, &[u8])>, ) -> Result { + let Some((client_id, tag, _flags, message)) = msg_or_timeout else { + println!( + "No client did anything for {} seconds..", + BROKER_TIMEOUT.as_secs() + ); + return Ok(llmp::LlmpMsgHookResult::Handled); + }; + match tag { _TAG_SIMPLE_U32_V1 => { println!( "Client {:?} sent message: {:?}", client_id, - u32::from_le_bytes(message.try_into().unwrap()) + u32::from_le_bytes(message.try_into()?) ); Ok(llmp::LlmpMsgHookResult::ForwardToClients) } _TAG_MATH_RESULT_V1 => { println!( "Adder Client has this current result: {:?}", - u32::from_le_bytes(message.try_into().unwrap()) + u32::from_le_bytes(message.try_into()?) ); Ok(llmp::LlmpMsgHookResult::Handled) } _ => { - println!("Unknwon message id received!"); + println!("Unknown message id received: {tag:?}"); Ok(llmp::LlmpMsgHookResult::ForwardToClients) } } @@ -115,67 +129,85 @@ fn broker_message_hook( #[cfg(not(any(unix, windows)))] fn main() { - todo!("LLMP is not yet supported on this platform."); + eprintln!("LLMP example is currently not supported on no_std. Implement ShMem for no_std."); } #[cfg(any(unix, windows))] -fn main() { +fn main() -> Result<(), Box> { /* The main node has a broker, and a few worker threads */ let mode = std::env::args() .nth(1) - .expect("no mode specified, chose 'broker', 'b2b', 'ctr', 'adder', or 'large'"); + .expect("no mode specified, chose 'broker', 'b2b', 'ctr', 'adder', 'large', or 'exiting'"); let port: u16 = std::env::args() .nth(2) .unwrap_or_else(|| "1337".into()) - .parse::() - .unwrap(); + .parse::()?; // in the b2b use-case, this is our "own" port, we connect to the "normal" broker node on startup. let b2b_port: u16 = std::env::args() .nth(3) .unwrap_or_else(|| "4242".into()) - .parse::() - .unwrap(); + .parse::()?; log::set_logger(&LOGGER).unwrap(); + log::set_max_level(log::LevelFilter::Trace); println!("Launching in mode {mode} on port {port}"); match mode.as_str() { "broker" => { - let mut broker = llmp::LlmpBroker::new(StdShMemProvider::new().unwrap()).unwrap(); - broker.launch_tcp_listener_on(port).unwrap(); - broker.loop_forever(&mut broker_message_hook, Some(Duration::from_millis(5))); + let mut broker = llmp::LlmpBroker::new(StdShMemProvider::new()?)?; + broker.launch_tcp_listener_on(port)?; + // Exit when we got at least _n_ nodes, and all of them quit. + broker.set_exit_cleanly_after(NonZeroUsize::new(1_usize).unwrap()); + broker.loop_with_timeouts( + &mut broker_message_hook, + BROKER_TIMEOUT, + Some(SLEEP_BETWEEN_FORWARDS), + ); } "b2b" => { - let mut broker = llmp::LlmpBroker::new(StdShMemProvider::new().unwrap()).unwrap(); - broker.launch_tcp_listener_on(b2b_port).unwrap(); + let mut broker = llmp::LlmpBroker::new(StdShMemProvider::new()?)?; + broker.launch_tcp_listener_on(b2b_port)?; // connect back to the main broker. - broker.connect_b2b(("127.0.0.1", port)).unwrap(); - broker.loop_forever(&mut broker_message_hook, Some(Duration::from_millis(5))); + broker.connect_b2b(("127.0.0.1", port))?; + broker.loop_with_timeouts( + &mut broker_message_hook, + BROKER_TIMEOUT, + Some(SLEEP_BETWEEN_FORWARDS), + ); } "ctr" => { let mut client = - llmp::LlmpClient::create_attach_to_tcp(StdShMemProvider::new().unwrap(), port) - .unwrap(); + llmp::LlmpClient::create_attach_to_tcp(StdShMemProvider::new()?, port)?; let mut counter: u32 = 0; loop { counter = counter.wrapping_add(1); - client - .send_buf(_TAG_SIMPLE_U32_V1, &counter.to_le_bytes()) - .unwrap(); + client.send_buf(_TAG_SIMPLE_U32_V1, &counter.to_le_bytes())?; println!("CTR Client writing {counter}"); thread::sleep(Duration::from_secs(1)); } } "adder" => { - adder_loop(port); + adder_loop(port)?; } "large" => { - large_msg_loop(port); + large_msg_loop(port)?; + } + "exiting" => { + let mut client = + llmp::LlmpClient::create_attach_to_tcp(StdShMemProvider::new()?, port)?; + for i in 0..10_u32 { + client.send_buf(_TAG_SIMPLE_U32_V1, &i.to_le_bytes())?; + println!("Exiting Client writing {i}"); + thread::sleep(Duration::from_millis(10)); + } + log::info!("Exiting Client exits"); + client.sender.send_exiting()?; } _ => { println!("No valid mode supplied"); } } + Ok(()) } diff --git a/libafl/src/bolts/core_affinity.rs b/libafl/src/bolts/core_affinity.rs index f63b007524..35aff72126 100644 --- a/libafl/src/bolts/core_affinity.rs +++ b/libafl/src/bolts/core_affinity.rs @@ -45,11 +45,12 @@ pub fn get_core_ids() -> Result, Error> { } /// This represents a CPU core. -#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] -pub struct CoreId { +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Hash)] +#[repr(transparent)] +pub struct CoreId( /// The numerical `id` of a core - pub id: usize, -} + pub usize, +); impl CoreId { /// Set the affinity of the current process to this [`CoreId`] @@ -73,18 +74,18 @@ impl CoreId { impl From for CoreId { fn from(id: usize) -> Self { - CoreId { id } + CoreId(id) } } impl From for usize { fn from(core_id: CoreId) -> usize { - core_id.id + core_id.0 } } /// A list of [`CoreId`] to use for fuzzing -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Hash)] pub struct Cores { /// The original commandline used during parsing pub cmdline: String, @@ -100,6 +101,19 @@ impl Cores { Self::from_cmdline("all") } + /// Trims the number of cores to the given value, dropping additional cores + pub fn trim(&mut self, count: usize) -> Result<(), Error> { + if count > self.ids.len() { + return Err(Error::illegal_argument(format!( + "Core trim value {count} is larger than number of chosen cores of {}", + self.ids.len() + ))); + } + + self.ids.resize(count, CoreId(0)); + Ok(()) + } + /// Parses core binding args from user input. /// Returns a Vec of CPU IDs. /// * `./fuzzer --cores 1,2-4,6`: clients run in cores `1,2,3,4,6` @@ -148,6 +162,18 @@ impl Cores { let core_id = CoreId::from(core_id); self.ids.contains(&core_id) } + + /// Returns the index/position of the given [`CoreId`] in this cores.ids list. + /// Will return `None`, if [`CoreId`] wasn't found. + #[must_use] + pub fn position(&self, core_id: CoreId) -> Option { + // Since cores a low number, iterating is const-size, + // and should be faster than hashmap lookups. + // Prove me wrong. + self.ids + .iter() + .position(|&cur_core_id| cur_core_id == core_id) + } } impl From<&[usize]> for Cores { @@ -183,11 +209,7 @@ impl TryFrom<&str> for Cores { #[cfg(feature = "std")] #[deprecated(since = "0.8.1", note = "Use Cores::from_cmdline instead")] pub fn parse_core_bind_arg(args: &str) -> Result, Error> { - Ok(Cores::from_cmdline(args)? - .ids - .iter() - .map(|x| x.id) - .collect()) + Ok(Cores::from_cmdline(args)?.ids.iter().map(|x| x.0).collect()) } // Linux Section @@ -226,7 +248,7 @@ mod linux { for i in 0..CPU_SETSIZE as usize { if unsafe { CPU_ISSET(i, &full_set) } { - core_ids.push(CoreId { id: i }); + core_ids.push(CoreId(i)); } } @@ -238,7 +260,7 @@ mod linux { // one core active. let mut set = new_cpu_set(); - unsafe { CPU_SET(core_id.id, &mut set) }; + unsafe { CPU_SET(core_id.0, &mut set) }; // Set the current thread's core affinity. let result = unsafe { @@ -301,7 +323,7 @@ mod linux { // Ensure that the system pinned the current thread // to the specified core. let mut core_mask = new_cpu_set(); - unsafe { CPU_SET(ids[0].id, &mut core_mask) }; + unsafe { CPU_SET(ids[0].0, &mut core_mask) }; let new_mask = get_affinity_mask().unwrap(); @@ -351,7 +373,7 @@ mod windows { match get_num_logical_cpus_ex_windows() { Some(total_cores) => { for i in 0..total_cores { - core_ids.push(CoreId { id: i }); + core_ids.push(CoreId(i)); } Ok(core_ids) } @@ -563,14 +585,14 @@ mod apple { #[allow(clippy::unnecessary_wraps)] pub fn get_core_ids() -> Result, Error> { Ok((0..(usize::from(available_parallelism()?))) - .map(|n| CoreId { id: n }) + .map(CoreId) .collect::>()) } #[cfg(target_arch = "x86_64")] pub fn set_for_current(core_id: CoreId) -> Result<(), Error> { let mut info = thread_affinity_policy_data_t { - affinity_tag: core_id.id.try_into().unwrap(), + affinity_tag: core_id.0.try_into().unwrap(), }; unsafe { @@ -640,7 +662,7 @@ mod freebsd { pub fn get_core_ids() -> Result, Error> { Ok((0..(usize::from(available_parallelism()?))) .into_iter() - .map(|n| CoreId { id: n }) + .map(|n| CoreId(n)) .collect::>()) } @@ -648,7 +670,7 @@ mod freebsd { // Turn `core_id` into a `libc::cpuset_t` with only let mut set = new_cpuset(); - unsafe { CPU_SET(core_id.id, &mut set) }; + unsafe { CPU_SET(core_id.0, &mut set) }; // Set the current thread's core affinity. let result = unsafe { @@ -769,14 +791,14 @@ mod netbsd { pub fn get_core_ids() -> Result, Error> { Ok((0..(usize::from(available_parallelism()?))) .into_iter() - .map(|n| CoreId { id: n }) + .map(|n| CoreId(n)) .collect::>()) } pub fn set_for_current(core_id: CoreId) -> Result<(), Error> { let set = new_cpuset(); - unsafe { _cpuset_set(core_id.id as u64, set) }; + unsafe { _cpuset_set(core_id.0 as u64, set) }; // Set the current thread's core affinity. let result = unsafe { pthread_setaffinity_np( @@ -824,7 +846,7 @@ mod openbsd { pub fn get_core_ids() -> Result, Error> { Ok((0..(usize::from(available_parallelism()?))) .into_iter() - .map(|n| CoreId { id: n }) + .map(|n| CoreId(n)) .collect::>()) } } @@ -853,7 +875,7 @@ mod solaris { pub fn get_core_ids() -> Result, Error> { Ok((0..(usize::from(available_parallelism()?))) .into_iter() - .map(|n| CoreId { id: n }) + .map(|n| CoreId(n)) .collect::>()) } @@ -862,7 +884,7 @@ mod solaris { libc::processor_bind( libc::P_PID, libc::PS_MYID, - core_id.id as i32, + core_id.0 as i32, std::ptr::null_mut(), ) }; diff --git a/libafl/src/bolts/launcher.rs b/libafl/src/bolts/launcher.rs index fda606d95a..b46bae8071 100644 --- a/libafl/src/bolts/launcher.rs +++ b/libafl/src/bolts/launcher.rs @@ -12,9 +12,12 @@ #[cfg(all(feature = "std"))] use alloc::string::ToString; -use core::fmt::{self, Debug, Formatter}; #[cfg(feature = "std")] use core::marker::PhantomData; +use core::{ + fmt::{self, Debug, Formatter}, + num::NonZeroUsize, +}; #[cfg(feature = "std")] use std::net::SocketAddr; #[cfg(all(feature = "std", any(windows, not(feature = "fork"))))] @@ -27,12 +30,14 @@ use serde::de::DeserializeOwned; #[cfg(feature = "std")] use typed_builder::TypedBuilder; -#[cfg(all(feature = "std", any(windows, not(feature = "fork"))))] -use crate::bolts::core_affinity::CoreId; +use super::core_affinity::CoreId; #[cfg(all(feature = "std", any(windows, not(feature = "fork"))))] use crate::bolts::os::startable_self; #[cfg(all(unix, feature = "std", feature = "fork"))] -use crate::bolts::os::{dup2, fork, ForkResult}; +use crate::bolts::{ + core_affinity::get_core_ids, + os::{dup2, fork, ForkResult}, +}; use crate::inputs::UsesInput; #[cfg(feature = "std")] use crate::{ @@ -51,7 +56,7 @@ const _AFL_LAUNCHER_CLIENT: &str = "AFL_LAUNCHER_CLIENT"; #[allow(clippy::type_complexity, missing_debug_implementations)] pub struct Launcher<'a, CF, MT, S, SP> where - CF: FnOnce(Option, LlmpRestartingEventManager, usize) -> Result<(), Error>, + CF: FnOnce(Option, LlmpRestartingEventManager, CoreId) -> Result<(), Error>, S::Input: 'a, MT: Monitor, SP: ShMemProvider + 'static, @@ -90,7 +95,7 @@ where impl Debug for Launcher<'_, CF, MT, S, SP> where - CF: FnOnce(Option, LlmpRestartingEventManager, usize) -> Result<(), Error>, + CF: FnOnce(Option, LlmpRestartingEventManager, CoreId) -> Result<(), Error>, MT: Monitor + Clone, SP: ShMemProvider + 'static, S: DeserializeOwned + UsesInput, @@ -110,7 +115,7 @@ where #[cfg(feature = "std")] impl<'a, CF, MT, S, SP> Launcher<'a, CF, MT, S, SP> where - CF: FnOnce(Option, LlmpRestartingEventManager, usize) -> Result<(), Error>, + CF: FnOnce(Option, LlmpRestartingEventManager, CoreId) -> Result<(), Error>, MT: Monitor + Clone, S: DeserializeOwned + UsesInput + HasExecutions + HasClientPerfMonitor, SP: ShMemProvider + 'static, @@ -119,7 +124,11 @@ where #[cfg(all(unix, feature = "std", feature = "fork"))] #[allow(clippy::similar_names)] pub fn launch(&mut self) -> Result<(), Error> { - use crate::bolts::core_affinity::get_core_ids; + if self.cores.ids.is_empty() { + return Err(Error::illegal_argument( + "No cores to spawn on given, cannot launch anything.", + )); + } if self.run_client.is_none() { return Err(Error::illegal_argument( @@ -159,7 +168,7 @@ where self.shmem_provider.post_fork(true)?; #[cfg(feature = "std")] - std::thread::sleep(std::time::Duration::from_millis(index * 100)); + std::thread::sleep(std::time::Duration::from_millis(index * 10)); #[cfg(feature = "std")] if !debug_output { @@ -180,7 +189,7 @@ where .build() .launch()?; - return (self.run_client.take().unwrap())(state, mgr, bind_to.id); + return (self.run_client.take().unwrap())(state, mgr, *bind_to); } }; } @@ -197,6 +206,7 @@ where .broker_port(self.broker_port) .kind(ManagerKind::Broker) .remote_broker_addr(self.remote_broker_addr) + .exit_cleanly_after(Some(NonZeroUsize::try_from(self.cores.ids.len()).unwrap())) .configuration(self.configuration) .build() .launch()?; @@ -242,13 +252,13 @@ where .shmem_provider(self.shmem_provider.clone()) .broker_port(self.broker_port) .kind(ManagerKind::Client { - cpu_core: Some(CoreId { id: core_id }), + cpu_core: Some(CoreId(core_id)), }) .configuration(self.configuration) .build() .launch()?; - return (self.run_client.take().unwrap())(state, mgr, core_id); + return (self.run_client.take().unwrap())(state, mgr, CoreId(core_id)); } Err(std::env::VarError::NotPresent) => { // I am a broker @@ -285,6 +295,14 @@ where Err(_) => panic!("Env variables are broken, received non-unicode!"), }; + // It's fine to check this after the client spawn loop - since we won't have spawned any clients... + // Doing it later means one less check in each spawned process. + if self.cores.ids.is_empty() { + return Err(Error::illegal_argument( + "No cores to spawn on given, cannot launch anything.", + )); + } + if self.spawn_broker { #[cfg(feature = "std")] log::info!("I am broker!!."); @@ -295,6 +313,7 @@ where .broker_port(self.broker_port) .kind(ManagerKind::Broker) .remote_broker_addr(self.remote_broker_addr) + .exit_cleanly_after(Some(NonZeroUsize::try_from(self.cores.ids.len()).unwrap())) .configuration(self.configuration) .build() .launch()?; diff --git a/libafl/src/bolts/llmp.rs b/libafl/src/bolts/llmp.rs index 2ca0dd8912..39d357d333 100644 --- a/libafl/src/bolts/llmp.rs +++ b/libafl/src/bolts/llmp.rs @@ -69,6 +69,8 @@ use core::{ fmt::Debug, hint, mem::size_of, + num::NonZeroUsize, + ops::{BitAnd, BitOr, Not}, ptr, slice, sync::atomic::{fence, AtomicU16, Ordering}, time::Duration, @@ -92,15 +94,24 @@ use backtrace::Backtrace; use nix::sys::socket::{self, sockopt::ReusePort}; use serde::{Deserialize, Serialize}; +#[cfg(feature = "std")] +use crate::bolts::current_time; #[cfg(unix)] use crate::bolts::os::unix_signals::{ setup_signal_handler, siginfo_t, ucontext_t, Handler, Signal, }; use crate::{ - bolts::shmem::{ShMem, ShMemDescription, ShMemId, ShMemProvider}, + bolts::{ + shmem::{ShMem, ShMemDescription, ShMemId, ShMemProvider}, + ClientId, + }, Error, }; +/// The timeout after which a client will be considered stale, and removed. +#[cfg(feature = "std")] +const CLIENT_TIMEOUT: Duration = Duration::from_secs(60); + /// The max number of pages a [`client`] may have mapped that were not yet read by the [`broker`] /// Usually, this value should not exceed `1`, else the broker cannot keep up with the amount of incoming messages. /// Instead of increasing this value, you may consider sending new messages at a lower rate, else your Sender will eventually `OOM`. @@ -116,25 +127,25 @@ const LLMP_CFG_INITIAL_MAP_SIZE: usize = 1 << 20; const LLMP_CFG_ALIGNNMENT: usize = 64; /// A msg fresh from the press: No tag got sent by the user yet -const LLMP_TAG_UNSET: Tag = 0xDEADAF; +const LLMP_TAG_UNSET: Tag = Tag(0xDEADAF); /// This message should not exist yet. Some bug in unsafe code! -const LLMP_TAG_UNINITIALIZED: Tag = 0xA143AF11; +const LLMP_TAG_UNINITIALIZED: Tag = Tag(0xA143AF11); /// The end of page message /// When receiving this, a new sharedmap needs to be allocated. -const LLMP_TAG_END_OF_PAGE: Tag = 0xAF1E0F1; +const LLMP_TAG_END_OF_PAGE: Tag = Tag(0xAF1E0F1); /// A new client for this broker got added. -const LLMP_TAG_NEW_SHM_CLIENT: Tag = 0xC11E471; +const LLMP_TAG_NEW_SHM_CLIENT: Tag = Tag(0xC11E471); /// The sender on this map is exiting (if broker exits, clients should exit gracefully); -const LLMP_TAG_EXITING: Tag = 0x13C5171; +const LLMP_TAG_EXITING: Tag = Tag(0x13C5171); /// Client gave up as the receiver/broker was too slow -const LLMP_SLOW_RECEIVER_PANIC: Tag = 0x70051041; +const LLMP_SLOW_RECEIVER_PANIC: Tag = Tag(0x70051041); /// Unused... -pub const LLMP_FLAG_INITIALIZED: Flags = 0x0; +pub const LLMP_FLAG_INITIALIZED: Flags = Flags(0x0); /// This message was compressed in transit -pub const LLMP_FLAG_COMPRESSED: Flags = 0x1; +pub const LLMP_FLAG_COMPRESSED: Flags = Flags(0x1); /// From another broker. -pub const LLMP_FLAG_FROM_B2B: Flags = 0x2; +pub const LLMP_FLAG_FROM_B2B: Flags = Flags(0x2); /// Timt the broker 2 broker connection waits for incoming data, /// before checking for own data to forward again. @@ -170,19 +181,75 @@ static mut LLMP_SIGHANDLER_STATE: LlmpShutdownSignalHandler = LlmpShutdownSignal }; /// TAGs used throughout llmp -pub type Tag = u32; -/// The client ID == the sender id. -pub type ClientId = u32; +#[repr(transparent)] +#[derive(Copy, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)] +pub struct Tag(pub u32); + +impl Debug for Tag { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + f.write_fmt(format_args!("Tag({:X})", self.0)) + } +} + /// The broker ID, for broker 2 broker communication. -pub type BrokerId = u32; +#[repr(transparent)] +#[derive( + Debug, Default, Copy, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize, +)] +pub struct BrokerId(pub u32); /// The flags, indicating, for example, enabled compression. -pub type Flags = u32; +#[repr(transparent)] +#[derive(Copy, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub struct Flags(u32); + +impl Debug for Flags { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + f.write_fmt(format_args!("Flags{:x}( ", self.0))?; + // Initialized is the default value, no need to print. + if *self & LLMP_FLAG_COMPRESSED == LLMP_FLAG_COMPRESSED { + f.write_str("COMPRESSED")?; + } + if *self & LLMP_FLAG_FROM_B2B == LLMP_FLAG_FROM_B2B { + f.write_str("FROM_B2B")?; + } + f.write_str(" )") + } +} + +impl BitAnd for Flags { + type Output = Self; + + fn bitand(self, rhs: Self) -> Self::Output { + Self(self.0 & rhs.0) + } +} + +impl BitOr for Flags { + type Output = Self; + + fn bitor(self, rhs: Self) -> Self::Output { + Self(self.0 | rhs.0) + } +} + +impl Not for Flags { + type Output = Self; + + fn not(self) -> Self::Output { + Self(!self.0) + } +} + /// The message ID, an ever-increasing number, unique only to a sharedmap/page. #[cfg(target_pointer_width = "64")] -pub type MessageId = u64; +#[repr(transparent)] +#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)] +pub struct MessageId(u64); /// The message ID, an ever-increasing number, unique only to a sharedmap/page. #[cfg(not(target_pointer_width = "64"))] -pub type MessageId = u32; +#[repr(transparent)] +#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)] +pub struct MessageId(u32); /// This is for the server the broker will spawn. /// If an llmp connection is local - use sharedmaps @@ -477,7 +544,7 @@ unsafe fn llmp_page_init(shmem: &mut SHM, sender_id: ClientId, allow // Don't forget to subtract our own header size (*page).size_total = map_size - LLMP_PAGE_HEADER_LEN; (*page).size_used = 0; - (*(*page).messages.as_mut_ptr()).message_id = 0; + (*(*page).messages.as_mut_ptr()).message_id = MessageId(0); (*(*page).messages.as_mut_ptr()).tag = LLMP_TAG_UNSET; (*page).safe_to_unmap.store(0, Ordering::Relaxed); (*page).sender_dead.store(0, Ordering::Relaxed); @@ -827,14 +894,14 @@ where Ok(if client_id_str == _NULL_ENV_STR { None } else { - Some(client_id_str.parse()?) + Some(ClientId(client_id_str.parse()?)) }) } /// Writes the `id` to an env var #[cfg(feature = "std")] fn client_id_to_env(env_name: &str, id: ClientId) { - env::set_var(format!("{env_name}_CLIENT_ID"), format!("{id}")); + env::set_var(format!("{env_name}_CLIENT_ID"), format!("{}", id.0)); } /// Reattach to a vacant `out_shmem`, to with a previous sender stored the information in an env before. @@ -986,9 +1053,9 @@ where // We don't need to pad the EOP message: it'll always be the last in this page. (*ret).buf_len_padded = (*ret).buf_len; (*ret).message_id = if last_msg.is_null() { - 1 + MessageId(1) } else { - (*last_msg).message_id + 1 + MessageId((*last_msg).message_id.0 + 1) }; (*ret).tag = LLMP_TAG_END_OF_PAGE; (*page).size_used += EOP_MSG_SIZE; @@ -1050,12 +1117,12 @@ where /* We need to start with 1 for ids, as current message id is initialized * with 0... */ (*ret).message_id = if last_msg.is_null() { - 1 - } else if (*page).current_msg_id.load(Ordering::Relaxed) == (*last_msg).message_id { - (*last_msg).message_id + 1 + MessageId(1) + } else if (*page).current_msg_id.load(Ordering::Relaxed) == (*last_msg).message_id.0 { + MessageId((*last_msg).message_id.0 + 1) } else { /* Oops, wrong usage! */ - panic!("BUG: The current message never got committed using send! (page->current_msg_id {:?}, last_msg->message_id: {})", ptr::addr_of!((*page).current_msg_id), (*last_msg).message_id); + panic!("BUG: The current message never got committed using send! (page->current_msg_id {:?}, last_msg->message_id: {:?})", ptr::addr_of!((*page).current_msg_id), (*last_msg).message_id); }; (*ret).buf_len = buf_len as u64; @@ -1087,7 +1154,7 @@ where assert!(self.last_msg_sent != msg, "Message sent twice!"); assert!( (*msg).tag != LLMP_TAG_UNSET, - "No tag set on message with id {}", + "No tag set on message with id {:?}", (*msg).message_id ); // A client gets the sender id assigned to by the broker during the initial handshake. @@ -1101,12 +1168,12 @@ where ))); } - (*msg).message_id = (*page).current_msg_id.load(Ordering::Relaxed) + 1; + (*msg).message_id.0 = (*page).current_msg_id.load(Ordering::Relaxed) + 1; // Make sure all things have been written to the page, and commit the message to the page (*page) .current_msg_id - .store((*msg).message_id, Ordering::Release); + .store((*msg).message_id.0, Ordering::Release); self.last_msg_sent = msg; self.has_unsent_message = false; @@ -1169,7 +1236,7 @@ where // If we want to get red if old pages, (client to broker), do that now if !self.keep_pages_forever { #[cfg(feature = "llmp_debug")] - log::info!("pruning"); + log::debug!("LLMP DEBUG: pruning old pages"); self.prune_old_pages(); } @@ -1231,7 +1298,7 @@ where } #[cfg(feature = "llmp_debug")] - log::info!("Handled out eop"); + log::debug!("Handled out eop"); match unsafe { self.alloc_next_if_space(buf_len) } { Some(msg) => Ok(msg), @@ -1304,7 +1371,7 @@ where || tag == LLMP_TAG_UNSET { return Err(Error::unknown(format!( - "Reserved tag supplied to send_buf ({tag:#X})" + "Reserved tag supplied to send_buf ({tag:?})" ))); } @@ -1327,7 +1394,7 @@ where || tag == LLMP_TAG_UNSET { return Err(Error::unknown(format!( - "Reserved tag supplied to send_buf ({tag:#X})" + "Reserved tag supplied to send_buf ({tag:?})" ))); } @@ -1367,6 +1434,13 @@ where description.last_message_offset, ) } + + /// Send information that this client is exiting. + /// The other side may free up all allocated memory. + /// We are no longer allowed to send anything afterwards. + pub fn send_exiting(&mut self) -> Result<(), Error> { + self.send_buf(LLMP_TAG_EXITING, &[]) + } } /// Receiving end on a (unidirectional) sharedmap channel @@ -1375,10 +1449,13 @@ pub struct LlmpReceiver where SP: ShMemProvider, { - /// Id of this provider - pub id: u32, + /// Client Id of this receiver + pub id: ClientId, /// Pointer to the last message received pub last_msg_recvd: *const LlmpMsg, + /// Time we received the last message from this receiver + #[cfg(feature = "std")] + last_msg_time: Duration, /// The shmem provider pub shmem_provider: SP, /// current page. After EOP, this gets replaced with the new one @@ -1426,16 +1503,22 @@ where }; Ok(Self { - id: 0, + id: ClientId(0), current_recv_shmem, last_msg_recvd, shmem_provider, - highest_msg_id: 0, + highest_msg_id: MessageId(0), + // We don't know the last received time, just assume the current time. + #[cfg(feature = "std")] + last_msg_time: current_time(), }) } // Never inline, to not get some strange effects /// Read next message. + /// Returns a pointer to the [`LlmpMsg`], `None` of no message exists, or an [`Error`]. + /// + /// Will *not* update `self.last_msg_time`. #[inline(never)] unsafe fn recv(&mut self) -> Result, Error> { /* DBG("recv %p %p\n", page, last_msg); */ @@ -1449,12 +1532,12 @@ where } else { // read the msg_id from shared map let current_msg_id = (*page).current_msg_id.load(Ordering::Relaxed); - self.highest_msg_id = current_msg_id; - (current_msg_id, true) + self.highest_msg_id = MessageId(current_msg_id); + (MessageId(current_msg_id), true) }; // Read the message from the page - let ret = if current_msg_id == 0 { + let ret = if current_msg_id.0 == 0 { /* No messages yet */ None } else if last_msg.is_null() { @@ -1485,7 +1568,7 @@ where // Handle special, LLMP internal, messages. match (*msg).tag { LLMP_TAG_UNSET => panic!( - "BUG: Read unallocated msg (tag was {:x} - msg header: {:?}", + "BUG: Read unallocated msg (tag was {:?} - msg header: {:?}", LLMP_TAG_UNSET, &(*msg) ), @@ -1495,7 +1578,7 @@ where return Err(Error::shutting_down()); } LLMP_TAG_END_OF_PAGE => { - log::info!("Received end of page, allocating next"); + log::debug!("Received end of page, allocating next"); // Handle end of page assert!( (*msg).buf_len >= size_of::() as u64, @@ -1514,7 +1597,7 @@ where // Set last msg we received to null (as the map may no longer exist) self.last_msg_recvd = ptr::null(); - self.highest_msg_id = 0; + self.highest_msg_id = MessageId(0); // Mark the old page save to unmap, in case we didn't do so earlier. (*page).safe_to_unmap.store(1, Ordering::Relaxed); @@ -1552,7 +1635,7 @@ where /// # Safety /// Returns a raw ptr, on the recv map. Should be safe in general pub unsafe fn recv_blocking(&mut self) -> Result<*mut LlmpMsg, Error> { - let mut current_msg_id = 0; + let mut current_msg_id = MessageId(0); let page = self.current_recv_shmem.page_mut(); let last_msg = self.last_msg_recvd; if !last_msg.is_null() { @@ -1564,7 +1647,7 @@ where current_msg_id = (*last_msg).message_id; } loop { - if (*page).current_msg_id.load(Ordering::Relaxed) != current_msg_id { + if (*page).current_msg_id.load(Ordering::Relaxed) != current_msg_id.0 { return match self.recv()? { Some(msg) => Ok(msg), None => panic!("BUG: blocking llmp message should never be NULL"), @@ -1805,6 +1888,16 @@ where /// This allows us to intercept messages right in the broker. /// This keeps the out map clean. pub llmp_clients: Vec>, + /// The own listeners we spawned via `launch_listener` or `crate_attach_to_tcp`. + /// Listeners will be ignored for `exit_cleanly_after` and they are never considered to have timed out. + listeners: Vec, + /// The total amount of clients we had, historically, including those that disconnected, and our listeners. + num_clients_total: usize, + /// The amount of total clients that should have connected and (and disconnected) + /// after which the broker loop should quit gracefully. + pub exit_cleanly_after: Option, + /// Clients that should be removed soon, (offset into llmp_clients) + clients_to_remove: Vec, /// The ShMemProvider to use shmem_provider: SP, } @@ -1839,10 +1932,10 @@ where pub fn new(mut shmem_provider: SP) -> Result { Ok(LlmpBroker { llmp_out: LlmpSender { - id: 0, + id: ClientId(0), last_msg_sent: ptr::null_mut(), out_shmems: vec![LlmpSharedMap::new( - 0, + ClientId(0), shmem_provider.new_shmem(next_shmem_size(0))?, )], // Broker never cleans up the pages so that new @@ -1853,7 +1946,11 @@ where unused_shmem_cache: vec![], }, llmp_clients: vec![], + clients_to_remove: vec![], shmem_provider, + listeners: vec![], + exit_cleanly_after: None, + num_clients_total: 0, }) } @@ -1870,6 +1967,15 @@ where } } + /// Set this broker to exit after at least `count` clients attached and all client exited. + /// Will ignore the own listener thread, if `create_attach_to_tcp` + /// + /// So, if the `n_client` value is `2`, the broker will not exit after client 1 connected and disconnected, + /// but it will quit after client 2 connected and disconnected. + pub fn set_exit_cleanly_after(&mut self, n_clients: NonZeroUsize) { + self.exit_cleanly_after = Some(n_clients); + } + /// Allocate the next message on the outgoing map unsafe fn alloc_next(&mut self, buf_len: usize) -> Result<*mut LlmpMsg, Error> { self.llmp_out.alloc_next(buf_len) @@ -1877,18 +1983,25 @@ where /// Registers a new client for the given sharedmap str and size. /// Returns the id of the new client in [`broker.client_shmem`] - pub fn register_client(&mut self, mut client_page: LlmpSharedMap) { - // Tell the client it may unmap this page now. + pub fn register_client(&mut self, mut client_page: LlmpSharedMap) -> ClientId { + // Tell the client it may unmap its initial allocated shmem page now. + // Since we now have a handle to it, it won't be umapped too early (only after we also unmap it) client_page.mark_safe_to_unmap(); - let id = self.llmp_clients.len() as u32; + let id = ClientId(self.num_clients_total.try_into().unwrap()); self.llmp_clients.push(LlmpReceiver { id, current_recv_shmem: client_page, last_msg_recvd: ptr::null_mut(), shmem_provider: self.shmem_provider.clone(), - highest_msg_id: 0, + highest_msg_id: MessageId(0), + // We don't know the last received time, just assume the current time. + #[cfg(feature = "std")] + last_msg_time: current_time(), }); + + self.num_clients_total += 1; + id } /// Connects to a broker running on another machine. @@ -1923,7 +2036,7 @@ where let broker_id = match recv_tcp_msg(&mut stream)?.try_into()? { TcpResponse::RemoteBrokerAccepted { broker_id } => { - log::info!("B2B: Got Connection Ack, broker_id {broker_id}"); + log::info!("B2B: Got Connection Ack, broker_id {broker_id:?}"); broker_id } _ => { @@ -1934,12 +2047,12 @@ where }; // TODO: use broker ids! - log::info!("B2B: We are broker {broker_id}"); + log::info!("B2B: We are broker {broker_id:?}"); // TODO: handle broker_ids properly/at all. let map_description = Self::b2b_thread_on( stream, - self.llmp_clients.len() as ClientId, + ClientId(self.llmp_clients.len() as u32), &self .llmp_out .out_shmems @@ -1987,12 +2100,38 @@ where where F: FnMut(ClientId, Tag, Flags, &[u8]) -> Result, { + #[cfg(feature = "std")] + let current_time = current_time(); let mut new_messages = false; for i in 0..self.llmp_clients.len() { - unsafe { - new_messages |= self.handle_new_msgs(i as u32, on_new_msg)?; + let client_id = self.llmp_clients[i].id; + match unsafe { self.handle_new_msgs(client_id, on_new_msg) } { + Ok(has_messages) => { + // See if we need to remove this client, in case no new messages got brokered, and it's not a listener + #[cfg(feature = "std")] + if !has_messages && !self.listeners.iter().any(|&x| x == client_id) { + let last_msg_time = self.llmp_clients[i].last_msg_time; + if last_msg_time < current_time + && current_time - last_msg_time > CLIENT_TIMEOUT + { + self.clients_to_remove.push(i as u32); + #[cfg(feature = "llmp_debug")] + println!("Client {i} timed out. Removing."); + } + } + new_messages = has_messages; + } + Err(Error::ShuttingDown) => self.clients_to_remove.push(i as u32), + Err(err) => return Err(err), } } + + // After brokering, remove all clients we don't want to keep. + for client_id in self.clients_to_remove.iter().rev() { + log::debug!("Client {client_id} disconnected."); + self.llmp_clients.remove((*client_id) as usize); + } + self.clients_to_remove.clear(); Ok(new_messages) } @@ -2012,8 +2151,16 @@ where false } - /// Loops infinitely, forwarding and handling all incoming messages from clients. - /// Never returns. + /// Returns if any clients are currently connected. + /// Ignores listener threads that belong to the broker, + /// talking to other brokers via TCP, and accepting new clients over this port. + #[inline] + fn has_clients(&self) -> bool { + self.llmp_clients.len() > self.listeners.len() + } + + /// Loops until the last client quits, + /// forwarding and handling all incoming messages from clients. /// Will call `on_timeout` roughly after `timeout` /// Panics on error. /// 5 millis of sleep can't hurt to keep busywait not at 100% @@ -2052,6 +2199,23 @@ where end_time = current_milliseconds() + timeout; } + if let Some(exit_after_count) = self.exit_cleanly_after { + log::trace!( + "Clients connected: {} && > {} - {} >= {}", + self.has_clients(), + self.num_clients_total, + self.listeners.len(), + exit_after_count + ); + if !self.has_clients() + && (self.num_clients_total - self.listeners.len()) >= exit_after_count.into() + { + // No more clients connected, and the amount of clients we were waiting for was previously connected. + // exit cleanly. + break; + } + } + #[cfg(feature = "std")] if let Some(time) = sleep_time { thread::sleep(time); @@ -2059,7 +2223,7 @@ where #[cfg(not(feature = "std"))] if let Some(time) = sleep_time { - panic!("Cannot sleep on no_std platform (requested {:?})", time); + panic!("Cannot sleep on no_std platform (requested {time:?})"); } } self.llmp_out @@ -2067,8 +2231,8 @@ where .expect("Error when shutting down broker: Could not send LLMP_TAG_EXITING msg."); } - /// Loops infinitely, forwarding and handling all incoming messages from clients. - /// Never returns. Panics on error. + /// Loops unitl the last client quit, + /// forwarding and handling all incoming messages from clients. /// 5 millis of sleep can't hurt to keep busywait not at 100% /// On std, if you need to run code even if no update got sent, use `Self::loop_with_timeout` (needs the `std` feature). pub fn loop_forever(&mut self, on_new_msg: &mut F, sleep_time: Option) @@ -2085,6 +2249,16 @@ where self.once(on_new_msg) .expect("An error occurred when brokering. Exiting."); + if let Some(exit_after_count) = self.exit_cleanly_after { + if !self.has_clients() + && (self.num_clients_total - self.listeners.len()) > exit_after_count.into() + { + // No more clients connected, and the amount of clients we were waiting for was previously connected. + // exit cleanly. + break; + } + } + #[cfg(feature = "std")] if let Some(time) = sleep_time { thread::sleep(time); @@ -2092,7 +2266,7 @@ where #[cfg(not(feature = "std"))] if let Some(time) = sleep_time { - panic!("Cannot sleep on no_std platform (requested {:?})", time); + panic!("Cannot sleep on no_std platform (requested {time:?})"); } } self.llmp_out @@ -2205,7 +2379,7 @@ where Ok(Some((client_id, tag, flags, payload))) => { if client_id == b2b_client_id { log::info!( - "Ignored message we probably sent earlier (same id), TAG: {tag:x}" + "Ignored message we probably sent earlier (same id), TAG: {tag:?}" ); continue; } @@ -2297,7 +2471,7 @@ where fn handle_tcp_request( mut stream: TcpStream, request: &TcpRequest, - current_client_id: &mut u32, + current_client_id: &mut ClientId, sender: &mut LlmpSender, broker_shmem_description: &ShMemDescription, ) { @@ -2316,7 +2490,7 @@ where ) { log::info!("An error occurred sending via tcp {e}"); }; - *current_client_id += 1; + current_client_id.0 += 1; } TcpRequest::RemoteBrokerHello { hostname } => { log::info!("B2B new client: {hostname}"); @@ -2325,7 +2499,7 @@ where if send_tcp_msg( &mut stream, &TcpResponse::RemoteBrokerAccepted { - broker_id: *current_client_id, + broker_id: BrokerId(current_client_id.0), }, ) .is_err() @@ -2340,7 +2514,7 @@ where if Self::announce_new_client(sender, &shmem_description).is_err() { log::info!("B2B: Error announcing client {shmem_description:?}"); }; - *current_client_id += 1; + current_client_id.0 += 1; } } }; @@ -2365,7 +2539,7 @@ where hostname, }; - let llmp_tcp_id = self.llmp_clients.len() as ClientId; + let llmp_tcp_id = ClientId(self.llmp_clients.len() as u32); // Tcp out map sends messages from background thread tcp server to foreground client let tcp_out_shmem = LlmpSharedMap::new( @@ -2373,13 +2547,13 @@ where self.shmem_provider.new_shmem(LLMP_CFG_INITIAL_MAP_SIZE)?, ); let tcp_out_shmem_description = tcp_out_shmem.shmem.description(); - self.register_client(tcp_out_shmem); + let listener_id = self.register_client(tcp_out_shmem); let ret = thread::spawn(move || { // Create a new ShMemProvider for this background thread. let mut shmem_provider_bg = SP::new().unwrap(); - let mut current_client_id = llmp_tcp_id + 1; + let mut current_client_id = ClientId(llmp_tcp_id.0 + 1); let mut tcp_incoming_sender = LlmpSender { id: llmp_tcp_id, @@ -2445,6 +2619,8 @@ where } }); + self.listeners.push(listener_id); + Ok(ret) } @@ -2454,34 +2630,40 @@ where #[allow(clippy::cast_ptr_alignment)] unsafe fn handle_new_msgs( &mut self, - client_id: u32, + client_id: ClientId, on_new_msg: &mut F, ) -> Result where F: FnMut(ClientId, Tag, Flags, &[u8]) -> Result, { - let mut new_mesages = false; + let mut new_messages = false; let mut next_id = self.llmp_clients.len() as u32; // TODO: We could memcpy a range of pending messages, instead of one by one. loop { let msg = { - let client = &mut self.llmp_clients[client_id as usize]; + let client = &mut self.llmp_clients[client_id.0 as usize]; match client.recv()? { None => { // We're done handling this client - return Ok(new_mesages); + #[cfg(feature = "std")] + if new_messages { + // set the recv time + // We don't do that in recv() to keep calls to `current_time` to a minimum. + self.llmp_clients[client_id.0 as usize].last_msg_time = current_time(); + } + return Ok(new_messages); } Some(msg) => msg, } }; // We got a new message - new_mesages = true; + new_messages = true; match (*msg).tag { // first, handle the special, llmp-internal messages LLMP_SLOW_RECEIVER_PANIC => { - return Err(Error::unknown(format!("The broker was too slow to handle messages of client {client_id} in time, so it quit. Either the client sent messages too fast, or we (the broker) got stuck!"))); + return Err(Error::unknown(format!("The broker was too slow to handle messages of client {client_id:?} in time, so it quit. Either the client sent messages too fast, or we (the broker) got stuck!"))); } LLMP_TAG_NEW_SHM_CLIENT => { /* This client informs us about yet another new client @@ -2510,12 +2692,16 @@ where next_id += 1; new_page.mark_safe_to_unmap(); self.llmp_clients.push(LlmpReceiver { - id, + id: ClientId(id), current_recv_shmem: new_page, last_msg_recvd: ptr::null_mut(), shmem_provider: self.shmem_provider.clone(), - highest_msg_id: 0, + highest_msg_id: MessageId(0), + // We don't know the last received time, just assume the current time. + #[cfg(feature = "std")] + last_msg_time: current_time(), }); + self.num_clients_total += 1; } Err(e) => { log::info!("Error adding client! Ignoring: {e:?}"); @@ -2531,7 +2717,7 @@ where // The message is not specifically for use. Let the user handle it, then forward it to the clients, if necessary. let mut should_forward_msg = true; - let map = &mut self.llmp_clients[client_id as usize].current_recv_shmem; + let map = &mut self.llmp_clients[client_id.0 as usize].current_recv_shmem; let msg_buf = (*msg).try_as_slice(map)?; if let LlmpMsgHookResult::Handled = (on_new_msg)(client_id, (*msg).tag, (*msg).flags, msg_buf)? @@ -2689,11 +2875,14 @@ where }, receiver: LlmpReceiver { - id: 0, + id: ClientId(0), current_recv_shmem: initial_broker_shmem, last_msg_recvd: ptr::null_mut(), shmem_provider, - highest_msg_id: 0, + highest_msg_id: MessageId(0), + // We don't know the last received time, just assume the current time. + #[cfg(feature = "std")] + last_msg_time: current_time(), }, }) } @@ -2827,7 +3016,7 @@ where ); // We'll set `sender_id` later - let mut ret = Self::new(shmem_provider, map, 0)?; + let mut ret = Self::new(shmem_provider, map, ClientId(0))?; let client_hello_req = TcpRequest::LocalClientHello { shmem_description: ret.sender.out_shmems.first().unwrap().shmem.description(), @@ -2891,7 +3080,7 @@ mod tests { .once(&mut |_sender_id, _tag, _flags, _msg| Ok(ForwardToClients)) .unwrap(); - let tag: Tag = 0x1337; + let tag: Tag = Tag(0x1337); let arr: [u8; 1] = [1_u8]; // Send stuff client.send_buf(tag, &arr).unwrap(); diff --git a/libafl/src/bolts/mod.rs b/libafl/src/bolts/mod.rs index ae09d6fcb6..dd4afaa1f5 100644 --- a/libafl/src/bolts/mod.rs +++ b/libafl/src/bolts/mod.rs @@ -34,6 +34,15 @@ use core::{iter::Iterator, time}; #[cfg(feature = "std")] use std::time::{SystemTime, UNIX_EPOCH}; +use serde::{Deserialize, Serialize}; + +/// The client ID == the sender id. +#[repr(transparent)] +#[derive( + Debug, Default, Copy, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize, +)] +pub struct ClientId(pub u32); + #[cfg(feature = "std")] use log::{Level, Metadata, Record}; diff --git a/libafl/src/bolts/staterestore.rs b/libafl/src/bolts/staterestore.rs index 13335bf230..6f7ae009e7 100644 --- a/libafl/src/bolts/staterestore.rs +++ b/libafl/src/bolts/staterestore.rs @@ -26,6 +26,9 @@ use crate::{ Error, }; +/// If the saved page content equals exactly this buf, the restarted child wants to exit cleanly. +const EXITING_MAGIC: &[u8; 16] = b"LIBAFL_EXIT_NOW\0"; + /// The struct stored on the shared map, containing either the data, or the filename to read contents from. #[repr(C)] struct StateShMemContent { @@ -187,6 +190,35 @@ where content_mut.buf_len = 0; } + /// When called from a child, informs the restarter/parent process + /// that it should no longer respawn the child. + pub fn send_exiting(&mut self) { + self.reset(); + + let len = EXITING_MAGIC.len(); + + assert!(size_of::() + len <= self.shmem.len()); + + let shmem_content = self.content_mut(); + unsafe { + ptr::copy_nonoverlapping( + EXITING_MAGIC as *const u8, + shmem_content.buf.as_mut_ptr(), + len, + ); + } + shmem_content.buf_len = EXITING_MAGIC.len(); + } + + /// Returns true, if [`Self::send_exiting`] was called on this [`StateRestorer`] last. + /// This should be checked in the parent before deciding to restore the client. + pub fn wants_to_exit(&self) -> bool { + let len = EXITING_MAGIC.len(); + assert!(size_of::() + len <= self.shmem.len()); + let bytes = unsafe { slice::from_raw_parts(self.content().buf.as_ptr(), len) }; + bytes == EXITING_MAGIC + } + fn content_mut(&mut self) -> &mut StateShMemContent { let ptr = self.shmem.as_slice().as_ptr(); #[allow(clippy::cast_ptr_alignment)] // Beginning of the page will always be aligned @@ -207,7 +239,7 @@ where self.content().buf_len > 0 } - /// Restores the contents saved in this [`StateRestorer`], if any are availiable. + /// Restores the contents saved in this [`StateRestorer`], if any are available. /// Can only be read once. pub fn restore(&self) -> Result, Error> where @@ -223,6 +255,13 @@ where state_shmem_content.buf_len_checked(self.mapsize())?, ) }; + + if bytes == EXITING_MAGIC { + return Err(Error::illegal_state( + "Trying to restore a state after send_exiting was called.", + )); + } + let mut state = bytes; let mut file_content; if state_shmem_content.buf_len == 0 { diff --git a/libafl/src/events/llmp.rs b/libafl/src/events/llmp.rs index beb5e0976e..c1d923b6ee 100644 --- a/libafl/src/events/llmp.rs +++ b/libafl/src/events/llmp.rs @@ -7,7 +7,7 @@ use alloc::{ }; #[cfg(feature = "std")] use core::sync::atomic::{compiler_fence, Ordering}; -use core::{marker::PhantomData, time::Duration}; +use core::{marker::PhantomData, num::NonZeroUsize, time::Duration}; #[cfg(feature = "std")] use std::net::{SocketAddr, ToSocketAddrs}; @@ -40,6 +40,7 @@ use crate::{ bolts::{ llmp::{self, LlmpClient, LlmpClientDescription, Tag}, shmem::ShMemProvider, + ClientId, }, events::{ BrokerEventResult, Event, EventConfig, EventFirer, EventManager, EventManagerId, @@ -54,14 +55,14 @@ use crate::{ }; /// Forward this to the client -const _LLMP_TAG_EVENT_TO_CLIENT: Tag = 0x2C11E471; +const _LLMP_TAG_EVENT_TO_CLIENT: Tag = Tag(0x2C11E471); /// Only handle this in the broker -const _LLMP_TAG_EVENT_TO_BROKER: Tag = 0x2B80438; +const _LLMP_TAG_EVENT_TO_BROKER: Tag = Tag(0x2B80438); /// Handle in both /// -const LLMP_TAG_EVENT_TO_BOTH: Tag = 0x2B0741; -const _LLMP_TAG_RESTART: Tag = 0x8357A87; -const _LLMP_TAG_NO_RESTART: Tag = 0x57A7EE71; +const LLMP_TAG_EVENT_TO_BOTH: Tag = Tag(0x2B0741); +const _LLMP_TAG_RESTART: Tag = Tag(0x8357A87); +const _LLMP_TAG_NO_RESTART: Tag = Tag(0x57A7EE71); /// The minimum buffer size at which to compress LLMP IPC messages. #[cfg(feature = "llmp_compression")] @@ -113,6 +114,11 @@ where }) } + /// Exit the broker process cleanly after at least `n` clients attached and all of them disconnected again + pub fn set_exit_cleanly_after(&mut self, n_clients: NonZeroUsize) { + self.llmp.set_exit_cleanly_after(n_clients); + } + /// Connect to an llmp broker on the givien address #[cfg(feature = "std")] pub fn connect_b2b(&mut self, addr: A) -> Result<(), Error> @@ -154,10 +160,13 @@ where Some(Duration::from_millis(5)), ); - Ok(()) + #[cfg(all(feature = "std", feature = "llmp_debug"))] + println!("The last client quit. Exiting."); + + Err(Error::shutting_down()) } - /// Run forever in the broker + /// Run in the broker until all clients exit #[cfg(feature = "llmp_broker_timeouts")] pub fn broker_loop(&mut self) -> Result<(), Error> { let monitor = &mut self.monitor; @@ -189,7 +198,7 @@ where Ok(llmp::LlmpMsgHookResult::ForwardToClients) } } else { - monitor.display("Broker".into(), 0); + monitor.display("Broker".into(), ClientId(0)); Ok(llmp::LlmpMsgHookResult::Handled) } }, @@ -197,14 +206,17 @@ where Some(Duration::from_millis(5)), ); - Ok(()) + #[cfg(feature = "llmp_debug")] + println!("The last client quit. Exiting."); + + Err(Error::shutting_down()) } /// Handle arriving events in the broker #[allow(clippy::unnecessary_wraps)] fn handle_in_broker( monitor: &mut MT, - client_id: u32, + client_id: ClientId, event: &Event, ) -> Result { match &event { @@ -422,7 +434,7 @@ where fuzzer: &mut Z, executor: &mut E, state: &mut S, - _client_id: u32, + client_id: ClientId, event: Event, ) -> Result<(), Error> where @@ -440,7 +452,7 @@ where time: _, executions: _, } => { - log::info!("Received new Testcase from {_client_id} ({client_config:?})"); + log::info!("Received new Testcase from {client_id:?} ({client_config:?})"); let _res = if client_config.match_with(&self.configuration) && observers_buf.is_some() @@ -474,6 +486,15 @@ where } } +impl LlmpEventManager { + /// Send information that this client is exiting. + /// The other side may free up all allocated memory. + /// We are no longer allowed to send anything afterwards. + pub fn send_exiting(&mut self) -> Result<(), Error> { + self.llmp.sender.send_exiting() + } +} + impl UsesState for LlmpEventManager where S: UsesInput, @@ -622,9 +643,7 @@ where { /// Gets the id assigned to this staterestorer. fn mgr_id(&self) -> EventManagerId { - EventManagerId { - id: self.llmp.sender.id as usize, - } + EventManagerId(self.llmp.sender.id.0 as usize) } } @@ -702,6 +721,13 @@ where self.staterestorer .save(&(state, &self.llmp_mgr.describe()?)) } + + fn send_exiting(&mut self) -> Result<(), Error> { + self.staterestorer.send_exiting(); + // Also inform the broker that we are about to exit. + // This way, the broker can clean up the pages, and eventually exit. + self.llmp_mgr.send_exiting() + } } #[cfg(feature = "std")] @@ -840,6 +866,14 @@ where /// The type of manager to build #[builder(default = ManagerKind::Any)] kind: ManagerKind, + /// The amount of external clients that should have connected (not counting our own tcp client) + /// before this broker quits _after the last client exited_. + /// If `None`, the broker will never quit when the last client exits, but run forever. + /// + /// So, if this value is `Some(2)`, the broker will not exit after client 1 connected and disconnected, + /// but it will quit after client 2 connected and disconnected. + #[builder(default = None)] + exit_cleanly_after: Option, #[builder(setter(skip), default = PhantomData)] phantom_data: PhantomData, } @@ -865,6 +899,10 @@ where broker.connect_b2b(remote_broker_addr)?; }; + if let Some(exit_cleanly_after) = self.exit_cleanly_after { + broker.set_exit_cleanly_after(exit_cleanly_after); + } + broker.broker_loop() }; @@ -903,8 +941,7 @@ where )?; broker_things(event_broker, self.remote_broker_addr)?; - - return Err(Error::shutting_down()); + unreachable!("The broker may never return normally, only on Errors or when shutting down."); } ManagerKind::Client { cpu_core } => { // We are a client @@ -999,6 +1036,10 @@ where panic!("Fuzzer-respawner: Storing state in crashed fuzzer instance did not work, no point to spawn the next client! This can happen if the child calls `exit()`, in that case make sure it uses `abort()`, if it got killed unrecoverable (OOM), or if there is a bug in the fuzzer itself. (Child exited with: {child_status})"); } + if staterestorer.wants_to_exit() { + return Err(Error::shutting_down()); + } + ctr = ctr.wrapping_add(1); } } else { @@ -1188,7 +1229,7 @@ where executor: &mut E, state: &mut S, manager: &mut EM, - _client_id: u32, + _client_id: ClientId, event: Event, ) -> Result<(), Error> where @@ -1207,7 +1248,7 @@ where time: _, executions: _, } => { - log::info!("Received new Testcase to convert from {_client_id}"); + log::info!("Received new Testcase to convert from {_client_id:?}"); let Some(converter) = self.converter_back.as_mut() else { return Ok(()); @@ -1412,6 +1453,7 @@ mod tests { shmem::{ShMemProvider, StdShMemProvider}, staterestore::StateRestorer, tuples::tuple_list, + ClientId, }, corpus::{Corpus, InMemoryCorpus, Testcase}, events::{llmp::_ENV_FUZZER_SENDER, LlmpEventManager}, @@ -1447,8 +1489,8 @@ mod tests { let mut llmp_client = LlmpClient::new( shmem_provider.clone(), - LlmpSharedMap::new(0, shmem_provider.new_shmem(1024).unwrap()), - 0, + LlmpSharedMap::new(ClientId(0), shmem_provider.new_shmem(1024).unwrap()), + ClientId(0), ) .unwrap(); diff --git a/libafl/src/events/mod.rs b/libafl/src/events/mod.rs index 55f3a73fd1..ee2f5c5951 100644 --- a/libafl/src/events/mod.rs +++ b/libafl/src/events/mod.rs @@ -109,11 +109,12 @@ impl Handler for ShutdownSignalData { /// A per-fuzzer unique `ID`, usually starting with `0` and increasing /// by `1` in multiprocessed `EventManager`s, such as [`self::llmp::LlmpEventManager`]. -#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)] -pub struct EventManagerId { +#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Hash)] +#[repr(transparent)] +pub struct EventManagerId( /// The id - pub id: usize, -} + pub usize, +); #[cfg(feature = "introspection")] use crate::monitors::ClientPerfMonitor; @@ -504,6 +505,12 @@ pub trait EventRestarter: UsesState { Ok(()) } + /// Send information that this client is exiting. + /// No need to restart us any longer, and no need to print an error, either. + fn send_exiting(&mut self) -> Result<(), Error> { + Ok(()) + } + /// Block until we are safe to exit. #[inline] fn await_restart_safe(&mut self) {} @@ -625,7 +632,7 @@ impl ProgressReporter for NopEventManager where impl HasEventManagerId for NopEventManager { fn mgr_id(&self) -> EventManagerId { - EventManagerId { id: 0 } + EventManagerId(0) } } diff --git a/libafl/src/events/simple.rs b/libafl/src/events/simple.rs index b17ddf6578..5c2f36d6e8 100644 --- a/libafl/src/events/simple.rs +++ b/libafl/src/events/simple.rs @@ -26,14 +26,8 @@ use crate::{ bolts::os::unix_signals::setup_signal_handler, events::{shutdown_handler, SHUTDOWN_SIGHANDLER_DATA}, }; -#[cfg(feature = "std")] -use crate::{ - bolts::{shmem::ShMemProvider, staterestore::StateRestorer}, - corpus::Corpus, - monitors::SimplePrintingMonitor, - state::{HasCorpus, HasSolutions}, -}; use crate::{ + bolts::ClientId, events::{ BrokerEventResult, Event, EventFirer, EventManager, EventManagerId, EventProcessor, EventRestarter, HasEventManagerId, @@ -43,6 +37,13 @@ use crate::{ state::{HasClientPerfMonitor, HasExecutions, HasMetadata, UsesState}, Error, }; +#[cfg(feature = "std")] +use crate::{ + bolts::{shmem::ShMemProvider, staterestore::StateRestorer}, + corpus::Corpus, + monitors::SimplePrintingMonitor, + state::{HasCorpus, HasSolutions}, +}; /// The llmp connection from the actual fuzzer to the process supervising it const _ENV_FUZZER_SENDER: &str = "_AFL_ENV_FUZZER_SENDER"; @@ -166,7 +167,7 @@ where S: UsesInput, { fn mgr_id(&self) -> EventManagerId { - EventManagerId { id: 0 } + EventManagerId(0) } } @@ -214,12 +215,12 @@ where executions, } => { monitor - .client_stats_mut_for(0) + .client_stats_mut_for(ClientId(0)) .update_corpus_size(*corpus_size as u64); monitor - .client_stats_mut_for(0) + .client_stats_mut_for(ClientId(0)) .update_executions(*executions as u64, *time); - monitor.display(event.name().to_string(), 0); + monitor.display(event.name().to_string(), ClientId(0)); Ok(BrokerEventResult::Handled) } Event::UpdateExecStats { @@ -228,11 +229,11 @@ where phantom: _, } => { // TODO: The monitor buffer should be added on client add. - let client = monitor.client_stats_mut_for(0); + let client = monitor.client_stats_mut_for(ClientId(0)); client.update_executions(*executions as u64, *time); - monitor.display(event.name().to_string(), 0); + monitor.display(event.name().to_string(), ClientId(0)); Ok(BrokerEventResult::Handled) } Event::UpdateUserStats { @@ -241,9 +242,9 @@ where phantom: _, } => { monitor - .client_stats_mut_for(0) + .client_stats_mut_for(ClientId(0)) .update_user_stats(name.clone(), value.clone()); - monitor.display(event.name().to_string(), 0); + monitor.display(event.name().to_string(), ClientId(0)); Ok(BrokerEventResult::Handled) } #[cfg(feature = "introspection")] @@ -254,17 +255,17 @@ where phantom: _, } => { // TODO: The monitor buffer should be added on client add. - let client = monitor.client_stats_mut_for(0); + let client = monitor.client_stats_mut_for(ClientId(0)); client.update_executions(*executions as u64, *time); client.update_introspection_monitor((**introspection_monitor).clone()); - monitor.display(event.name().to_string(), 0); + monitor.display(event.name().to_string(), ClientId(0)); Ok(BrokerEventResult::Handled) } Event::Objective { objective_size } => { monitor - .client_stats_mut_for(0) + .client_stats_mut_for(ClientId(0)) .update_objective_size(*objective_size as u64); - monitor.display(event.name().to_string(), 0); + monitor.display(event.name().to_string(), ClientId(0)); Ok(BrokerEventResult::Handled) } Event::Log { @@ -351,6 +352,11 @@ where self.staterestorer.reset(); self.staterestorer.save(state) } + + fn send_exiting(&mut self) -> Result<(), Error> { + self.staterestorer.send_exiting(); + Ok(()) + } } #[cfg(feature = "std")] @@ -540,7 +546,7 @@ where staterestorer.reset(); // load the corpus size into monitor to still display the correct numbers after restart. - let client_stats = monitor.client_stats_mut_for(0); + let client_stats = monitor.client_stats_mut_for(ClientId(0)); client_stats.update_corpus_size(state.corpus().count().try_into()?); client_stats.update_objective_size(state.solutions().count().try_into()?); diff --git a/libafl/src/feedbacks/map.rs b/libafl/src/feedbacks/map.rs index dce6058d44..45747204ae 100644 --- a/libafl/src/feedbacks/map.rs +++ b/libafl/src/feedbacks/map.rs @@ -39,6 +39,9 @@ pub type MaxMapFeedback = MapFeedback = MapFeedback; +/// A [`MapFeedback`] that always returns `true` for `is_interesting`. Useful for tracing all executions. +pub type AlwaysInterestingMapFeedback = MapFeedback; + /// A [`MapFeedback`] that strives to maximize the map contents, /// but only, if a value is larger than `pow2` of the previous. pub type MaxMapPow2Feedback = MapFeedback; @@ -83,6 +86,20 @@ where } } +/// A [`NopReducer`] does nothing, and just "reduces" to the second/`new` value. +#[derive(Clone, Debug)] +pub struct NopReducer {} + +impl Reducer for NopReducer +where + T: Default + Copy + 'static, +{ + #[inline] + fn reduce(_history: T, new: T) -> T { + new + } +} + /// A [`MaxReducer`] reduces int values and returns their maximum. #[derive(Clone, Debug)] pub struct MaxReducer {} @@ -208,7 +225,7 @@ where pub struct MapIndexesMetadata { /// The list of indexes. pub list: Vec, - /// A refcount used to know when remove this meta + /// A refcount used to know when we can remove this metadata pub tcref: isize, } @@ -337,6 +354,8 @@ where /// The most common AFL-like feedback type #[derive(Clone, Debug)] pub struct MapFeedback { + /// For tracking, always keep indexes and/or novelties, even if the map isn't considered `interesting`. + always_track: bool, /// Indexes used in the last observation indexes: bool, /// New indexes observed in the last observation @@ -632,6 +651,7 @@ where name: MAPFEEDBACK_PREFIX.to_string() + map_observer.name(), observer_name: map_observer.name().to_string(), stats_name: create_stats_name(map_observer.name()), + always_track: false, phantom: PhantomData, } } @@ -645,6 +665,7 @@ where name: MAPFEEDBACK_PREFIX.to_string() + map_observer.name(), observer_name: map_observer.name().to_string(), stats_name: create_stats_name(map_observer.name()), + always_track: false, phantom: PhantomData, } } @@ -659,9 +680,17 @@ where observer_name: observer_name.to_string(), stats_name: create_stats_name(name), phantom: PhantomData, + always_track: false, } } + /// For tracking, enable `always_track` mode, that also adds `novelties` or `indexes`, + /// even if the map is not novel for this feedback. + /// This is useful in combination with `load_initial_inputs_forced`, or other feedbacks. + pub fn set_always_track(&mut self, always_track: bool) { + self.always_track = always_track; + } + /// Creating a new `MapFeedback` with a specific name. This is usefully whenever the same /// feedback is needed twice, but with a different history. Using `new()` always results in the /// same name and therefore also the same history. @@ -673,6 +702,7 @@ where name: name.to_string(), observer_name: map_observer.name().to_string(), stats_name: create_stats_name(name), + always_track: false, phantom: PhantomData, } } @@ -691,6 +721,7 @@ where observer_name: observer_name.to_string(), stats_name: create_stats_name(name), name: name.to_string(), + always_track: false, phantom: PhantomData, } } @@ -758,7 +789,7 @@ where } } - if interesting { + if interesting || self.always_track { let len = history_map.len(); let filled = history_map.iter().filter(|&&i| i != initial).count(); // opt: if not tracking optimisations, we technically don't show the *current* history diff --git a/libafl/src/monitors/disk.rs b/libafl/src/monitors/disk.rs index b3a1d3c6c4..766f94f13d 100644 --- a/libafl/src/monitors/disk.rs +++ b/libafl/src/monitors/disk.rs @@ -11,7 +11,7 @@ use std::{ use serde_json::json; use crate::{ - bolts::{current_time, format_duration_hms}, + bolts::{current_time, format_duration_hms, ClientId}, monitors::{ClientStats, Monitor, NopMonitor}, }; @@ -45,7 +45,7 @@ where self.base.start_time() } - fn display(&mut self, event_msg: String, sender_id: u32) { + fn display(&mut self, event_msg: String, sender_id: ClientId) { let cur_time = current_time(); if (cur_time - self.last_update).as_secs() >= 60 { @@ -190,7 +190,7 @@ where self.base.start_time() } - fn display(&mut self, event_msg: String, sender_id: u32) { + fn display(&mut self, event_msg: String, sender_id: ClientId) { if (self.log_record)(&mut self.base) { let file = OpenOptions::new() .append(true) diff --git a/libafl/src/monitors/mod.rs b/libafl/src/monitors/mod.rs index b7f02ab1cd..212c344bcb 100644 --- a/libafl/src/monitors/mod.rs +++ b/libafl/src/monitors/mod.rs @@ -23,7 +23,7 @@ pub use disk::{OnDiskJSONMonitor, OnDiskTOMLMonitor}; use hashbrown::HashMap; use serde::{Deserialize, Serialize}; -use crate::bolts::{current_time, format_duration_hms}; +use crate::bolts::{current_time, format_duration_hms, ClientId}; #[cfg(feature = "afl_exec_sec")] const CLIENT_STATS_TIME_WINDOW_SECS: u64 = 5; // 5 seconds @@ -221,7 +221,7 @@ pub trait Monitor { fn start_time(&mut self) -> Duration; /// Show the monitor to the user - fn display(&mut self, event_msg: String, sender_id: u32); + fn display(&mut self, event_msg: String, sender_id: ClientId); /// Amount of elements in the corpus (combined for all children) fn corpus_size(&self) -> u64 { @@ -261,15 +261,15 @@ pub trait Monitor { } /// The client monitor for a specific id, creating new if it doesn't exist - fn client_stats_mut_for(&mut self, client_id: u32) -> &mut ClientStats { + fn client_stats_mut_for(&mut self, client_id: ClientId) -> &mut ClientStats { let client_stat_count = self.client_stats().len(); - for _ in client_stat_count..(client_id + 1) as usize { + for _ in client_stat_count..(client_id.0 + 1) as usize { self.client_stats_mut().push(ClientStats { last_window_time: current_time(), ..ClientStats::default() }); } - &mut self.client_stats_mut()[client_id as usize] + &mut self.client_stats_mut()[client_id.0 as usize] } } @@ -297,7 +297,7 @@ impl Monitor for NopMonitor { self.start_time } - fn display(&mut self, _event_msg: String, _sender_id: u32) {} + fn display(&mut self, _event_msg: String, _sender_id: ClientId) {} } impl NopMonitor { @@ -351,11 +351,11 @@ impl Monitor for SimplePrintingMonitor { self.start_time } - fn display(&mut self, event_msg: String, sender_id: u32) { + fn display(&mut self, event_msg: String, sender_id: ClientId) { println!( "[{} #{}] run time: {}, clients: {}, corpus: {}, objectives: {}, executions: {}, exec/sec: {}", event_msg, - sender_id, + sender_id.0, format_duration_hms(&(current_time() - self.start_time)), self.client_stats().len(), self.corpus_size(), @@ -370,7 +370,7 @@ impl Monitor for SimplePrintingMonitor { // Print the client performance monitor. println!( "Client {:03}:\n{}", - sender_id, self.client_stats[sender_id as usize].introspection_monitor + sender_id.0, self.client_stats[sender_id.0 as usize].introspection_monitor ); // Separate the spacing just a bit println!(); @@ -421,11 +421,11 @@ where self.start_time } - fn display(&mut self, event_msg: String, sender_id: u32) { + fn display(&mut self, event_msg: String, sender_id: ClientId) { let mut fmt = format!( "[{} #{}] run time: {}, clients: {}, corpus: {}, objectives: {}, executions: {}, exec/sec: {}", event_msg, - sender_id, + sender_id.0, format_duration_hms(&(current_time() - self.start_time)), self.client_stats().len(), self.corpus_size(), @@ -449,7 +449,7 @@ where // Print the client performance monitor. let fmt = format!( "Client {:03}:\n{}", - sender_id, self.client_stats[sender_id as usize].introspection_monitor + sender_id.0, self.client_stats[sender_id.0 as usize].introspection_monitor ); (self.print_fn)(fmt); @@ -956,7 +956,10 @@ pub mod pybind { use pyo3::{prelude::*, types::PyUnicode}; use super::ClientStats; - use crate::monitors::{Monitor, SimpleMonitor}; + use crate::{ + bolts::ClientId, + monitors::{Monitor, SimpleMonitor}, + }; // TODO create a PyObjectFnMut to pass, track stabilization of https://github.com/rust-lang/rust/issues/29625 @@ -1078,7 +1081,7 @@ pub mod pybind { unwrap_me_mut!(self.wrapper, m, { m.start_time() }) } - fn display(&mut self, event_msg: String, sender_id: u32) { + fn display(&mut self, event_msg: String, sender_id: ClientId) { unwrap_me_mut!(self.wrapper, m, { m.display(event_msg, sender_id) }); } } diff --git a/libafl/src/monitors/multi.rs b/libafl/src/monitors/multi.rs index e377f22f5c..5eb6db8db8 100644 --- a/libafl/src/monitors/multi.rs +++ b/libafl/src/monitors/multi.rs @@ -6,7 +6,7 @@ use alloc::{string::String, vec::Vec}; use core::{fmt::Write, time::Duration}; use crate::{ - bolts::{current_time, format_duration_hms}, + bolts::{current_time, format_duration_hms, ClientId}, monitors::{ClientStats, Monitor}, }; @@ -40,8 +40,8 @@ where self.start_time } - fn display(&mut self, event_msg: String, sender_id: u32) { - let sender = format!("#{sender_id}"); + fn display(&mut self, event_msg: String, sender_id: ClientId) { + let sender = format!("#{}", sender_id.0); let pad = if event_msg.len() + sender.len() < 13 { " ".repeat(13 - event_msg.len() - sender.len()) } else { diff --git a/libafl/src/monitors/prometheus.rs b/libafl/src/monitors/prometheus.rs index 95fea54e94..5007dbe6ea 100644 --- a/libafl/src/monitors/prometheus.rs +++ b/libafl/src/monitors/prometheus.rs @@ -41,7 +41,7 @@ use prometheus_client::{ use tide::Request; use crate::{ - bolts::{current_time, format_duration_hms}, + bolts::{current_time, format_duration_hms, ClientId}, monitors::{ClientStats, Monitor, UserStats}, }; @@ -95,7 +95,7 @@ where } #[allow(clippy::cast_sign_loss)] - fn display(&mut self, event_msg: String, sender_id: u32) { + fn display(&mut self, event_msg: String, sender_id: ClientId) { // Update the prometheus metrics // Label each metric with the sender / client_id // The gauges must take signed i64's, with max value of 2^63-1 so it is @@ -107,42 +107,42 @@ where let corpus_size = self.corpus_size(); self.corpus_count .get_or_create(&Labels { - client: sender_id, + client: sender_id.0, stat: String::new(), }) .set(corpus_size.try_into().unwrap()); let objective_size = self.objective_size(); self.objective_count .get_or_create(&Labels { - client: sender_id, + client: sender_id.0, stat: String::new(), }) .set(objective_size.try_into().unwrap()); let total_execs = self.total_execs(); self.executions .get_or_create(&Labels { - client: sender_id, + client: sender_id.0, stat: String::new(), }) .set(total_execs.try_into().unwrap()); let execs_per_sec = self.execs_per_sec(); self.exec_rate .get_or_create(&Labels { - client: sender_id, + client: sender_id.0, stat: String::new(), }) .set(execs_per_sec); let run_time = (current_time() - self.start_time).as_secs(); self.runtime .get_or_create(&Labels { - client: sender_id, + client: sender_id.0, stat: String::new(), }) .set(run_time.try_into().unwrap()); // run time in seconds, which can be converted to a time format by Grafana or similar let total_clients = self.client_stats().len().try_into().unwrap(); // convert usize to u64 (unlikely that # of clients will be > 2^64 -1...) self.clients_count .get_or_create(&Labels { - client: sender_id, + client: sender_id.0, stat: String::new(), }) .set(total_clients); @@ -151,7 +151,7 @@ where let fmt = format!( "[Prometheus] [{} #{}] run time: {}, clients: {}, corpus: {}, objectives: {}, executions: {}, exec/sec: {}", event_msg, - sender_id, + sender_id.0, format_duration_hms(&(current_time() - self.start_time)), self.client_stats().len(), self.corpus_size(), @@ -177,7 +177,7 @@ where }; self.custom_stat .get_or_create(&Labels { - client: sender_id, + client: sender_id.0, stat: key.clone(), }) .set(value); diff --git a/libafl/src/monitors/tui/mod.rs b/libafl/src/monitors/tui/mod.rs index dc56bb01d8..6bd2aba254 100644 --- a/libafl/src/monitors/tui/mod.rs +++ b/libafl/src/monitors/tui/mod.rs @@ -25,7 +25,7 @@ use tui::{backend::CrosstermBackend, Terminal}; #[cfg(feature = "introspection")] use super::{ClientPerfMonitor, PerfFeature}; use crate::{ - bolts::{current_time, format_duration_hms}, + bolts::{current_time, format_duration_hms, ClientId}, monitors::{ClientStats, Monitor, UserStats}, }; @@ -258,7 +258,7 @@ impl Monitor for TuiMonitor { } #[allow(clippy::cast_sign_loss)] - fn display(&mut self, event_msg: String, sender_id: u32) { + fn display(&mut self, event_msg: String, sender_id: ClientId) { let cur_time = current_time(); { @@ -279,7 +279,7 @@ impl Monitor for TuiMonitor { let client = self.client_stats_mut_for(sender_id); let exec_sec = client.execs_per_sec_pretty(cur_time); - let sender = format!("#{sender_id}"); + let sender = format!("#{}", sender_id.0); let pad = if event_msg.len() + sender.len() < 13 { " ".repeat(13 - event_msg.len() - sender.len()) } else { @@ -295,10 +295,10 @@ impl Monitor for TuiMonitor { } { - let client = &self.client_stats()[sender_id as usize]; + let client = &self.client_stats()[sender_id.0 as usize]; let mut ctx = self.context.write().unwrap(); ctx.clients - .entry(sender_id as usize) + .entry(sender_id.0 as usize) .or_default() .grab_data(client, exec_sec); while ctx.client_logs.len() >= DEFAULT_LOGS_NUMBER {