diff --git a/libafl/src/events/centralized.rs b/libafl/src/events/centralized.rs index 255e562278..88a1b1cdd0 100644 --- a/libafl/src/events/centralized.rs +++ b/libafl/src/events/centralized.rs @@ -8,7 +8,9 @@ // 4. The "main broker", the gathers the stats from the fuzzer clients and broadcast the newly found testcases from the main evaluator. use alloc::{boxed::Box, string::String, vec::Vec}; -use core::{marker::PhantomData, num::NonZeroUsize, time::Duration}; +use core::fmt::Debug; +#[cfg(feature = "adaptive_serialization")] +use core::time::Duration; #[cfg(feature = "adaptive_serialization")] use libafl_bolts::tuples::{Handle, Handled}; @@ -18,7 +20,7 @@ use libafl_bolts::{ llmp::{LLMP_FLAG_COMPRESSED, LLMP_FLAG_INITIALIZED}, }; use libafl_bolts::{ - llmp::{self, LlmpBroker, LlmpClient, LlmpClientDescription, Tag}, + llmp::{LlmpClient, LlmpClientDescription, Tag}, shmem::{NopShMemProvider, ShMemProvider}, ClientId, }; @@ -33,9 +35,9 @@ use crate::observers::TimeObserver; use crate::state::HasScalabilityMonitor; use crate::{ events::{ - AdaptiveSerializer, BrokerEventResult, CustomBufEventResult, Event, EventConfig, - EventFirer, EventManager, EventManagerId, EventProcessor, EventRestarter, - HasCustomBufHandlers, HasEventManagerId, LogSeverity, ProgressReporter, + AdaptiveSerializer, CustomBufEventResult, Event, EventConfig, EventFirer, EventManager, + EventManagerId, EventProcessor, EventRestarter, HasCustomBufHandlers, HasEventManagerId, + LogSeverity, ProgressReporter, }, executors::{Executor, HasObservers}, fuzzer::{EvaluatorObservers, ExecutionProcessor}, @@ -45,174 +47,7 @@ use crate::{ Error, HasMetadata, }; -const _LLMP_TAG_TO_MAIN: Tag = Tag(0x3453453); - -/// An LLMP-backed event manager for scalable multi-processed fuzzing -pub struct CentralizedLlmpEventBroker -where - I: Input, - SP: ShMemProvider + 'static, - //CE: CustomEvent, -{ - llmp: LlmpBroker, - #[cfg(feature = "llmp_compression")] - compressor: GzipCompressor, - phantom: PhantomData, -} - -impl core::fmt::Debug for CentralizedLlmpEventBroker -where - SP: ShMemProvider + 'static, - I: Input, -{ - fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { - let mut debug_struct = f.debug_struct("CentralizedLlmpEventBroker"); - let debug = debug_struct.field("llmp", &self.llmp); - //.field("custom_buf_handlers", &self.custom_buf_handlers) - #[cfg(feature = "llmp_compression")] - let debug = debug.field("compressor", &self.compressor); - debug - .field("phantom", &self.phantom) - .finish_non_exhaustive() - } -} - -impl CentralizedLlmpEventBroker -where - I: Input, - SP: ShMemProvider + 'static, -{ - /// Create an event broker from a raw broker. - pub fn new(llmp: LlmpBroker) -> Result { - Ok(Self { - llmp, - #[cfg(feature = "llmp_compression")] - compressor: GzipCompressor::with_threshold(COMPRESS_THRESHOLD), - phantom: PhantomData, - }) - } - - /// Create an LLMP broker on a port. - /// - /// The port must not be bound yet to have a broker. - #[cfg(feature = "std")] - pub fn on_port(shmem_provider: SP, port: u16) -> Result { - Ok(Self { - // TODO switch to false after solving the bug - llmp: LlmpBroker::with_keep_pages_attach_to_tcp(shmem_provider, port, true)?, - #[cfg(feature = "llmp_compression")] - compressor: GzipCompressor::with_threshold(COMPRESS_THRESHOLD), - phantom: PhantomData, - }) - } - - /// Exit the broker process cleanly after at least `n` clients attached and all of them disconnected again - pub fn set_exit_cleanly_after(&mut self, n_clients: NonZeroUsize) { - self.llmp.set_exit_cleanly_after(n_clients); - } - - /// Run forever in the broker - #[cfg(not(feature = "llmp_broker_timeouts"))] - pub fn broker_loop(&mut self) -> Result<(), Error> { - #[cfg(feature = "llmp_compression")] - let compressor = &self.compressor; - self.llmp.loop_forever( - &mut |client_id, tag, _flags, msg| { - if tag == _LLMP_TAG_TO_MAIN { - #[cfg(not(feature = "llmp_compression"))] - let event_bytes = msg; - #[cfg(feature = "llmp_compression")] - let compressed; - #[cfg(feature = "llmp_compression")] - let event_bytes = if _flags & LLMP_FLAG_COMPRESSED == LLMP_FLAG_COMPRESSED { - compressed = compressor.decompress(msg)?; - &compressed - } else { - msg - }; - let event: Event = postcard::from_bytes(event_bytes)?; - match Self::handle_in_broker(client_id, &event)? { - BrokerEventResult::Forward => Ok(llmp::LlmpMsgHookResult::ForwardToClients), - BrokerEventResult::Handled => Ok(llmp::LlmpMsgHookResult::Handled), - } - } else { - Ok(llmp::LlmpMsgHookResult::ForwardToClients) - } - }, - Some(Duration::from_millis(5)), - ); - - #[cfg(all(feature = "std", feature = "llmp_debug"))] - println!("The last client quit. Exiting."); - - Err(Error::shutting_down()) - } - - /// Run in the broker until all clients exit - #[cfg(feature = "llmp_broker_timeouts")] - pub fn broker_loop(&mut self) -> Result<(), Error> { - #[cfg(feature = "llmp_compression")] - let compressor = &self.compressor; - self.llmp.loop_with_timeouts( - &mut |msg_or_timeout| { - if let Some((client_id, tag, _flags, msg)) = msg_or_timeout { - if tag == _LLMP_TAG_TO_MAIN { - #[cfg(not(feature = "llmp_compression"))] - let event_bytes = msg; - #[cfg(feature = "llmp_compression")] - let compressed; - #[cfg(feature = "llmp_compression")] - let event_bytes = if _flags & LLMP_FLAG_COMPRESSED == LLMP_FLAG_COMPRESSED { - compressed = compressor.decompress(msg)?; - &compressed - } else { - msg - }; - let event: Event = postcard::from_bytes(event_bytes)?; - match Self::handle_in_broker(client_id, &event)? { - BrokerEventResult::Forward => { - Ok(llmp::LlmpMsgHookResult::ForwardToClients) - } - BrokerEventResult::Handled => Ok(llmp::LlmpMsgHookResult::Handled), - } - } else { - Ok(llmp::LlmpMsgHookResult::ForwardToClients) - } - } else { - Ok(llmp::LlmpMsgHookResult::Handled) - } - }, - Duration::from_secs(30), - Some(Duration::from_millis(5)), - ); - - #[cfg(feature = "llmp_debug")] - println!("The last client quit. Exiting."); - - Err(Error::shutting_down()) - } - - /// Handle arriving events in the broker - #[allow(clippy::unnecessary_wraps)] - fn handle_in_broker( - _client_id: ClientId, - event: &Event, - ) -> Result { - match &event { - Event::NewTestcase { - input: _, - client_config: _, - exit_kind: _, - corpus_size: _, - observers_buf: _, - time: _, - executions: _, - forward_id: _, - } => Ok(BrokerEventResult::Forward), - _ => Ok(BrokerEventResult::Handled), - } - } -} +pub(crate) const _LLMP_TAG_TO_MAIN: Tag = Tag(0x3453453); /// A wrapper manager to implement a main-secondary architecture with another broker #[derive(Debug)] diff --git a/libafl/src/events/launcher.rs b/libafl/src/events/launcher.rs index 8ca073316f..07b372ca6e 100644 --- a/libafl/src/events/launcher.rs +++ b/libafl/src/events/launcher.rs @@ -28,6 +28,8 @@ use std::process::Stdio; #[cfg(all(unix, feature = "std"))] use std::{fs::File, os::unix::io::AsRawFd}; +#[cfg(all(unix, feature = "std", feature = "fork"))] +use libafl_bolts::llmp::LlmpBroker; #[cfg(all(unix, feature = "std"))] use libafl_bolts::os::dup2; #[cfg(all(feature = "std", any(windows, not(feature = "fork"))))] @@ -52,7 +54,7 @@ use super::hooks::EventManagerHooksTuple; use crate::observers::TimeObserver; #[cfg(all(unix, feature = "std", feature = "fork"))] use crate::{ - events::centralized::{CentralizedEventManager, CentralizedLlmpEventBroker}, + events::{centralized::CentralizedEventManager, llmp::centralized::CentralizedLlmpHook}, state::UsesState, }; #[cfg(feature = "std")] @@ -167,8 +169,8 @@ where impl<'a, CF, MT, S, SP> Launcher<'a, CF, (), MT, S, SP> where CF: FnOnce(Option, LlmpRestartingEventManager<(), S, SP>, CoreId) -> Result<(), Error>, - MT: Monitor + Clone, - S: State + HasExecutions, + MT: Monitor + Clone + 'static, + S: State + HasExecutions + 'static, SP: ShMemProvider + 'static, { /// Launch the broker and the clients and fuzz @@ -190,8 +192,8 @@ impl<'a, CF, EMH, MT, S, SP> Launcher<'a, CF, EMH, MT, S, SP> where CF: FnOnce(Option, LlmpRestartingEventManager, CoreId) -> Result<(), Error>, EMH: EventManagerHooksTuple + Clone + Copy, - MT: Monitor + Clone, - S: State + HasExecutions, + MT: Monitor + Clone + 'static, + S: State + HasExecutions + 'static, SP: ShMemProvider + 'static, { /// Launch the broker and the clients and fuzz with a user-supplied hook @@ -562,8 +564,8 @@ where CentralizedEventManager, SP>, CoreId, ) -> Result<(), Error>, - MT: Monitor + Clone, - S: State + HasExecutions, + MT: Monitor + Clone + 'static, + S: State + HasExecutions + 'static, SP: ShMemProvider + 'static, { /// Launch a standard Centralized-based fuzzer @@ -601,8 +603,8 @@ where CentralizedEventManager, // No hooks for centralized EM CoreId, ) -> Result<(), Error>, - MT: Monitor + Clone, - S: State + HasExecutions, + MT: Monitor + Clone + 'static, + S: State + HasExecutions + 'static, SP: ShMemProvider + 'static, { /// Launch a Centralized-based fuzzer. @@ -663,12 +665,22 @@ where log::info!("PID: {:#?} I am centralized broker", std::process::id()); self.shmem_provider.post_fork(true)?; - let mut broker: CentralizedLlmpEventBroker = - CentralizedLlmpEventBroker::on_port( - self.shmem_provider.clone(), - self.centralized_broker_port, - )?; - broker.broker_loop()?; + let llmp_centralized_hook = CentralizedLlmpHook::::new()?; + + // TODO switch to false after solving the bug + let mut broker = LlmpBroker::with_keep_pages_attach_to_tcp( + self.shmem_provider.clone(), + tuple_list!(llmp_centralized_hook), + self.centralized_broker_port, + true, + )?; + + // Run in the broker until all clients exit + broker.loop_with_timeouts(Duration::from_secs(30), Some(Duration::from_millis(5))); + + log::info!("The last client quit. Exiting."); + + return Err(Error::shutting_down()); } } diff --git a/libafl/src/events/llmp/hooks/centralized.rs b/libafl/src/events/llmp/hooks/centralized.rs new file mode 100644 index 0000000000..42d7278f16 --- /dev/null +++ b/libafl/src/events/llmp/hooks/centralized.rs @@ -0,0 +1,110 @@ +use std::{fmt::Debug, marker::PhantomData}; + +#[cfg(feature = "llmp_compression")] +use libafl_bolts::{compress::GzipCompressor, llmp::LLMP_FLAG_COMPRESSED}; +use libafl_bolts::{ + llmp::{Flags, LlmpBrokerState, LlmpHook, LlmpMsgHookResult, Tag}, + shmem::ShMemProvider, + ClientId, Error, +}; + +#[cfg(feature = "llmp_compression")] +use crate::events::COMPRESS_THRESHOLD; +use crate::{ + events::{BrokerEventResult, Event, _LLMP_TAG_TO_MAIN}, + inputs::Input, +}; + +/// An LLMP-backed event manager for scalable multi-processed fuzzing +pub struct CentralizedLlmpHook { + #[cfg(feature = "llmp_compression")] + compressor: GzipCompressor, + phantom: PhantomData<(I, SP)>, +} + +impl LlmpHook for CentralizedLlmpHook +where + I: Input, + SP: ShMemProvider + 'static, +{ + fn on_new_message( + &mut self, + _llmp_broker_state: &mut LlmpBrokerState, + client_id: ClientId, + msg_tag: &mut Tag, + msg_flags: &mut Flags, + msg: &mut [u8], + ) -> Result { + if *msg_tag == _LLMP_TAG_TO_MAIN { + #[cfg(feature = "llmp_compression")] + let compressor = &self.compressor; + #[cfg(not(feature = "llmp_compression"))] + let event_bytes = msg; + #[cfg(feature = "llmp_compression")] + let compressed; + #[cfg(feature = "llmp_compression")] + let event_bytes = if *msg_flags & LLMP_FLAG_COMPRESSED == LLMP_FLAG_COMPRESSED { + compressed = compressor.decompress(msg)?; + &compressed + } else { + &*msg + }; + let event: Event = postcard::from_bytes(event_bytes)?; + match Self::handle_in_broker(client_id, &event)? { + BrokerEventResult::Forward => Ok(LlmpMsgHookResult::ForwardToClients), + BrokerEventResult::Handled => Ok(LlmpMsgHookResult::Handled), + } + } else { + Ok(LlmpMsgHookResult::ForwardToClients) + } + } +} + +impl Debug for CentralizedLlmpHook { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + let mut debug_struct = f.debug_struct("CentralizedLlmpEventBroker"); + + #[cfg(feature = "llmp_compression")] + let debug_struct = debug_struct.field("compressor", &self.compressor); + + debug_struct + .field("phantom", &self.phantom) + .finish_non_exhaustive() + } +} + +impl CentralizedLlmpHook +where + I: Input, + SP: ShMemProvider + 'static, +{ + /// Create an event broker from a raw broker. + pub fn new() -> Result { + Ok(Self { + #[cfg(feature = "llmp_compression")] + compressor: GzipCompressor::with_threshold(COMPRESS_THRESHOLD), + phantom: PhantomData, + }) + } + + /// Handle arriving events in the broker + #[allow(clippy::unnecessary_wraps)] + fn handle_in_broker( + _client_id: ClientId, + event: &Event, + ) -> Result { + match &event { + Event::NewTestcase { + input: _, + client_config: _, + exit_kind: _, + corpus_size: _, + observers_buf: _, + time: _, + executions: _, + forward_id: _, + } => Ok(BrokerEventResult::Forward), + _ => Ok(BrokerEventResult::Handled), + } + } +} diff --git a/libafl/src/events/llmp/broker.rs b/libafl/src/events/llmp/hooks/mod.rs similarity index 52% rename from libafl/src/events/llmp/broker.rs rename to libafl/src/events/llmp/hooks/mod.rs index 44bc8eb719..d54d6d159c 100644 --- a/libafl/src/events/llmp/broker.rs +++ b/libafl/src/events/llmp/hooks/mod.rs @@ -1,12 +1,13 @@ -//! LLMP broker - -use core::{marker::PhantomData, num::NonZeroUsize, time::Duration}; -#[cfg(feature = "std")] -use std::net::ToSocketAddrs; +//! Standard LLMP hook +use core::marker::PhantomData; #[cfg(feature = "llmp_compression")] use libafl_bolts::{compress::GzipCompressor, llmp::LLMP_FLAG_COMPRESSED}; -use libafl_bolts::{llmp, shmem::ShMemProvider, ClientId}; +use libafl_bolts::{ + llmp::{Flags, LlmpBrokerState, LlmpHook, LlmpMsgHookResult, Tag}, + shmem::ShMemProvider, + ClientId, +}; #[cfg(feature = "llmp_compression")] use crate::events::llmp::COMPRESS_THRESHOLD; @@ -17,148 +18,85 @@ use crate::{ Error, }; -/// An LLMP-backed event manager for scalable multi-processed fuzzing +/// centralized hook +#[cfg(all(unix, feature = "std"))] +pub mod centralized; + +/// An LLMP-backed event hook for scalable multi-processed fuzzing #[derive(Debug)] -pub struct LlmpEventBroker +pub struct StdLlmpEventHook where SP: ShMemProvider + 'static, { monitor: MT, - llmp: llmp::LlmpBroker, #[cfg(feature = "llmp_compression")] compressor: GzipCompressor, - phantom: PhantomData, + phantom: PhantomData<(I, SP)>, } -impl LlmpEventBroker +impl LlmpHook for StdLlmpEventHook +where + I: Input, + MT: Monitor, + SP: ShMemProvider + 'static, +{ + fn on_new_message( + &mut self, + _llmp_broker_state: &mut LlmpBrokerState, + client_id: ClientId, + msg_tag: &mut Tag, + #[cfg(feature = "llmp_compression")] msg_flags: &mut Flags, + #[cfg(not(feature = "llmp_compression"))] _msg_flags: &mut Flags, + msg: &mut [u8], + ) -> Result { + let monitor = &mut self.monitor; + #[cfg(feature = "llmp_compression")] + let compressor = &self.compressor; + + if *msg_tag == LLMP_TAG_EVENT_TO_BOTH { + #[cfg(not(feature = "llmp_compression"))] + let event_bytes = msg; + #[cfg(feature = "llmp_compression")] + let compressed; + #[cfg(feature = "llmp_compression")] + let event_bytes = if *msg_flags & LLMP_FLAG_COMPRESSED == LLMP_FLAG_COMPRESSED { + compressed = compressor.decompress(msg)?; + &compressed + } else { + &*msg + }; + let event: Event = postcard::from_bytes(event_bytes)?; + match Self::handle_in_broker(monitor, client_id, &event)? { + BrokerEventResult::Forward => Ok(LlmpMsgHookResult::ForwardToClients), + BrokerEventResult::Handled => Ok(LlmpMsgHookResult::Handled), + } + } else { + Ok(LlmpMsgHookResult::ForwardToClients) + } + } + + fn on_timeout(&mut self) -> Result<(), Error> { + self.monitor.display("Broker Heartbeat", ClientId(0)); + Ok(()) + } +} + +impl StdLlmpEventHook where I: Input, SP: ShMemProvider + 'static, MT: Monitor, { /// Create an event broker from a raw broker. - pub fn new(llmp: llmp::LlmpBroker, monitor: MT) -> Result { + pub fn new(monitor: MT) -> Result { Ok(Self { monitor, - llmp, #[cfg(feature = "llmp_compression")] compressor: GzipCompressor::with_threshold(COMPRESS_THRESHOLD), phantom: PhantomData, }) } - /// Create an LLMP broker on a port. - /// - /// The port must not be bound yet to have a broker. - #[cfg(feature = "std")] - pub fn on_port(shmem_provider: SP, monitor: MT, port: u16) -> Result { - Ok(Self { - monitor, - llmp: llmp::LlmpBroker::create_attach_to_tcp(shmem_provider, port)?, - #[cfg(feature = "llmp_compression")] - compressor: GzipCompressor::with_threshold(COMPRESS_THRESHOLD), - phantom: PhantomData, - }) - } - - /// Exit the broker process cleanly after at least `n` clients attached and all of them disconnected again - pub fn set_exit_cleanly_after(&mut self, n_clients: NonZeroUsize) { - self.llmp.set_exit_cleanly_after(n_clients); - } - - /// Connect to an LLMP broker on the given address - #[cfg(feature = "std")] - pub fn connect_b2b(&mut self, addr: A) -> Result<(), Error> - where - A: ToSocketAddrs, - { - self.llmp.connect_b2b(addr) - } - - /// Run forever in the broker - #[cfg(not(feature = "llmp_broker_timeouts"))] - pub fn broker_loop(&mut self) -> Result<(), Error> { - let monitor = &mut self.monitor; - #[cfg(feature = "llmp_compression")] - let compressor = &self.compressor; - self.llmp.loop_forever( - &mut |client_id, tag, _flags, msg| { - if tag == LLMP_TAG_EVENT_TO_BOTH { - #[cfg(not(feature = "llmp_compression"))] - let event_bytes = msg; - #[cfg(feature = "llmp_compression")] - let compressed; - #[cfg(feature = "llmp_compression")] - let event_bytes = if _flags & LLMP_FLAG_COMPRESSED == LLMP_FLAG_COMPRESSED { - compressed = compressor.decompress(msg)?; - &compressed - } else { - msg - }; - let event: Event = postcard::from_bytes(event_bytes)?; - match Self::handle_in_broker(monitor, client_id, &event)? { - BrokerEventResult::Forward => Ok(llmp::LlmpMsgHookResult::ForwardToClients), - BrokerEventResult::Handled => Ok(llmp::LlmpMsgHookResult::Handled), - } - } else { - Ok(llmp::LlmpMsgHookResult::ForwardToClients) - } - }, - Some(Duration::from_millis(5)), - ); - - #[cfg(all(feature = "std", feature = "llmp_debug"))] - println!("The last client quit. Exiting."); - - Err(Error::shutting_down()) - } - - /// Run in the broker until all clients exit - #[cfg(feature = "llmp_broker_timeouts")] - pub fn broker_loop(&mut self) -> Result<(), Error> { - let monitor = &mut self.monitor; - #[cfg(feature = "llmp_compression")] - let compressor = &self.compressor; - self.llmp.loop_with_timeouts( - &mut |msg_or_timeout| { - if let Some((client_id, tag, _flags, msg)) = msg_or_timeout { - if tag == LLMP_TAG_EVENT_TO_BOTH { - #[cfg(not(feature = "llmp_compression"))] - let event_bytes = msg; - #[cfg(feature = "llmp_compression")] - let compressed; - #[cfg(feature = "llmp_compression")] - let event_bytes = if _flags & LLMP_FLAG_COMPRESSED == LLMP_FLAG_COMPRESSED { - compressed = compressor.decompress(msg)?; - &compressed - } else { - msg - }; - let event: Event = postcard::from_bytes(event_bytes)?; - match Self::handle_in_broker(monitor, client_id, &event)? { - BrokerEventResult::Forward => { - Ok(llmp::LlmpMsgHookResult::ForwardToClients) - } - BrokerEventResult::Handled => Ok(llmp::LlmpMsgHookResult::Handled), - } - } else { - Ok(llmp::LlmpMsgHookResult::ForwardToClients) - } - } else { - monitor.display("Broker Heartbeat", ClientId(0)); - Ok(llmp::LlmpMsgHookResult::Handled) - } - }, - Duration::from_secs(30), - Some(Duration::from_millis(5)), - ); - - #[cfg(feature = "llmp_debug")] - println!("The last client quit. Exiting."); - - Err(Error::shutting_down()) - } - /// Handle arriving events in the broker #[allow(clippy::unnecessary_wraps)] fn handle_in_broker( diff --git a/libafl/src/events/llmp/mod.rs b/libafl/src/events/llmp/mod.rs index 318c751e16..18d5e73f18 100644 --- a/libafl/src/events/llmp/mod.rs +++ b/libafl/src/events/llmp/mod.rs @@ -24,14 +24,14 @@ use crate::{ Error, HasMetadata, }; -/// The llmp broker -pub mod broker; -pub use broker::*; - /// The llmp event manager pub mod mgr; pub use mgr::*; +/// The llmp hooks +pub mod hooks; +pub use hooks::*; + /// The llmp restarting manager #[cfg(feature = "std")] pub mod restarting; diff --git a/libafl/src/events/llmp/restarting.rs b/libafl/src/events/llmp/restarting.rs index 52f5cbdb56..18dcd9a2b2 100644 --- a/libafl/src/events/llmp/restarting.rs +++ b/libafl/src/events/llmp/restarting.rs @@ -8,7 +8,7 @@ use alloc::vec::Vec; use core::ptr::addr_of_mut; #[cfg(feature = "std")] use core::sync::atomic::{compiler_fence, Ordering}; -#[cfg(all(feature = "std", feature = "adaptive_serialization"))] +#[cfg(feature = "std")] use core::time::Duration; use core::{marker::PhantomData, num::NonZeroUsize}; #[cfg(feature = "std")] @@ -24,11 +24,11 @@ use libafl_bolts::os::unix_signals::setup_signal_handler; use libafl_bolts::os::{fork, ForkResult}; #[cfg(feature = "adaptive_serialization")] use libafl_bolts::tuples::{Handle, Handled}; +use libafl_bolts::{llmp::LlmpBroker, shmem::ShMemProvider, tuples::tuple_list}; #[cfg(feature = "std")] use libafl_bolts::{ llmp::LlmpConnection, os::CTRL_C_EXIT, shmem::StdShMemProvider, staterestore::StateRestorer, }; -use libafl_bolts::{shmem::ShMemProvider, tuples::tuple_list}; use serde::{Deserialize, Serialize}; #[cfg(feature = "std")] use typed_builder::TypedBuilder; @@ -44,8 +44,8 @@ use crate::observers::TimeObserver; use crate::{ events::{ hooks::EventManagerHooksTuple, Event, EventConfig, EventFirer, EventManager, - EventManagerId, EventProcessor, EventRestarter, HasEventManagerId, LlmpEventBroker, - LlmpEventManager, LlmpShouldSaveState, ProgressReporter, + EventManagerId, EventProcessor, EventRestarter, HasEventManagerId, LlmpEventManager, + LlmpShouldSaveState, ProgressReporter, StdLlmpEventHook, }, executors::{Executor, HasObservers}, fuzzer::{Evaluator, EvaluatorObservers, ExecutionProcessor}, @@ -323,7 +323,7 @@ pub enum ManagerKind { /// The CPU core ID of this client cpu_core: Option, }, - /// A [`crate::events::llmp::broker::LlmpEventBroker`], forwarding the packets of local clients. + /// An [`LlmpBroker`], forwarding the packets of local clients. Broker, } @@ -344,8 +344,8 @@ pub fn setup_restarting_mgr_std( Error, > where - MT: Monitor + Clone, - S: State + HasExecutions, + MT: Monitor + Clone + 'static, + S: State + HasExecutions + 'static, { RestartingMgr::builder() .shmem_provider(StdShMemProvider::new()?) @@ -375,8 +375,8 @@ pub fn setup_restarting_mgr_std( Error, > where - MT: Monitor + Clone, - S: State + HasExecutions, + MT: Monitor + Clone + 'static, + S: State + HasExecutions + 'static, { RestartingMgr::builder() .shmem_provider(StdShMemProvider::new()?) @@ -448,8 +448,8 @@ impl RestartingMgr where EMH: EventManagerHooksTuple + Copy + Clone, SP: ShMemProvider, - S: State + HasExecutions, - MT: Monitor + Clone, + S: State + HasExecutions + 'static, + MT: Monitor + Clone + 'static, { /// Launch the broker and the clients and fuzz pub fn launch(&mut self) -> Result<(Option, LlmpRestartingEventManager), Error> { @@ -457,18 +457,24 @@ where let (staterestorer, new_shmem_provider, core_id) = if std::env::var(_ENV_FUZZER_SENDER) .is_err() { - let broker_things = |mut broker: LlmpEventBroker, - remote_broker_addr| { + let broker_things = |mut broker: LlmpBroker<_, SP>, remote_broker_addr| { if let Some(remote_broker_addr) = remote_broker_addr { log::info!("B2b: Connecting to {:?}", &remote_broker_addr); - broker.connect_b2b(remote_broker_addr)?; + broker.state_mut().connect_b2b(remote_broker_addr)?; }; if let Some(exit_cleanly_after) = self.exit_cleanly_after { - broker.set_exit_cleanly_after(exit_cleanly_after); + broker + .state_mut() + .set_exit_cleanly_after(exit_cleanly_after); } - broker.broker_loop() + broker.loop_with_timeouts(Duration::from_secs(30), Some(Duration::from_millis(5))); + + #[cfg(feature = "llmp_debug")] + log::info!("The last client quit. Exiting."); + + Err(Error::shutting_down()) }; // We get here if we are on Unix, or we are a broker on Windows (or without forks). let (mgr, core_id) = match self.kind { @@ -477,8 +483,7 @@ where LlmpConnection::on_port(self.shmem_provider.clone(), self.broker_port)?; match connection { LlmpConnection::IsBroker { broker } => { - let event_broker = LlmpEventBroker::::new( - broker, + let llmp_hook = StdLlmpEventHook::::new( self.monitor.take().unwrap(), )?; @@ -487,7 +492,10 @@ where "Doing broker things. Run this tool again to start fuzzing in a client." ); - broker_things(event_broker, self.remote_broker_addr)?; + broker_things( + broker.add_hooks(tuple_list!(llmp_hook)), + self.remote_broker_addr, + )?; return Err(Error::shutting_down()); } @@ -511,13 +519,15 @@ where } } ManagerKind::Broker => { - let event_broker = LlmpEventBroker::::on_port( + let llmp_hook = StdLlmpEventHook::new(self.monitor.take().unwrap())?; + + let broker = LlmpBroker::create_attach_to_tcp( self.shmem_provider.clone(), - self.monitor.take().unwrap(), + tuple_list!(llmp_hook), self.broker_port, )?; - broker_things(event_broker, self.remote_broker_addr)?; + broker_things(broker, self.remote_broker_addr)?; unreachable!("The broker may never return normally, only on errors or when shutting down."); } ManagerKind::Client { cpu_core } => { diff --git a/libafl/src/events/tcp.rs b/libafl/src/events/tcp.rs index 2cf5b2cb13..fd7cd74072 100644 --- a/libafl/src/events/tcp.rs +++ b/libafl/src/events/tcp.rs @@ -772,7 +772,7 @@ where Ok(()) => { self.tcp.set_nonblocking(false).expect("set to blocking"); let len = u32::from_le_bytes(len_buf); - let mut buf = vec![0_u8; len as usize + 4_usize]; + let mut buf = vec![0_u8; 4_usize + len as usize]; self.tcp.read_exact(&mut buf)?; let mut client_id_buf = [0_u8; 4]; diff --git a/libafl_bolts/examples/llmp_test/main.rs b/libafl_bolts/examples/llmp_test/main.rs index 0735dac0bb..79110bfaf1 100644 --- a/libafl_bolts/examples/llmp_test/main.rs +++ b/libafl_bolts/examples/llmp_test/main.rs @@ -5,15 +5,18 @@ extern crate alloc; #[cfg(not(target_os = "haiku"))] use core::time::Duration; +use std::marker::PhantomData; #[cfg(all(feature = "std", not(target_os = "haiku")))] use std::{num::NonZeroUsize, thread, time}; +use libafl_bolts::{bolts_prelude::LlmpMsgHookResult, llmp::LlmpBrokerState}; #[cfg(all(feature = "std", not(target_os = "haiku")))] use libafl_bolts::{ - llmp::{self, Tag}, + llmp::{self, Flags, LlmpHook, Tag}, shmem::{ShMemProvider, StdShMemProvider}, ClientId, Error, SimpleStderrLogger, }; +use tuple_list::tuple_list; #[cfg(all(feature = "std", not(target_os = "haiku")))] const _TAG_SIMPLE_U32_V1: Tag = Tag(0x5130_0321); @@ -90,39 +93,68 @@ fn large_msg_loop(port: u16) -> Result<(), Box> { } } -#[allow(clippy::unnecessary_wraps)] +pub struct LlmpExampleHook { + phantom: PhantomData, +} + +impl LlmpExampleHook { + #[must_use] + pub fn new() -> Self { + Self { + phantom: PhantomData, + } + } +} + +impl Default for LlmpExampleHook { + fn default() -> Self { + Self::new() + } +} + #[cfg(all(feature = "std", not(target_os = "haiku")))] -fn broker_message_hook( - msg_or_timeout: Option<(ClientId, llmp::Tag, llmp::Flags, &[u8])>, -) -> Result { - let Some((client_id, tag, _flags, message)) = msg_or_timeout else { +impl LlmpHook for LlmpExampleHook +where + SP: ShMemProvider + 'static, +{ + fn on_new_message( + &mut self, + _llmp_broker_state: &mut LlmpBrokerState, + client_id: ClientId, + msg_tag: &mut Tag, + _msg_flags: &mut Flags, + msg: &mut [u8], + ) -> Result { + match *msg_tag { + _TAG_SIMPLE_U32_V1 => { + println!( + "Client {:?} sent message: {:?}", + client_id, + u32::from_le_bytes(msg.try_into()?) + ); + Ok(LlmpMsgHookResult::ForwardToClients) + } + _TAG_MATH_RESULT_V1 => { + println!( + "Adder Client has this current result: {:?}", + u32::from_le_bytes(msg.try_into()?) + ); + Ok(LlmpMsgHookResult::Handled) + } + _ => { + println!("Unknown message id received: {msg_tag:?}"); + Ok(LlmpMsgHookResult::ForwardToClients) + } + } + } + + fn on_timeout(&mut self) -> Result<(), Error> { println!( "No client did anything for {} seconds..", BROKER_TIMEOUT.as_secs() ); - return Ok(llmp::LlmpMsgHookResult::Handled); - }; - match tag { - _TAG_SIMPLE_U32_V1 => { - println!( - "Client {:?} sent message: {:?}", - client_id, - u32::from_le_bytes(message.try_into()?) - ); - Ok(llmp::LlmpMsgHookResult::ForwardToClients) - } - _TAG_MATH_RESULT_V1 => { - println!( - "Adder Client has this current result: {:?}", - u32::from_le_bytes(message.try_into()?) - ); - Ok(llmp::LlmpMsgHookResult::Handled) - } - _ => { - println!("Unknown message id received: {tag:?}"); - Ok(llmp::LlmpMsgHookResult::ForwardToClients) - } + Ok(()) } } @@ -154,26 +186,26 @@ fn main() -> Result<(), Box> { match mode.as_str() { "broker" => { - let mut broker = llmp::LlmpBroker::new(StdShMemProvider::new()?)?; - broker.launch_tcp_listener_on(port)?; + let mut broker = llmp::LlmpBroker::new( + StdShMemProvider::new()?, + tuple_list!(LlmpExampleHook::new()), + )?; + broker.state_mut().launch_tcp_listener_on(port)?; // Exit when we got at least _n_ nodes, and all of them quit. - broker.set_exit_cleanly_after(NonZeroUsize::new(1_usize).unwrap()); - broker.loop_with_timeouts( - &mut broker_message_hook, - BROKER_TIMEOUT, - Some(SLEEP_BETWEEN_FORWARDS), - ); + broker + .state_mut() + .set_exit_cleanly_after(NonZeroUsize::new(1_usize).unwrap()); + broker.loop_with_timeouts(BROKER_TIMEOUT, Some(SLEEP_BETWEEN_FORWARDS)); } "b2b" => { - let mut broker = llmp::LlmpBroker::new(StdShMemProvider::new()?)?; - broker.launch_tcp_listener_on(b2b_port)?; + let mut broker = llmp::LlmpBroker::new( + StdShMemProvider::new()?, + tuple_list!(LlmpExampleHook::new()), + )?; + broker.state_mut().launch_tcp_listener_on(b2b_port)?; // connect back to the main broker. - broker.connect_b2b(("127.0.0.1", port))?; - broker.loop_with_timeouts( - &mut broker_message_hook, - BROKER_TIMEOUT, - Some(SLEEP_BETWEEN_FORWARDS), - ); + broker.state_mut().connect_b2b(("127.0.0.1", port))?; + broker.loop_with_timeouts(BROKER_TIMEOUT, Some(SLEEP_BETWEEN_FORWARDS)); } "ctr" => { let mut client = diff --git a/libafl_bolts/src/core_affinity.rs b/libafl_bolts/src/core_affinity.rs index b756854c8f..da06b5b160 100644 --- a/libafl_bolts/src/core_affinity.rs +++ b/libafl_bolts/src/core_affinity.rs @@ -235,7 +235,7 @@ fn set_for_current_helper(core_id: CoreId) -> Result<(), Error> { ))] mod linux { use alloc::{string::ToString, vec::Vec}; - use std::mem; + use core::mem::{size_of, zeroed}; #[cfg(not(target_os = "freebsd"))] use libc::cpu_set_t; @@ -276,7 +276,7 @@ mod linux { let result = unsafe { sched_setaffinity( 0, // Defaults to current thread - mem::size_of::(), + size_of::(), &set, ) }; @@ -295,7 +295,7 @@ mod linux { let result = unsafe { sched_getaffinity( 0, // Defaults to current thread - mem::size_of::(), + size_of::(), &mut set, ) }; @@ -310,7 +310,7 @@ mod linux { } fn new_cpu_set() -> cpu_set_t { - unsafe { mem::zeroed::() } + unsafe { zeroed::() } } #[cfg(test)] diff --git a/libafl_bolts/src/llmp.rs b/libafl_bolts/src/llmp.rs index 8ebef1dd03..4ca076bc15 100644 --- a/libafl_bolts/src/llmp.rs +++ b/libafl_bolts/src/llmp.rs @@ -48,7 +48,7 @@ also need to create a new [`ShMem`] each time their bufs are filled up. To use, you will have to create a broker using [`LlmpBroker::new()`]. Then, create some [`LlmpClient`]`s` in other threads and register them -with the main thread using [`LlmpBroker::register_client`]. +with the main thread using [`LlmpBrokerState::register_client`]. Finally, call [`LlmpBroker::loop_forever()`]. For broker2broker communication, all messages are forwarded via network sockets. @@ -90,6 +90,7 @@ use backtrace::Backtrace; #[cfg(not(any(target_os = "solaris", target_os = "illumos")))] use nix::sys::socket::{self, sockopt::ReusePort}; use serde::{Deserialize, Serialize}; +use tuple_list::tuple_list; #[cfg(all(unix, not(miri)))] use crate::os::unix_signals::setup_signal_handler; @@ -639,7 +640,8 @@ pub struct LlmpMsg { /// The message we receive impl LlmpMsg { - /// Gets the buffer from this message as slice, with the corrent length. + /// Gets the buffer from this message as slice, with the correct length. + /// /// # Safety /// This is unsafe if somebody has access to shared mem pages on the system. #[must_use] @@ -647,6 +649,15 @@ impl LlmpMsg { slice::from_raw_parts(self.buf.as_ptr(), self.buf_len as usize) } + /// Gets the buffer from this message as a mutable slice, with the correct length. + /// + /// # Safety + /// This is unsafe if somebody has access to shared mem pages on the system. + #[must_use] + pub unsafe fn as_slice_mut_unsafe(&mut self) -> &mut [u8] { + slice::from_raw_parts_mut(self.buf.as_mut_ptr(), self.buf_len as usize) + } + /// Gets the buffer from this message as slice, with the correct length. #[inline] pub fn try_as_slice(&self, map: &mut LlmpSharedMap) -> Result<&[u8], Error> { @@ -659,6 +670,21 @@ impl LlmpMsg { } } + /// Gets the buffer from this message as mutable slice, with the correct length. + #[inline] + pub fn try_as_slice_mut( + &mut self, + map: &mut LlmpSharedMap, + ) -> Result<&mut [u8], Error> { + unsafe { + if self.in_shmem(map) { + Ok(self.as_slice_mut_unsafe()) + } else { + Err(Error::illegal_state("Current message not in page. The sharedmap get tampered with or we have a BUG.")) + } + } + } + /// Returns `true`, if the pointer is, indeed, in the page of this shared map. #[inline] pub fn in_shmem(&self, map: &mut LlmpSharedMap) -> bool { @@ -675,14 +701,14 @@ impl LlmpMsg { /// An Llmp instance #[derive(Debug)] -pub enum LlmpConnection +pub enum LlmpConnection where SP: ShMemProvider + 'static, { /// A broker and a thread using this tcp background thread IsBroker { /// The [`LlmpBroker`] of this [`LlmpConnection`]. - broker: LlmpBroker, + broker: LlmpBroker, }, /// A client, connected to the port IsClient { @@ -691,7 +717,7 @@ where }, } -impl LlmpConnection +impl LlmpConnection<(), SP> where SP: ShMemProvider, { @@ -705,8 +731,10 @@ where // We got the port. We are the broker! :) log::info!("We're the broker"); - let mut broker = LlmpBroker::new(shmem_provider)?; - let _listener_thread = broker.launch_listener(Listener::Tcp(listener))?; + let mut broker = LlmpBroker::new(shmem_provider, tuple_list!())?; + let _listener_thread = broker + .state_mut() + .launch_listener(Listener::Tcp(listener))?; Ok(LlmpConnection::IsBroker { broker }) } Err(Error::OsError(e, ..)) if e.kind() == ErrorKind::AddrInUse => { @@ -727,7 +755,7 @@ where #[cfg(feature = "std")] pub fn broker_on_port(shmem_provider: SP, port: u16) -> Result { Ok(LlmpConnection::IsBroker { - broker: LlmpBroker::create_attach_to_tcp(shmem_provider, port)?, + broker: LlmpBroker::create_attach_to_tcp(shmem_provider, tuple_list!(), port)?, }) } @@ -740,8 +768,14 @@ where let conn = LlmpConnection::IsClient { client }; Ok(conn) } +} - /// Describe this in a reproducable fashion, if it's a client +impl LlmpConnection +where + MT: LlmpHookTuple, + SP: ShMemProvider, +{ + /// Describe this in a reproducible fashion, if it's a client pub fn describe(&self) -> Result { Ok(match self { LlmpConnection::IsClient { client } => client.describe()?, @@ -753,7 +787,7 @@ where pub fn existing_client_from_description( shmem_provider: SP, description: &LlmpClientDescription, - ) -> Result, Error> { + ) -> Result, Error> { Ok(LlmpConnection::IsClient { client: LlmpClient::existing_client_from_description(shmem_provider, description)?, }) @@ -762,7 +796,7 @@ where /// Sends the given buffer over this connection, no matter if client or broker. pub fn send_buf(&mut self, tag: Tag, buf: &[u8]) -> Result<(), Error> { match self { - LlmpConnection::IsBroker { broker } => broker.send_buf(tag, buf), + LlmpConnection::IsBroker { broker } => broker.state.send_buf(tag, buf), LlmpConnection::IsClient { client } => client.send_buf(tag, buf), } } @@ -770,7 +804,9 @@ where /// Send the `buf` with given `flags`. pub fn send_buf_with_flags(&mut self, tag: Tag, buf: &[u8], flags: Flags) -> Result<(), Error> { match self { - LlmpConnection::IsBroker { broker } => broker.send_buf_with_flags(tag, flags, buf), + LlmpConnection::IsBroker { broker } => { + broker.state.send_buf_with_flags(tag, flags, buf) + } LlmpConnection::IsClient { client } => client.send_buf_with_flags(tag, flags, buf), } } @@ -793,6 +829,7 @@ pub struct LlmpPage { pub receivers_left_count: AtomicU16, #[cfg(target_pointer_width = "64")] /// The current message ID + /// It also servers to know whether the next message can be read or not. pub current_msg_id: AtomicU64, #[cfg(not(target_pointer_width = "64"))] /// The current message ID @@ -1981,9 +2018,9 @@ where } } -/// The broker (node 0) +/// The inner state of [`LlmpBroker`] #[derive(Debug)] -pub struct LlmpBroker +pub struct LlmpBrokerState where SP: ShMemProvider + 'static, { @@ -2009,6 +2046,18 @@ where shmem_provider: SP, } +/// The broker (node 0) +#[derive(Debug)] +pub struct LlmpBroker +where + SP: ShMemProvider + 'static, +{ + /// The broker + state: LlmpBrokerState, + /// Llmp hooks + hooks: HT, +} + /// A signal handler for the [`LlmpBroker`]. /// On unix, it handles signals /// On Windows - control signals (e.g., CTRL+C) @@ -2048,33 +2097,506 @@ impl CtrlHandler for LlmpShutdownSignalHandler { } } -/// The broker forwards all messages to its own bus-like broadcast map. -/// It may intercept messages passing through. -impl LlmpBroker +/// Llmp hooks +pub trait LlmpHook where SP: ShMemProvider + 'static, { - /// Create and initialize a new [`LlmpBroker`] - pub fn new(shmem_provider: SP) -> Result { - // Broker never cleans up the pages so that new - // clients may join at any time - #[cfg(feature = "std")] - { - Self::with_keep_pages(shmem_provider, true) - } + /// Hook called whenever a new message is received. It receives an llmp message as input, does + /// something with it (read, transform, forward, etc...) and decides to discard it or not. + fn on_new_message( + &mut self, + llmp_broker_state: &mut LlmpBrokerState, + client_id: ClientId, + msg_tag: &mut Tag, + msg_flags: &mut Flags, + msg: &mut [u8], + ) -> Result; - #[cfg(not(feature = "std"))] + /// Hook called whenever there is a timeout. + fn on_timeout(&mut self) -> Result<(), Error> { + Ok(()) + } +} + +/// A tuple of Llmp hooks. They are evaluated sequentially, and returns if one decides to filter out the evaluated message. +pub trait LlmpHookTuple +where + SP: ShMemProvider, +{ + /// Call all hook callbacks on new message. + fn on_new_message_all( + &mut self, + llmp_broker_state: &mut LlmpBrokerState, + client_id: ClientId, + msg_tag: &mut Tag, + msg_flags: &mut Flags, + msg: &mut [u8], + ) -> Result; + + /// Call all hook callbacks on timeout. + fn on_timeout_all(&mut self) -> Result<(), Error>; +} + +impl LlmpHookTuple for () +where + SP: ShMemProvider, +{ + fn on_new_message_all( + &mut self, + _llmp_broker_state: &mut LlmpBrokerState, + _client_id: ClientId, + _msg_tag: &mut Tag, + _msg_flags: &mut Flags, + _msg: &mut [u8], + ) -> Result { + Ok(LlmpMsgHookResult::ForwardToClients) + } + + fn on_timeout_all(&mut self) -> Result<(), Error> { + Ok(()) + } +} + +impl LlmpHookTuple for (Head, Tail) +where + Head: LlmpHook, + Tail: LlmpHookTuple, + SP: ShMemProvider + 'static, +{ + fn on_new_message_all( + &mut self, + llmp_broker_state: &mut LlmpBrokerState, + client_id: ClientId, + msg_tag: &mut Tag, + msg_flags: &mut Flags, + msg: &mut [u8], + ) -> Result { + match self + .0 + .on_new_message(llmp_broker_state, client_id, msg_tag, msg_flags, msg)? { - Self::with_keep_pages(shmem_provider, true) + LlmpMsgHookResult::Handled => { + // message handled, stop early + Ok(LlmpMsgHookResult::Handled) + } + LlmpMsgHookResult::ForwardToClients => { + // message should be forwarded, continue iterating + self.1 + .on_new_message_all(llmp_broker_state, client_id, msg_tag, msg_flags, msg) + } } } + fn on_timeout_all(&mut self) -> Result<(), Error> { + self.0.on_timeout()?; + self.1.on_timeout_all() + } +} + +impl LlmpBroker<(), SP> +where + SP: ShMemProvider + 'static, +{ + /// Add hooks to a hookless [`LlmpBroker`]. + /// We do not support replacing hooks for now. + pub fn add_hooks(self, hooks: HT) -> LlmpBroker + where + HT: LlmpHookTuple, + { + LlmpBroker { + state: self.state, + hooks, + } + } +} + +impl LlmpBroker +where + HT: LlmpHookTuple, + SP: ShMemProvider + 'static, +{ + /// Create and initialize a new [`LlmpBroker`], associated with some hooks. + pub fn new(shmem_provider: SP, hooks: HT) -> Result { + Self::with_keep_pages(shmem_provider, hooks, true) + } + /// Create and initialize a new [`LlmpBroker`] telling if it has to keep pages forever pub fn with_keep_pages( - mut shmem_provider: SP, + shmem_provider: SP, + hooks: HT, keep_pages_forever: bool, ) -> Result { Ok(LlmpBroker { + state: LlmpBrokerState::with_keep_pages(shmem_provider, keep_pages_forever)?, + hooks, + }) + } + + /// Create a new [`LlmpBroker`] attaching to a TCP port + #[cfg(feature = "std")] + pub fn create_attach_to_tcp(shmem_provider: SP, hooks: HT, port: u16) -> Result { + Ok(LlmpBroker { + state: LlmpBrokerState::create_attach_to_tcp(shmem_provider, port)?, + hooks, + }) + } + + /// Create a new [`LlmpBroker`] attaching to a TCP port and telling if it has to keep pages forever + #[cfg(feature = "std")] + pub fn with_keep_pages_attach_to_tcp( + shmem_provider: SP, + hooks: HT, + port: u16, + keep_pages_forever: bool, + ) -> Result { + Ok(LlmpBroker { + state: LlmpBrokerState::with_keep_pages_attach_to_tcp( + shmem_provider, + port, + keep_pages_forever, + )?, + hooks, + }) + } + + /// Get the inner state of the broker + pub fn state(&self) -> &LlmpBrokerState { + &self.state + } + + /// Get the inner mutable state of the broker + pub fn state_mut(&mut self) -> &mut LlmpBrokerState { + &mut self.state + } + + /// Loops unitl the last client quit, + /// forwarding and handling all incoming messages from clients. + /// 5 millis of sleep can't hurt to keep busywait not at 100% + /// On std, if you need to run code even if no update got sent, use `Self::loop_with_timeout` (needs the `std` feature). + pub fn loop_forever(&mut self, sleep_time: Option) { + #[cfg(any(all(unix, not(miri)), all(windows, feature = "std")))] + Self::setup_handlers(); + + while !self.state.is_shutting_down() { + self.broker_once() + .expect("An error occurred when brokering. Exiting."); + + if let Some(exit_after_count) = self.state.exit_cleanly_after { + if !self.state.has_clients() + && (self.state.num_clients_seen - self.state.listeners.len()) + > exit_after_count.into() + { + // No more clients connected, and the amount of clients we were waiting for was previously connected. + // exit cleanly. + break; + } + } + + #[cfg(feature = "std")] + if let Some(time) = sleep_time { + thread::sleep(time); + } + + #[cfg(not(feature = "std"))] + if let Some(time) = sleep_time { + panic!("Cannot sleep on no_std platform (requested {time:?})"); + } + } + self.state + .llmp_out + .send_buf(LLMP_TAG_EXITING, &[]) + .expect("Error when shutting down broker: Could not send LLMP_TAG_EXITING msg."); + } + + /// Loops until the last client quits, + /// forwarding and handling all incoming messages from clients. + /// Will call `on_timeout` roughly after `timeout` + /// Panics on error. + /// 5 millis of sleep can't hurt to keep busywait not at 100% + #[cfg(feature = "std")] + pub fn loop_with_timeouts(&mut self, timeout: Duration, sleep_time: Option) { + use super::current_milliseconds; + + #[cfg(any(all(unix, not(miri)), all(windows, feature = "std")))] + Self::setup_handlers(); + + let timeout = timeout.as_millis() as u64; + let mut end_time = current_milliseconds() + timeout; + + while !self.state.is_shutting_down() { + if current_milliseconds() > end_time { + self.hooks + .on_timeout_all() + .expect("An error occurred in broker timeout. Exiting."); + end_time = current_milliseconds() + timeout; + } + + if self + .broker_once() + .expect("An error occurred when brokering. Exiting.") + { + end_time = current_milliseconds() + timeout; + } + + if let Some(exit_after_count) = self.state.exit_cleanly_after { + // log::trace!( + // "Clients connected: {} && > {} - {} >= {}", + // self.has_clients(), + // self.num_clients_seen, + // self.listeners.len(), + // exit_after_count + // ); + if !self.state.has_clients() + && (self.state.num_clients_seen - self.state.listeners.len()) + >= exit_after_count.into() + { + // No more clients connected, and the amount of clients we were waiting for was previously connected. + // exit cleanly. + break; + } + } + + #[cfg(feature = "std")] + if let Some(time) = sleep_time { + thread::sleep(time); + } + + #[cfg(not(feature = "std"))] + if let Some(time) = sleep_time { + panic!("Cannot sleep on no_std platform (requested {time:?})"); + } + } + self.state + .llmp_out + .send_buf(LLMP_TAG_EXITING, &[]) + .expect("Error when shutting down broker: Could not send LLMP_TAG_EXITING msg."); + } + + /// The broker walks all pages and looks for changes, then broadcasts them on + /// its own shared page, once. + #[inline] + pub fn broker_once(&mut self) -> Result { + let mut new_messages = false; + for i in 0..self.state.llmp_clients.len() { + let client_id = self.state.llmp_clients[i].id; + match unsafe { self.handle_new_msgs(client_id) } { + Ok(has_messages) => { + new_messages = has_messages; + } + Err(Error::ShuttingDown) => { + self.state.clients_to_remove.push(client_id); + } + Err(err) => return Err(err), + } + } + + let possible_remove = self.state.clients_to_remove.len(); + if possible_remove > 0 { + self.state.clients_to_remove.sort_unstable(); + self.state.clients_to_remove.dedup(); + log::trace!("Removing {:#?}", self.state.clients_to_remove); + // rev() to make it works + // commit the change to llmp_clients + for idx in (0..self.state.llmp_clients.len()).rev() { + let client_id = self.state.llmp_clients[idx].id; + if self.state.clients_to_remove.contains(&client_id) { + log::info!("Client {:#?} wants to exit. Removing.", client_id); + self.state.llmp_clients.remove(idx); + } + } + // log::trace!("{:#?}", self.llmp_clients); + } + + self.state.clients_to_remove.clear(); + Ok(new_messages) + } + + /// Broker broadcast to its own page for all others to read + /// Returns `true` if new messages were broker-ed + #[inline] + #[allow(clippy::cast_ptr_alignment)] + unsafe fn handle_new_msgs(&mut self, client_id: ClientId) -> Result { + let mut new_messages = false; + + // TODO: We could memcpy a range of pending messages, instead of one by one. + loop { + // log::trace!("{:#?}", self.llmp_clients); + let msg = { + let pos = if (client_id.0 as usize) < self.state.llmp_clients.len() + && self.state.llmp_clients[client_id.0 as usize].id == client_id + { + // Fast path when no client before this one was removed + client_id.0 as usize + } else { + self.state + .llmp_clients + .binary_search_by_key(&client_id, |x| x.id) + .expect("Fatal error, client ID {client_id} not found in llmp_clients.") + }; + let client = &mut self.state.llmp_clients[pos]; + match client.recv()? { + None => { + // We're done handling this client + #[cfg(feature = "std")] + if new_messages { + // set the recv time + // We don't do that in recv() to keep calls to `current_time` to a minimum. + self.state.llmp_clients[pos].last_msg_time = current_time(); + } + return Ok(new_messages); + } + Some(msg) => msg, + } + }; + // We got a new message + new_messages = true; + + match (*msg).tag { + // first, handle the special, llmp-internal messages + LLMP_SLOW_RECEIVER_PANIC => { + return Err(Error::unknown(format!("The broker was too slow to handle messages of client {client_id:?} in time, so it quit. Either the client sent messages too fast, or we (the broker) got stuck!"))); + } + LLMP_TAG_CLIENT_EXIT => { + let msg_buf_len_padded = (*msg).buf_len_padded; + if (*msg).buf_len < size_of::() as u64 { + log::info!("Ignoring broken CLIENT_EXIT msg due to incorrect size. Expected {} but got {}", + msg_buf_len_padded, + size_of::() + ); + #[cfg(not(feature = "std"))] + return Err(Error::unknown(format!("Broken CLIENT_EXIT msg with incorrect size received. Expected {} but got {}", + msg_buf_len_padded, + size_of::() + ))); + } + let exitinfo = (*msg).buf.as_mut_ptr() as *mut LlmpClientExitInfo; + let client_id = ClientId((*exitinfo).client_id); + log::info!("Client exit message received!, we are removing clients whose client_group_id is {:#?}", client_id); + + self.state.clients_to_remove.push(client_id); + } + LLMP_TAG_NEW_SHM_CLIENT => { + /* This client informs us about yet another new client + add it to the list! Also, no need to forward this msg. */ + let msg_buf_len_padded = (*msg).buf_len_padded; + if (*msg).buf_len < size_of::() as u64 { + log::info!("Ignoring broken CLIENT_ADDED msg due to incorrect size. Expected {} but got {}", + msg_buf_len_padded, + size_of::() + ); + #[cfg(not(feature = "std"))] + return Err(Error::unknown(format!("Broken CLIENT_ADDED msg with incorrect size received. Expected {} but got {}", + msg_buf_len_padded, + size_of::() + ))); + } + let pageinfo = (*msg).buf.as_mut_ptr() as *mut LlmpPayloadSharedMapInfo; + match self.state.shmem_provider.shmem_from_id_and_size( + ShMemId::from_array(&(*pageinfo).shm_str), + (*pageinfo).map_size, + ) { + Ok(new_shmem) => { + let mut new_page = LlmpSharedMap::existing(new_shmem); + new_page.mark_safe_to_unmap(); + + let _new_client = self.state.add_client(LlmpReceiver { + id: ClientId(0), // will be auto-filled + current_recv_shmem: new_page, + last_msg_recvd: ptr::null_mut(), + shmem_provider: self.state.shmem_provider.clone(), + highest_msg_id: MessageId(0), + // We don't know the last received time, just assume the current time. + #[cfg(feature = "std")] + last_msg_time: current_time(), + }); + } + Err(e) => { + log::info!("Error adding client! Ignoring: {e:?}"); + #[cfg(not(feature = "std"))] + return Err(Error::unknown(format!( + "Error adding client! PANIC! {e:?}" + ))); + } + }; + } + // handle all other messages + _ => { + // The message is not specifically for use. Let the user handle it, then forward it to the clients, if necessary. + let mut should_forward_msg = true; + + let pos = if (client_id.0 as usize) < self.state.llmp_clients.len() + && self.state.llmp_clients[client_id.0 as usize].id == client_id + { + // Fast path when no client before this one was removed + client_id.0 as usize + } else { + self.state + .llmp_clients + .binary_search_by_key(&client_id, |x| x.id) + .expect("Fatal error, client ID {client_id} not found in llmp_clients.") + }; + + let map = &mut self.state.llmp_clients[pos].current_recv_shmem; + let msg_buf = (*msg).try_as_slice_mut(map)?; + if let LlmpMsgHookResult::Handled = self.hooks.on_new_message_all( + &mut self.state, + client_id, + &mut (*msg).tag, + &mut (*msg).flags, + msg_buf, + )? { + should_forward_msg = false; + } + + if should_forward_msg { + self.state_mut().forward_msg(msg)?; + } + } + } + } + } + + #[cfg(any(all(unix, not(miri)), all(windows, feature = "std")))] + fn setup_handlers() { + #[cfg(all(unix, not(miri)))] + if let Err(e) = unsafe { setup_signal_handler(ptr::addr_of_mut!(LLMP_SIGHANDLER_STATE)) } { + // We can live without a proper ctrl+c signal handler - Ignore. + log::info!("Failed to setup signal handlers: {e}"); + } else { + log::info!("Successfully setup signal handlers"); + } + + #[cfg(all(windows, feature = "std"))] + if let Err(e) = unsafe { setup_ctrl_handler(ptr::addr_of_mut!(LLMP_SIGHANDLER_STATE)) } { + // We can live without a proper ctrl+c signal handler - Ignore. + log::info!("Failed to setup control handlers: {e}"); + } else { + log::info!( + "{}: Broker successfully setup control handlers", + std::process::id().to_string() + ); + } + } +} + +/// The broker forwards all messages to its own bus-like broadcast map. +/// It may intercept messages passing through. +impl LlmpBrokerState +where + SP: ShMemProvider + 'static, +{ + /// Create and initialize a new [`LlmpBrokerState`], associated with some hooks. + pub fn new(shmem_provider: SP) -> Result { + Self::with_keep_pages(shmem_provider, true) + } + + /// Create and initialize a new [`LlmpBrokerState`] telling if it has to keep pages forever + pub fn with_keep_pages( + mut shmem_provider: SP, + keep_pages_forever: bool, + ) -> Result { + Ok(LlmpBrokerState { llmp_out: LlmpSender { id: ClientId(0), last_msg_sent: ptr::null_mut(), @@ -2089,17 +2611,17 @@ where }, llmp_clients: vec![], clients_to_remove: Vec::new(), - shmem_provider, listeners: vec![], exit_cleanly_after: None, num_clients_seen: 0, + shmem_provider, }) } /// Gets the [`ClientId`] the next client attaching to this broker will get. - /// In its current implememtation, the inner value of the next [`ClientId`] + /// In its current implementation, the inner value of the next [`ClientId`] /// is equal to `self.num_clients_seen`. - /// Calling `peek_next_client_id` mutliple times (without adding a client) will yield the same value. + /// Calling `peek_next_client_id` multiple times (without adding a client) will yield the same value. #[must_use] #[inline] pub fn peek_next_client_id(&self) -> ClientId { @@ -2110,13 +2632,13 @@ where ) } - /// Create a new [`LlmpBroker`] attaching to a TCP port + /// Create a new [`LlmpBrokerState`] attaching to a TCP port #[cfg(feature = "std")] pub fn create_attach_to_tcp(shmem_provider: SP, port: u16) -> Result { Self::with_keep_pages_attach_to_tcp(shmem_provider, port, true) } - /// Create a new [`LlmpBroker`] attaching to a TCP port and telling if it has to keep pages forever + /// Create a new [`LlmpBrokerState`] attaching to a TCP port and telling if it has to keep pages forever #[cfg(feature = "std")] pub fn with_keep_pages_attach_to_tcp( shmem_provider: SP, @@ -2125,7 +2647,8 @@ where ) -> Result { match tcp_bind(port) { Ok(listener) => { - let mut broker = LlmpBroker::with_keep_pages(shmem_provider, keep_pages_forever)?; + let mut broker = + LlmpBrokerState::with_keep_pages(shmem_provider, keep_pages_forever)?; let _listener_thread = broker.launch_listener(Listener::Tcp(listener))?; Ok(broker) } @@ -2133,7 +2656,7 @@ where } } - /// Set this broker to exit after at least `count` clients attached and all client exited. + /// Set this broker to exit after at least `n_clients` clients attached and all client exited. /// Will ignore the own listener thread, if `create_attach_to_tcp` /// /// So, if the `n_client` value is `2`, the broker will not exit after client 1 connected and disconnected, @@ -2268,48 +2791,6 @@ where Ok(()) } - /// The broker walks all pages and looks for changes, then broadcasts them on - /// its own shared page, once. - #[inline] - pub fn once(&mut self, on_new_msg: &mut F) -> Result - where - F: FnMut(ClientId, Tag, Flags, &[u8]) -> Result, - { - let mut new_messages = false; - for i in 0..self.llmp_clients.len() { - let client_id = self.llmp_clients[i].id; - match unsafe { self.handle_new_msgs(client_id, on_new_msg) } { - Ok(has_messages) => { - new_messages = has_messages; - } - Err(Error::ShuttingDown) => { - self.clients_to_remove.push(client_id); - } - Err(err) => return Err(err), - } - } - - let possible_remove = self.clients_to_remove.len(); - if possible_remove > 0 { - self.clients_to_remove.sort_unstable(); - self.clients_to_remove.dedup(); - log::trace!("Removing {:#?}", self.clients_to_remove); - // rev() to make it works - // commit the change to llmp_clients - for idx in (0..self.llmp_clients.len()).rev() { - let client_id = self.llmp_clients[idx].id; - if self.clients_to_remove.contains(&client_id) { - log::info!("Client {:#?} wants to exit. Removing.", client_id); - self.llmp_clients.remove(idx); - } - } - // log::trace!("{:#?}", self.llmp_clients); - } - - self.clients_to_remove.clear(); - Ok(new_messages) - } - /// Internal function, returns true when shuttdown is requested by a `SIGINT` signal #[inline] #[cfg(any(unix, all(windows, feature = "std")))] @@ -2318,28 +2799,6 @@ where unsafe { ptr::read_volatile(ptr::addr_of!(LLMP_SIGHANDLER_STATE.shutting_down)) } } - #[cfg(any(all(unix, not(miri)), all(windows, feature = "std")))] - fn setup_handlers() { - #[cfg(all(unix, not(miri)))] - if let Err(e) = unsafe { setup_signal_handler(ptr::addr_of_mut!(LLMP_SIGHANDLER_STATE)) } { - // We can live without a proper ctrl+c signal handler - Ignore. - log::info!("Failed to setup signal handlers: {e}"); - } else { - log::info!("Successfully setup signal handlers"); - } - - #[cfg(all(windows, feature = "std"))] - if let Err(e) = unsafe { setup_ctrl_handler(ptr::addr_of_mut!(LLMP_SIGHANDLER_STATE)) } { - // We can live without a proper ctrl+c signal handler - Ignore. - log::info!("Failed to setup control handlers: {e}"); - } else { - log::info!( - "{}: Broker successfully setup control handlers", - std::process::id().to_string() - ); - } - } - /// Always returns true on platforms, where no shutdown signal handlers are supported #[inline] #[cfg(not(any(unix, all(windows, feature = "std"))))] @@ -2356,115 +2815,6 @@ where self.llmp_clients.len() > self.listeners.len() } - /// Loops until the last client quits, - /// forwarding and handling all incoming messages from clients. - /// Will call `on_timeout` roughly after `timeout` - /// Panics on error. - /// 5 millis of sleep can't hurt to keep busywait not at 100% - #[cfg(feature = "std")] - pub fn loop_with_timeouts( - &mut self, - on_new_msg_or_timeout: &mut F, - timeout: Duration, - sleep_time: Option, - ) where - F: FnMut(Option<(ClientId, Tag, Flags, &[u8])>) -> Result, - { - use super::current_milliseconds; - - #[cfg(any(all(unix, not(miri)), all(windows, feature = "std")))] - Self::setup_handlers(); - - let timeout = timeout.as_millis() as u64; - let mut end_time = current_milliseconds() + timeout; - - while !self.is_shutting_down() { - if current_milliseconds() > end_time { - on_new_msg_or_timeout(None).expect("An error occurred in broker timeout. Exiting."); - end_time = current_milliseconds() + timeout; - } - - if self - .once(&mut |client_id, tag, flags, buf| { - on_new_msg_or_timeout(Some((client_id, tag, flags, buf))) - }) - .expect("An error occurred when brokering. Exiting.") - { - end_time = current_milliseconds() + timeout; - } - - if let Some(exit_after_count) = self.exit_cleanly_after { - // log::trace!( - // "Clients connected: {} && > {} - {} >= {}", - // self.has_clients(), - // self.num_clients_seen, - // self.listeners.len(), - // exit_after_count - // ); - if !self.has_clients() - && (self.num_clients_seen - self.listeners.len()) >= exit_after_count.into() - { - // No more clients connected, and the amount of clients we were waiting for was previously connected. - // exit cleanly. - break; - } - } - - #[cfg(feature = "std")] - if let Some(time) = sleep_time { - thread::sleep(time); - } - - #[cfg(not(feature = "std"))] - if let Some(time) = sleep_time { - panic!("Cannot sleep on no_std platform (requested {time:?})"); - } - } - self.llmp_out - .send_buf(LLMP_TAG_EXITING, &[]) - .expect("Error when shutting down broker: Could not send LLMP_TAG_EXITING msg."); - } - - /// Loops unitl the last client quit, - /// forwarding and handling all incoming messages from clients. - /// 5 millis of sleep can't hurt to keep busywait not at 100% - /// On std, if you need to run code even if no update got sent, use `Self::loop_with_timeout` (needs the `std` feature). - pub fn loop_forever(&mut self, on_new_msg: &mut F, sleep_time: Option) - where - F: FnMut(ClientId, Tag, Flags, &[u8]) -> Result, - { - #[cfg(any(all(unix, not(miri)), all(windows, feature = "std")))] - Self::setup_handlers(); - - while !self.is_shutting_down() { - self.once(on_new_msg) - .expect("An error occurred when brokering. Exiting."); - - if let Some(exit_after_count) = self.exit_cleanly_after { - if !self.has_clients() - && (self.num_clients_seen - self.listeners.len()) > exit_after_count.into() - { - // No more clients connected, and the amount of clients we were waiting for was previously connected. - // exit cleanly. - break; - } - } - - #[cfg(feature = "std")] - if let Some(time) = sleep_time { - thread::sleep(time); - } - - #[cfg(not(feature = "std"))] - if let Some(time) = sleep_time { - panic!("Cannot sleep on no_std platform (requested {time:?})"); - } - } - self.llmp_out - .send_buf(LLMP_TAG_EXITING, &[]) - .expect("Error when shutting down broker: Could not send LLMP_TAG_EXITING msg."); - } - /// Broadcasts the given buf to all clients pub fn send_buf(&mut self, tag: Tag, buf: &[u8]) -> Result<(), Error> { self.llmp_out.send_buf(tag, buf) @@ -2487,7 +2837,7 @@ where /// Announces a new client on the given shared map. /// Called from a background thread, typically. - /// Upon receiving this message, the broker should map the announced page and start trckang it for new messages. + /// Upon receiving this message, the broker should map the announced page and start tracking it for new messages. #[allow(dead_code)] fn announce_new_client( sender: &mut LlmpSender, @@ -2737,7 +3087,7 @@ where /// Launches a thread using a listener socket, on which new clients may connect to this broker pub fn launch_listener(&mut self, listener: Listener) -> Result, Error> { // Later in the execution, after the initial map filled up, - // the current broacast map will will point to a different map. + // the current broadcast map will point to a different map. // However, the original map is (as of now) never freed, new clients will start // to read from the initial map id. @@ -2793,7 +3143,7 @@ where ); // Send initial information, without anyone asking. - // This makes it a tiny bit easier to map the broker map for new Clients. + // This makes it a tiny bit easier to map the broker map for new Clients. match send_tcp_msg(&mut stream, &broker_hello) { Ok(()) => {} Err(e) => { @@ -2838,151 +3188,6 @@ where Ok(ret) } - - /// Broker broadcast to its own page for all others to read - /// Returns `true` if new messages were broker-ed - #[inline] - #[allow(clippy::cast_ptr_alignment)] - unsafe fn handle_new_msgs( - &mut self, - client_id: ClientId, - on_new_msg: &mut F, - ) -> Result - where - F: FnMut(ClientId, Tag, Flags, &[u8]) -> Result, - { - let mut new_messages = false; - - // TODO: We could memcpy a range of pending messages, instead of one by one. - loop { - // log::trace!("{:#?}", self.llmp_clients); - let msg = { - let pos = if (client_id.0 as usize) < self.llmp_clients.len() - && self.llmp_clients[client_id.0 as usize].id == client_id - { - // Fast path when no client before this one was removed - client_id.0 as usize - } else { - self.llmp_clients - .binary_search_by_key(&client_id, |x| x.id) - .expect("Fatal error, client ID {client_id} not found in llmp_clients.") - }; - let client = &mut self.llmp_clients[pos]; - match client.recv()? { - None => { - // We're done handling this client - #[cfg(feature = "std")] - if new_messages { - // set the recv time - // We don't do that in recv() to keep calls to `current_time` to a minimum. - self.llmp_clients[pos].last_msg_time = current_time(); - } - return Ok(new_messages); - } - Some(msg) => msg, - } - }; - // We got a new message - new_messages = true; - - match (*msg).tag { - // first, handle the special, llmp-internal messages - LLMP_SLOW_RECEIVER_PANIC => { - return Err(Error::unknown(format!("The broker was too slow to handle messages of client {client_id:?} in time, so it quit. Either the client sent messages too fast, or we (the broker) got stuck!"))); - } - LLMP_TAG_CLIENT_EXIT => { - let msg_buf_len_padded = (*msg).buf_len_padded; - if (*msg).buf_len < size_of::() as u64 { - log::info!("Ignoring broken CLIENT_EXIT msg due to incorrect size. Expected {} but got {}", - msg_buf_len_padded, - size_of::() - ); - #[cfg(not(feature = "std"))] - return Err(Error::unknown(format!("Broken CLIENT_EXIT msg with incorrect size received. Expected {} but got {}", - msg_buf_len_padded, - size_of::() - ))); - } - let exitinfo = (*msg).buf.as_mut_ptr() as *mut LlmpClientExitInfo; - let client_id = ClientId((*exitinfo).client_id); - log::info!("Client exit message received!, we are removing clients whose client_group_id is {:#?}", client_id); - - self.clients_to_remove.push(client_id); - } - LLMP_TAG_NEW_SHM_CLIENT => { - /* This client informs us about yet another new client - add it to the list! Also, no need to forward this msg. */ - let msg_buf_len_padded = (*msg).buf_len_padded; - if (*msg).buf_len < size_of::() as u64 { - log::info!("Ignoring broken CLIENT_ADDED msg due to incorrect size. Expected {} but got {}", - msg_buf_len_padded, - size_of::() - ); - #[cfg(not(feature = "std"))] - return Err(Error::unknown(format!("Broken CLIENT_ADDED msg with incorrect size received. Expected {} but got {}", - msg_buf_len_padded, - size_of::() - ))); - } - let pageinfo = (*msg).buf.as_mut_ptr() as *mut LlmpPayloadSharedMapInfo; - match self.shmem_provider.shmem_from_id_and_size( - ShMemId::from_array(&(*pageinfo).shm_str), - (*pageinfo).map_size, - ) { - Ok(new_shmem) => { - let mut new_page = LlmpSharedMap::existing(new_shmem); - new_page.mark_safe_to_unmap(); - - let _new_client = self.add_client(LlmpReceiver { - id: ClientId(0), // will be auto-filled - current_recv_shmem: new_page, - last_msg_recvd: ptr::null_mut(), - shmem_provider: self.shmem_provider.clone(), - highest_msg_id: MessageId(0), - // We don't know the last received time, just assume the current time. - #[cfg(feature = "std")] - last_msg_time: current_time(), - }); - } - Err(e) => { - log::info!("Error adding client! Ignoring: {e:?}"); - #[cfg(not(feature = "std"))] - return Err(Error::unknown(format!( - "Error adding client! PANIC! {e:?}" - ))); - } - }; - } - // handle all other messages - _ => { - // The message is not specifically for use. Let the user handle it, then forward it to the clients, if necessary. - let mut should_forward_msg = true; - - let pos = if (client_id.0 as usize) < self.llmp_clients.len() - && self.llmp_clients[client_id.0 as usize].id == client_id - { - // Fast path when no client before this one was removed - client_id.0 as usize - } else { - self.llmp_clients - .binary_search_by_key(&client_id, |x| x.id) - .expect("Fatal error, client ID {client_id} not found in llmp_clients.") - }; - - let map = &mut self.llmp_clients[pos].current_recv_shmem; - let msg_buf = (*msg).try_as_slice(map)?; - if let LlmpMsgHookResult::Handled = - (on_new_msg)(client_id, (*msg).tag, (*msg).flags, msg_buf)? - { - should_forward_msg = false; - } - if should_forward_msg { - self.forward_msg(msg)?; - } - } - } - } - } } /// A restorable client description @@ -3293,26 +3498,28 @@ where // We'll set `sender_id` later let mut ret = Self::new(shmem_provider, map, ClientId(0))?; + // Now sender contains 1 shmem, that must be shared back with the broker. let client_hello_req = TcpRequest::LocalClientHello { shmem_description: ret.sender.out_shmems.first().unwrap().shmem.description(), }; - send_tcp_msg(&mut stream, &client_hello_req)?; - let TcpResponse::LocalClientAccepted { client_id } = - recv_tcp_msg(&mut stream)?.try_into()? + // The broker accepted the client, and sent back an ID. + let TcpResponse::LocalClientAccepted { + client_id: client_sender_id, + } = recv_tcp_msg(&mut stream)?.try_into()? else { return Err(Error::illegal_state( "Unexpected Response from Broker".to_string(), )); }; - // Set our ID to the one the broker sent us.. + // Set our ID to the one the broker sent us. // This is mainly so we can filter out our own msgs later. - ret.sender.id = client_id; + ret.sender.id = client_sender_id; // Also set the sender on our initial llmp map correctly. unsafe { - (*ret.sender.out_shmems.first_mut().unwrap().page_mut()).sender_id = client_id; + (*ret.sender.out_shmems.first_mut().unwrap().page_mut()).sender_id = client_sender_id; } Ok(ret) @@ -3330,7 +3537,6 @@ mod tests { use super::{ LlmpClient, LlmpConnection::{self, IsBroker, IsClient}, - LlmpMsgHookResult::ForwardToClients, Tag, }; use crate::shmem::{ShMemProvider, StdShMemProvider}; @@ -3354,9 +3560,7 @@ mod tests { // Give the (background) tcp thread a few millis to post the message sleep(Duration::from_millis(100)); - broker - .once(&mut |_sender_id, _tag, _flags, _msg| Ok(ForwardToClients)) - .unwrap(); + broker.broker_once().unwrap(); let tag: Tag = Tag(0x1337); let arr: [u8; 1] = [1_u8]; @@ -3377,14 +3581,12 @@ mod tests { client.send_buf(tag, &arr).unwrap(); // Forward stuff to clients - broker - .once(&mut |_sender_id, _tag, _flags, _msg| Ok(ForwardToClients)) - .unwrap(); + broker.broker_once().unwrap(); let (_sender_id, tag2, arr2) = client.recv_buf_blocking().unwrap(); assert_eq!(tag, tag2); assert_eq!(arr[0], arr2[0]); // We want at least the tcp and sender clients. - assert_eq!(broker.llmp_clients.len(), 2); + assert_eq!(broker.state.llmp_clients.len(), 2); } } diff --git a/libafl_bolts/src/os/pipes.rs b/libafl_bolts/src/os/pipes.rs index 5299771f3d..0dea7a5faa 100644 --- a/libafl_bolts/src/os/pipes.rs +++ b/libafl_bolts/src/os/pipes.rs @@ -1,18 +1,20 @@ //! Unix `pipe` wrapper for `LibAFL` -use alloc::rc::Rc; -use core::{borrow::Borrow, cell::RefCell}; #[cfg(feature = "std")] use std::{ + borrow::Borrow, + cell::RefCell, io::{self, ErrorKind, Read, Write}, os::{ fd::{AsFd, AsRawFd, OwnedFd}, unix::io::RawFd, }, + rc::Rc, }; #[cfg(feature = "std")] use nix::unistd::{pipe, read, write}; +#[cfg(feature = "std")] use crate::Error; /// A unix pipe wrapper for `LibAFL` diff --git a/libafl_bolts/src/os/unix_signals.rs b/libafl_bolts/src/os/unix_signals.rs index 58032c0044..12d62f6604 100644 --- a/libafl_bolts/src/os/unix_signals.rs +++ b/libafl_bolts/src/os/unix_signals.rs @@ -1,6 +1,8 @@ //! Signal handling for unix #[cfg(feature = "alloc")] use alloc::vec::Vec; +#[cfg(all(target_vendor = "apple", target_arch = "aarch64"))] +use core::mem::size_of; #[cfg(feature = "alloc")] use core::{ cell::UnsafeCell, @@ -160,7 +162,7 @@ pub struct arm_thread_state64 { //#[repr(align(16))] pub struct arm_neon_state64 { /// opaque - pub opaque: [u8; (32 * 16) + (2 * mem::size_of::())], + pub opaque: [u8; (32 * 16) + (2 * size_of::())], } /// ```c diff --git a/libafl_bolts/src/shmem.rs b/libafl_bolts/src/shmem.rs index 091410a990..907f2b43ec 100644 --- a/libafl_bolts/src/shmem.rs +++ b/libafl_bolts/src/shmem.rs @@ -7,7 +7,7 @@ use alloc::{rc::Rc, string::ToString, vec::Vec}; use core::{cell::RefCell, fmt, fmt::Display, mem::ManuallyDrop}; use core::{ fmt::Debug, - mem, + mem::size_of, ops::{Deref, DerefMut}, }; #[cfg(feature = "std")] @@ -202,7 +202,7 @@ pub trait ShMem: Sized + Debug + Clone + DerefMut { /// Convert to a ptr of a given type, checking the size. /// If the map is too small, returns `None` fn as_ptr_of(&self) -> Option<*const T> { - if self.len() >= mem::size_of::() { + if self.len() >= size_of::() { Some(self.as_ptr() as *const T) } else { None @@ -212,7 +212,7 @@ pub trait ShMem: Sized + Debug + Clone + DerefMut { /// Convert to a mut ptr of a given type, checking the size. /// If the map is too small, returns `None` fn as_mut_ptr_of(&mut self) -> Option<*mut T> { - if self.len() >= mem::size_of::() { + if self.len() >= size_of::() { Some(self.as_mut_ptr() as *mut T) } else { None @@ -267,7 +267,7 @@ pub trait ShMemProvider: Clone + Default + Debug { /// Create a new shared memory mapping to hold an object of the given type, and initializes it with the given value. fn uninit_on_shmem(&mut self) -> Result { - self.new_shmem(mem::size_of::()) + self.new_shmem(size_of::()) } /// Get a mapping given a description diff --git a/libafl_libfuzzer/libafl_libfuzzer_runtime/src/fuzz.rs b/libafl_libfuzzer/libafl_libfuzzer_runtime/src/fuzz.rs index ed4caaf8b6..f671bf120a 100644 --- a/libafl_libfuzzer/libafl_libfuzzer_runtime/src/fuzz.rs +++ b/libafl_libfuzzer/libafl_libfuzzer_runtime/src/fuzz.rs @@ -139,7 +139,7 @@ fn fuzz_many_forking( monitor: M, ) -> Result<(), Error> where - M: Monitor + Clone + Debug, + M: Monitor + Clone + Debug + 'static, { destroy_output_fds(options); let broker_port = std::env::var(PORT_PROVIDER_VAR) diff --git a/libafl_targets/src/cmps/mod.rs b/libafl_targets/src/cmps/mod.rs index 03eef311eb..51ccebce30 100644 --- a/libafl_targets/src/cmps/mod.rs +++ b/libafl_targets/src/cmps/mod.rs @@ -8,7 +8,8 @@ use alloc::{alloc::alloc_zeroed, boxed::Box, vec::Vec}; use core::{ alloc::Layout, fmt::{self, Debug, Formatter}, - mem, ptr, slice, + mem::{size_of, zeroed}, + ptr, slice, }; use libafl::{ @@ -30,11 +31,11 @@ pub const CMPLOG_RTN_LEN: usize = 32; /// The hight of a cmplog routine map pub const CMPLOG_MAP_RTN_H: usize = - (CMPLOG_MAP_H * mem::size_of::()) / mem::size_of::(); + (CMPLOG_MAP_H * size_of::()) / size_of::(); /// The height of extended rountine map pub const CMPLOG_MAP_RTN_EXTENDED_H: usize = - CMPLOG_MAP_H * mem::size_of::() / mem::size_of::(); + CMPLOG_MAP_H * size_of::() / size_of::(); /// `CmpLog` instruction kind pub const CMPLOG_KIND_INS: u8 = 0; @@ -318,7 +319,7 @@ pub struct CmpLogMap { impl Default for CmpLogMap { fn default() -> Self { - unsafe { mem::zeroed() } + unsafe { zeroed() } } } @@ -474,9 +475,8 @@ impl Serialize for AFLppCmpLogMap { where S: Serializer, { - let slice = unsafe { - slice::from_raw_parts(ptr::from_ref(self) as *const u8, mem::size_of::()) - }; + let slice = + unsafe { slice::from_raw_parts(ptr::from_ref(self) as *const u8, size_of::()) }; serializer.serialize_bytes(slice) } } diff --git a/libafl_targets/src/sancov_8bit.rs b/libafl_targets/src/sancov_8bit.rs index 89f7118a2c..4f2d3ed8d2 100644 --- a/libafl_targets/src/sancov_8bit.rs +++ b/libafl_targets/src/sancov_8bit.rs @@ -64,6 +64,7 @@ mod observers { fmt::Debug, hash::{Hash, Hasher}, iter::Flatten, + mem::size_of, ptr::{addr_of, addr_of_mut}, slice::{from_raw_parts, Iter, IterMut}, }; @@ -161,7 +162,7 @@ mod observers { for map in unsafe { &*addr_of!(COUNTERS_MAPS) } { let slice = map.as_slice(); let ptr = slice.as_ptr(); - let map_size = slice.len() / core::mem::size_of::(); + let map_size = slice.len() / size_of::(); unsafe { hasher.write(from_raw_parts(ptr, map_size)); } diff --git a/libafl_targets/src/sancov_pcguard.rs b/libafl_targets/src/sancov_pcguard.rs index 0d09dd084e..ef9c5fe192 100644 --- a/libafl_targets/src/sancov_pcguard.rs +++ b/libafl_targets/src/sancov_pcguard.rs @@ -3,7 +3,7 @@ #[rustversion::nightly] #[cfg(feature = "sancov_ngram4")] use core::simd::num::SimdUint; -use core::{mem, ptr, slice}; +use core::{mem::align_of, ptr, slice}; #[cfg(any(feature = "sancov_ngram4", feature = "sancov_ctx"))] use libafl::executors::{hooks::ExecutorHook, HasObservers}; @@ -346,7 +346,7 @@ pub fn sanitizer_cov_pc_table() -> Option<&'static [PcTableEntry]> { "PC Table size is not evens - start: {PCS_BEG:x?} end: {PCS_END:x?}" ); assert_eq!( - (PCS_BEG as usize) % mem::align_of::(), + (PCS_BEG as usize) % align_of::(), 0, "Unaligned PC Table - start: {PCS_BEG:x?} end: {PCS_END:x?}" );