Replace LLMP client timeout with client deregistration (#1982)

* wip

* done

* fix

* fix

* some fixes

* stuff

* fmt

* stuff

* use

* not 1337

* comment

* move functions around

* fix

* fix

* doc

* mistake

* aa

* fixer

* wipe out restarter id

* denig

* fix

* fix

* include

* fix

* fix

* fix

* clp

* fix

* fix

* error log
This commit is contained in:
Dongjia "toka" Zhang 2024-04-04 21:01:11 +02:00 committed by GitHub
parent e64233e203
commit 98d3dfe821
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 205 additions and 174 deletions

View File

@ -6,7 +6,7 @@ static GLOBAL: MiMalloc = MiMalloc;
use std::ptr::write_volatile; use std::ptr::write_volatile;
use std::{env, net::SocketAddr, path::PathBuf, time::Duration}; use std::{env, net::SocketAddr, path::PathBuf, time::Duration};
use clap::{self, Parser}; use clap::Parser;
use libafl::{ use libafl::{
corpus::{InMemoryCorpus, OnDiskCorpus}, corpus::{InMemoryCorpus, OnDiskCorpus},
events::{launcher::Launcher, llmp::LlmpEventConverter, EventConfig}, events::{launcher::Launcher, llmp::LlmpEventConverter, EventConfig},
@ -130,6 +130,9 @@ pub extern "C" fn libafl_main() {
.unwrap() .unwrap()
}); });
// to disconnect the event coverter from the broker later
// call detach_from_broker( port)
let mut run_client = |state: Option<_>, mut mgr, _core_id| { let mut run_client = |state: Option<_>, mut mgr, _core_id| {
let mut bytes = vec![]; let mut bytes = vec![];

View File

@ -93,15 +93,10 @@ where
/// ///
/// The port must not be bound yet to have a broker. /// The port must not be bound yet to have a broker.
#[cfg(feature = "std")] #[cfg(feature = "std")]
pub fn on_port(shmem_provider: SP, port: u16, client_timeout: Duration) -> Result<Self, Error> { pub fn on_port(shmem_provider: SP, port: u16) -> Result<Self, Error> {
Ok(Self { Ok(Self {
// TODO switch to false after solving the bug // TODO switch to false after solving the bug
llmp: LlmpBroker::with_keep_pages_attach_to_tcp( llmp: LlmpBroker::with_keep_pages_attach_to_tcp(shmem_provider, port, true)?,
shmem_provider,
port,
true,
client_timeout,
)?,
#[cfg(feature = "llmp_compression")] #[cfg(feature = "llmp_compression")]
compressor: GzipCompressor::new(COMPRESS_THRESHOLD), compressor: GzipCompressor::new(COMPRESS_THRESHOLD),
phantom: PhantomData, phantom: PhantomData,
@ -509,9 +504,10 @@ where
/// will act as a client. /// will act as a client.
#[cfg(feature = "std")] #[cfg(feature = "std")]
pub fn on_port(inner: EM, shmem_provider: SP, port: u16, is_main: bool) -> Result<Self, Error> { pub fn on_port(inner: EM, shmem_provider: SP, port: u16, is_main: bool) -> Result<Self, Error> {
let client = LlmpClient::create_attach_to_tcp(shmem_provider, port)?;
Ok(Self { Ok(Self {
inner, inner,
client: LlmpClient::create_attach_to_tcp(shmem_provider, port)?, client,
#[cfg(feature = "llmp_compression")] #[cfg(feature = "llmp_compression")]
compressor: GzipCompressor::new(COMPRESS_THRESHOLD), compressor: GzipCompressor::new(COMPRESS_THRESHOLD),
is_main, is_main,

View File

@ -15,10 +15,11 @@
use alloc::string::ToString; use alloc::string::ToString;
#[cfg(feature = "std")] #[cfg(feature = "std")]
use core::marker::PhantomData; use core::marker::PhantomData;
#[cfg(all(unix, feature = "std", feature = "fork"))]
use core::time::Duration;
use core::{ use core::{
fmt::{self, Debug, Formatter}, fmt::{self, Debug, Formatter},
num::NonZeroUsize, num::NonZeroUsize,
time::Duration,
}; };
#[cfg(feature = "std")] #[cfg(feature = "std")]
use std::net::SocketAddr; use std::net::SocketAddr;
@ -36,7 +37,6 @@ use libafl_bolts::{
}; };
use libafl_bolts::{ use libafl_bolts::{
core_affinity::{CoreId, Cores}, core_affinity::{CoreId, Cores},
llmp::DEFAULT_CLIENT_TIMEOUT_SECS,
shmem::ShMemProvider, shmem::ShMemProvider,
tuples::tuple_list, tuples::tuple_list,
}; };
@ -122,9 +122,6 @@ where
/// Then, clients launched by this [`Launcher`] can connect to the original `broker`. /// Then, clients launched by this [`Launcher`] can connect to the original `broker`.
#[builder(default = true)] #[builder(default = true)]
spawn_broker: bool, spawn_broker: bool,
/// The timeout duration used for llmp client timeout
#[builder(default = DEFAULT_CLIENT_TIMEOUT_SECS)]
client_timeout: Duration,
/// Tell the manager to serialize or not the state on restart /// Tell the manager to serialize or not the state on restart
#[builder(default = LlmpShouldSaveState::OnRestart)] #[builder(default = LlmpShouldSaveState::OnRestart)]
serialize_state: LlmpShouldSaveState, serialize_state: LlmpShouldSaveState,
@ -261,7 +258,6 @@ where
}) })
.configuration(self.configuration) .configuration(self.configuration)
.serialize_state(self.serialize_state) .serialize_state(self.serialize_state)
.client_timeout(self.client_timeout)
.hooks(hooks) .hooks(hooks)
.build() .build()
.launch()?; .launch()?;
@ -286,7 +282,6 @@ where
.exit_cleanly_after(Some(NonZeroUsize::try_from(self.cores.ids.len()).unwrap())) .exit_cleanly_after(Some(NonZeroUsize::try_from(self.cores.ids.len()).unwrap()))
.configuration(self.configuration) .configuration(self.configuration)
.serialize_state(self.serialize_state) .serialize_state(self.serialize_state)
.client_timeout(self.client_timeout)
.hooks(hooks) .hooks(hooks)
.build() .build()
.launch()?; .launch()?;
@ -339,7 +334,6 @@ where
}) })
.configuration(self.configuration) .configuration(self.configuration)
.serialize_state(self.serialize_state) .serialize_state(self.serialize_state)
.client_timeout(self.client_timeout)
.hooks(hooks) .hooks(hooks)
.build() .build()
.launch()?; .launch()?;
@ -410,7 +404,6 @@ where
.exit_cleanly_after(Some(NonZeroUsize::try_from(self.cores.ids.len()).unwrap())) .exit_cleanly_after(Some(NonZeroUsize::try_from(self.cores.ids.len()).unwrap()))
.configuration(self.configuration) .configuration(self.configuration)
.serialize_state(self.serialize_state) .serialize_state(self.serialize_state)
.client_timeout(self.client_timeout)
.hooks(hooks) .hooks(hooks)
.build() .build()
.launch()?; .launch()?;
@ -495,9 +488,6 @@ where
/// Tell the manager to serialize or not the state on restart /// Tell the manager to serialize or not the state on restart
#[builder(default = LlmpShouldSaveState::OnRestart)] #[builder(default = LlmpShouldSaveState::OnRestart)]
serialize_state: LlmpShouldSaveState, serialize_state: LlmpShouldSaveState,
/// The duration for the llmp client timeout
#[builder(default = DEFAULT_CLIENT_TIMEOUT_SECS)]
client_timeout: Duration,
#[builder(setter(skip), default = PhantomData)] #[builder(setter(skip), default = PhantomData)]
phantom_data: PhantomData<(&'a S, &'a SP)>, phantom_data: PhantomData<(&'a S, &'a SP)>,
} }
@ -589,7 +579,6 @@ where
CentralizedLlmpEventBroker::on_port( CentralizedLlmpEventBroker::on_port(
self.shmem_provider.clone(), self.shmem_provider.clone(),
self.centralized_broker_port, self.centralized_broker_port,
self.client_timeout,
)?; )?;
broker.broker_loop()?; broker.broker_loop()?;
} }
@ -636,7 +625,6 @@ where
}) })
.configuration(self.configuration) .configuration(self.configuration)
.serialize_state(self.serialize_state) .serialize_state(self.serialize_state)
.client_timeout(self.client_timeout)
.hooks(tuple_list!()) .hooks(tuple_list!())
.build() .build()
.launch()?; .launch()?;
@ -667,7 +655,6 @@ where
.exit_cleanly_after(Some(NonZeroUsize::try_from(self.cores.ids.len()).unwrap())) .exit_cleanly_after(Some(NonZeroUsize::try_from(self.cores.ids.len()).unwrap()))
.configuration(self.configuration) .configuration(self.configuration)
.serialize_state(self.serialize_state) .serialize_state(self.serialize_state)
.client_timeout(self.client_timeout)
.hooks(tuple_list!()) .hooks(tuple_list!())
.build() .build()
.launch()?; .launch()?;

View File

@ -1,5 +1,7 @@
//! LLMP-backed event manager for scalable multi-processed fuzzing //! LLMP-backed event manager for scalable multi-processed fuzzing
#[cfg(feature = "std")]
use alloc::string::ToString;
use alloc::{boxed::Box, vec::Vec}; use alloc::{boxed::Box, vec::Vec};
#[cfg(all(unix, not(miri), feature = "std"))] #[cfg(all(unix, not(miri), feature = "std"))]
use core::ptr::addr_of_mut; use core::ptr::addr_of_mut;
@ -7,14 +9,12 @@ use core::ptr::addr_of_mut;
use core::sync::atomic::{compiler_fence, Ordering}; use core::sync::atomic::{compiler_fence, Ordering};
use core::{marker::PhantomData, num::NonZeroUsize, time::Duration}; use core::{marker::PhantomData, num::NonZeroUsize, time::Duration};
#[cfg(feature = "std")] #[cfg(feature = "std")]
use std::net::TcpStream;
#[cfg(feature = "std")]
use std::net::{SocketAddr, ToSocketAddrs}; use std::net::{SocketAddr, ToSocketAddrs};
#[cfg(feature = "std")]
use libafl_bolts::core_affinity::CoreId;
#[cfg(feature = "adaptive_serialization")] #[cfg(feature = "adaptive_serialization")]
use libafl_bolts::current_time; use libafl_bolts::current_time;
#[cfg(feature = "std")]
use libafl_bolts::llmp::DEFAULT_CLIENT_TIMEOUT_SECS;
#[cfg(all(feature = "std", any(windows, not(feature = "fork"))))] #[cfg(all(feature = "std", any(windows, not(feature = "fork"))))]
use libafl_bolts::os::startable_self; use libafl_bolts::os::startable_self;
#[cfg(all(unix, feature = "std", not(miri)))] #[cfg(all(unix, feature = "std", not(miri)))]
@ -27,6 +27,12 @@ use libafl_bolts::{
llmp::{LLMP_FLAG_COMPRESSED, LLMP_FLAG_INITIALIZED}, llmp::{LLMP_FLAG_COMPRESSED, LLMP_FLAG_INITIALIZED},
}; };
#[cfg(feature = "std")] #[cfg(feature = "std")]
use libafl_bolts::{
core_affinity::CoreId,
llmp::{recv_tcp_msg, send_tcp_msg, TcpRequest, TcpResponse},
IP_LOCALHOST,
};
#[cfg(feature = "std")]
use libafl_bolts::{llmp::LlmpConnection, shmem::StdShMemProvider, staterestore::StateRestorer}; use libafl_bolts::{llmp::LlmpConnection, shmem::StdShMemProvider, staterestore::StateRestorer};
use libafl_bolts::{ use libafl_bolts::{
llmp::{self, LlmpClient, LlmpClientDescription, Tag}, llmp::{self, LlmpClient, LlmpClientDescription, Tag},
@ -106,15 +112,10 @@ where
/// ///
/// The port must not be bound yet to have a broker. /// The port must not be bound yet to have a broker.
#[cfg(feature = "std")] #[cfg(feature = "std")]
pub fn on_port( pub fn on_port(shmem_provider: SP, monitor: MT, port: u16) -> Result<Self, Error> {
shmem_provider: SP,
monitor: MT,
port: u16,
client_timeout: Duration,
) -> Result<Self, Error> {
Ok(Self { Ok(Self {
monitor, monitor,
llmp: llmp::LlmpBroker::create_attach_to_tcp(shmem_provider, port, client_timeout)?, llmp: llmp::LlmpBroker::create_attach_to_tcp(shmem_provider, port)?,
#[cfg(feature = "llmp_compression")] #[cfg(feature = "llmp_compression")]
compressor: GzipCompressor::new(COMPRESS_THRESHOLD), compressor: GzipCompressor::new(COMPRESS_THRESHOLD),
phantom: PhantomData, phantom: PhantomData,
@ -517,6 +518,38 @@ where
S: State, S: State,
SP: ShMemProvider + 'static, SP: ShMemProvider + 'static,
{ {
/// Calling this function will tell the llmp broker that this client is exiting
/// This should be called from the restarter not from the actual fuzzer client
/// This function serves the same roll as the `LlmpClient.send_exiting()`
/// However, from the the event restarter process it is forbidden to call `send_exiting()`
/// (You can call it and it compiles but you should never do so)
/// `send_exiting()` is exclusive to the fuzzer client.
#[cfg(feature = "std")]
pub fn detach_from_broker(&self, broker_port: u16) -> Result<(), Error> {
let client_id = self.llmp.sender().id();
let Ok(mut stream) = TcpStream::connect((IP_LOCALHOST, broker_port)) else {
log::error!("Connection refused.");
return Ok(());
};
// The broker tells us hello we don't care we just tell it our client died
let TcpResponse::BrokerConnectHello {
broker_shmem_description: _,
hostname: _,
} = recv_tcp_msg(&mut stream)?.try_into()?
else {
return Err(Error::illegal_state(
"Received unexpected Broker Hello".to_string(),
));
};
let msg = TcpRequest::ClientQuit { client_id };
// Send this mesasge off and we are leaving.
match send_tcp_msg(&mut stream, &msg) {
Ok(_) => (),
Err(e) => log::error!("Failed to send tcp message {:#?}", e),
}
log::info!("Asking he broker to be disconnected");
Ok(())
}
/// Create a manager from a raw LLMP client with hooks /// Create a manager from a raw LLMP client with hooks
pub fn with_hooks( pub fn with_hooks(
llmp: LlmpClient<SP>, llmp: LlmpClient<SP>,
@ -546,6 +579,7 @@ where
/// ///
/// If the port is not yet bound, it will act as a broker; otherwise, it /// If the port is not yet bound, it will act as a broker; otherwise, it
/// will act as a client. /// will act as a client.
/// This will make a new connection to the broker so will return its new [`ClientId`], too
#[cfg(feature = "std")] #[cfg(feature = "std")]
pub fn on_port_with_hooks( pub fn on_port_with_hooks(
shmem_provider: SP, shmem_provider: SP,
@ -1258,9 +1292,6 @@ where
/// Tell the manager to serialize or not the state on restart /// Tell the manager to serialize or not the state on restart
#[builder(default = LlmpShouldSaveState::OnRestart)] #[builder(default = LlmpShouldSaveState::OnRestart)]
serialize_state: LlmpShouldSaveState, serialize_state: LlmpShouldSaveState,
/// The timeout duration used for llmp client timeout
#[builder(default = DEFAULT_CLIENT_TIMEOUT_SECS)]
client_timeout: Duration,
/// The hooks passed to event manager: /// The hooks passed to event manager:
hooks: EMH, hooks: EMH,
#[builder(setter(skip), default = PhantomData)] #[builder(setter(skip), default = PhantomData)]
@ -1308,15 +1339,11 @@ where
broker.broker_loop() broker.broker_loop()
}; };
// We get here if we are on Unix, or we are a broker on Windows (or without forks). // We get here if we are on Unix, or we are a broker on Windows (or without forks).
let (mgr, core_id) = match self.kind { let (mgr, core_id) = match self.kind {
ManagerKind::Any => { ManagerKind::Any => {
let connection = LlmpConnection::on_port( let connection =
self.shmem_provider.clone(), LlmpConnection::on_port(self.shmem_provider.clone(), self.broker_port)?;
self.broker_port,
self.client_timeout,
)?;
match connection { match connection {
LlmpConnection::IsBroker { broker } => { LlmpConnection::IsBroker { broker } => {
let event_broker = LlmpEventBroker::<S::Input, MT, SP>::new( let event_broker = LlmpEventBroker::<S::Input, MT, SP>::new(
@ -1348,7 +1375,6 @@ where
self.shmem_provider.clone(), self.shmem_provider.clone(),
self.monitor.take().unwrap(), self.monitor.take().unwrap(),
self.broker_port, self.broker_port,
self.client_timeout,
)?; )?;
broker_things(event_broker, self.remote_broker_addr)?; broker_things(event_broker, self.remote_broker_addr)?;
@ -1435,21 +1461,26 @@ where
#[allow(clippy::manual_assert)] #[allow(clippy::manual_assert)]
if !staterestorer.has_content() && !self.serialize_state.oom_safe() { if !staterestorer.has_content() && !self.serialize_state.oom_safe() {
if let Err(err) = mgr.detach_from_broker(self.broker_port) {
log::error!("Failed to detach from broker: {err}");
}
#[cfg(unix)] #[cfg(unix)]
if child_status == 137 { if child_status == 137 {
// Out of Memory, see https://tldp.org/LDP/abs/html/exitcodes.html // Out of Memory, see https://tldp.org/LDP/abs/html/exitcodes.html
// and https://github.com/AFLplusplus/LibAFL/issues/32 for discussion. // and https://github.com/AFLplusplus/LibAFL/issues/32 for discussion.
panic!("Fuzzer-respawner: The fuzzed target crashed with an out of memory error! Fix your harness, or switch to another executor (for example, a forkserver)."); panic!("Fuzzer-respawner: The fuzzed target crashed with an out of memory error! Fix your harness, or switch to another executor (for example, a forkserver).");
} }
// Storing state in the last round did not work // Storing state in the last round did not work
panic!("Fuzzer-respawner: Storing state in crashed fuzzer instance did not work, no point to spawn the next client! This can happen if the child calls `exit()`, in that case make sure it uses `abort()`, if it got killed unrecoverable (OOM), or if there is a bug in the fuzzer itself. (Child exited with: {child_status})"); panic!("Fuzzer-respawner: Storing state in crashed fuzzer instance did not work, no point to spawn the next client! This can happen if the child calls `exit()`, in that case make sure it uses `abort()`, if it got killed unrecoverable (OOM), or if there is a bug in the fuzzer itself. (Child exited with: {child_status})");
} }
if staterestorer.wants_to_exit() || Self::is_shutting_down() { if staterestorer.wants_to_exit() || Self::is_shutting_down() {
// if ctrl-c is pressed, we end up in this branch
if let Err(err) = mgr.detach_from_broker(self.broker_port) {
log::error!("Failed to detach from broker: {err}");
}
return Err(Error::shutting_down()); return Err(Error::shutting_down());
} }
ctr = ctr.wrapping_add(1); ctr = ctr.wrapping_add(1);
} }
} else { } else {
@ -1594,8 +1625,9 @@ where
converter: Option<IC>, converter: Option<IC>,
converter_back: Option<ICB>, converter_back: Option<ICB>,
) -> Result<Self, Error> { ) -> Result<Self, Error> {
let llmp = LlmpClient::create_attach_to_tcp(shmem_provider, port)?;
Ok(Self { Ok(Self {
llmp: LlmpClient::create_attach_to_tcp(shmem_provider, port)?, llmp,
#[cfg(feature = "llmp_compression")] #[cfg(feature = "llmp_compression")]
compressor: GzipCompressor::new(COMPRESS_THRESHOLD), compressor: GzipCompressor::new(COMPRESS_THRESHOLD),
converter, converter,

View File

@ -150,13 +150,11 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
log::set_logger(&LOGGER).unwrap(); log::set_logger(&LOGGER).unwrap();
log::set_max_level(log::LevelFilter::Trace); log::set_max_level(log::LevelFilter::Trace);
println!("Launching in mode {mode} on port {port}"); println!("Launching in mode {mode} on port {port}");
match mode.as_str() { match mode.as_str() {
"broker" => { "broker" => {
let mut broker = let mut broker = llmp::LlmpBroker::new(StdShMemProvider::new()?)?;
llmp::LlmpBroker::new(StdShMemProvider::new()?, Duration::from_secs(5))?;
broker.launch_tcp_listener_on(port)?; broker.launch_tcp_listener_on(port)?;
// Exit when we got at least _n_ nodes, and all of them quit. // Exit when we got at least _n_ nodes, and all of them quit.
broker.set_exit_cleanly_after(NonZeroUsize::new(1_usize).unwrap()); broker.set_exit_cleanly_after(NonZeroUsize::new(1_usize).unwrap());
@ -167,8 +165,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
); );
} }
"b2b" => { "b2b" => {
let mut broker = let mut broker = llmp::LlmpBroker::new(StdShMemProvider::new()?)?;
llmp::LlmpBroker::new(StdShMemProvider::new()?, Duration::from_secs(5))?;
broker.launch_tcp_listener_on(b2b_port)?; broker.launch_tcp_listener_on(b2b_port)?;
// connect back to the main broker. // connect back to the main broker.
broker.connect_b2b(("127.0.0.1", port))?; broker.connect_b2b(("127.0.0.1", port))?;
@ -205,6 +202,14 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
} }
log::info!("Exiting Client exits"); log::info!("Exiting Client exits");
client.sender_mut().send_exiting()?; client.sender_mut().send_exiting()?;
// there is another way to tell that this client wants to exit.
// one is to call client.sender_mut().send_exiting()?;
// you can disconnet the client in this way as long as this client in an unrecoverable state (like in a crash handler)
// another way to do this is through the detach_from_broker() call
// you can call detach_from_broker(port); to notify the broker that this broker wants to exit
// This one is usually for the event restarter to cut off the connection when the client has crashed.
// In that case we don't have access to the llmp client of the client anymore, but we can use detach_from_broker instead
} }
_ => { _ => {
println!("No valid mode supplied"); println!("No valid mode supplied");

View File

@ -29,7 +29,8 @@
clippy::missing_docs_in_private_items, clippy::missing_docs_in_private_items,
clippy::module_name_repetitions, clippy::module_name_repetitions,
clippy::ptr_cast_constness, clippy::ptr_cast_constness,
clippy::negative_feature_names clippy::negative_feature_names,
clippy::too_many_lines
)] )]
#![cfg_attr(not(test), warn( #![cfg_attr(not(test), warn(
missing_debug_implementations, missing_debug_implementations,
@ -226,6 +227,9 @@ use {
core::str::Utf8Error, core::str::Utf8Error,
}; };
/// Localhost addr, this is used, for example, for LLMP Client, which connects to this address
pub const IP_LOCALHOST: &str = "127.0.0.1";
/// We need fixed names for many parts of this lib. /// We need fixed names for many parts of this lib.
pub trait Named { pub trait Named {
/// Provide the name of this element. /// Provide the name of this element.

View File

@ -91,22 +91,19 @@ use backtrace::Backtrace;
use nix::sys::socket::{self, sockopt::ReusePort}; use nix::sys::socket::{self, sockopt::ReusePort};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
#[cfg(feature = "std")]
use crate::current_time;
#[cfg(all(unix, not(miri)))] #[cfg(all(unix, not(miri)))]
use crate::os::unix_signals::setup_signal_handler; use crate::os::unix_signals::setup_signal_handler;
#[cfg(unix)] #[cfg(unix)]
use crate::os::unix_signals::{siginfo_t, ucontext_t, Handler, Signal}; use crate::os::unix_signals::{siginfo_t, ucontext_t, Handler, Signal};
#[cfg(all(windows, feature = "std"))] #[cfg(all(windows, feature = "std"))]
use crate::os::windows_exceptions::{setup_ctrl_handler, CtrlHandler}; use crate::os::windows_exceptions::{setup_ctrl_handler, CtrlHandler};
#[cfg(feature = "std")]
use crate::{current_time, IP_LOCALHOST};
use crate::{ use crate::{
shmem::{ShMem, ShMemDescription, ShMemId, ShMemProvider}, shmem::{ShMem, ShMemDescription, ShMemId, ShMemProvider},
ClientId, Error, ClientId, Error,
}; };
/// The default timeout in seconds after which a client will be considered stale, and removed.
pub const DEFAULT_CLIENT_TIMEOUT_SECS: Duration = Duration::from_secs(7200);
/// The max number of pages a [`client`] may have mapped that were not yet read by the [`broker`] /// The max number of pages a [`client`] may have mapped that were not yet read by the [`broker`]
/// Usually, this value should not exceed `1`, else the broker cannot keep up with the amount of incoming messages. /// Usually, this value should not exceed `1`, else the broker cannot keep up with the amount of incoming messages.
/// Instead of increasing this value, you may consider sending new messages at a lower rate, else your Sender will eventually `OOM`. /// Instead of increasing this value, you may consider sending new messages at a lower rate, else your Sender will eventually `OOM`.
@ -130,6 +127,8 @@ const LLMP_TAG_UNINITIALIZED: Tag = Tag(0xA143AF11);
const LLMP_TAG_END_OF_PAGE: Tag = Tag(0xAF1E0F1); const LLMP_TAG_END_OF_PAGE: Tag = Tag(0xAF1E0F1);
/// A new client for this broker got added. /// A new client for this broker got added.
const LLMP_TAG_NEW_SHM_CLIENT: Tag = Tag(0xC11E471); const LLMP_TAG_NEW_SHM_CLIENT: Tag = Tag(0xC11E471);
/// A client wants to disconnect from this broker
const LLMP_TAG_CLIENT_EXIT: Tag = Tag(0xC11E472);
/// The sender on this map is exiting (if broker exits, clients should exit gracefully); /// The sender on this map is exiting (if broker exits, clients should exit gracefully);
const LLMP_TAG_EXITING: Tag = Tag(0x13C5171); const LLMP_TAG_EXITING: Tag = Tag(0x13C5171);
/// Client gave up as the receiver/broker was too slow /// Client gave up as the receiver/broker was too slow
@ -154,9 +153,6 @@ const _LLMP_BIND_ADDR: &str = "0.0.0.0";
#[cfg(not(feature = "llmp_bind_public"))] #[cfg(not(feature = "llmp_bind_public"))]
const _LLMP_BIND_ADDR: &str = "127.0.0.1"; const _LLMP_BIND_ADDR: &str = "127.0.0.1";
/// LLMP Client connects to this address
const _LLMP_CONNECT_ADDR: &str = "127.0.0.1";
/// An env var of this value indicates that the set value was a NULL PTR /// An env var of this value indicates that the set value was a NULL PTR
const _NULL_ENV_STR: &str = "_NULL"; const _NULL_ENV_STR: &str = "_NULL";
@ -264,6 +260,12 @@ pub enum TcpRequest {
/// The hostname of our broker, trying to connect. /// The hostname of our broker, trying to connect.
hostname: String, hostname: String,
}, },
/// Notify the broker the the othe side is dying so remove this client
/// `client_id` is the pid of the very initial client
ClientQuit {
/// Tell the broker that remove the client with this `client_id`. `client_id` is equal to the one of event restarter
client_id: ClientId,
},
} }
impl TryFrom<&Vec<u8>> for TcpRequest { impl TryFrom<&Vec<u8>> for TcpRequest {
@ -455,7 +457,7 @@ fn tcp_bind(port: u16) -> Result<TcpListener, Error> {
/// Send one message as `u32` len and `[u8;len]` bytes /// Send one message as `u32` len and `[u8;len]` bytes
#[cfg(feature = "std")] #[cfg(feature = "std")]
fn send_tcp_msg<T>(stream: &mut TcpStream, msg: &T) -> Result<(), Error> pub fn send_tcp_msg<T>(stream: &mut TcpStream, msg: &T) -> Result<(), Error>
where where
T: Serialize, T: Serialize,
{ {
@ -482,7 +484,7 @@ where
/// Receive one message of `u32` len and `[u8; len]` bytes /// Receive one message of `u32` len and `[u8; len]` bytes
#[cfg(feature = "std")] #[cfg(feature = "std")]
fn recv_tcp_msg(stream: &mut TcpStream) -> Result<Vec<u8>, Error> { pub fn recv_tcp_msg(stream: &mut TcpStream) -> Result<Vec<u8>, Error> {
// Always receive one be u32 of size, then the command. // Always receive one be u32 of size, then the command.
#[cfg(feature = "llmp_debug")] #[cfg(feature = "llmp_debug")]
@ -695,22 +697,24 @@ where
{ {
#[cfg(feature = "std")] #[cfg(feature = "std")]
/// Creates either a broker, if the tcp port is not bound, or a client, connected to this port. /// Creates either a broker, if the tcp port is not bound, or a client, connected to this port.
pub fn on_port(shmem_provider: SP, port: u16, client_timeout: Duration) -> Result<Self, Error> { /// This will make a new connection to the broker if it ends up a client
/// In that case this function will return its new [`ClientId`], too.
pub fn on_port(shmem_provider: SP, port: u16) -> Result<Self, Error> {
match tcp_bind(port) { match tcp_bind(port) {
Ok(listener) => { Ok(listener) => {
// We got the port. We are the broker! :) // We got the port. We are the broker! :)
log::info!("We're the broker"); log::info!("We're the broker");
let mut broker = LlmpBroker::new(shmem_provider, client_timeout)?; let mut broker = LlmpBroker::new(shmem_provider)?;
let _listener_thread = broker.launch_listener(Listener::Tcp(listener))?; let _listener_thread = broker.launch_listener(Listener::Tcp(listener))?;
Ok(LlmpConnection::IsBroker { broker }) Ok(LlmpConnection::IsBroker { broker })
} }
Err(Error::OsError(e, ..)) if e.kind() == ErrorKind::AddrInUse => { Err(Error::OsError(e, ..)) if e.kind() == ErrorKind::AddrInUse => {
// We are the client :) // We are the client :)
log::info!("We're the client (internal port already bound by broker, {e:#?})"); log::info!("We're the client (internal port already bound by broker, {e:#?})");
Ok(LlmpConnection::IsClient { let client = LlmpClient::create_attach_to_tcp(shmem_provider, port)?;
client: LlmpClient::create_attach_to_tcp(shmem_provider, port)?, let conn = LlmpConnection::IsClient { client };
}) Ok(conn)
} }
Err(e) => { Err(e) => {
log::error!("{e:?}"); log::error!("{e:?}");
@ -721,22 +725,20 @@ where
/// Creates a new broker on the given port /// Creates a new broker on the given port
#[cfg(feature = "std")] #[cfg(feature = "std")]
pub fn broker_on_port( pub fn broker_on_port(shmem_provider: SP, port: u16) -> Result<Self, Error> {
shmem_provider: SP,
port: u16,
client_timeout: Duration,
) -> Result<Self, Error> {
Ok(LlmpConnection::IsBroker { Ok(LlmpConnection::IsBroker {
broker: LlmpBroker::create_attach_to_tcp(shmem_provider, port, client_timeout)?, broker: LlmpBroker::create_attach_to_tcp(shmem_provider, port)?,
}) })
} }
/// Creates a new client on the given port /// Creates a new client on the given port
/// This will make a new connection to the broker if it ends up a client
/// In that case this function will return its new [`ClientId`], too.
#[cfg(feature = "std")] #[cfg(feature = "std")]
pub fn client_on_port(shmem_provider: SP, port: u16) -> Result<Self, Error> { pub fn client_on_port(shmem_provider: SP, port: u16) -> Result<Self, Error> {
Ok(LlmpConnection::IsClient { let client = LlmpClient::create_attach_to_tcp(shmem_provider, port)?;
client: LlmpClient::create_attach_to_tcp(shmem_provider, port)?, let conn = LlmpConnection::IsClient { client };
}) Ok(conn)
} }
/// Describe this in a reproducable fashion, if it's a client /// Describe this in a reproducable fashion, if it's a client
@ -834,6 +836,16 @@ struct LlmpPayloadSharedMapInfo {
pub shm_str: [u8; 20], pub shm_str: [u8; 20],
} }
/// Message payload when a client got removed
/// This is an internal message!
/// [`LLMP_TAG_END_OF_PAGE_V1`]
#[derive(Copy, Clone, Debug)]
#[repr(C, align(8))]
struct LlmpClientExitInfo {
/// The restarter process id of the client
pub client_id: u32,
}
/// Sending end on a (unidirectional) sharedmap channel /// Sending end on a (unidirectional) sharedmap channel
#[derive(Debug)] #[derive(Debug)]
pub struct LlmpSender<SP> pub struct LlmpSender<SP>
@ -1966,23 +1978,20 @@ where
/// This allows us to intercept messages right in the broker. /// This allows us to intercept messages right in the broker.
/// This keeps the out map clean. /// This keeps the out map clean.
/// The backing values of `llmp_clients` [`ClientId`]s will always be sorted (but not gapless) /// The backing values of `llmp_clients` [`ClientId`]s will always be sorted (but not gapless)
/// Make sure to always increase `num_clients_total` when pushing a new [`LlmpReceiver`] to `llmp_clients`! /// Make sure to always increase `num_clients_seen` when pushing a new [`LlmpReceiver`] to `llmp_clients`!
llmp_clients: Vec<LlmpReceiver<SP>>, llmp_clients: Vec<LlmpReceiver<SP>>,
/// The own listeners we spawned via `launch_listener` or `crate_attach_to_tcp`. /// The own listeners we spawned via `launch_listener` or `crate_attach_to_tcp`.
/// Listeners will be ignored for `exit_cleanly_after` and they are never considered to have timed out. /// Listeners will be ignored for `exit_cleanly_after` and they are never considered to have timed out.
listeners: Vec<ClientId>, listeners: Vec<ClientId>,
/// The total amount of clients we had, historically, including those that disconnected, and our listeners. /// The total amount of clients we had, historically, including those that disconnected, and our listeners.
num_clients_total: usize, num_clients_seen: usize,
/// The amount of total clients that should have connected and (and disconnected) /// The amount of total clients that should have connected and (and disconnected)
/// after which the broker loop should quit gracefully. /// after which the broker loop should quit gracefully.
pub exit_cleanly_after: Option<NonZeroUsize>, pub exit_cleanly_after: Option<NonZeroUsize>,
/// Clients that should be removed soon, (offset into `llmp_clients`) /// Clients that should be removed soon
clients_to_remove: Vec<usize>, clients_to_remove: Vec<ClientId>,
/// The `ShMemProvider` to use /// The `ShMemProvider` to use
shmem_provider: SP, shmem_provider: SP,
#[cfg(feature = "std")]
/// The timeout after which a client will be considered stale, and removed.
client_timeout: Duration,
} }
/// A signal handler for the [`LlmpBroker`]. /// A signal handler for the [`LlmpBroker`].
@ -2031,15 +2040,12 @@ where
SP: ShMemProvider + 'static, SP: ShMemProvider + 'static,
{ {
/// Create and initialize a new [`LlmpBroker`] /// Create and initialize a new [`LlmpBroker`]
pub fn new( pub fn new(shmem_provider: SP) -> Result<Self, Error> {
shmem_provider: SP,
#[cfg(feature = "std")] client_timeout: Duration,
) -> Result<Self, Error> {
// Broker never cleans up the pages so that new // Broker never cleans up the pages so that new
// clients may join at any time // clients may join at any time
#[cfg(feature = "std")] #[cfg(feature = "std")]
{ {
Self::with_keep_pages(shmem_provider, true, client_timeout) Self::with_keep_pages(shmem_provider, true)
} }
#[cfg(not(feature = "std"))] #[cfg(not(feature = "std"))]
@ -2052,7 +2058,6 @@ where
pub fn with_keep_pages( pub fn with_keep_pages(
mut shmem_provider: SP, mut shmem_provider: SP,
keep_pages_forever: bool, keep_pages_forever: bool,
#[cfg(feature = "std")] client_timeout: Duration,
) -> Result<Self, Error> { ) -> Result<Self, Error> {
Ok(LlmpBroker { Ok(LlmpBroker {
llmp_out: LlmpSender { llmp_out: LlmpSender {
@ -2068,25 +2073,23 @@ where
unused_shmem_cache: vec![], unused_shmem_cache: vec![],
}, },
llmp_clients: vec![], llmp_clients: vec![],
clients_to_remove: vec![], clients_to_remove: Vec::new(),
shmem_provider, shmem_provider,
listeners: vec![], listeners: vec![],
exit_cleanly_after: None, exit_cleanly_after: None,
num_clients_total: 0, num_clients_seen: 0,
#[cfg(feature = "std")]
client_timeout,
}) })
} }
/// Gets the [`ClientId`] the next client attaching to this broker will get. /// Gets the [`ClientId`] the next client attaching to this broker will get.
/// In its current implememtation, the inner value of the next [`ClientId`] /// In its current implememtation, the inner value of the next [`ClientId`]
/// is equal to `self.num_clients_total`. /// is equal to `self.num_clients_seen`.
/// Calling `peek_next_client_id` mutliple times (without adding a client) will yield the same value. /// Calling `peek_next_client_id` mutliple times (without adding a client) will yield the same value.
#[must_use] #[must_use]
#[inline] #[inline]
pub fn peek_next_client_id(&self) -> ClientId { pub fn peek_next_client_id(&self) -> ClientId {
ClientId( ClientId(
self.num_clients_total self.num_clients_seen
.try_into() .try_into()
.expect("More than u32::MAX clients!"), .expect("More than u32::MAX clients!"),
) )
@ -2094,12 +2097,8 @@ where
/// Create a new [`LlmpBroker`] attaching to a TCP port /// Create a new [`LlmpBroker`] attaching to a TCP port
#[cfg(feature = "std")] #[cfg(feature = "std")]
pub fn create_attach_to_tcp( pub fn create_attach_to_tcp(shmem_provider: SP, port: u16) -> Result<Self, Error> {
shmem_provider: SP, Self::with_keep_pages_attach_to_tcp(shmem_provider, port, true)
port: u16,
client_timeout: Duration,
) -> Result<Self, Error> {
Self::with_keep_pages_attach_to_tcp(shmem_provider, port, true, client_timeout)
} }
/// Create a new [`LlmpBroker`] attaching to a TCP port and telling if it has to keep pages forever /// Create a new [`LlmpBroker`] attaching to a TCP port and telling if it has to keep pages forever
@ -2108,15 +2107,10 @@ where
shmem_provider: SP, shmem_provider: SP,
port: u16, port: u16,
keep_pages_forever: bool, keep_pages_forever: bool,
client_timeout: Duration,
) -> Result<Self, Error> { ) -> Result<Self, Error> {
match tcp_bind(port) { match tcp_bind(port) {
Ok(listener) => { Ok(listener) => {
let mut broker = LlmpBroker::with_keep_pages( let mut broker = LlmpBroker::with_keep_pages(shmem_provider, keep_pages_forever)?;
shmem_provider,
keep_pages_forever,
client_timeout,
)?;
let _listener_thread = broker.launch_listener(Listener::Tcp(listener))?; let _listener_thread = broker.launch_listener(Listener::Tcp(listener))?;
Ok(broker) Ok(broker)
} }
@ -2135,14 +2129,14 @@ where
/// Add a client to this broker. /// Add a client to this broker.
/// Will set an appropriate [`ClientId`] before pushing the client to the internal vec. /// Will set an appropriate [`ClientId`] before pushing the client to the internal vec.
/// Will increase `num_clients_total`. /// Will increase `num_clients_seen`.
/// The backing values of `llmp_clients` [`ClientId`]s will always be sorted (but not gapless) /// The backing values of `llmp_clients` [`ClientId`]s will always be sorted (but not gapless)
/// returns the [`ClientId`] of the new client. /// returns the [`ClientId`] of the new client.
pub fn add_client(&mut self, mut client_receiver: LlmpReceiver<SP>) -> ClientId { pub fn add_client(&mut self, mut client_receiver: LlmpReceiver<SP>) -> ClientId {
let id = self.peek_next_client_id(); let id = self.peek_next_client_id();
client_receiver.id = id; client_receiver.id = id;
self.llmp_clients.push(client_receiver); self.llmp_clients.push(client_receiver);
self.num_clients_total += 1; self.num_clients_seen += 1;
id id
} }
@ -2266,38 +2260,37 @@ where
where where
F: FnMut(ClientId, Tag, Flags, &[u8]) -> Result<LlmpMsgHookResult, Error>, F: FnMut(ClientId, Tag, Flags, &[u8]) -> Result<LlmpMsgHookResult, Error>,
{ {
#[cfg(feature = "std")]
let current_time = current_time();
let mut new_messages = false; let mut new_messages = false;
for i in 0..self.llmp_clients.len() { for i in 0..self.llmp_clients.len() {
let client_id = self.llmp_clients[i].id; let client_id = self.llmp_clients[i].id;
match unsafe { self.handle_new_msgs(client_id, on_new_msg) } { match unsafe { self.handle_new_msgs(client_id, on_new_msg) } {
Ok(has_messages) => { Ok(has_messages) => {
// See if we need to remove this client, in case no new messages got brokered, and it's not a listener
#[cfg(feature = "std")]
if !has_messages && !self.listeners.iter().any(|&x| x == client_id) {
let last_msg_time = self.llmp_clients[i].last_msg_time;
if last_msg_time < current_time
&& current_time - last_msg_time > self.client_timeout
{
self.clients_to_remove.push(i);
#[cfg(feature = "llmp_debug")]
log::info!("Client #{:#?} timed out. Removing.", client_id);
}
}
new_messages = has_messages; new_messages = has_messages;
} }
Err(Error::ShuttingDown) => self.clients_to_remove.push(i), Err(Error::ShuttingDown) => {
self.clients_to_remove.push(client_id);
}
Err(err) => return Err(err), Err(err) => return Err(err),
} }
} }
// After brokering, remove all clients we don't want to keep. let possible_remove = self.clients_to_remove.len();
for i in self.clients_to_remove.iter().rev() { if possible_remove > 0 {
let client_id = self.llmp_clients[*i].id; self.clients_to_remove.sort_unstable();
log::info!("Client #{:#?} disconnected.", client_id); self.clients_to_remove.dedup();
self.llmp_clients.remove(*i); log::trace!("Removing {:#?}", self.clients_to_remove);
// rev() to make it works
// commit the change to llmp_clients
for idx in (0..self.llmp_clients.len()).rev() {
let client_id = self.llmp_clients[idx].id;
if self.clients_to_remove.contains(&client_id) {
log::info!("Client {:#?} wants to exit. Removing.", client_id);
self.llmp_clients.remove(idx);
}
}
// log::trace!("{:#?}", self.llmp_clients);
} }
self.clients_to_remove.clear(); self.clients_to_remove.clear();
Ok(new_messages) Ok(new_messages)
} }
@ -2389,12 +2382,12 @@ where
// log::trace!( // log::trace!(
// "Clients connected: {} && > {} - {} >= {}", // "Clients connected: {} && > {} - {} >= {}",
// self.has_clients(), // self.has_clients(),
// self.num_clients_total, // self.num_clients_seen,
// self.listeners.len(), // self.listeners.len(),
// exit_after_count // exit_after_count
// ); // );
if !self.has_clients() if !self.has_clients()
&& (self.num_clients_total - self.listeners.len()) >= exit_after_count.into() && (self.num_clients_seen - self.listeners.len()) >= exit_after_count.into()
{ {
// No more clients connected, and the amount of clients we were waiting for was previously connected. // No more clients connected, and the amount of clients we were waiting for was previously connected.
// exit cleanly. // exit cleanly.
@ -2434,7 +2427,7 @@ where
if let Some(exit_after_count) = self.exit_cleanly_after { if let Some(exit_after_count) = self.exit_cleanly_after {
if !self.has_clients() if !self.has_clients()
&& (self.num_clients_total - self.listeners.len()) > exit_after_count.into() && (self.num_clients_seen - self.listeners.len()) > exit_after_count.into()
{ {
// No more clients connected, and the amount of clients we were waiting for was previously connected. // No more clients connected, and the amount of clients we were waiting for was previously connected.
// exit cleanly. // exit cleanly.
@ -2498,6 +2491,20 @@ where
} }
} }
/// Tell the broker to disconnect this client from it.
fn announce_client_exit(sender: &mut LlmpSender<SP>, client_id: u32) -> Result<(), Error> {
unsafe {
let msg = sender
.alloc_next(size_of::<LlmpClientExitInfo>())
.expect("Could not allocate a new message in shared map.");
(*msg).tag = LLMP_TAG_CLIENT_EXIT;
#[allow(clippy::cast_ptr_alignment)]
let exitinfo = (*msg).buf.as_mut_ptr() as *mut LlmpClientExitInfo;
(*exitinfo).client_id = client_id;
sender.send(msg, true)
}
}
/// For broker to broker connections: /// For broker to broker connections:
/// Launches a proxy thread. /// Launches a proxy thread.
/// It will read outgoing messages from the given broker map (and handle EOP by mapping a new page). /// It will read outgoing messages from the given broker map (and handle EOP by mapping a new page).
@ -2659,6 +2666,13 @@ where
broker_shmem_description: &ShMemDescription, broker_shmem_description: &ShMemDescription,
) { ) {
match request { match request {
TcpRequest::ClientQuit { client_id } => {
// todo search the ancestor_id and remove it.
match Self::announce_client_exit(sender, client_id.0) {
Ok(()) => (),
Err(e) => log::info!("Error announcing client exit: {e:?}"),
}
}
TcpRequest::LocalClientHello { shmem_description } => { TcpRequest::LocalClientHello { shmem_description } => {
match Self::announce_new_client(sender, shmem_description) { match Self::announce_new_client(sender, shmem_description) {
Ok(()) => (), Ok(()) => (),
@ -2779,6 +2793,8 @@ where
continue; continue;
} }
}; };
// log::info!("{:#?}", buf);
let req = match buf.try_into() { let req = match buf.try_into() {
Ok(req) => req, Ok(req) => req,
Err(e) => { Err(e) => {
@ -2823,6 +2839,7 @@ where
// TODO: We could memcpy a range of pending messages, instead of one by one. // TODO: We could memcpy a range of pending messages, instead of one by one.
loop { loop {
// log::trace!("{:#?}", self.llmp_clients);
let msg = { let msg = {
let pos = if (client_id.0 as usize) < self.llmp_clients.len() let pos = if (client_id.0 as usize) < self.llmp_clients.len()
&& self.llmp_clients[client_id.0 as usize].id == client_id && self.llmp_clients[client_id.0 as usize].id == client_id
@ -2857,6 +2874,25 @@ where
LLMP_SLOW_RECEIVER_PANIC => { LLMP_SLOW_RECEIVER_PANIC => {
return Err(Error::unknown(format!("The broker was too slow to handle messages of client {client_id:?} in time, so it quit. Either the client sent messages too fast, or we (the broker) got stuck!"))); return Err(Error::unknown(format!("The broker was too slow to handle messages of client {client_id:?} in time, so it quit. Either the client sent messages too fast, or we (the broker) got stuck!")));
} }
LLMP_TAG_CLIENT_EXIT => {
let msg_buf_len_padded = (*msg).buf_len_padded;
if (*msg).buf_len < size_of::<LlmpClientExitInfo>() as u64 {
log::info!("Ignoring broken CLIENT_EXIT msg due to incorrect size. Expected {} but got {}",
msg_buf_len_padded,
size_of::<LlmpClientExitInfo>()
);
#[cfg(not(feature = "std"))]
return Err(Error::unknown(format!("Broken CLIENT_EXIT msg with incorrect size received. Expected {} but got {}",
msg_buf_len_padded,
size_of::<LlmpClientExitInfo>()
)));
}
let exitinfo = (*msg).buf.as_mut_ptr() as *mut LlmpClientExitInfo;
let client_id = ClientId((*exitinfo).client_id);
log::info!("Client exit message received!, we are removing clients whose client_group_id is {:#?}", client_id);
self.clients_to_remove.push(client_id);
}
LLMP_TAG_NEW_SHM_CLIENT => { LLMP_TAG_NEW_SHM_CLIENT => {
/* This client informs us about yet another new client /* This client informs us about yet another new client
add it to the list! Also, no need to forward this msg. */ add it to the list! Also, no need to forward this msg. */
@ -2873,7 +2909,6 @@ where
))); )));
} }
let pageinfo = (*msg).buf.as_mut_ptr() as *mut LlmpPayloadSharedMapInfo; let pageinfo = (*msg).buf.as_mut_ptr() as *mut LlmpPayloadSharedMapInfo;
match self.shmem_provider.shmem_from_id_and_size( match self.shmem_provider.shmem_from_id_and_size(
ShMemId::from_array(&(*pageinfo).shm_str), ShMemId::from_array(&(*pageinfo).shm_str),
(*pageinfo).map_size, (*pageinfo).map_size,
@ -2882,7 +2917,7 @@ where
let mut new_page = LlmpSharedMap::existing(new_shmem); let mut new_page = LlmpSharedMap::existing(new_shmem);
new_page.mark_safe_to_unmap(); new_page.mark_safe_to_unmap();
self.add_client(LlmpReceiver { let _new_client = self.add_client(LlmpReceiver {
id: ClientId(0), // will be auto-filled id: ClientId(0), // will be auto-filled
current_recv_shmem: new_page, current_recv_shmem: new_page,
last_msg_recvd: ptr::null_mut(), last_msg_recvd: ptr::null_mut(),
@ -3140,26 +3175,6 @@ where
self.sender.send_buf_with_flags(tag, flags, buf) self.sender.send_buf_with_flags(tag, flags, buf)
} }
/// Informs the broker about a new client in town, with the given map id
pub fn send_client_added_msg(
&mut self,
shm_str: &[u8; 20],
shm_id: usize,
) -> Result<(), Error> {
// We write this by hand to get around checks in send_buf
unsafe {
let msg = self
.alloc_next(size_of::<LlmpPayloadSharedMapInfo>())
.expect("Could not allocate a new message in shared map.");
(*msg).tag = LLMP_TAG_NEW_SHM_CLIENT;
#[allow(clippy::cast_ptr_alignment)]
let pageinfo = (*msg).buf.as_mut_ptr() as *mut LlmpPayloadSharedMapInfo;
(*pageinfo).shm_str = *shm_str;
(*pageinfo).map_size = shm_id;
self.send(msg)
}
}
/// A client receives a broadcast message. /// A client receives a broadcast message.
/// Returns null if no message is availiable /// Returns null if no message is availiable
/// # Safety /// # Safety
@ -3215,16 +3230,17 @@ where
} }
#[cfg(feature = "std")] #[cfg(feature = "std")]
/// Create a [`LlmpClient`], getting the ID from a given port /// Create a [`LlmpClient`], getting the ID from a given port, then also tell the restarter's ID so we ask to be removed later
/// This is called when, for the first time, the restarter attaches to this process.
pub fn create_attach_to_tcp(mut shmem_provider: SP, port: u16) -> Result<Self, Error> { pub fn create_attach_to_tcp(mut shmem_provider: SP, port: u16) -> Result<Self, Error> {
let mut stream = match TcpStream::connect((_LLMP_CONNECT_ADDR, port)) { let mut stream = match TcpStream::connect((IP_LOCALHOST, port)) {
Ok(stream) => stream, Ok(stream) => stream,
Err(e) => { Err(e) => {
match e.kind() { match e.kind() {
ErrorKind::ConnectionRefused => { ErrorKind::ConnectionRefused => {
//connection refused. loop till the broker is up //connection refused. loop till the broker is up
loop { loop {
match TcpStream::connect((_LLMP_CONNECT_ADDR, port)) { match TcpStream::connect((IP_LOCALHOST, port)) {
Ok(stream) => break stream, Ok(stream) => break stream,
Err(_) => { Err(_) => {
log::info!("Connection Refused.. Retrying"); log::info!("Connection Refused.. Retrying");
@ -3293,7 +3309,7 @@ mod tests {
LlmpClient, LlmpClient,
LlmpConnection::{self, IsBroker, IsClient}, LlmpConnection::{self, IsBroker, IsClient},
LlmpMsgHookResult::ForwardToClients, LlmpMsgHookResult::ForwardToClients,
Tag, DEFAULT_CLIENT_TIMEOUT_SECS, Tag,
}; };
use crate::shmem::{ShMemProvider, StdShMemProvider}; use crate::shmem::{ShMemProvider, StdShMemProvider};
@ -3303,25 +3319,13 @@ mod tests {
pub fn test_llmp_connection() { pub fn test_llmp_connection() {
#[allow(unused_variables)] #[allow(unused_variables)]
let shmem_provider = StdShMemProvider::new().unwrap(); let shmem_provider = StdShMemProvider::new().unwrap();
let mut broker = match LlmpConnection::on_port( let mut broker = match LlmpConnection::on_port(shmem_provider.clone(), 1337).unwrap() {
shmem_provider.clone(),
1337,
DEFAULT_CLIENT_TIMEOUT_SECS,
)
.unwrap()
{
IsClient { client: _ } => panic!("Could not bind to port as broker"), IsClient { client: _ } => panic!("Could not bind to port as broker"),
IsBroker { broker } => broker, IsBroker { broker } => broker,
}; };
// Add the first client (2nd, actually, because of the tcp listener client) // Add the first client (2nd, actually, because of the tcp listener client)
let mut client = match LlmpConnection::on_port( let mut client = match LlmpConnection::on_port(shmem_provider.clone(), 1337).unwrap() {
shmem_provider.clone(),
1337,
DEFAULT_CLIENT_TIMEOUT_SECS,
)
.unwrap()
{
IsBroker { broker: _ } => panic!("Second connect should be a client!"), IsBroker { broker: _ } => panic!("Second connect should be a client!"),
IsClient { client } => client, IsClient { client } => client,
}; };