From 98d3dfe821abe99bbdc080b421e0257bcc808b06 Mon Sep 17 00:00:00 2001 From: "Dongjia \"toka\" Zhang" Date: Thu, 4 Apr 2024 21:01:11 +0200 Subject: [PATCH] Replace LLMP client timeout with client deregistration (#1982) * wip * done * fix * fix * some fixes * stuff * fmt * stuff * use * not 1337 * comment * move functions around * fix * fix * doc * mistake * aa * fixer * wipe out restarter id * denig * fix * fix * include * fix * fix * fix * clp * fix * fix * error log --- fuzzers/nautilus_sync/src/lib.rs | 5 +- libafl/src/events/centralized.rs | 12 +- libafl/src/events/launcher.rs | 17 +- libafl/src/events/llmp.rs | 80 +++++--- libafl_bolts/examples/llmp_test/main.rs | 15 +- libafl_bolts/src/lib.rs | 6 +- libafl_bolts/src/llmp.rs | 244 ++++++++++++------------ 7 files changed, 205 insertions(+), 174 deletions(-) diff --git a/fuzzers/nautilus_sync/src/lib.rs b/fuzzers/nautilus_sync/src/lib.rs index e4f8c95c3c..e368cc1d9b 100644 --- a/fuzzers/nautilus_sync/src/lib.rs +++ b/fuzzers/nautilus_sync/src/lib.rs @@ -6,7 +6,7 @@ static GLOBAL: MiMalloc = MiMalloc; use std::ptr::write_volatile; use std::{env, net::SocketAddr, path::PathBuf, time::Duration}; -use clap::{self, Parser}; +use clap::Parser; use libafl::{ corpus::{InMemoryCorpus, OnDiskCorpus}, events::{launcher::Launcher, llmp::LlmpEventConverter, EventConfig}, @@ -130,6 +130,9 @@ pub extern "C" fn libafl_main() { .unwrap() }); + // to disconnect the event coverter from the broker later + // call detach_from_broker( port) + let mut run_client = |state: Option<_>, mut mgr, _core_id| { let mut bytes = vec![]; diff --git a/libafl/src/events/centralized.rs b/libafl/src/events/centralized.rs index 86c42e3de7..22208064e3 100644 --- a/libafl/src/events/centralized.rs +++ b/libafl/src/events/centralized.rs @@ -93,15 +93,10 @@ where /// /// The port must not be bound yet to have a broker. #[cfg(feature = "std")] - pub fn on_port(shmem_provider: SP, port: u16, client_timeout: Duration) -> Result { + pub fn on_port(shmem_provider: SP, port: u16) -> Result { Ok(Self { // TODO switch to false after solving the bug - llmp: LlmpBroker::with_keep_pages_attach_to_tcp( - shmem_provider, - port, - true, - client_timeout, - )?, + llmp: LlmpBroker::with_keep_pages_attach_to_tcp(shmem_provider, port, true)?, #[cfg(feature = "llmp_compression")] compressor: GzipCompressor::new(COMPRESS_THRESHOLD), phantom: PhantomData, @@ -509,9 +504,10 @@ where /// will act as a client. #[cfg(feature = "std")] pub fn on_port(inner: EM, shmem_provider: SP, port: u16, is_main: bool) -> Result { + let client = LlmpClient::create_attach_to_tcp(shmem_provider, port)?; Ok(Self { inner, - client: LlmpClient::create_attach_to_tcp(shmem_provider, port)?, + client, #[cfg(feature = "llmp_compression")] compressor: GzipCompressor::new(COMPRESS_THRESHOLD), is_main, diff --git a/libafl/src/events/launcher.rs b/libafl/src/events/launcher.rs index 82da6c1806..bdc74e074e 100644 --- a/libafl/src/events/launcher.rs +++ b/libafl/src/events/launcher.rs @@ -15,10 +15,11 @@ use alloc::string::ToString; #[cfg(feature = "std")] use core::marker::PhantomData; +#[cfg(all(unix, feature = "std", feature = "fork"))] +use core::time::Duration; use core::{ fmt::{self, Debug, Formatter}, num::NonZeroUsize, - time::Duration, }; #[cfg(feature = "std")] use std::net::SocketAddr; @@ -36,7 +37,6 @@ use libafl_bolts::{ }; use libafl_bolts::{ core_affinity::{CoreId, Cores}, - llmp::DEFAULT_CLIENT_TIMEOUT_SECS, shmem::ShMemProvider, tuples::tuple_list, }; @@ -122,9 +122,6 @@ where /// Then, clients launched by this [`Launcher`] can connect to the original `broker`. #[builder(default = true)] spawn_broker: bool, - /// The timeout duration used for llmp client timeout - #[builder(default = DEFAULT_CLIENT_TIMEOUT_SECS)] - client_timeout: Duration, /// Tell the manager to serialize or not the state on restart #[builder(default = LlmpShouldSaveState::OnRestart)] serialize_state: LlmpShouldSaveState, @@ -261,7 +258,6 @@ where }) .configuration(self.configuration) .serialize_state(self.serialize_state) - .client_timeout(self.client_timeout) .hooks(hooks) .build() .launch()?; @@ -286,7 +282,6 @@ where .exit_cleanly_after(Some(NonZeroUsize::try_from(self.cores.ids.len()).unwrap())) .configuration(self.configuration) .serialize_state(self.serialize_state) - .client_timeout(self.client_timeout) .hooks(hooks) .build() .launch()?; @@ -339,7 +334,6 @@ where }) .configuration(self.configuration) .serialize_state(self.serialize_state) - .client_timeout(self.client_timeout) .hooks(hooks) .build() .launch()?; @@ -410,7 +404,6 @@ where .exit_cleanly_after(Some(NonZeroUsize::try_from(self.cores.ids.len()).unwrap())) .configuration(self.configuration) .serialize_state(self.serialize_state) - .client_timeout(self.client_timeout) .hooks(hooks) .build() .launch()?; @@ -495,9 +488,6 @@ where /// Tell the manager to serialize or not the state on restart #[builder(default = LlmpShouldSaveState::OnRestart)] serialize_state: LlmpShouldSaveState, - /// The duration for the llmp client timeout - #[builder(default = DEFAULT_CLIENT_TIMEOUT_SECS)] - client_timeout: Duration, #[builder(setter(skip), default = PhantomData)] phantom_data: PhantomData<(&'a S, &'a SP)>, } @@ -589,7 +579,6 @@ where CentralizedLlmpEventBroker::on_port( self.shmem_provider.clone(), self.centralized_broker_port, - self.client_timeout, )?; broker.broker_loop()?; } @@ -636,7 +625,6 @@ where }) .configuration(self.configuration) .serialize_state(self.serialize_state) - .client_timeout(self.client_timeout) .hooks(tuple_list!()) .build() .launch()?; @@ -667,7 +655,6 @@ where .exit_cleanly_after(Some(NonZeroUsize::try_from(self.cores.ids.len()).unwrap())) .configuration(self.configuration) .serialize_state(self.serialize_state) - .client_timeout(self.client_timeout) .hooks(tuple_list!()) .build() .launch()?; diff --git a/libafl/src/events/llmp.rs b/libafl/src/events/llmp.rs index 329dd809de..de82425144 100644 --- a/libafl/src/events/llmp.rs +++ b/libafl/src/events/llmp.rs @@ -1,5 +1,7 @@ //! LLMP-backed event manager for scalable multi-processed fuzzing +#[cfg(feature = "std")] +use alloc::string::ToString; use alloc::{boxed::Box, vec::Vec}; #[cfg(all(unix, not(miri), feature = "std"))] use core::ptr::addr_of_mut; @@ -7,14 +9,12 @@ use core::ptr::addr_of_mut; use core::sync::atomic::{compiler_fence, Ordering}; use core::{marker::PhantomData, num::NonZeroUsize, time::Duration}; #[cfg(feature = "std")] +use std::net::TcpStream; +#[cfg(feature = "std")] use std::net::{SocketAddr, ToSocketAddrs}; -#[cfg(feature = "std")] -use libafl_bolts::core_affinity::CoreId; #[cfg(feature = "adaptive_serialization")] use libafl_bolts::current_time; -#[cfg(feature = "std")] -use libafl_bolts::llmp::DEFAULT_CLIENT_TIMEOUT_SECS; #[cfg(all(feature = "std", any(windows, not(feature = "fork"))))] use libafl_bolts::os::startable_self; #[cfg(all(unix, feature = "std", not(miri)))] @@ -27,6 +27,12 @@ use libafl_bolts::{ llmp::{LLMP_FLAG_COMPRESSED, LLMP_FLAG_INITIALIZED}, }; #[cfg(feature = "std")] +use libafl_bolts::{ + core_affinity::CoreId, + llmp::{recv_tcp_msg, send_tcp_msg, TcpRequest, TcpResponse}, + IP_LOCALHOST, +}; +#[cfg(feature = "std")] use libafl_bolts::{llmp::LlmpConnection, shmem::StdShMemProvider, staterestore::StateRestorer}; use libafl_bolts::{ llmp::{self, LlmpClient, LlmpClientDescription, Tag}, @@ -106,15 +112,10 @@ where /// /// The port must not be bound yet to have a broker. #[cfg(feature = "std")] - pub fn on_port( - shmem_provider: SP, - monitor: MT, - port: u16, - client_timeout: Duration, - ) -> Result { + pub fn on_port(shmem_provider: SP, monitor: MT, port: u16) -> Result { Ok(Self { monitor, - llmp: llmp::LlmpBroker::create_attach_to_tcp(shmem_provider, port, client_timeout)?, + llmp: llmp::LlmpBroker::create_attach_to_tcp(shmem_provider, port)?, #[cfg(feature = "llmp_compression")] compressor: GzipCompressor::new(COMPRESS_THRESHOLD), phantom: PhantomData, @@ -517,6 +518,38 @@ where S: State, SP: ShMemProvider + 'static, { + /// Calling this function will tell the llmp broker that this client is exiting + /// This should be called from the restarter not from the actual fuzzer client + /// This function serves the same roll as the `LlmpClient.send_exiting()` + /// However, from the the event restarter process it is forbidden to call `send_exiting()` + /// (You can call it and it compiles but you should never do so) + /// `send_exiting()` is exclusive to the fuzzer client. + #[cfg(feature = "std")] + pub fn detach_from_broker(&self, broker_port: u16) -> Result<(), Error> { + let client_id = self.llmp.sender().id(); + let Ok(mut stream) = TcpStream::connect((IP_LOCALHOST, broker_port)) else { + log::error!("Connection refused."); + return Ok(()); + }; + // The broker tells us hello we don't care we just tell it our client died + let TcpResponse::BrokerConnectHello { + broker_shmem_description: _, + hostname: _, + } = recv_tcp_msg(&mut stream)?.try_into()? + else { + return Err(Error::illegal_state( + "Received unexpected Broker Hello".to_string(), + )); + }; + let msg = TcpRequest::ClientQuit { client_id }; + // Send this mesasge off and we are leaving. + match send_tcp_msg(&mut stream, &msg) { + Ok(_) => (), + Err(e) => log::error!("Failed to send tcp message {:#?}", e), + } + log::info!("Asking he broker to be disconnected"); + Ok(()) + } /// Create a manager from a raw LLMP client with hooks pub fn with_hooks( llmp: LlmpClient, @@ -546,6 +579,7 @@ where /// /// If the port is not yet bound, it will act as a broker; otherwise, it /// will act as a client. + /// This will make a new connection to the broker so will return its new [`ClientId`], too #[cfg(feature = "std")] pub fn on_port_with_hooks( shmem_provider: SP, @@ -1258,9 +1292,6 @@ where /// Tell the manager to serialize or not the state on restart #[builder(default = LlmpShouldSaveState::OnRestart)] serialize_state: LlmpShouldSaveState, - /// The timeout duration used for llmp client timeout - #[builder(default = DEFAULT_CLIENT_TIMEOUT_SECS)] - client_timeout: Duration, /// The hooks passed to event manager: hooks: EMH, #[builder(setter(skip), default = PhantomData)] @@ -1308,15 +1339,11 @@ where broker.broker_loop() }; - // We get here if we are on Unix, or we are a broker on Windows (or without forks). let (mgr, core_id) = match self.kind { ManagerKind::Any => { - let connection = LlmpConnection::on_port( - self.shmem_provider.clone(), - self.broker_port, - self.client_timeout, - )?; + let connection = + LlmpConnection::on_port(self.shmem_provider.clone(), self.broker_port)?; match connection { LlmpConnection::IsBroker { broker } => { let event_broker = LlmpEventBroker::::new( @@ -1348,7 +1375,6 @@ where self.shmem_provider.clone(), self.monitor.take().unwrap(), self.broker_port, - self.client_timeout, )?; broker_things(event_broker, self.remote_broker_addr)?; @@ -1435,21 +1461,26 @@ where #[allow(clippy::manual_assert)] if !staterestorer.has_content() && !self.serialize_state.oom_safe() { + if let Err(err) = mgr.detach_from_broker(self.broker_port) { + log::error!("Failed to detach from broker: {err}"); + } #[cfg(unix)] if child_status == 137 { // Out of Memory, see https://tldp.org/LDP/abs/html/exitcodes.html // and https://github.com/AFLplusplus/LibAFL/issues/32 for discussion. panic!("Fuzzer-respawner: The fuzzed target crashed with an out of memory error! Fix your harness, or switch to another executor (for example, a forkserver)."); } - // Storing state in the last round did not work panic!("Fuzzer-respawner: Storing state in crashed fuzzer instance did not work, no point to spawn the next client! This can happen if the child calls `exit()`, in that case make sure it uses `abort()`, if it got killed unrecoverable (OOM), or if there is a bug in the fuzzer itself. (Child exited with: {child_status})"); } if staterestorer.wants_to_exit() || Self::is_shutting_down() { + // if ctrl-c is pressed, we end up in this branch + if let Err(err) = mgr.detach_from_broker(self.broker_port) { + log::error!("Failed to detach from broker: {err}"); + } return Err(Error::shutting_down()); } - ctr = ctr.wrapping_add(1); } } else { @@ -1594,8 +1625,9 @@ where converter: Option, converter_back: Option, ) -> Result { + let llmp = LlmpClient::create_attach_to_tcp(shmem_provider, port)?; Ok(Self { - llmp: LlmpClient::create_attach_to_tcp(shmem_provider, port)?, + llmp, #[cfg(feature = "llmp_compression")] compressor: GzipCompressor::new(COMPRESS_THRESHOLD), converter, diff --git a/libafl_bolts/examples/llmp_test/main.rs b/libafl_bolts/examples/llmp_test/main.rs index 5a41088f32..0735dac0bb 100644 --- a/libafl_bolts/examples/llmp_test/main.rs +++ b/libafl_bolts/examples/llmp_test/main.rs @@ -150,13 +150,11 @@ fn main() -> Result<(), Box> { log::set_logger(&LOGGER).unwrap(); log::set_max_level(log::LevelFilter::Trace); - println!("Launching in mode {mode} on port {port}"); match mode.as_str() { "broker" => { - let mut broker = - llmp::LlmpBroker::new(StdShMemProvider::new()?, Duration::from_secs(5))?; + let mut broker = llmp::LlmpBroker::new(StdShMemProvider::new()?)?; broker.launch_tcp_listener_on(port)?; // Exit when we got at least _n_ nodes, and all of them quit. broker.set_exit_cleanly_after(NonZeroUsize::new(1_usize).unwrap()); @@ -167,8 +165,7 @@ fn main() -> Result<(), Box> { ); } "b2b" => { - let mut broker = - llmp::LlmpBroker::new(StdShMemProvider::new()?, Duration::from_secs(5))?; + let mut broker = llmp::LlmpBroker::new(StdShMemProvider::new()?)?; broker.launch_tcp_listener_on(b2b_port)?; // connect back to the main broker. broker.connect_b2b(("127.0.0.1", port))?; @@ -205,6 +202,14 @@ fn main() -> Result<(), Box> { } log::info!("Exiting Client exits"); client.sender_mut().send_exiting()?; + + // there is another way to tell that this client wants to exit. + // one is to call client.sender_mut().send_exiting()?; + // you can disconnet the client in this way as long as this client in an unrecoverable state (like in a crash handler) + // another way to do this is through the detach_from_broker() call + // you can call detach_from_broker(port); to notify the broker that this broker wants to exit + // This one is usually for the event restarter to cut off the connection when the client has crashed. + // In that case we don't have access to the llmp client of the client anymore, but we can use detach_from_broker instead } _ => { println!("No valid mode supplied"); diff --git a/libafl_bolts/src/lib.rs b/libafl_bolts/src/lib.rs index d1d40e536c..26271a8af3 100644 --- a/libafl_bolts/src/lib.rs +++ b/libafl_bolts/src/lib.rs @@ -29,7 +29,8 @@ clippy::missing_docs_in_private_items, clippy::module_name_repetitions, clippy::ptr_cast_constness, - clippy::negative_feature_names + clippy::negative_feature_names, + clippy::too_many_lines )] #![cfg_attr(not(test), warn( missing_debug_implementations, @@ -226,6 +227,9 @@ use { core::str::Utf8Error, }; +/// Localhost addr, this is used, for example, for LLMP Client, which connects to this address +pub const IP_LOCALHOST: &str = "127.0.0.1"; + /// We need fixed names for many parts of this lib. pub trait Named { /// Provide the name of this element. diff --git a/libafl_bolts/src/llmp.rs b/libafl_bolts/src/llmp.rs index 8d0276e30f..96545552e7 100644 --- a/libafl_bolts/src/llmp.rs +++ b/libafl_bolts/src/llmp.rs @@ -91,22 +91,19 @@ use backtrace::Backtrace; use nix::sys::socket::{self, sockopt::ReusePort}; use serde::{Deserialize, Serialize}; -#[cfg(feature = "std")] -use crate::current_time; #[cfg(all(unix, not(miri)))] use crate::os::unix_signals::setup_signal_handler; #[cfg(unix)] use crate::os::unix_signals::{siginfo_t, ucontext_t, Handler, Signal}; #[cfg(all(windows, feature = "std"))] use crate::os::windows_exceptions::{setup_ctrl_handler, CtrlHandler}; +#[cfg(feature = "std")] +use crate::{current_time, IP_LOCALHOST}; use crate::{ shmem::{ShMem, ShMemDescription, ShMemId, ShMemProvider}, ClientId, Error, }; -/// The default timeout in seconds after which a client will be considered stale, and removed. -pub const DEFAULT_CLIENT_TIMEOUT_SECS: Duration = Duration::from_secs(7200); - /// The max number of pages a [`client`] may have mapped that were not yet read by the [`broker`] /// Usually, this value should not exceed `1`, else the broker cannot keep up with the amount of incoming messages. /// Instead of increasing this value, you may consider sending new messages at a lower rate, else your Sender will eventually `OOM`. @@ -130,6 +127,8 @@ const LLMP_TAG_UNINITIALIZED: Tag = Tag(0xA143AF11); const LLMP_TAG_END_OF_PAGE: Tag = Tag(0xAF1E0F1); /// A new client for this broker got added. const LLMP_TAG_NEW_SHM_CLIENT: Tag = Tag(0xC11E471); +/// A client wants to disconnect from this broker +const LLMP_TAG_CLIENT_EXIT: Tag = Tag(0xC11E472); /// The sender on this map is exiting (if broker exits, clients should exit gracefully); const LLMP_TAG_EXITING: Tag = Tag(0x13C5171); /// Client gave up as the receiver/broker was too slow @@ -154,9 +153,6 @@ const _LLMP_BIND_ADDR: &str = "0.0.0.0"; #[cfg(not(feature = "llmp_bind_public"))] const _LLMP_BIND_ADDR: &str = "127.0.0.1"; -/// LLMP Client connects to this address -const _LLMP_CONNECT_ADDR: &str = "127.0.0.1"; - /// An env var of this value indicates that the set value was a NULL PTR const _NULL_ENV_STR: &str = "_NULL"; @@ -264,6 +260,12 @@ pub enum TcpRequest { /// The hostname of our broker, trying to connect. hostname: String, }, + /// Notify the broker the the othe side is dying so remove this client + /// `client_id` is the pid of the very initial client + ClientQuit { + /// Tell the broker that remove the client with this `client_id`. `client_id` is equal to the one of event restarter + client_id: ClientId, + }, } impl TryFrom<&Vec> for TcpRequest { @@ -455,7 +457,7 @@ fn tcp_bind(port: u16) -> Result { /// Send one message as `u32` len and `[u8;len]` bytes #[cfg(feature = "std")] -fn send_tcp_msg(stream: &mut TcpStream, msg: &T) -> Result<(), Error> +pub fn send_tcp_msg(stream: &mut TcpStream, msg: &T) -> Result<(), Error> where T: Serialize, { @@ -482,7 +484,7 @@ where /// Receive one message of `u32` len and `[u8; len]` bytes #[cfg(feature = "std")] -fn recv_tcp_msg(stream: &mut TcpStream) -> Result, Error> { +pub fn recv_tcp_msg(stream: &mut TcpStream) -> Result, Error> { // Always receive one be u32 of size, then the command. #[cfg(feature = "llmp_debug")] @@ -695,22 +697,24 @@ where { #[cfg(feature = "std")] /// Creates either a broker, if the tcp port is not bound, or a client, connected to this port. - pub fn on_port(shmem_provider: SP, port: u16, client_timeout: Duration) -> Result { + /// This will make a new connection to the broker if it ends up a client + /// In that case this function will return its new [`ClientId`], too. + pub fn on_port(shmem_provider: SP, port: u16) -> Result { match tcp_bind(port) { Ok(listener) => { // We got the port. We are the broker! :) log::info!("We're the broker"); - let mut broker = LlmpBroker::new(shmem_provider, client_timeout)?; + let mut broker = LlmpBroker::new(shmem_provider)?; let _listener_thread = broker.launch_listener(Listener::Tcp(listener))?; Ok(LlmpConnection::IsBroker { broker }) } Err(Error::OsError(e, ..)) if e.kind() == ErrorKind::AddrInUse => { // We are the client :) log::info!("We're the client (internal port already bound by broker, {e:#?})"); - Ok(LlmpConnection::IsClient { - client: LlmpClient::create_attach_to_tcp(shmem_provider, port)?, - }) + let client = LlmpClient::create_attach_to_tcp(shmem_provider, port)?; + let conn = LlmpConnection::IsClient { client }; + Ok(conn) } Err(e) => { log::error!("{e:?}"); @@ -721,22 +725,20 @@ where /// Creates a new broker on the given port #[cfg(feature = "std")] - pub fn broker_on_port( - shmem_provider: SP, - port: u16, - client_timeout: Duration, - ) -> Result { + pub fn broker_on_port(shmem_provider: SP, port: u16) -> Result { Ok(LlmpConnection::IsBroker { - broker: LlmpBroker::create_attach_to_tcp(shmem_provider, port, client_timeout)?, + broker: LlmpBroker::create_attach_to_tcp(shmem_provider, port)?, }) } /// Creates a new client on the given port + /// This will make a new connection to the broker if it ends up a client + /// In that case this function will return its new [`ClientId`], too. #[cfg(feature = "std")] pub fn client_on_port(shmem_provider: SP, port: u16) -> Result { - Ok(LlmpConnection::IsClient { - client: LlmpClient::create_attach_to_tcp(shmem_provider, port)?, - }) + let client = LlmpClient::create_attach_to_tcp(shmem_provider, port)?; + let conn = LlmpConnection::IsClient { client }; + Ok(conn) } /// Describe this in a reproducable fashion, if it's a client @@ -834,6 +836,16 @@ struct LlmpPayloadSharedMapInfo { pub shm_str: [u8; 20], } +/// Message payload when a client got removed +/// This is an internal message! +/// [`LLMP_TAG_END_OF_PAGE_V1`] +#[derive(Copy, Clone, Debug)] +#[repr(C, align(8))] +struct LlmpClientExitInfo { + /// The restarter process id of the client + pub client_id: u32, +} + /// Sending end on a (unidirectional) sharedmap channel #[derive(Debug)] pub struct LlmpSender @@ -1966,23 +1978,20 @@ where /// This allows us to intercept messages right in the broker. /// This keeps the out map clean. /// The backing values of `llmp_clients` [`ClientId`]s will always be sorted (but not gapless) - /// Make sure to always increase `num_clients_total` when pushing a new [`LlmpReceiver`] to `llmp_clients`! + /// Make sure to always increase `num_clients_seen` when pushing a new [`LlmpReceiver`] to `llmp_clients`! llmp_clients: Vec>, /// The own listeners we spawned via `launch_listener` or `crate_attach_to_tcp`. /// Listeners will be ignored for `exit_cleanly_after` and they are never considered to have timed out. listeners: Vec, /// The total amount of clients we had, historically, including those that disconnected, and our listeners. - num_clients_total: usize, + num_clients_seen: usize, /// The amount of total clients that should have connected and (and disconnected) /// after which the broker loop should quit gracefully. pub exit_cleanly_after: Option, - /// Clients that should be removed soon, (offset into `llmp_clients`) - clients_to_remove: Vec, + /// Clients that should be removed soon + clients_to_remove: Vec, /// The `ShMemProvider` to use shmem_provider: SP, - #[cfg(feature = "std")] - /// The timeout after which a client will be considered stale, and removed. - client_timeout: Duration, } /// A signal handler for the [`LlmpBroker`]. @@ -2031,15 +2040,12 @@ where SP: ShMemProvider + 'static, { /// Create and initialize a new [`LlmpBroker`] - pub fn new( - shmem_provider: SP, - #[cfg(feature = "std")] client_timeout: Duration, - ) -> Result { + pub fn new(shmem_provider: SP) -> Result { // Broker never cleans up the pages so that new // clients may join at any time #[cfg(feature = "std")] { - Self::with_keep_pages(shmem_provider, true, client_timeout) + Self::with_keep_pages(shmem_provider, true) } #[cfg(not(feature = "std"))] @@ -2052,7 +2058,6 @@ where pub fn with_keep_pages( mut shmem_provider: SP, keep_pages_forever: bool, - #[cfg(feature = "std")] client_timeout: Duration, ) -> Result { Ok(LlmpBroker { llmp_out: LlmpSender { @@ -2068,25 +2073,23 @@ where unused_shmem_cache: vec![], }, llmp_clients: vec![], - clients_to_remove: vec![], + clients_to_remove: Vec::new(), shmem_provider, listeners: vec![], exit_cleanly_after: None, - num_clients_total: 0, - #[cfg(feature = "std")] - client_timeout, + num_clients_seen: 0, }) } /// Gets the [`ClientId`] the next client attaching to this broker will get. /// In its current implememtation, the inner value of the next [`ClientId`] - /// is equal to `self.num_clients_total`. + /// is equal to `self.num_clients_seen`. /// Calling `peek_next_client_id` mutliple times (without adding a client) will yield the same value. #[must_use] #[inline] pub fn peek_next_client_id(&self) -> ClientId { ClientId( - self.num_clients_total + self.num_clients_seen .try_into() .expect("More than u32::MAX clients!"), ) @@ -2094,12 +2097,8 @@ where /// Create a new [`LlmpBroker`] attaching to a TCP port #[cfg(feature = "std")] - pub fn create_attach_to_tcp( - shmem_provider: SP, - port: u16, - client_timeout: Duration, - ) -> Result { - Self::with_keep_pages_attach_to_tcp(shmem_provider, port, true, client_timeout) + pub fn create_attach_to_tcp(shmem_provider: SP, port: u16) -> Result { + Self::with_keep_pages_attach_to_tcp(shmem_provider, port, true) } /// Create a new [`LlmpBroker`] attaching to a TCP port and telling if it has to keep pages forever @@ -2108,15 +2107,10 @@ where shmem_provider: SP, port: u16, keep_pages_forever: bool, - client_timeout: Duration, ) -> Result { match tcp_bind(port) { Ok(listener) => { - let mut broker = LlmpBroker::with_keep_pages( - shmem_provider, - keep_pages_forever, - client_timeout, - )?; + let mut broker = LlmpBroker::with_keep_pages(shmem_provider, keep_pages_forever)?; let _listener_thread = broker.launch_listener(Listener::Tcp(listener))?; Ok(broker) } @@ -2135,14 +2129,14 @@ where /// Add a client to this broker. /// Will set an appropriate [`ClientId`] before pushing the client to the internal vec. - /// Will increase `num_clients_total`. + /// Will increase `num_clients_seen`. /// The backing values of `llmp_clients` [`ClientId`]s will always be sorted (but not gapless) /// returns the [`ClientId`] of the new client. pub fn add_client(&mut self, mut client_receiver: LlmpReceiver) -> ClientId { let id = self.peek_next_client_id(); client_receiver.id = id; self.llmp_clients.push(client_receiver); - self.num_clients_total += 1; + self.num_clients_seen += 1; id } @@ -2266,38 +2260,37 @@ where where F: FnMut(ClientId, Tag, Flags, &[u8]) -> Result, { - #[cfg(feature = "std")] - let current_time = current_time(); let mut new_messages = false; for i in 0..self.llmp_clients.len() { let client_id = self.llmp_clients[i].id; match unsafe { self.handle_new_msgs(client_id, on_new_msg) } { Ok(has_messages) => { - // See if we need to remove this client, in case no new messages got brokered, and it's not a listener - #[cfg(feature = "std")] - if !has_messages && !self.listeners.iter().any(|&x| x == client_id) { - let last_msg_time = self.llmp_clients[i].last_msg_time; - if last_msg_time < current_time - && current_time - last_msg_time > self.client_timeout - { - self.clients_to_remove.push(i); - #[cfg(feature = "llmp_debug")] - log::info!("Client #{:#?} timed out. Removing.", client_id); - } - } new_messages = has_messages; } - Err(Error::ShuttingDown) => self.clients_to_remove.push(i), + Err(Error::ShuttingDown) => { + self.clients_to_remove.push(client_id); + } Err(err) => return Err(err), } } - // After brokering, remove all clients we don't want to keep. - for i in self.clients_to_remove.iter().rev() { - let client_id = self.llmp_clients[*i].id; - log::info!("Client #{:#?} disconnected.", client_id); - self.llmp_clients.remove(*i); + let possible_remove = self.clients_to_remove.len(); + if possible_remove > 0 { + self.clients_to_remove.sort_unstable(); + self.clients_to_remove.dedup(); + log::trace!("Removing {:#?}", self.clients_to_remove); + // rev() to make it works + // commit the change to llmp_clients + for idx in (0..self.llmp_clients.len()).rev() { + let client_id = self.llmp_clients[idx].id; + if self.clients_to_remove.contains(&client_id) { + log::info!("Client {:#?} wants to exit. Removing.", client_id); + self.llmp_clients.remove(idx); + } + } + // log::trace!("{:#?}", self.llmp_clients); } + self.clients_to_remove.clear(); Ok(new_messages) } @@ -2389,12 +2382,12 @@ where // log::trace!( // "Clients connected: {} && > {} - {} >= {}", // self.has_clients(), - // self.num_clients_total, + // self.num_clients_seen, // self.listeners.len(), // exit_after_count // ); if !self.has_clients() - && (self.num_clients_total - self.listeners.len()) >= exit_after_count.into() + && (self.num_clients_seen - self.listeners.len()) >= exit_after_count.into() { // No more clients connected, and the amount of clients we were waiting for was previously connected. // exit cleanly. @@ -2434,7 +2427,7 @@ where if let Some(exit_after_count) = self.exit_cleanly_after { if !self.has_clients() - && (self.num_clients_total - self.listeners.len()) > exit_after_count.into() + && (self.num_clients_seen - self.listeners.len()) > exit_after_count.into() { // No more clients connected, and the amount of clients we were waiting for was previously connected. // exit cleanly. @@ -2498,6 +2491,20 @@ where } } + /// Tell the broker to disconnect this client from it. + fn announce_client_exit(sender: &mut LlmpSender, client_id: u32) -> Result<(), Error> { + unsafe { + let msg = sender + .alloc_next(size_of::()) + .expect("Could not allocate a new message in shared map."); + (*msg).tag = LLMP_TAG_CLIENT_EXIT; + #[allow(clippy::cast_ptr_alignment)] + let exitinfo = (*msg).buf.as_mut_ptr() as *mut LlmpClientExitInfo; + (*exitinfo).client_id = client_id; + sender.send(msg, true) + } + } + /// For broker to broker connections: /// Launches a proxy thread. /// It will read outgoing messages from the given broker map (and handle EOP by mapping a new page). @@ -2659,6 +2666,13 @@ where broker_shmem_description: &ShMemDescription, ) { match request { + TcpRequest::ClientQuit { client_id } => { + // todo search the ancestor_id and remove it. + match Self::announce_client_exit(sender, client_id.0) { + Ok(()) => (), + Err(e) => log::info!("Error announcing client exit: {e:?}"), + } + } TcpRequest::LocalClientHello { shmem_description } => { match Self::announce_new_client(sender, shmem_description) { Ok(()) => (), @@ -2779,6 +2793,8 @@ where continue; } }; + + // log::info!("{:#?}", buf); let req = match buf.try_into() { Ok(req) => req, Err(e) => { @@ -2823,6 +2839,7 @@ where // TODO: We could memcpy a range of pending messages, instead of one by one. loop { + // log::trace!("{:#?}", self.llmp_clients); let msg = { let pos = if (client_id.0 as usize) < self.llmp_clients.len() && self.llmp_clients[client_id.0 as usize].id == client_id @@ -2857,6 +2874,25 @@ where LLMP_SLOW_RECEIVER_PANIC => { return Err(Error::unknown(format!("The broker was too slow to handle messages of client {client_id:?} in time, so it quit. Either the client sent messages too fast, or we (the broker) got stuck!"))); } + LLMP_TAG_CLIENT_EXIT => { + let msg_buf_len_padded = (*msg).buf_len_padded; + if (*msg).buf_len < size_of::() as u64 { + log::info!("Ignoring broken CLIENT_EXIT msg due to incorrect size. Expected {} but got {}", + msg_buf_len_padded, + size_of::() + ); + #[cfg(not(feature = "std"))] + return Err(Error::unknown(format!("Broken CLIENT_EXIT msg with incorrect size received. Expected {} but got {}", + msg_buf_len_padded, + size_of::() + ))); + } + let exitinfo = (*msg).buf.as_mut_ptr() as *mut LlmpClientExitInfo; + let client_id = ClientId((*exitinfo).client_id); + log::info!("Client exit message received!, we are removing clients whose client_group_id is {:#?}", client_id); + + self.clients_to_remove.push(client_id); + } LLMP_TAG_NEW_SHM_CLIENT => { /* This client informs us about yet another new client add it to the list! Also, no need to forward this msg. */ @@ -2873,7 +2909,6 @@ where ))); } let pageinfo = (*msg).buf.as_mut_ptr() as *mut LlmpPayloadSharedMapInfo; - match self.shmem_provider.shmem_from_id_and_size( ShMemId::from_array(&(*pageinfo).shm_str), (*pageinfo).map_size, @@ -2882,7 +2917,7 @@ where let mut new_page = LlmpSharedMap::existing(new_shmem); new_page.mark_safe_to_unmap(); - self.add_client(LlmpReceiver { + let _new_client = self.add_client(LlmpReceiver { id: ClientId(0), // will be auto-filled current_recv_shmem: new_page, last_msg_recvd: ptr::null_mut(), @@ -3140,26 +3175,6 @@ where self.sender.send_buf_with_flags(tag, flags, buf) } - /// Informs the broker about a new client in town, with the given map id - pub fn send_client_added_msg( - &mut self, - shm_str: &[u8; 20], - shm_id: usize, - ) -> Result<(), Error> { - // We write this by hand to get around checks in send_buf - unsafe { - let msg = self - .alloc_next(size_of::()) - .expect("Could not allocate a new message in shared map."); - (*msg).tag = LLMP_TAG_NEW_SHM_CLIENT; - #[allow(clippy::cast_ptr_alignment)] - let pageinfo = (*msg).buf.as_mut_ptr() as *mut LlmpPayloadSharedMapInfo; - (*pageinfo).shm_str = *shm_str; - (*pageinfo).map_size = shm_id; - self.send(msg) - } - } - /// A client receives a broadcast message. /// Returns null if no message is availiable /// # Safety @@ -3215,16 +3230,17 @@ where } #[cfg(feature = "std")] - /// Create a [`LlmpClient`], getting the ID from a given port + /// Create a [`LlmpClient`], getting the ID from a given port, then also tell the restarter's ID so we ask to be removed later + /// This is called when, for the first time, the restarter attaches to this process. pub fn create_attach_to_tcp(mut shmem_provider: SP, port: u16) -> Result { - let mut stream = match TcpStream::connect((_LLMP_CONNECT_ADDR, port)) { + let mut stream = match TcpStream::connect((IP_LOCALHOST, port)) { Ok(stream) => stream, Err(e) => { match e.kind() { ErrorKind::ConnectionRefused => { //connection refused. loop till the broker is up loop { - match TcpStream::connect((_LLMP_CONNECT_ADDR, port)) { + match TcpStream::connect((IP_LOCALHOST, port)) { Ok(stream) => break stream, Err(_) => { log::info!("Connection Refused.. Retrying"); @@ -3293,7 +3309,7 @@ mod tests { LlmpClient, LlmpConnection::{self, IsBroker, IsClient}, LlmpMsgHookResult::ForwardToClients, - Tag, DEFAULT_CLIENT_TIMEOUT_SECS, + Tag, }; use crate::shmem::{ShMemProvider, StdShMemProvider}; @@ -3303,25 +3319,13 @@ mod tests { pub fn test_llmp_connection() { #[allow(unused_variables)] let shmem_provider = StdShMemProvider::new().unwrap(); - let mut broker = match LlmpConnection::on_port( - shmem_provider.clone(), - 1337, - DEFAULT_CLIENT_TIMEOUT_SECS, - ) - .unwrap() - { + let mut broker = match LlmpConnection::on_port(shmem_provider.clone(), 1337).unwrap() { IsClient { client: _ } => panic!("Could not bind to port as broker"), IsBroker { broker } => broker, }; // Add the first client (2nd, actually, because of the tcp listener client) - let mut client = match LlmpConnection::on_port( - shmem_provider.clone(), - 1337, - DEFAULT_CLIENT_TIMEOUT_SECS, - ) - .unwrap() - { + let mut client = match LlmpConnection::on_port(shmem_provider.clone(), 1337).unwrap() { IsBroker { broker: _ } => panic!("Second connect should be a client!"), IsClient { client } => client, };