LLMP Client timeouts, Exit broker when last client exits (#1057)

* Moving type definitions to transparent structs

* function to notify other side of exit

* docs

* Exmaple support windows now

* timeout fix

* Exiting after the last client quit

* inform about quits

* clippy

* clippy

* clean exits

* fix

* more unsafe

* fixes

* Move ClientId

* fix no_std

* Fix prometheus

* introduce Cores.trim()

* add always_track metadata

* docu

* add AlwaysUniqueMapFeedback

* rename to always_interesting

* return CoreId for Launcher

* CoreId as transparent tuple struct

* fix graceful exits for launcher

* Broker exits after launcher

* clippy

* Fix llmp eop race, introduce llmp shmem cache

* initialize cached page, clippy

* fix llmp_debug strings

* add error handling

* nicer error output

* More error handling convenience

* clippy

* fix macos example

* nits

* trying to add a logger

* no_std

* inline logger enabled

* fix windows, non-fork

* macos

* no_std docs

* clippy

* use ? instead of unwraps in example

* more logging

* docs
This commit is contained in:
Dominik Maier 2023-02-24 11:50:42 +01:00 committed by GitHub
parent 92842c8b04
commit 672d25e5ac
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 658 additions and 259 deletions

View File

@ -3,45 +3,57 @@ This shows how llmp can be used directly, without libafl abstractions
*/
extern crate alloc;
#[cfg(all(any(unix, windows), feature = "std"))]
#[cfg(feature = "std")]
use core::time::Duration;
#[cfg(all(any(unix, windows), feature = "std"))]
use std::{thread, time};
#[cfg(feature = "std")]
use std::{num::NonZeroUsize, thread, time};
use libafl::{bolts::llmp::Tag, prelude::SimpleStdErrLogger};
#[cfg(all(any(unix, windows), feature = "std"))]
#[cfg(feature = "std")]
use libafl::{
bolts::{
llmp,
llmp::{self, Tag},
shmem::{ShMemProvider, StdShMemProvider},
ClientId, SimpleStdErrLogger,
},
Error,
};
#[cfg(feature = "std")]
const _TAG_SIMPLE_U32_V1: Tag = Tag(0x5130_0321);
#[cfg(feature = "std")]
const _TAG_MATH_RESULT_V1: Tag = Tag(0x7747_4331);
#[cfg(feature = "std")]
const _TAG_1MEG_V1: Tag = Tag(0xB111_1161);
/// The time the broker will wait for things to happen before printing a message
#[cfg(feature = "std")]
const BROKER_TIMEOUT: Duration = Duration::from_secs(10);
/// How long the broker may sleep between forwarding a new chunk of sent messages
#[cfg(feature = "std")]
const SLEEP_BETWEEN_FORWARDS: Duration = Duration::from_millis(5);
#[cfg(feature = "std")]
static LOGGER: SimpleStdErrLogger = SimpleStdErrLogger::debug();
const _TAG_SIMPLE_U32_V1: Tag = 0x5130_0321;
const _TAG_MATH_RESULT_V1: Tag = 0x7747_4331;
const _TAG_1MEG_V1: Tag = 0xB111_1161;
#[cfg(all(any(unix, windows), feature = "std"))]
fn adder_loop(port: u16) -> ! {
let shmem_provider = StdShMemProvider::new().unwrap();
let mut client = llmp::LlmpClient::create_attach_to_tcp(shmem_provider, port).unwrap();
#[cfg(feature = "std")]
fn adder_loop(port: u16) -> Result<(), Box<dyn std::error::Error>> {
let shmem_provider = StdShMemProvider::new()?;
let mut client = llmp::LlmpClient::create_attach_to_tcp(shmem_provider, port)?;
let mut last_result: u32 = 0;
let mut current_result: u32 = 0;
loop {
let mut msg_counter = 0;
loop {
let Some((sender, tag, buf)) = client.recv_buf().unwrap() else { break };
let Some((sender, tag, buf)) = client.recv_buf()? else { break };
msg_counter += 1;
match tag {
_TAG_SIMPLE_U32_V1 => {
current_result =
current_result.wrapping_add(u32::from_le_bytes(buf.try_into().unwrap()));
current_result.wrapping_add(u32::from_le_bytes(buf.try_into()?));
}
_ => println!(
"Adder Client ignored unknown message {:#x} from client {} with {} bytes",
"Adder Client ignored unknown message {:?} from client {:?} with {} bytes",
tag,
sender,
buf.len()
@ -52,9 +64,7 @@ fn adder_loop(port: u16) -> ! {
if current_result != last_result {
println!("Adder handled {msg_counter} messages, reporting {current_result} to broker");
client
.send_buf(_TAG_MATH_RESULT_V1, &current_result.to_le_bytes())
.unwrap();
client.send_buf(_TAG_MATH_RESULT_V1, &current_result.to_le_bytes())?;
last_result = current_result;
}
@ -62,10 +72,9 @@ fn adder_loop(port: u16) -> ! {
}
}
#[cfg(all(any(unix, windows), feature = "std"))]
fn large_msg_loop(port: u16) -> ! {
let mut client =
llmp::LlmpClient::create_attach_to_tcp(StdShMemProvider::new().unwrap(), port).unwrap();
#[cfg(feature = "std")]
fn large_msg_loop(port: u16) -> Result<(), Box<dyn std::error::Error>> {
let mut client = llmp::LlmpClient::create_attach_to_tcp(StdShMemProvider::new()?, port)?;
#[cfg(not(target_vendor = "apple"))]
let meg_buf = vec![1u8; 1 << 20];
@ -73,7 +82,7 @@ fn large_msg_loop(port: u16) -> ! {
let meg_buf = vec![1u8; 1 << 19];
loop {
client.send_buf(_TAG_1MEG_V1, &meg_buf).unwrap();
client.send_buf(_TAG_1MEG_V1, &meg_buf)?;
#[cfg(not(target_vendor = "apple"))]
println!("Sending the next megabyte");
#[cfg(target_vendor = "apple")]
@ -83,31 +92,36 @@ fn large_msg_loop(port: u16) -> ! {
}
#[allow(clippy::unnecessary_wraps)]
#[cfg(all(any(unix, windows), feature = "std"))]
#[cfg(feature = "std")]
fn broker_message_hook(
client_id: u32,
tag: llmp::Tag,
_flags: llmp::Flags,
message: &[u8],
msg_or_timeout: Option<(ClientId, llmp::Tag, llmp::Flags, &[u8])>,
) -> Result<llmp::LlmpMsgHookResult, Error> {
let Some((client_id, tag, _flags, message)) = msg_or_timeout else {
println!(
"No client did anything for {} seconds..",
BROKER_TIMEOUT.as_secs()
);
return Ok(llmp::LlmpMsgHookResult::Handled);
};
match tag {
_TAG_SIMPLE_U32_V1 => {
println!(
"Client {:?} sent message: {:?}",
client_id,
u32::from_le_bytes(message.try_into().unwrap())
u32::from_le_bytes(message.try_into()?)
);
Ok(llmp::LlmpMsgHookResult::ForwardToClients)
}
_TAG_MATH_RESULT_V1 => {
println!(
"Adder Client has this current result: {:?}",
u32::from_le_bytes(message.try_into().unwrap())
u32::from_le_bytes(message.try_into()?)
);
Ok(llmp::LlmpMsgHookResult::Handled)
}
_ => {
println!("Unknwon message id received!");
println!("Unknown message id received: {tag:?}");
Ok(llmp::LlmpMsgHookResult::ForwardToClients)
}
}
@ -115,67 +129,85 @@ fn broker_message_hook(
#[cfg(not(any(unix, windows)))]
fn main() {
todo!("LLMP is not yet supported on this platform.");
eprintln!("LLMP example is currently not supported on no_std. Implement ShMem for no_std.");
}
#[cfg(any(unix, windows))]
fn main() {
fn main() -> Result<(), Box<dyn std::error::Error>> {
/* The main node has a broker, and a few worker threads */
let mode = std::env::args()
.nth(1)
.expect("no mode specified, chose 'broker', 'b2b', 'ctr', 'adder', or 'large'");
.expect("no mode specified, chose 'broker', 'b2b', 'ctr', 'adder', 'large', or 'exiting'");
let port: u16 = std::env::args()
.nth(2)
.unwrap_or_else(|| "1337".into())
.parse::<u16>()
.unwrap();
.parse::<u16>()?;
// in the b2b use-case, this is our "own" port, we connect to the "normal" broker node on startup.
let b2b_port: u16 = std::env::args()
.nth(3)
.unwrap_or_else(|| "4242".into())
.parse::<u16>()
.unwrap();
.parse::<u16>()?;
log::set_logger(&LOGGER).unwrap();
log::set_max_level(log::LevelFilter::Trace);
println!("Launching in mode {mode} on port {port}");
match mode.as_str() {
"broker" => {
let mut broker = llmp::LlmpBroker::new(StdShMemProvider::new().unwrap()).unwrap();
broker.launch_tcp_listener_on(port).unwrap();
broker.loop_forever(&mut broker_message_hook, Some(Duration::from_millis(5)));
let mut broker = llmp::LlmpBroker::new(StdShMemProvider::new()?)?;
broker.launch_tcp_listener_on(port)?;
// Exit when we got at least _n_ nodes, and all of them quit.
broker.set_exit_cleanly_after(NonZeroUsize::new(1_usize).unwrap());
broker.loop_with_timeouts(
&mut broker_message_hook,
BROKER_TIMEOUT,
Some(SLEEP_BETWEEN_FORWARDS),
);
}
"b2b" => {
let mut broker = llmp::LlmpBroker::new(StdShMemProvider::new().unwrap()).unwrap();
broker.launch_tcp_listener_on(b2b_port).unwrap();
let mut broker = llmp::LlmpBroker::new(StdShMemProvider::new()?)?;
broker.launch_tcp_listener_on(b2b_port)?;
// connect back to the main broker.
broker.connect_b2b(("127.0.0.1", port)).unwrap();
broker.loop_forever(&mut broker_message_hook, Some(Duration::from_millis(5)));
broker.connect_b2b(("127.0.0.1", port))?;
broker.loop_with_timeouts(
&mut broker_message_hook,
BROKER_TIMEOUT,
Some(SLEEP_BETWEEN_FORWARDS),
);
}
"ctr" => {
let mut client =
llmp::LlmpClient::create_attach_to_tcp(StdShMemProvider::new().unwrap(), port)
.unwrap();
llmp::LlmpClient::create_attach_to_tcp(StdShMemProvider::new()?, port)?;
let mut counter: u32 = 0;
loop {
counter = counter.wrapping_add(1);
client
.send_buf(_TAG_SIMPLE_U32_V1, &counter.to_le_bytes())
.unwrap();
client.send_buf(_TAG_SIMPLE_U32_V1, &counter.to_le_bytes())?;
println!("CTR Client writing {counter}");
thread::sleep(Duration::from_secs(1));
}
}
"adder" => {
adder_loop(port);
adder_loop(port)?;
}
"large" => {
large_msg_loop(port);
large_msg_loop(port)?;
}
"exiting" => {
let mut client =
llmp::LlmpClient::create_attach_to_tcp(StdShMemProvider::new()?, port)?;
for i in 0..10_u32 {
client.send_buf(_TAG_SIMPLE_U32_V1, &i.to_le_bytes())?;
println!("Exiting Client writing {i}");
thread::sleep(Duration::from_millis(10));
}
log::info!("Exiting Client exits");
client.sender.send_exiting()?;
}
_ => {
println!("No valid mode supplied");
}
}
Ok(())
}

View File

@ -45,11 +45,12 @@ pub fn get_core_ids() -> Result<Vec<CoreId>, Error> {
}
/// This represents a CPU core.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub struct CoreId {
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Hash)]
#[repr(transparent)]
pub struct CoreId(
/// The numerical `id` of a core
pub id: usize,
}
pub usize,
);
impl CoreId {
/// Set the affinity of the current process to this [`CoreId`]
@ -73,18 +74,18 @@ impl CoreId {
impl From<usize> for CoreId {
fn from(id: usize) -> Self {
CoreId { id }
CoreId(id)
}
}
impl From<CoreId> for usize {
fn from(core_id: CoreId) -> usize {
core_id.id
core_id.0
}
}
/// A list of [`CoreId`] to use for fuzzing
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Hash)]
pub struct Cores {
/// The original commandline used during parsing
pub cmdline: String,
@ -100,6 +101,19 @@ impl Cores {
Self::from_cmdline("all")
}
/// Trims the number of cores to the given value, dropping additional cores
pub fn trim(&mut self, count: usize) -> Result<(), Error> {
if count > self.ids.len() {
return Err(Error::illegal_argument(format!(
"Core trim value {count} is larger than number of chosen cores of {}",
self.ids.len()
)));
}
self.ids.resize(count, CoreId(0));
Ok(())
}
/// Parses core binding args from user input.
/// Returns a Vec of CPU IDs.
/// * `./fuzzer --cores 1,2-4,6`: clients run in cores `1,2,3,4,6`
@ -148,6 +162,18 @@ impl Cores {
let core_id = CoreId::from(core_id);
self.ids.contains(&core_id)
}
/// Returns the index/position of the given [`CoreId`] in this cores.ids list.
/// Will return `None`, if [`CoreId`] wasn't found.
#[must_use]
pub fn position(&self, core_id: CoreId) -> Option<usize> {
// Since cores a low number, iterating is const-size,
// and should be faster than hashmap lookups.
// Prove me wrong.
self.ids
.iter()
.position(|&cur_core_id| cur_core_id == core_id)
}
}
impl From<&[usize]> for Cores {
@ -183,11 +209,7 @@ impl TryFrom<&str> for Cores {
#[cfg(feature = "std")]
#[deprecated(since = "0.8.1", note = "Use Cores::from_cmdline instead")]
pub fn parse_core_bind_arg(args: &str) -> Result<Vec<usize>, Error> {
Ok(Cores::from_cmdline(args)?
.ids
.iter()
.map(|x| x.id)
.collect())
Ok(Cores::from_cmdline(args)?.ids.iter().map(|x| x.0).collect())
}
// Linux Section
@ -226,7 +248,7 @@ mod linux {
for i in 0..CPU_SETSIZE as usize {
if unsafe { CPU_ISSET(i, &full_set) } {
core_ids.push(CoreId { id: i });
core_ids.push(CoreId(i));
}
}
@ -238,7 +260,7 @@ mod linux {
// one core active.
let mut set = new_cpu_set();
unsafe { CPU_SET(core_id.id, &mut set) };
unsafe { CPU_SET(core_id.0, &mut set) };
// Set the current thread's core affinity.
let result = unsafe {
@ -301,7 +323,7 @@ mod linux {
// Ensure that the system pinned the current thread
// to the specified core.
let mut core_mask = new_cpu_set();
unsafe { CPU_SET(ids[0].id, &mut core_mask) };
unsafe { CPU_SET(ids[0].0, &mut core_mask) };
let new_mask = get_affinity_mask().unwrap();
@ -351,7 +373,7 @@ mod windows {
match get_num_logical_cpus_ex_windows() {
Some(total_cores) => {
for i in 0..total_cores {
core_ids.push(CoreId { id: i });
core_ids.push(CoreId(i));
}
Ok(core_ids)
}
@ -563,14 +585,14 @@ mod apple {
#[allow(clippy::unnecessary_wraps)]
pub fn get_core_ids() -> Result<Vec<CoreId>, Error> {
Ok((0..(usize::from(available_parallelism()?)))
.map(|n| CoreId { id: n })
.map(CoreId)
.collect::<Vec<_>>())
}
#[cfg(target_arch = "x86_64")]
pub fn set_for_current(core_id: CoreId) -> Result<(), Error> {
let mut info = thread_affinity_policy_data_t {
affinity_tag: core_id.id.try_into().unwrap(),
affinity_tag: core_id.0.try_into().unwrap(),
};
unsafe {
@ -640,7 +662,7 @@ mod freebsd {
pub fn get_core_ids() -> Result<Vec<CoreId>, Error> {
Ok((0..(usize::from(available_parallelism()?)))
.into_iter()
.map(|n| CoreId { id: n })
.map(|n| CoreId(n))
.collect::<Vec<_>>())
}
@ -648,7 +670,7 @@ mod freebsd {
// Turn `core_id` into a `libc::cpuset_t` with only
let mut set = new_cpuset();
unsafe { CPU_SET(core_id.id, &mut set) };
unsafe { CPU_SET(core_id.0, &mut set) };
// Set the current thread's core affinity.
let result = unsafe {
@ -769,14 +791,14 @@ mod netbsd {
pub fn get_core_ids() -> Result<Vec<CoreId>, Error> {
Ok((0..(usize::from(available_parallelism()?)))
.into_iter()
.map(|n| CoreId { id: n })
.map(|n| CoreId(n))
.collect::<Vec<_>>())
}
pub fn set_for_current(core_id: CoreId) -> Result<(), Error> {
let set = new_cpuset();
unsafe { _cpuset_set(core_id.id as u64, set) };
unsafe { _cpuset_set(core_id.0 as u64, set) };
// Set the current thread's core affinity.
let result = unsafe {
pthread_setaffinity_np(
@ -824,7 +846,7 @@ mod openbsd {
pub fn get_core_ids() -> Result<Vec<CoreId>, Error> {
Ok((0..(usize::from(available_parallelism()?)))
.into_iter()
.map(|n| CoreId { id: n })
.map(|n| CoreId(n))
.collect::<Vec<_>>())
}
}
@ -853,7 +875,7 @@ mod solaris {
pub fn get_core_ids() -> Result<Vec<CoreId>, Error> {
Ok((0..(usize::from(available_parallelism()?)))
.into_iter()
.map(|n| CoreId { id: n })
.map(|n| CoreId(n))
.collect::<Vec<_>>())
}
@ -862,7 +884,7 @@ mod solaris {
libc::processor_bind(
libc::P_PID,
libc::PS_MYID,
core_id.id as i32,
core_id.0 as i32,
std::ptr::null_mut(),
)
};

View File

@ -12,9 +12,12 @@
#[cfg(all(feature = "std"))]
use alloc::string::ToString;
use core::fmt::{self, Debug, Formatter};
#[cfg(feature = "std")]
use core::marker::PhantomData;
use core::{
fmt::{self, Debug, Formatter},
num::NonZeroUsize,
};
#[cfg(feature = "std")]
use std::net::SocketAddr;
#[cfg(all(feature = "std", any(windows, not(feature = "fork"))))]
@ -27,12 +30,14 @@ use serde::de::DeserializeOwned;
#[cfg(feature = "std")]
use typed_builder::TypedBuilder;
#[cfg(all(feature = "std", any(windows, not(feature = "fork"))))]
use crate::bolts::core_affinity::CoreId;
use super::core_affinity::CoreId;
#[cfg(all(feature = "std", any(windows, not(feature = "fork"))))]
use crate::bolts::os::startable_self;
#[cfg(all(unix, feature = "std", feature = "fork"))]
use crate::bolts::os::{dup2, fork, ForkResult};
use crate::bolts::{
core_affinity::get_core_ids,
os::{dup2, fork, ForkResult},
};
use crate::inputs::UsesInput;
#[cfg(feature = "std")]
use crate::{
@ -51,7 +56,7 @@ const _AFL_LAUNCHER_CLIENT: &str = "AFL_LAUNCHER_CLIENT";
#[allow(clippy::type_complexity, missing_debug_implementations)]
pub struct Launcher<'a, CF, MT, S, SP>
where
CF: FnOnce(Option<S>, LlmpRestartingEventManager<S, SP>, usize) -> Result<(), Error>,
CF: FnOnce(Option<S>, LlmpRestartingEventManager<S, SP>, CoreId) -> Result<(), Error>,
S::Input: 'a,
MT: Monitor,
SP: ShMemProvider + 'static,
@ -90,7 +95,7 @@ where
impl<CF, MT, S, SP> Debug for Launcher<'_, CF, MT, S, SP>
where
CF: FnOnce(Option<S>, LlmpRestartingEventManager<S, SP>, usize) -> Result<(), Error>,
CF: FnOnce(Option<S>, LlmpRestartingEventManager<S, SP>, CoreId) -> Result<(), Error>,
MT: Monitor + Clone,
SP: ShMemProvider + 'static,
S: DeserializeOwned + UsesInput,
@ -110,7 +115,7 @@ where
#[cfg(feature = "std")]
impl<'a, CF, MT, S, SP> Launcher<'a, CF, MT, S, SP>
where
CF: FnOnce(Option<S>, LlmpRestartingEventManager<S, SP>, usize) -> Result<(), Error>,
CF: FnOnce(Option<S>, LlmpRestartingEventManager<S, SP>, CoreId) -> Result<(), Error>,
MT: Monitor + Clone,
S: DeserializeOwned + UsesInput + HasExecutions + HasClientPerfMonitor,
SP: ShMemProvider + 'static,
@ -119,7 +124,11 @@ where
#[cfg(all(unix, feature = "std", feature = "fork"))]
#[allow(clippy::similar_names)]
pub fn launch(&mut self) -> Result<(), Error> {
use crate::bolts::core_affinity::get_core_ids;
if self.cores.ids.is_empty() {
return Err(Error::illegal_argument(
"No cores to spawn on given, cannot launch anything.",
));
}
if self.run_client.is_none() {
return Err(Error::illegal_argument(
@ -159,7 +168,7 @@ where
self.shmem_provider.post_fork(true)?;
#[cfg(feature = "std")]
std::thread::sleep(std::time::Duration::from_millis(index * 100));
std::thread::sleep(std::time::Duration::from_millis(index * 10));
#[cfg(feature = "std")]
if !debug_output {
@ -180,7 +189,7 @@ where
.build()
.launch()?;
return (self.run_client.take().unwrap())(state, mgr, bind_to.id);
return (self.run_client.take().unwrap())(state, mgr, *bind_to);
}
};
}
@ -197,6 +206,7 @@ where
.broker_port(self.broker_port)
.kind(ManagerKind::Broker)
.remote_broker_addr(self.remote_broker_addr)
.exit_cleanly_after(Some(NonZeroUsize::try_from(self.cores.ids.len()).unwrap()))
.configuration(self.configuration)
.build()
.launch()?;
@ -242,13 +252,13 @@ where
.shmem_provider(self.shmem_provider.clone())
.broker_port(self.broker_port)
.kind(ManagerKind::Client {
cpu_core: Some(CoreId { id: core_id }),
cpu_core: Some(CoreId(core_id)),
})
.configuration(self.configuration)
.build()
.launch()?;
return (self.run_client.take().unwrap())(state, mgr, core_id);
return (self.run_client.take().unwrap())(state, mgr, CoreId(core_id));
}
Err(std::env::VarError::NotPresent) => {
// I am a broker
@ -285,6 +295,14 @@ where
Err(_) => panic!("Env variables are broken, received non-unicode!"),
};
// It's fine to check this after the client spawn loop - since we won't have spawned any clients...
// Doing it later means one less check in each spawned process.
if self.cores.ids.is_empty() {
return Err(Error::illegal_argument(
"No cores to spawn on given, cannot launch anything.",
));
}
if self.spawn_broker {
#[cfg(feature = "std")]
log::info!("I am broker!!.");
@ -295,6 +313,7 @@ where
.broker_port(self.broker_port)
.kind(ManagerKind::Broker)
.remote_broker_addr(self.remote_broker_addr)
.exit_cleanly_after(Some(NonZeroUsize::try_from(self.cores.ids.len()).unwrap()))
.configuration(self.configuration)
.build()
.launch()?;

View File

@ -69,6 +69,8 @@ use core::{
fmt::Debug,
hint,
mem::size_of,
num::NonZeroUsize,
ops::{BitAnd, BitOr, Not},
ptr, slice,
sync::atomic::{fence, AtomicU16, Ordering},
time::Duration,
@ -92,15 +94,24 @@ use backtrace::Backtrace;
use nix::sys::socket::{self, sockopt::ReusePort};
use serde::{Deserialize, Serialize};
#[cfg(feature = "std")]
use crate::bolts::current_time;
#[cfg(unix)]
use crate::bolts::os::unix_signals::{
setup_signal_handler, siginfo_t, ucontext_t, Handler, Signal,
};
use crate::{
bolts::shmem::{ShMem, ShMemDescription, ShMemId, ShMemProvider},
bolts::{
shmem::{ShMem, ShMemDescription, ShMemId, ShMemProvider},
ClientId,
},
Error,
};
/// The timeout after which a client will be considered stale, and removed.
#[cfg(feature = "std")]
const CLIENT_TIMEOUT: Duration = Duration::from_secs(60);
/// The max number of pages a [`client`] may have mapped that were not yet read by the [`broker`]
/// Usually, this value should not exceed `1`, else the broker cannot keep up with the amount of incoming messages.
/// Instead of increasing this value, you may consider sending new messages at a lower rate, else your Sender will eventually `OOM`.
@ -116,25 +127,25 @@ const LLMP_CFG_INITIAL_MAP_SIZE: usize = 1 << 20;
const LLMP_CFG_ALIGNNMENT: usize = 64;
/// A msg fresh from the press: No tag got sent by the user yet
const LLMP_TAG_UNSET: Tag = 0xDEADAF;
const LLMP_TAG_UNSET: Tag = Tag(0xDEADAF);
/// This message should not exist yet. Some bug in unsafe code!
const LLMP_TAG_UNINITIALIZED: Tag = 0xA143AF11;
const LLMP_TAG_UNINITIALIZED: Tag = Tag(0xA143AF11);
/// The end of page message
/// When receiving this, a new sharedmap needs to be allocated.
const LLMP_TAG_END_OF_PAGE: Tag = 0xAF1E0F1;
const LLMP_TAG_END_OF_PAGE: Tag = Tag(0xAF1E0F1);
/// A new client for this broker got added.
const LLMP_TAG_NEW_SHM_CLIENT: Tag = 0xC11E471;
const LLMP_TAG_NEW_SHM_CLIENT: Tag = Tag(0xC11E471);
/// The sender on this map is exiting (if broker exits, clients should exit gracefully);
const LLMP_TAG_EXITING: Tag = 0x13C5171;
const LLMP_TAG_EXITING: Tag = Tag(0x13C5171);
/// Client gave up as the receiver/broker was too slow
const LLMP_SLOW_RECEIVER_PANIC: Tag = 0x70051041;
const LLMP_SLOW_RECEIVER_PANIC: Tag = Tag(0x70051041);
/// Unused...
pub const LLMP_FLAG_INITIALIZED: Flags = 0x0;
pub const LLMP_FLAG_INITIALIZED: Flags = Flags(0x0);
/// This message was compressed in transit
pub const LLMP_FLAG_COMPRESSED: Flags = 0x1;
pub const LLMP_FLAG_COMPRESSED: Flags = Flags(0x1);
/// From another broker.
pub const LLMP_FLAG_FROM_B2B: Flags = 0x2;
pub const LLMP_FLAG_FROM_B2B: Flags = Flags(0x2);
/// Timt the broker 2 broker connection waits for incoming data,
/// before checking for own data to forward again.
@ -170,19 +181,75 @@ static mut LLMP_SIGHANDLER_STATE: LlmpShutdownSignalHandler = LlmpShutdownSignal
};
/// TAGs used throughout llmp
pub type Tag = u32;
/// The client ID == the sender id.
pub type ClientId = u32;
#[repr(transparent)]
#[derive(Copy, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
pub struct Tag(pub u32);
impl Debug for Tag {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
f.write_fmt(format_args!("Tag({:X})", self.0))
}
}
/// The broker ID, for broker 2 broker communication.
pub type BrokerId = u32;
#[repr(transparent)]
#[derive(
Debug, Default, Copy, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize,
)]
pub struct BrokerId(pub u32);
/// The flags, indicating, for example, enabled compression.
pub type Flags = u32;
#[repr(transparent)]
#[derive(Copy, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct Flags(u32);
impl Debug for Flags {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
f.write_fmt(format_args!("Flags{:x}( ", self.0))?;
// Initialized is the default value, no need to print.
if *self & LLMP_FLAG_COMPRESSED == LLMP_FLAG_COMPRESSED {
f.write_str("COMPRESSED")?;
}
if *self & LLMP_FLAG_FROM_B2B == LLMP_FLAG_FROM_B2B {
f.write_str("FROM_B2B")?;
}
f.write_str(" )")
}
}
impl BitAnd for Flags {
type Output = Self;
fn bitand(self, rhs: Self) -> Self::Output {
Self(self.0 & rhs.0)
}
}
impl BitOr for Flags {
type Output = Self;
fn bitor(self, rhs: Self) -> Self::Output {
Self(self.0 | rhs.0)
}
}
impl Not for Flags {
type Output = Self;
fn not(self) -> Self::Output {
Self(!self.0)
}
}
/// The message ID, an ever-increasing number, unique only to a sharedmap/page.
#[cfg(target_pointer_width = "64")]
pub type MessageId = u64;
#[repr(transparent)]
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
pub struct MessageId(u64);
/// The message ID, an ever-increasing number, unique only to a sharedmap/page.
#[cfg(not(target_pointer_width = "64"))]
pub type MessageId = u32;
#[repr(transparent)]
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
pub struct MessageId(u32);
/// This is for the server the broker will spawn.
/// If an llmp connection is local - use sharedmaps
@ -477,7 +544,7 @@ unsafe fn llmp_page_init<SHM: ShMem>(shmem: &mut SHM, sender_id: ClientId, allow
// Don't forget to subtract our own header size
(*page).size_total = map_size - LLMP_PAGE_HEADER_LEN;
(*page).size_used = 0;
(*(*page).messages.as_mut_ptr()).message_id = 0;
(*(*page).messages.as_mut_ptr()).message_id = MessageId(0);
(*(*page).messages.as_mut_ptr()).tag = LLMP_TAG_UNSET;
(*page).safe_to_unmap.store(0, Ordering::Relaxed);
(*page).sender_dead.store(0, Ordering::Relaxed);
@ -827,14 +894,14 @@ where
Ok(if client_id_str == _NULL_ENV_STR {
None
} else {
Some(client_id_str.parse()?)
Some(ClientId(client_id_str.parse()?))
})
}
/// Writes the `id` to an env var
#[cfg(feature = "std")]
fn client_id_to_env(env_name: &str, id: ClientId) {
env::set_var(format!("{env_name}_CLIENT_ID"), format!("{id}"));
env::set_var(format!("{env_name}_CLIENT_ID"), format!("{}", id.0));
}
/// Reattach to a vacant `out_shmem`, to with a previous sender stored the information in an env before.
@ -986,9 +1053,9 @@ where
// We don't need to pad the EOP message: it'll always be the last in this page.
(*ret).buf_len_padded = (*ret).buf_len;
(*ret).message_id = if last_msg.is_null() {
1
MessageId(1)
} else {
(*last_msg).message_id + 1
MessageId((*last_msg).message_id.0 + 1)
};
(*ret).tag = LLMP_TAG_END_OF_PAGE;
(*page).size_used += EOP_MSG_SIZE;
@ -1050,12 +1117,12 @@ where
/* We need to start with 1 for ids, as current message id is initialized
* with 0... */
(*ret).message_id = if last_msg.is_null() {
1
} else if (*page).current_msg_id.load(Ordering::Relaxed) == (*last_msg).message_id {
(*last_msg).message_id + 1
MessageId(1)
} else if (*page).current_msg_id.load(Ordering::Relaxed) == (*last_msg).message_id.0 {
MessageId((*last_msg).message_id.0 + 1)
} else {
/* Oops, wrong usage! */
panic!("BUG: The current message never got committed using send! (page->current_msg_id {:?}, last_msg->message_id: {})", ptr::addr_of!((*page).current_msg_id), (*last_msg).message_id);
panic!("BUG: The current message never got committed using send! (page->current_msg_id {:?}, last_msg->message_id: {:?})", ptr::addr_of!((*page).current_msg_id), (*last_msg).message_id);
};
(*ret).buf_len = buf_len as u64;
@ -1087,7 +1154,7 @@ where
assert!(self.last_msg_sent != msg, "Message sent twice!");
assert!(
(*msg).tag != LLMP_TAG_UNSET,
"No tag set on message with id {}",
"No tag set on message with id {:?}",
(*msg).message_id
);
// A client gets the sender id assigned to by the broker during the initial handshake.
@ -1101,12 +1168,12 @@ where
)));
}
(*msg).message_id = (*page).current_msg_id.load(Ordering::Relaxed) + 1;
(*msg).message_id.0 = (*page).current_msg_id.load(Ordering::Relaxed) + 1;
// Make sure all things have been written to the page, and commit the message to the page
(*page)
.current_msg_id
.store((*msg).message_id, Ordering::Release);
.store((*msg).message_id.0, Ordering::Release);
self.last_msg_sent = msg;
self.has_unsent_message = false;
@ -1169,7 +1236,7 @@ where
// If we want to get red if old pages, (client to broker), do that now
if !self.keep_pages_forever {
#[cfg(feature = "llmp_debug")]
log::info!("pruning");
log::debug!("LLMP DEBUG: pruning old pages");
self.prune_old_pages();
}
@ -1231,7 +1298,7 @@ where
}
#[cfg(feature = "llmp_debug")]
log::info!("Handled out eop");
log::debug!("Handled out eop");
match unsafe { self.alloc_next_if_space(buf_len) } {
Some(msg) => Ok(msg),
@ -1304,7 +1371,7 @@ where
|| tag == LLMP_TAG_UNSET
{
return Err(Error::unknown(format!(
"Reserved tag supplied to send_buf ({tag:#X})"
"Reserved tag supplied to send_buf ({tag:?})"
)));
}
@ -1327,7 +1394,7 @@ where
|| tag == LLMP_TAG_UNSET
{
return Err(Error::unknown(format!(
"Reserved tag supplied to send_buf ({tag:#X})"
"Reserved tag supplied to send_buf ({tag:?})"
)));
}
@ -1367,6 +1434,13 @@ where
description.last_message_offset,
)
}
/// Send information that this client is exiting.
/// The other side may free up all allocated memory.
/// We are no longer allowed to send anything afterwards.
pub fn send_exiting(&mut self) -> Result<(), Error> {
self.send_buf(LLMP_TAG_EXITING, &[])
}
}
/// Receiving end on a (unidirectional) sharedmap channel
@ -1375,10 +1449,13 @@ pub struct LlmpReceiver<SP>
where
SP: ShMemProvider,
{
/// Id of this provider
pub id: u32,
/// Client Id of this receiver
pub id: ClientId,
/// Pointer to the last message received
pub last_msg_recvd: *const LlmpMsg,
/// Time we received the last message from this receiver
#[cfg(feature = "std")]
last_msg_time: Duration,
/// The shmem provider
pub shmem_provider: SP,
/// current page. After EOP, this gets replaced with the new one
@ -1426,16 +1503,22 @@ where
};
Ok(Self {
id: 0,
id: ClientId(0),
current_recv_shmem,
last_msg_recvd,
shmem_provider,
highest_msg_id: 0,
highest_msg_id: MessageId(0),
// We don't know the last received time, just assume the current time.
#[cfg(feature = "std")]
last_msg_time: current_time(),
})
}
// Never inline, to not get some strange effects
/// Read next message.
/// Returns a pointer to the [`LlmpMsg`], `None` of no message exists, or an [`Error`].
///
/// Will *not* update `self.last_msg_time`.
#[inline(never)]
unsafe fn recv(&mut self) -> Result<Option<*mut LlmpMsg>, Error> {
/* DBG("recv %p %p\n", page, last_msg); */
@ -1449,12 +1532,12 @@ where
} else {
// read the msg_id from shared map
let current_msg_id = (*page).current_msg_id.load(Ordering::Relaxed);
self.highest_msg_id = current_msg_id;
(current_msg_id, true)
self.highest_msg_id = MessageId(current_msg_id);
(MessageId(current_msg_id), true)
};
// Read the message from the page
let ret = if current_msg_id == 0 {
let ret = if current_msg_id.0 == 0 {
/* No messages yet */
None
} else if last_msg.is_null() {
@ -1485,7 +1568,7 @@ where
// Handle special, LLMP internal, messages.
match (*msg).tag {
LLMP_TAG_UNSET => panic!(
"BUG: Read unallocated msg (tag was {:x} - msg header: {:?}",
"BUG: Read unallocated msg (tag was {:?} - msg header: {:?}",
LLMP_TAG_UNSET,
&(*msg)
),
@ -1495,7 +1578,7 @@ where
return Err(Error::shutting_down());
}
LLMP_TAG_END_OF_PAGE => {
log::info!("Received end of page, allocating next");
log::debug!("Received end of page, allocating next");
// Handle end of page
assert!(
(*msg).buf_len >= size_of::<LlmpPayloadSharedMapInfo>() as u64,
@ -1514,7 +1597,7 @@ where
// Set last msg we received to null (as the map may no longer exist)
self.last_msg_recvd = ptr::null();
self.highest_msg_id = 0;
self.highest_msg_id = MessageId(0);
// Mark the old page save to unmap, in case we didn't do so earlier.
(*page).safe_to_unmap.store(1, Ordering::Relaxed);
@ -1552,7 +1635,7 @@ where
/// # Safety
/// Returns a raw ptr, on the recv map. Should be safe in general
pub unsafe fn recv_blocking(&mut self) -> Result<*mut LlmpMsg, Error> {
let mut current_msg_id = 0;
let mut current_msg_id = MessageId(0);
let page = self.current_recv_shmem.page_mut();
let last_msg = self.last_msg_recvd;
if !last_msg.is_null() {
@ -1564,7 +1647,7 @@ where
current_msg_id = (*last_msg).message_id;
}
loop {
if (*page).current_msg_id.load(Ordering::Relaxed) != current_msg_id {
if (*page).current_msg_id.load(Ordering::Relaxed) != current_msg_id.0 {
return match self.recv()? {
Some(msg) => Ok(msg),
None => panic!("BUG: blocking llmp message should never be NULL"),
@ -1805,6 +1888,16 @@ where
/// This allows us to intercept messages right in the broker.
/// This keeps the out map clean.
pub llmp_clients: Vec<LlmpReceiver<SP>>,
/// The own listeners we spawned via `launch_listener` or `crate_attach_to_tcp`.
/// Listeners will be ignored for `exit_cleanly_after` and they are never considered to have timed out.
listeners: Vec<ClientId>,
/// The total amount of clients we had, historically, including those that disconnected, and our listeners.
num_clients_total: usize,
/// The amount of total clients that should have connected and (and disconnected)
/// after which the broker loop should quit gracefully.
pub exit_cleanly_after: Option<NonZeroUsize>,
/// Clients that should be removed soon, (offset into llmp_clients)
clients_to_remove: Vec<u32>,
/// The ShMemProvider to use
shmem_provider: SP,
}
@ -1839,10 +1932,10 @@ where
pub fn new(mut shmem_provider: SP) -> Result<Self, Error> {
Ok(LlmpBroker {
llmp_out: LlmpSender {
id: 0,
id: ClientId(0),
last_msg_sent: ptr::null_mut(),
out_shmems: vec![LlmpSharedMap::new(
0,
ClientId(0),
shmem_provider.new_shmem(next_shmem_size(0))?,
)],
// Broker never cleans up the pages so that new
@ -1853,7 +1946,11 @@ where
unused_shmem_cache: vec![],
},
llmp_clients: vec![],
clients_to_remove: vec![],
shmem_provider,
listeners: vec![],
exit_cleanly_after: None,
num_clients_total: 0,
})
}
@ -1870,6 +1967,15 @@ where
}
}
/// Set this broker to exit after at least `count` clients attached and all client exited.
/// Will ignore the own listener thread, if `create_attach_to_tcp`
///
/// So, if the `n_client` value is `2`, the broker will not exit after client 1 connected and disconnected,
/// but it will quit after client 2 connected and disconnected.
pub fn set_exit_cleanly_after(&mut self, n_clients: NonZeroUsize) {
self.exit_cleanly_after = Some(n_clients);
}
/// Allocate the next message on the outgoing map
unsafe fn alloc_next(&mut self, buf_len: usize) -> Result<*mut LlmpMsg, Error> {
self.llmp_out.alloc_next(buf_len)
@ -1877,18 +1983,25 @@ where
/// Registers a new client for the given sharedmap str and size.
/// Returns the id of the new client in [`broker.client_shmem`]
pub fn register_client(&mut self, mut client_page: LlmpSharedMap<SP::ShMem>) {
// Tell the client it may unmap this page now.
pub fn register_client(&mut self, mut client_page: LlmpSharedMap<SP::ShMem>) -> ClientId {
// Tell the client it may unmap its initial allocated shmem page now.
// Since we now have a handle to it, it won't be umapped too early (only after we also unmap it)
client_page.mark_safe_to_unmap();
let id = self.llmp_clients.len() as u32;
let id = ClientId(self.num_clients_total.try_into().unwrap());
self.llmp_clients.push(LlmpReceiver {
id,
current_recv_shmem: client_page,
last_msg_recvd: ptr::null_mut(),
shmem_provider: self.shmem_provider.clone(),
highest_msg_id: 0,
highest_msg_id: MessageId(0),
// We don't know the last received time, just assume the current time.
#[cfg(feature = "std")]
last_msg_time: current_time(),
});
self.num_clients_total += 1;
id
}
/// Connects to a broker running on another machine.
@ -1923,7 +2036,7 @@ where
let broker_id = match recv_tcp_msg(&mut stream)?.try_into()? {
TcpResponse::RemoteBrokerAccepted { broker_id } => {
log::info!("B2B: Got Connection Ack, broker_id {broker_id}");
log::info!("B2B: Got Connection Ack, broker_id {broker_id:?}");
broker_id
}
_ => {
@ -1934,12 +2047,12 @@ where
};
// TODO: use broker ids!
log::info!("B2B: We are broker {broker_id}");
log::info!("B2B: We are broker {broker_id:?}");
// TODO: handle broker_ids properly/at all.
let map_description = Self::b2b_thread_on(
stream,
self.llmp_clients.len() as ClientId,
ClientId(self.llmp_clients.len() as u32),
&self
.llmp_out
.out_shmems
@ -1987,12 +2100,38 @@ where
where
F: FnMut(ClientId, Tag, Flags, &[u8]) -> Result<LlmpMsgHookResult, Error>,
{
#[cfg(feature = "std")]
let current_time = current_time();
let mut new_messages = false;
for i in 0..self.llmp_clients.len() {
unsafe {
new_messages |= self.handle_new_msgs(i as u32, on_new_msg)?;
let client_id = self.llmp_clients[i].id;
match unsafe { self.handle_new_msgs(client_id, on_new_msg) } {
Ok(has_messages) => {
// See if we need to remove this client, in case no new messages got brokered, and it's not a listener
#[cfg(feature = "std")]
if !has_messages && !self.listeners.iter().any(|&x| x == client_id) {
let last_msg_time = self.llmp_clients[i].last_msg_time;
if last_msg_time < current_time
&& current_time - last_msg_time > CLIENT_TIMEOUT
{
self.clients_to_remove.push(i as u32);
#[cfg(feature = "llmp_debug")]
println!("Client {i} timed out. Removing.");
}
}
new_messages = has_messages;
}
Err(Error::ShuttingDown) => self.clients_to_remove.push(i as u32),
Err(err) => return Err(err),
}
}
// After brokering, remove all clients we don't want to keep.
for client_id in self.clients_to_remove.iter().rev() {
log::debug!("Client {client_id} disconnected.");
self.llmp_clients.remove((*client_id) as usize);
}
self.clients_to_remove.clear();
Ok(new_messages)
}
@ -2012,8 +2151,16 @@ where
false
}
/// Loops infinitely, forwarding and handling all incoming messages from clients.
/// Never returns.
/// Returns if any clients are currently connected.
/// Ignores listener threads that belong to the broker,
/// talking to other brokers via TCP, and accepting new clients over this port.
#[inline]
fn has_clients(&self) -> bool {
self.llmp_clients.len() > self.listeners.len()
}
/// Loops until the last client quits,
/// forwarding and handling all incoming messages from clients.
/// Will call `on_timeout` roughly after `timeout`
/// Panics on error.
/// 5 millis of sleep can't hurt to keep busywait not at 100%
@ -2052,6 +2199,23 @@ where
end_time = current_milliseconds() + timeout;
}
if let Some(exit_after_count) = self.exit_cleanly_after {
log::trace!(
"Clients connected: {} && > {} - {} >= {}",
self.has_clients(),
self.num_clients_total,
self.listeners.len(),
exit_after_count
);
if !self.has_clients()
&& (self.num_clients_total - self.listeners.len()) >= exit_after_count.into()
{
// No more clients connected, and the amount of clients we were waiting for was previously connected.
// exit cleanly.
break;
}
}
#[cfg(feature = "std")]
if let Some(time) = sleep_time {
thread::sleep(time);
@ -2059,7 +2223,7 @@ where
#[cfg(not(feature = "std"))]
if let Some(time) = sleep_time {
panic!("Cannot sleep on no_std platform (requested {:?})", time);
panic!("Cannot sleep on no_std platform (requested {time:?})");
}
}
self.llmp_out
@ -2067,8 +2231,8 @@ where
.expect("Error when shutting down broker: Could not send LLMP_TAG_EXITING msg.");
}
/// Loops infinitely, forwarding and handling all incoming messages from clients.
/// Never returns. Panics on error.
/// Loops unitl the last client quit,
/// forwarding and handling all incoming messages from clients.
/// 5 millis of sleep can't hurt to keep busywait not at 100%
/// On std, if you need to run code even if no update got sent, use `Self::loop_with_timeout` (needs the `std` feature).
pub fn loop_forever<F>(&mut self, on_new_msg: &mut F, sleep_time: Option<Duration>)
@ -2085,6 +2249,16 @@ where
self.once(on_new_msg)
.expect("An error occurred when brokering. Exiting.");
if let Some(exit_after_count) = self.exit_cleanly_after {
if !self.has_clients()
&& (self.num_clients_total - self.listeners.len()) > exit_after_count.into()
{
// No more clients connected, and the amount of clients we were waiting for was previously connected.
// exit cleanly.
break;
}
}
#[cfg(feature = "std")]
if let Some(time) = sleep_time {
thread::sleep(time);
@ -2092,7 +2266,7 @@ where
#[cfg(not(feature = "std"))]
if let Some(time) = sleep_time {
panic!("Cannot sleep on no_std platform (requested {:?})", time);
panic!("Cannot sleep on no_std platform (requested {time:?})");
}
}
self.llmp_out
@ -2205,7 +2379,7 @@ where
Ok(Some((client_id, tag, flags, payload))) => {
if client_id == b2b_client_id {
log::info!(
"Ignored message we probably sent earlier (same id), TAG: {tag:x}"
"Ignored message we probably sent earlier (same id), TAG: {tag:?}"
);
continue;
}
@ -2297,7 +2471,7 @@ where
fn handle_tcp_request(
mut stream: TcpStream,
request: &TcpRequest,
current_client_id: &mut u32,
current_client_id: &mut ClientId,
sender: &mut LlmpSender<SP>,
broker_shmem_description: &ShMemDescription,
) {
@ -2316,7 +2490,7 @@ where
) {
log::info!("An error occurred sending via tcp {e}");
};
*current_client_id += 1;
current_client_id.0 += 1;
}
TcpRequest::RemoteBrokerHello { hostname } => {
log::info!("B2B new client: {hostname}");
@ -2325,7 +2499,7 @@ where
if send_tcp_msg(
&mut stream,
&TcpResponse::RemoteBrokerAccepted {
broker_id: *current_client_id,
broker_id: BrokerId(current_client_id.0),
},
)
.is_err()
@ -2340,7 +2514,7 @@ where
if Self::announce_new_client(sender, &shmem_description).is_err() {
log::info!("B2B: Error announcing client {shmem_description:?}");
};
*current_client_id += 1;
current_client_id.0 += 1;
}
}
};
@ -2365,7 +2539,7 @@ where
hostname,
};
let llmp_tcp_id = self.llmp_clients.len() as ClientId;
let llmp_tcp_id = ClientId(self.llmp_clients.len() as u32);
// Tcp out map sends messages from background thread tcp server to foreground client
let tcp_out_shmem = LlmpSharedMap::new(
@ -2373,13 +2547,13 @@ where
self.shmem_provider.new_shmem(LLMP_CFG_INITIAL_MAP_SIZE)?,
);
let tcp_out_shmem_description = tcp_out_shmem.shmem.description();
self.register_client(tcp_out_shmem);
let listener_id = self.register_client(tcp_out_shmem);
let ret = thread::spawn(move || {
// Create a new ShMemProvider for this background thread.
let mut shmem_provider_bg = SP::new().unwrap();
let mut current_client_id = llmp_tcp_id + 1;
let mut current_client_id = ClientId(llmp_tcp_id.0 + 1);
let mut tcp_incoming_sender = LlmpSender {
id: llmp_tcp_id,
@ -2445,6 +2619,8 @@ where
}
});
self.listeners.push(listener_id);
Ok(ret)
}
@ -2454,34 +2630,40 @@ where
#[allow(clippy::cast_ptr_alignment)]
unsafe fn handle_new_msgs<F>(
&mut self,
client_id: u32,
client_id: ClientId,
on_new_msg: &mut F,
) -> Result<bool, Error>
where
F: FnMut(ClientId, Tag, Flags, &[u8]) -> Result<LlmpMsgHookResult, Error>,
{
let mut new_mesages = false;
let mut new_messages = false;
let mut next_id = self.llmp_clients.len() as u32;
// TODO: We could memcpy a range of pending messages, instead of one by one.
loop {
let msg = {
let client = &mut self.llmp_clients[client_id as usize];
let client = &mut self.llmp_clients[client_id.0 as usize];
match client.recv()? {
None => {
// We're done handling this client
return Ok(new_mesages);
#[cfg(feature = "std")]
if new_messages {
// set the recv time
// We don't do that in recv() to keep calls to `current_time` to a minimum.
self.llmp_clients[client_id.0 as usize].last_msg_time = current_time();
}
return Ok(new_messages);
}
Some(msg) => msg,
}
};
// We got a new message
new_mesages = true;
new_messages = true;
match (*msg).tag {
// first, handle the special, llmp-internal messages
LLMP_SLOW_RECEIVER_PANIC => {
return Err(Error::unknown(format!("The broker was too slow to handle messages of client {client_id} in time, so it quit. Either the client sent messages too fast, or we (the broker) got stuck!")));
return Err(Error::unknown(format!("The broker was too slow to handle messages of client {client_id:?} in time, so it quit. Either the client sent messages too fast, or we (the broker) got stuck!")));
}
LLMP_TAG_NEW_SHM_CLIENT => {
/* This client informs us about yet another new client
@ -2510,12 +2692,16 @@ where
next_id += 1;
new_page.mark_safe_to_unmap();
self.llmp_clients.push(LlmpReceiver {
id,
id: ClientId(id),
current_recv_shmem: new_page,
last_msg_recvd: ptr::null_mut(),
shmem_provider: self.shmem_provider.clone(),
highest_msg_id: 0,
highest_msg_id: MessageId(0),
// We don't know the last received time, just assume the current time.
#[cfg(feature = "std")]
last_msg_time: current_time(),
});
self.num_clients_total += 1;
}
Err(e) => {
log::info!("Error adding client! Ignoring: {e:?}");
@ -2531,7 +2717,7 @@ where
// The message is not specifically for use. Let the user handle it, then forward it to the clients, if necessary.
let mut should_forward_msg = true;
let map = &mut self.llmp_clients[client_id as usize].current_recv_shmem;
let map = &mut self.llmp_clients[client_id.0 as usize].current_recv_shmem;
let msg_buf = (*msg).try_as_slice(map)?;
if let LlmpMsgHookResult::Handled =
(on_new_msg)(client_id, (*msg).tag, (*msg).flags, msg_buf)?
@ -2689,11 +2875,14 @@ where
},
receiver: LlmpReceiver {
id: 0,
id: ClientId(0),
current_recv_shmem: initial_broker_shmem,
last_msg_recvd: ptr::null_mut(),
shmem_provider,
highest_msg_id: 0,
highest_msg_id: MessageId(0),
// We don't know the last received time, just assume the current time.
#[cfg(feature = "std")]
last_msg_time: current_time(),
},
})
}
@ -2827,7 +3016,7 @@ where
);
// We'll set `sender_id` later
let mut ret = Self::new(shmem_provider, map, 0)?;
let mut ret = Self::new(shmem_provider, map, ClientId(0))?;
let client_hello_req = TcpRequest::LocalClientHello {
shmem_description: ret.sender.out_shmems.first().unwrap().shmem.description(),
@ -2891,7 +3080,7 @@ mod tests {
.once(&mut |_sender_id, _tag, _flags, _msg| Ok(ForwardToClients))
.unwrap();
let tag: Tag = 0x1337;
let tag: Tag = Tag(0x1337);
let arr: [u8; 1] = [1_u8];
// Send stuff
client.send_buf(tag, &arr).unwrap();

View File

@ -34,6 +34,15 @@ use core::{iter::Iterator, time};
#[cfg(feature = "std")]
use std::time::{SystemTime, UNIX_EPOCH};
use serde::{Deserialize, Serialize};
/// The client ID == the sender id.
#[repr(transparent)]
#[derive(
Debug, Default, Copy, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize,
)]
pub struct ClientId(pub u32);
#[cfg(feature = "std")]
use log::{Level, Metadata, Record};

View File

@ -26,6 +26,9 @@ use crate::{
Error,
};
/// If the saved page content equals exactly this buf, the restarted child wants to exit cleanly.
const EXITING_MAGIC: &[u8; 16] = b"LIBAFL_EXIT_NOW\0";
/// The struct stored on the shared map, containing either the data, or the filename to read contents from.
#[repr(C)]
struct StateShMemContent {
@ -187,6 +190,35 @@ where
content_mut.buf_len = 0;
}
/// When called from a child, informs the restarter/parent process
/// that it should no longer respawn the child.
pub fn send_exiting(&mut self) {
self.reset();
let len = EXITING_MAGIC.len();
assert!(size_of::<StateShMemContent>() + len <= self.shmem.len());
let shmem_content = self.content_mut();
unsafe {
ptr::copy_nonoverlapping(
EXITING_MAGIC as *const u8,
shmem_content.buf.as_mut_ptr(),
len,
);
}
shmem_content.buf_len = EXITING_MAGIC.len();
}
/// Returns true, if [`Self::send_exiting`] was called on this [`StateRestorer`] last.
/// This should be checked in the parent before deciding to restore the client.
pub fn wants_to_exit(&self) -> bool {
let len = EXITING_MAGIC.len();
assert!(size_of::<StateShMemContent>() + len <= self.shmem.len());
let bytes = unsafe { slice::from_raw_parts(self.content().buf.as_ptr(), len) };
bytes == EXITING_MAGIC
}
fn content_mut(&mut self) -> &mut StateShMemContent {
let ptr = self.shmem.as_slice().as_ptr();
#[allow(clippy::cast_ptr_alignment)] // Beginning of the page will always be aligned
@ -207,7 +239,7 @@ where
self.content().buf_len > 0
}
/// Restores the contents saved in this [`StateRestorer`], if any are availiable.
/// Restores the contents saved in this [`StateRestorer`], if any are available.
/// Can only be read once.
pub fn restore<S>(&self) -> Result<Option<S>, Error>
where
@ -223,6 +255,13 @@ where
state_shmem_content.buf_len_checked(self.mapsize())?,
)
};
if bytes == EXITING_MAGIC {
return Err(Error::illegal_state(
"Trying to restore a state after send_exiting was called.",
));
}
let mut state = bytes;
let mut file_content;
if state_shmem_content.buf_len == 0 {

View File

@ -7,7 +7,7 @@ use alloc::{
};
#[cfg(feature = "std")]
use core::sync::atomic::{compiler_fence, Ordering};
use core::{marker::PhantomData, time::Duration};
use core::{marker::PhantomData, num::NonZeroUsize, time::Duration};
#[cfg(feature = "std")]
use std::net::{SocketAddr, ToSocketAddrs};
@ -40,6 +40,7 @@ use crate::{
bolts::{
llmp::{self, LlmpClient, LlmpClientDescription, Tag},
shmem::ShMemProvider,
ClientId,
},
events::{
BrokerEventResult, Event, EventConfig, EventFirer, EventManager, EventManagerId,
@ -54,14 +55,14 @@ use crate::{
};
/// Forward this to the client
const _LLMP_TAG_EVENT_TO_CLIENT: Tag = 0x2C11E471;
const _LLMP_TAG_EVENT_TO_CLIENT: Tag = Tag(0x2C11E471);
/// Only handle this in the broker
const _LLMP_TAG_EVENT_TO_BROKER: Tag = 0x2B80438;
const _LLMP_TAG_EVENT_TO_BROKER: Tag = Tag(0x2B80438);
/// Handle in both
///
const LLMP_TAG_EVENT_TO_BOTH: Tag = 0x2B0741;
const _LLMP_TAG_RESTART: Tag = 0x8357A87;
const _LLMP_TAG_NO_RESTART: Tag = 0x57A7EE71;
const LLMP_TAG_EVENT_TO_BOTH: Tag = Tag(0x2B0741);
const _LLMP_TAG_RESTART: Tag = Tag(0x8357A87);
const _LLMP_TAG_NO_RESTART: Tag = Tag(0x57A7EE71);
/// The minimum buffer size at which to compress LLMP IPC messages.
#[cfg(feature = "llmp_compression")]
@ -113,6 +114,11 @@ where
})
}
/// Exit the broker process cleanly after at least `n` clients attached and all of them disconnected again
pub fn set_exit_cleanly_after(&mut self, n_clients: NonZeroUsize) {
self.llmp.set_exit_cleanly_after(n_clients);
}
/// Connect to an llmp broker on the givien address
#[cfg(feature = "std")]
pub fn connect_b2b<A>(&mut self, addr: A) -> Result<(), Error>
@ -154,10 +160,13 @@ where
Some(Duration::from_millis(5)),
);
Ok(())
#[cfg(all(feature = "std", feature = "llmp_debug"))]
println!("The last client quit. Exiting.");
Err(Error::shutting_down())
}
/// Run forever in the broker
/// Run in the broker until all clients exit
#[cfg(feature = "llmp_broker_timeouts")]
pub fn broker_loop(&mut self) -> Result<(), Error> {
let monitor = &mut self.monitor;
@ -189,7 +198,7 @@ where
Ok(llmp::LlmpMsgHookResult::ForwardToClients)
}
} else {
monitor.display("Broker".into(), 0);
monitor.display("Broker".into(), ClientId(0));
Ok(llmp::LlmpMsgHookResult::Handled)
}
},
@ -197,14 +206,17 @@ where
Some(Duration::from_millis(5)),
);
Ok(())
#[cfg(feature = "llmp_debug")]
println!("The last client quit. Exiting.");
Err(Error::shutting_down())
}
/// Handle arriving events in the broker
#[allow(clippy::unnecessary_wraps)]
fn handle_in_broker(
monitor: &mut MT,
client_id: u32,
client_id: ClientId,
event: &Event<I>,
) -> Result<BrokerEventResult, Error> {
match &event {
@ -422,7 +434,7 @@ where
fuzzer: &mut Z,
executor: &mut E,
state: &mut S,
_client_id: u32,
client_id: ClientId,
event: Event<S::Input>,
) -> Result<(), Error>
where
@ -440,7 +452,7 @@ where
time: _,
executions: _,
} => {
log::info!("Received new Testcase from {_client_id} ({client_config:?})");
log::info!("Received new Testcase from {client_id:?} ({client_config:?})");
let _res = if client_config.match_with(&self.configuration)
&& observers_buf.is_some()
@ -474,6 +486,15 @@ where
}
}
impl<S: UsesInput, SP: ShMemProvider> LlmpEventManager<S, SP> {
/// Send information that this client is exiting.
/// The other side may free up all allocated memory.
/// We are no longer allowed to send anything afterwards.
pub fn send_exiting(&mut self) -> Result<(), Error> {
self.llmp.sender.send_exiting()
}
}
impl<S, SP> UsesState for LlmpEventManager<S, SP>
where
S: UsesInput,
@ -622,9 +643,7 @@ where
{
/// Gets the id assigned to this staterestorer.
fn mgr_id(&self) -> EventManagerId {
EventManagerId {
id: self.llmp.sender.id as usize,
}
EventManagerId(self.llmp.sender.id.0 as usize)
}
}
@ -702,6 +721,13 @@ where
self.staterestorer
.save(&(state, &self.llmp_mgr.describe()?))
}
fn send_exiting(&mut self) -> Result<(), Error> {
self.staterestorer.send_exiting();
// Also inform the broker that we are about to exit.
// This way, the broker can clean up the pages, and eventually exit.
self.llmp_mgr.send_exiting()
}
}
#[cfg(feature = "std")]
@ -840,6 +866,14 @@ where
/// The type of manager to build
#[builder(default = ManagerKind::Any)]
kind: ManagerKind,
/// The amount of external clients that should have connected (not counting our own tcp client)
/// before this broker quits _after the last client exited_.
/// If `None`, the broker will never quit when the last client exits, but run forever.
///
/// So, if this value is `Some(2)`, the broker will not exit after client 1 connected and disconnected,
/// but it will quit after client 2 connected and disconnected.
#[builder(default = None)]
exit_cleanly_after: Option<NonZeroUsize>,
#[builder(setter(skip), default = PhantomData)]
phantom_data: PhantomData<S>,
}
@ -865,6 +899,10 @@ where
broker.connect_b2b(remote_broker_addr)?;
};
if let Some(exit_cleanly_after) = self.exit_cleanly_after {
broker.set_exit_cleanly_after(exit_cleanly_after);
}
broker.broker_loop()
};
@ -903,8 +941,7 @@ where
)?;
broker_things(event_broker, self.remote_broker_addr)?;
return Err(Error::shutting_down());
unreachable!("The broker may never return normally, only on Errors or when shutting down.");
}
ManagerKind::Client { cpu_core } => {
// We are a client
@ -999,6 +1036,10 @@ where
panic!("Fuzzer-respawner: Storing state in crashed fuzzer instance did not work, no point to spawn the next client! This can happen if the child calls `exit()`, in that case make sure it uses `abort()`, if it got killed unrecoverable (OOM), or if there is a bug in the fuzzer itself. (Child exited with: {child_status})");
}
if staterestorer.wants_to_exit() {
return Err(Error::shutting_down());
}
ctr = ctr.wrapping_add(1);
}
} else {
@ -1188,7 +1229,7 @@ where
executor: &mut E,
state: &mut S,
manager: &mut EM,
_client_id: u32,
_client_id: ClientId,
event: Event<DI>,
) -> Result<(), Error>
where
@ -1207,7 +1248,7 @@ where
time: _,
executions: _,
} => {
log::info!("Received new Testcase to convert from {_client_id}");
log::info!("Received new Testcase to convert from {_client_id:?}");
let Some(converter) = self.converter_back.as_mut() else {
return Ok(());
@ -1412,6 +1453,7 @@ mod tests {
shmem::{ShMemProvider, StdShMemProvider},
staterestore::StateRestorer,
tuples::tuple_list,
ClientId,
},
corpus::{Corpus, InMemoryCorpus, Testcase},
events::{llmp::_ENV_FUZZER_SENDER, LlmpEventManager},
@ -1447,8 +1489,8 @@ mod tests {
let mut llmp_client = LlmpClient::new(
shmem_provider.clone(),
LlmpSharedMap::new(0, shmem_provider.new_shmem(1024).unwrap()),
0,
LlmpSharedMap::new(ClientId(0), shmem_provider.new_shmem(1024).unwrap()),
ClientId(0),
)
.unwrap();

View File

@ -109,11 +109,12 @@ impl Handler for ShutdownSignalData {
/// A per-fuzzer unique `ID`, usually starting with `0` and increasing
/// by `1` in multiprocessed `EventManager`s, such as [`self::llmp::LlmpEventManager`].
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
pub struct EventManagerId {
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Hash)]
#[repr(transparent)]
pub struct EventManagerId(
/// The id
pub id: usize,
}
pub usize,
);
#[cfg(feature = "introspection")]
use crate::monitors::ClientPerfMonitor;
@ -504,6 +505,12 @@ pub trait EventRestarter: UsesState {
Ok(())
}
/// Send information that this client is exiting.
/// No need to restart us any longer, and no need to print an error, either.
fn send_exiting(&mut self) -> Result<(), Error> {
Ok(())
}
/// Block until we are safe to exit.
#[inline]
fn await_restart_safe(&mut self) {}
@ -625,7 +632,7 @@ impl<S> ProgressReporter for NopEventManager<S> where
impl<S> HasEventManagerId for NopEventManager<S> {
fn mgr_id(&self) -> EventManagerId {
EventManagerId { id: 0 }
EventManagerId(0)
}
}

View File

@ -26,14 +26,8 @@ use crate::{
bolts::os::unix_signals::setup_signal_handler,
events::{shutdown_handler, SHUTDOWN_SIGHANDLER_DATA},
};
#[cfg(feature = "std")]
use crate::{
bolts::{shmem::ShMemProvider, staterestore::StateRestorer},
corpus::Corpus,
monitors::SimplePrintingMonitor,
state::{HasCorpus, HasSolutions},
};
use crate::{
bolts::ClientId,
events::{
BrokerEventResult, Event, EventFirer, EventManager, EventManagerId, EventProcessor,
EventRestarter, HasEventManagerId,
@ -43,6 +37,13 @@ use crate::{
state::{HasClientPerfMonitor, HasExecutions, HasMetadata, UsesState},
Error,
};
#[cfg(feature = "std")]
use crate::{
bolts::{shmem::ShMemProvider, staterestore::StateRestorer},
corpus::Corpus,
monitors::SimplePrintingMonitor,
state::{HasCorpus, HasSolutions},
};
/// The llmp connection from the actual fuzzer to the process supervising it
const _ENV_FUZZER_SENDER: &str = "_AFL_ENV_FUZZER_SENDER";
@ -166,7 +167,7 @@ where
S: UsesInput,
{
fn mgr_id(&self) -> EventManagerId {
EventManagerId { id: 0 }
EventManagerId(0)
}
}
@ -214,12 +215,12 @@ where
executions,
} => {
monitor
.client_stats_mut_for(0)
.client_stats_mut_for(ClientId(0))
.update_corpus_size(*corpus_size as u64);
monitor
.client_stats_mut_for(0)
.client_stats_mut_for(ClientId(0))
.update_executions(*executions as u64, *time);
monitor.display(event.name().to_string(), 0);
monitor.display(event.name().to_string(), ClientId(0));
Ok(BrokerEventResult::Handled)
}
Event::UpdateExecStats {
@ -228,11 +229,11 @@ where
phantom: _,
} => {
// TODO: The monitor buffer should be added on client add.
let client = monitor.client_stats_mut_for(0);
let client = monitor.client_stats_mut_for(ClientId(0));
client.update_executions(*executions as u64, *time);
monitor.display(event.name().to_string(), 0);
monitor.display(event.name().to_string(), ClientId(0));
Ok(BrokerEventResult::Handled)
}
Event::UpdateUserStats {
@ -241,9 +242,9 @@ where
phantom: _,
} => {
monitor
.client_stats_mut_for(0)
.client_stats_mut_for(ClientId(0))
.update_user_stats(name.clone(), value.clone());
monitor.display(event.name().to_string(), 0);
monitor.display(event.name().to_string(), ClientId(0));
Ok(BrokerEventResult::Handled)
}
#[cfg(feature = "introspection")]
@ -254,17 +255,17 @@ where
phantom: _,
} => {
// TODO: The monitor buffer should be added on client add.
let client = monitor.client_stats_mut_for(0);
let client = monitor.client_stats_mut_for(ClientId(0));
client.update_executions(*executions as u64, *time);
client.update_introspection_monitor((**introspection_monitor).clone());
monitor.display(event.name().to_string(), 0);
monitor.display(event.name().to_string(), ClientId(0));
Ok(BrokerEventResult::Handled)
}
Event::Objective { objective_size } => {
monitor
.client_stats_mut_for(0)
.client_stats_mut_for(ClientId(0))
.update_objective_size(*objective_size as u64);
monitor.display(event.name().to_string(), 0);
monitor.display(event.name().to_string(), ClientId(0));
Ok(BrokerEventResult::Handled)
}
Event::Log {
@ -351,6 +352,11 @@ where
self.staterestorer.reset();
self.staterestorer.save(state)
}
fn send_exiting(&mut self) -> Result<(), Error> {
self.staterestorer.send_exiting();
Ok(())
}
}
#[cfg(feature = "std")]
@ -540,7 +546,7 @@ where
staterestorer.reset();
// load the corpus size into monitor to still display the correct numbers after restart.
let client_stats = monitor.client_stats_mut_for(0);
let client_stats = monitor.client_stats_mut_for(ClientId(0));
client_stats.update_corpus_size(state.corpus().count().try_into()?);
client_stats.update_objective_size(state.solutions().count().try_into()?);

View File

@ -39,6 +39,9 @@ pub type MaxMapFeedback<O, S, T> = MapFeedback<DifferentIsNovel, O, MaxReducer,
/// A [`MapFeedback`] that strives to minimize the map contents.
pub type MinMapFeedback<O, S, T> = MapFeedback<DifferentIsNovel, O, MinReducer, S, T>;
/// A [`MapFeedback`] that always returns `true` for `is_interesting`. Useful for tracing all executions.
pub type AlwaysInterestingMapFeedback<O, S, T> = MapFeedback<AllIsNovel, O, NopReducer, S, T>;
/// A [`MapFeedback`] that strives to maximize the map contents,
/// but only, if a value is larger than `pow2` of the previous.
pub type MaxMapPow2Feedback<O, S, T> = MapFeedback<NextPow2IsNovel, O, MaxReducer, S, T>;
@ -83,6 +86,20 @@ where
}
}
/// A [`NopReducer`] does nothing, and just "reduces" to the second/`new` value.
#[derive(Clone, Debug)]
pub struct NopReducer {}
impl<T> Reducer<T> for NopReducer
where
T: Default + Copy + 'static,
{
#[inline]
fn reduce(_history: T, new: T) -> T {
new
}
}
/// A [`MaxReducer`] reduces int values and returns their maximum.
#[derive(Clone, Debug)]
pub struct MaxReducer {}
@ -208,7 +225,7 @@ where
pub struct MapIndexesMetadata {
/// The list of indexes.
pub list: Vec<usize>,
/// A refcount used to know when remove this meta
/// A refcount used to know when we can remove this metadata
pub tcref: isize,
}
@ -337,6 +354,8 @@ where
/// The most common AFL-like feedback type
#[derive(Clone, Debug)]
pub struct MapFeedback<N, O, R, S, T> {
/// For tracking, always keep indexes and/or novelties, even if the map isn't considered `interesting`.
always_track: bool,
/// Indexes used in the last observation
indexes: bool,
/// New indexes observed in the last observation
@ -632,6 +651,7 @@ where
name: MAPFEEDBACK_PREFIX.to_string() + map_observer.name(),
observer_name: map_observer.name().to_string(),
stats_name: create_stats_name(map_observer.name()),
always_track: false,
phantom: PhantomData,
}
}
@ -645,6 +665,7 @@ where
name: MAPFEEDBACK_PREFIX.to_string() + map_observer.name(),
observer_name: map_observer.name().to_string(),
stats_name: create_stats_name(map_observer.name()),
always_track: false,
phantom: PhantomData,
}
}
@ -659,9 +680,17 @@ where
observer_name: observer_name.to_string(),
stats_name: create_stats_name(name),
phantom: PhantomData,
always_track: false,
}
}
/// For tracking, enable `always_track` mode, that also adds `novelties` or `indexes`,
/// even if the map is not novel for this feedback.
/// This is useful in combination with `load_initial_inputs_forced`, or other feedbacks.
pub fn set_always_track(&mut self, always_track: bool) {
self.always_track = always_track;
}
/// Creating a new `MapFeedback` with a specific name. This is usefully whenever the same
/// feedback is needed twice, but with a different history. Using `new()` always results in the
/// same name and therefore also the same history.
@ -673,6 +702,7 @@ where
name: name.to_string(),
observer_name: map_observer.name().to_string(),
stats_name: create_stats_name(name),
always_track: false,
phantom: PhantomData,
}
}
@ -691,6 +721,7 @@ where
observer_name: observer_name.to_string(),
stats_name: create_stats_name(name),
name: name.to_string(),
always_track: false,
phantom: PhantomData,
}
}
@ -758,7 +789,7 @@ where
}
}
if interesting {
if interesting || self.always_track {
let len = history_map.len();
let filled = history_map.iter().filter(|&&i| i != initial).count();
// opt: if not tracking optimisations, we technically don't show the *current* history

View File

@ -11,7 +11,7 @@ use std::{
use serde_json::json;
use crate::{
bolts::{current_time, format_duration_hms},
bolts::{current_time, format_duration_hms, ClientId},
monitors::{ClientStats, Monitor, NopMonitor},
};
@ -45,7 +45,7 @@ where
self.base.start_time()
}
fn display(&mut self, event_msg: String, sender_id: u32) {
fn display(&mut self, event_msg: String, sender_id: ClientId) {
let cur_time = current_time();
if (cur_time - self.last_update).as_secs() >= 60 {
@ -190,7 +190,7 @@ where
self.base.start_time()
}
fn display(&mut self, event_msg: String, sender_id: u32) {
fn display(&mut self, event_msg: String, sender_id: ClientId) {
if (self.log_record)(&mut self.base) {
let file = OpenOptions::new()
.append(true)

View File

@ -23,7 +23,7 @@ pub use disk::{OnDiskJSONMonitor, OnDiskTOMLMonitor};
use hashbrown::HashMap;
use serde::{Deserialize, Serialize};
use crate::bolts::{current_time, format_duration_hms};
use crate::bolts::{current_time, format_duration_hms, ClientId};
#[cfg(feature = "afl_exec_sec")]
const CLIENT_STATS_TIME_WINDOW_SECS: u64 = 5; // 5 seconds
@ -221,7 +221,7 @@ pub trait Monitor {
fn start_time(&mut self) -> Duration;
/// Show the monitor to the user
fn display(&mut self, event_msg: String, sender_id: u32);
fn display(&mut self, event_msg: String, sender_id: ClientId);
/// Amount of elements in the corpus (combined for all children)
fn corpus_size(&self) -> u64 {
@ -261,15 +261,15 @@ pub trait Monitor {
}
/// The client monitor for a specific id, creating new if it doesn't exist
fn client_stats_mut_for(&mut self, client_id: u32) -> &mut ClientStats {
fn client_stats_mut_for(&mut self, client_id: ClientId) -> &mut ClientStats {
let client_stat_count = self.client_stats().len();
for _ in client_stat_count..(client_id + 1) as usize {
for _ in client_stat_count..(client_id.0 + 1) as usize {
self.client_stats_mut().push(ClientStats {
last_window_time: current_time(),
..ClientStats::default()
});
}
&mut self.client_stats_mut()[client_id as usize]
&mut self.client_stats_mut()[client_id.0 as usize]
}
}
@ -297,7 +297,7 @@ impl Monitor for NopMonitor {
self.start_time
}
fn display(&mut self, _event_msg: String, _sender_id: u32) {}
fn display(&mut self, _event_msg: String, _sender_id: ClientId) {}
}
impl NopMonitor {
@ -351,11 +351,11 @@ impl Monitor for SimplePrintingMonitor {
self.start_time
}
fn display(&mut self, event_msg: String, sender_id: u32) {
fn display(&mut self, event_msg: String, sender_id: ClientId) {
println!(
"[{} #{}] run time: {}, clients: {}, corpus: {}, objectives: {}, executions: {}, exec/sec: {}",
event_msg,
sender_id,
sender_id.0,
format_duration_hms(&(current_time() - self.start_time)),
self.client_stats().len(),
self.corpus_size(),
@ -370,7 +370,7 @@ impl Monitor for SimplePrintingMonitor {
// Print the client performance monitor.
println!(
"Client {:03}:\n{}",
sender_id, self.client_stats[sender_id as usize].introspection_monitor
sender_id.0, self.client_stats[sender_id.0 as usize].introspection_monitor
);
// Separate the spacing just a bit
println!();
@ -421,11 +421,11 @@ where
self.start_time
}
fn display(&mut self, event_msg: String, sender_id: u32) {
fn display(&mut self, event_msg: String, sender_id: ClientId) {
let mut fmt = format!(
"[{} #{}] run time: {}, clients: {}, corpus: {}, objectives: {}, executions: {}, exec/sec: {}",
event_msg,
sender_id,
sender_id.0,
format_duration_hms(&(current_time() - self.start_time)),
self.client_stats().len(),
self.corpus_size(),
@ -449,7 +449,7 @@ where
// Print the client performance monitor.
let fmt = format!(
"Client {:03}:\n{}",
sender_id, self.client_stats[sender_id as usize].introspection_monitor
sender_id.0, self.client_stats[sender_id.0 as usize].introspection_monitor
);
(self.print_fn)(fmt);
@ -956,7 +956,10 @@ pub mod pybind {
use pyo3::{prelude::*, types::PyUnicode};
use super::ClientStats;
use crate::monitors::{Monitor, SimpleMonitor};
use crate::{
bolts::ClientId,
monitors::{Monitor, SimpleMonitor},
};
// TODO create a PyObjectFnMut to pass, track stabilization of https://github.com/rust-lang/rust/issues/29625
@ -1078,7 +1081,7 @@ pub mod pybind {
unwrap_me_mut!(self.wrapper, m, { m.start_time() })
}
fn display(&mut self, event_msg: String, sender_id: u32) {
fn display(&mut self, event_msg: String, sender_id: ClientId) {
unwrap_me_mut!(self.wrapper, m, { m.display(event_msg, sender_id) });
}
}

View File

@ -6,7 +6,7 @@ use alloc::{string::String, vec::Vec};
use core::{fmt::Write, time::Duration};
use crate::{
bolts::{current_time, format_duration_hms},
bolts::{current_time, format_duration_hms, ClientId},
monitors::{ClientStats, Monitor},
};
@ -40,8 +40,8 @@ where
self.start_time
}
fn display(&mut self, event_msg: String, sender_id: u32) {
let sender = format!("#{sender_id}");
fn display(&mut self, event_msg: String, sender_id: ClientId) {
let sender = format!("#{}", sender_id.0);
let pad = if event_msg.len() + sender.len() < 13 {
" ".repeat(13 - event_msg.len() - sender.len())
} else {

View File

@ -41,7 +41,7 @@ use prometheus_client::{
use tide::Request;
use crate::{
bolts::{current_time, format_duration_hms},
bolts::{current_time, format_duration_hms, ClientId},
monitors::{ClientStats, Monitor, UserStats},
};
@ -95,7 +95,7 @@ where
}
#[allow(clippy::cast_sign_loss)]
fn display(&mut self, event_msg: String, sender_id: u32) {
fn display(&mut self, event_msg: String, sender_id: ClientId) {
// Update the prometheus metrics
// Label each metric with the sender / client_id
// The gauges must take signed i64's, with max value of 2^63-1 so it is
@ -107,42 +107,42 @@ where
let corpus_size = self.corpus_size();
self.corpus_count
.get_or_create(&Labels {
client: sender_id,
client: sender_id.0,
stat: String::new(),
})
.set(corpus_size.try_into().unwrap());
let objective_size = self.objective_size();
self.objective_count
.get_or_create(&Labels {
client: sender_id,
client: sender_id.0,
stat: String::new(),
})
.set(objective_size.try_into().unwrap());
let total_execs = self.total_execs();
self.executions
.get_or_create(&Labels {
client: sender_id,
client: sender_id.0,
stat: String::new(),
})
.set(total_execs.try_into().unwrap());
let execs_per_sec = self.execs_per_sec();
self.exec_rate
.get_or_create(&Labels {
client: sender_id,
client: sender_id.0,
stat: String::new(),
})
.set(execs_per_sec);
let run_time = (current_time() - self.start_time).as_secs();
self.runtime
.get_or_create(&Labels {
client: sender_id,
client: sender_id.0,
stat: String::new(),
})
.set(run_time.try_into().unwrap()); // run time in seconds, which can be converted to a time format by Grafana or similar
let total_clients = self.client_stats().len().try_into().unwrap(); // convert usize to u64 (unlikely that # of clients will be > 2^64 -1...)
self.clients_count
.get_or_create(&Labels {
client: sender_id,
client: sender_id.0,
stat: String::new(),
})
.set(total_clients);
@ -151,7 +151,7 @@ where
let fmt = format!(
"[Prometheus] [{} #{}] run time: {}, clients: {}, corpus: {}, objectives: {}, executions: {}, exec/sec: {}",
event_msg,
sender_id,
sender_id.0,
format_duration_hms(&(current_time() - self.start_time)),
self.client_stats().len(),
self.corpus_size(),
@ -177,7 +177,7 @@ where
};
self.custom_stat
.get_or_create(&Labels {
client: sender_id,
client: sender_id.0,
stat: key.clone(),
})
.set(value);

View File

@ -25,7 +25,7 @@ use tui::{backend::CrosstermBackend, Terminal};
#[cfg(feature = "introspection")]
use super::{ClientPerfMonitor, PerfFeature};
use crate::{
bolts::{current_time, format_duration_hms},
bolts::{current_time, format_duration_hms, ClientId},
monitors::{ClientStats, Monitor, UserStats},
};
@ -258,7 +258,7 @@ impl Monitor for TuiMonitor {
}
#[allow(clippy::cast_sign_loss)]
fn display(&mut self, event_msg: String, sender_id: u32) {
fn display(&mut self, event_msg: String, sender_id: ClientId) {
let cur_time = current_time();
{
@ -279,7 +279,7 @@ impl Monitor for TuiMonitor {
let client = self.client_stats_mut_for(sender_id);
let exec_sec = client.execs_per_sec_pretty(cur_time);
let sender = format!("#{sender_id}");
let sender = format!("#{}", sender_id.0);
let pad = if event_msg.len() + sender.len() < 13 {
" ".repeat(13 - event_msg.len() - sender.len())
} else {
@ -295,10 +295,10 @@ impl Monitor for TuiMonitor {
}
{
let client = &self.client_stats()[sender_id as usize];
let client = &self.client_stats()[sender_id.0 as usize];
let mut ctx = self.context.write().unwrap();
ctx.clients
.entry(sender_id as usize)
.entry(sender_id.0 as usize)
.or_default()
.grab_data(client, exec_sec);
while ctx.client_logs.len() >= DEFAULT_LOGS_NUMBER {