* cleanup

* ppppp

* a

* b
This commit is contained in:
Dongjia "toka" Zhang 2024-06-07 14:56:14 +02:00 committed by GitHub
parent 1b008ae2e4
commit 2cc33464fa
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 89 additions and 94 deletions

View File

@ -665,7 +665,7 @@ where
log::info!("PID: {:#?} I am centralized broker", std::process::id()); log::info!("PID: {:#?} I am centralized broker", std::process::id());
self.shmem_provider.post_fork(true)?; self.shmem_provider.post_fork(true)?;
let llmp_centralized_hook = CentralizedLlmpHook::<S::Input, SP>::new()?; let llmp_centralized_hook = CentralizedLlmpHook::<S::Input>::new()?;
// TODO switch to false after solving the bug // TODO switch to false after solving the bug
let mut broker = LlmpBroker::with_keep_pages_attach_to_tcp( let mut broker = LlmpBroker::with_keep_pages_attach_to_tcp(

View File

@ -3,7 +3,7 @@ use std::{fmt::Debug, marker::PhantomData};
#[cfg(feature = "llmp_compression")] #[cfg(feature = "llmp_compression")]
use libafl_bolts::{compress::GzipCompressor, llmp::LLMP_FLAG_COMPRESSED}; use libafl_bolts::{compress::GzipCompressor, llmp::LLMP_FLAG_COMPRESSED};
use libafl_bolts::{ use libafl_bolts::{
llmp::{Flags, LlmpBrokerState, LlmpHook, LlmpMsgHookResult, Tag}, llmp::{Flags, LlmpBrokerInner, LlmpHook, LlmpMsgHookResult, Tag},
shmem::ShMemProvider, shmem::ShMemProvider,
ClientId, Error, ClientId, Error,
}; };
@ -16,20 +16,20 @@ use crate::{
}; };
/// An LLMP-backed event manager for scalable multi-processed fuzzing /// An LLMP-backed event manager for scalable multi-processed fuzzing
pub struct CentralizedLlmpHook<I, SP> { pub struct CentralizedLlmpHook<I> {
#[cfg(feature = "llmp_compression")] #[cfg(feature = "llmp_compression")]
compressor: GzipCompressor, compressor: GzipCompressor,
phantom: PhantomData<(I, SP)>, phantom: PhantomData<I>,
} }
impl<I, SP> LlmpHook<SP> for CentralizedLlmpHook<I, SP> impl<I, SP> LlmpHook<SP> for CentralizedLlmpHook<I>
where where
I: Input, I: Input,
SP: ShMemProvider, SP: ShMemProvider,
{ {
fn on_new_message( fn on_new_message(
&mut self, &mut self,
_llmp_broker_state: &mut LlmpBrokerState<SP>, _broker_inner: &mut LlmpBrokerInner<SP>,
client_id: ClientId, client_id: ClientId,
msg_tag: &mut Tag, msg_tag: &mut Tag,
msg_flags: &mut Flags, msg_flags: &mut Flags,
@ -60,9 +60,9 @@ where
} }
} }
impl<I, SP> Debug for CentralizedLlmpHook<I, SP> { impl<I> Debug for CentralizedLlmpHook<I> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
let mut debug_struct = f.debug_struct("CentralizedLlmpEventBroker"); let mut debug_struct = f.debug_struct("CentralizedLlmpHook");
#[cfg(feature = "llmp_compression")] #[cfg(feature = "llmp_compression")]
let debug_struct = debug_struct.field("compressor", &self.compressor); let debug_struct = debug_struct.field("compressor", &self.compressor);
@ -73,10 +73,9 @@ impl<I, SP> Debug for CentralizedLlmpHook<I, SP> {
} }
} }
impl<I, SP> CentralizedLlmpHook<I, SP> impl<I> CentralizedLlmpHook<I>
where where
I: Input, I: Input,
SP: ShMemProvider,
{ {
/// Create an event broker from a raw broker. /// Create an event broker from a raw broker.
pub fn new() -> Result<Self, Error> { pub fn new() -> Result<Self, Error> {

View File

@ -4,7 +4,7 @@ use core::marker::PhantomData;
#[cfg(feature = "llmp_compression")] #[cfg(feature = "llmp_compression")]
use libafl_bolts::{compress::GzipCompressor, llmp::LLMP_FLAG_COMPRESSED}; use libafl_bolts::{compress::GzipCompressor, llmp::LLMP_FLAG_COMPRESSED};
use libafl_bolts::{ use libafl_bolts::{
llmp::{Flags, LlmpBrokerState, LlmpHook, LlmpMsgHookResult, Tag}, llmp::{Flags, LlmpBrokerInner, LlmpHook, LlmpMsgHookResult, Tag},
shmem::ShMemProvider, shmem::ShMemProvider,
ClientId, ClientId,
}; };
@ -24,17 +24,14 @@ pub mod centralized;
/// An LLMP-backed event hook for scalable multi-processed fuzzing /// An LLMP-backed event hook for scalable multi-processed fuzzing
#[derive(Debug)] #[derive(Debug)]
pub struct StdLlmpEventHook<I, MT, SP> pub struct StdLlmpEventHook<I, MT> {
where
SP: ShMemProvider,
{
monitor: MT, monitor: MT,
#[cfg(feature = "llmp_compression")] #[cfg(feature = "llmp_compression")]
compressor: GzipCompressor, compressor: GzipCompressor,
phantom: PhantomData<(I, SP)>, phantom: PhantomData<I>,
} }
impl<I, MT, SP> LlmpHook<SP> for StdLlmpEventHook<I, MT, SP> impl<I, MT, SP> LlmpHook<SP> for StdLlmpEventHook<I, MT>
where where
I: Input, I: Input,
MT: Monitor, MT: Monitor,
@ -42,7 +39,7 @@ where
{ {
fn on_new_message( fn on_new_message(
&mut self, &mut self,
_llmp_broker_state: &mut LlmpBrokerState<SP>, _broker_inner: &mut LlmpBrokerInner<SP>,
client_id: ClientId, client_id: ClientId,
msg_tag: &mut Tag, msg_tag: &mut Tag,
#[cfg(feature = "llmp_compression")] msg_flags: &mut Flags, #[cfg(feature = "llmp_compression")] msg_flags: &mut Flags,
@ -81,10 +78,9 @@ where
} }
} }
impl<I, MT, SP> StdLlmpEventHook<I, MT, SP> impl<I, MT> StdLlmpEventHook<I, MT>
where where
I: Input, I: Input,
SP: ShMemProvider,
MT: Monitor, MT: Monitor,
{ {
/// Create an event broker from a raw broker. /// Create an event broker from a raw broker.

View File

@ -460,12 +460,12 @@ where
let broker_things = |mut broker: LlmpBroker<_, SP>, remote_broker_addr| { let broker_things = |mut broker: LlmpBroker<_, SP>, remote_broker_addr| {
if let Some(remote_broker_addr) = remote_broker_addr { if let Some(remote_broker_addr) = remote_broker_addr {
log::info!("B2b: Connecting to {:?}", &remote_broker_addr); log::info!("B2b: Connecting to {:?}", &remote_broker_addr);
broker.state_mut().connect_b2b(remote_broker_addr)?; broker.inner_mut().connect_b2b(remote_broker_addr)?;
}; };
if let Some(exit_cleanly_after) = self.exit_cleanly_after { if let Some(exit_cleanly_after) = self.exit_cleanly_after {
broker broker
.state_mut() .inner_mut()
.set_exit_cleanly_after(exit_cleanly_after); .set_exit_cleanly_after(exit_cleanly_after);
} }
@ -483,7 +483,7 @@ where
LlmpConnection::on_port(self.shmem_provider.clone(), self.broker_port)?; LlmpConnection::on_port(self.shmem_provider.clone(), self.broker_port)?;
match connection { match connection {
LlmpConnection::IsBroker { broker } => { LlmpConnection::IsBroker { broker } => {
let llmp_hook = StdLlmpEventHook::<S::Input, MT, SP>::new( let llmp_hook = StdLlmpEventHook::<S::Input, MT>::new(
self.monitor.take().unwrap(), self.monitor.take().unwrap(),
)?; )?;

View File

@ -9,7 +9,7 @@ use std::marker::PhantomData;
#[cfg(all(feature = "std", not(target_os = "haiku")))] #[cfg(all(feature = "std", not(target_os = "haiku")))]
use std::{num::NonZeroUsize, thread, time}; use std::{num::NonZeroUsize, thread, time};
use libafl_bolts::{bolts_prelude::LlmpMsgHookResult, llmp::LlmpBrokerState}; use libafl_bolts::{bolts_prelude::LlmpMsgHookResult, llmp::LlmpBrokerInner};
#[cfg(all(feature = "std", not(target_os = "haiku")))] #[cfg(all(feature = "std", not(target_os = "haiku")))]
use libafl_bolts::{ use libafl_bolts::{
llmp::{self, Flags, LlmpHook, Tag}, llmp::{self, Flags, LlmpHook, Tag},
@ -119,7 +119,7 @@ where
{ {
fn on_new_message( fn on_new_message(
&mut self, &mut self,
_llmp_broker_state: &mut LlmpBrokerState<SP>, _broker_inner: &mut LlmpBrokerInner<SP>,
client_id: ClientId, client_id: ClientId,
msg_tag: &mut Tag, msg_tag: &mut Tag,
_msg_flags: &mut Flags, _msg_flags: &mut Flags,
@ -190,10 +190,10 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
StdShMemProvider::new()?, StdShMemProvider::new()?,
tuple_list!(LlmpExampleHook::new()), tuple_list!(LlmpExampleHook::new()),
)?; )?;
broker.state_mut().launch_tcp_listener_on(port)?; broker.inner_mut().launch_tcp_listener_on(port)?;
// Exit when we got at least _n_ nodes, and all of them quit. // Exit when we got at least _n_ nodes, and all of them quit.
broker broker
.state_mut() .inner_mut()
.set_exit_cleanly_after(NonZeroUsize::new(1_usize).unwrap()); .set_exit_cleanly_after(NonZeroUsize::new(1_usize).unwrap());
broker.loop_with_timeouts(BROKER_TIMEOUT, Some(SLEEP_BETWEEN_FORWARDS)); broker.loop_with_timeouts(BROKER_TIMEOUT, Some(SLEEP_BETWEEN_FORWARDS));
} }
@ -202,9 +202,9 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
StdShMemProvider::new()?, StdShMemProvider::new()?,
tuple_list!(LlmpExampleHook::new()), tuple_list!(LlmpExampleHook::new()),
)?; )?;
broker.state_mut().launch_tcp_listener_on(b2b_port)?; broker.inner_mut().launch_tcp_listener_on(b2b_port)?;
// connect back to the main broker. // connect back to the main broker.
broker.state_mut().connect_b2b(("127.0.0.1", port))?; broker.inner_mut().connect_b2b(("127.0.0.1", port))?;
broker.loop_with_timeouts(BROKER_TIMEOUT, Some(SLEEP_BETWEEN_FORWARDS)); broker.loop_with_timeouts(BROKER_TIMEOUT, Some(SLEEP_BETWEEN_FORWARDS));
} }
"ctr" => { "ctr" => {

View File

@ -48,7 +48,7 @@ also need to create a new [`ShMem`] each time their bufs are filled up.
To use, you will have to create a broker using [`LlmpBroker::new()`]. To use, you will have to create a broker using [`LlmpBroker::new()`].
Then, create some [`LlmpClient`]`s` in other threads and register them Then, create some [`LlmpClient`]`s` in other threads and register them
with the main thread using [`LlmpBrokerState::register_client`]. with the main thread using [`LlmpBrokerInner::register_client`].
Finally, call [`LlmpBroker::loop_forever()`]. Finally, call [`LlmpBroker::loop_forever()`].
For broker2broker communication, all messages are forwarded via network sockets. For broker2broker communication, all messages are forwarded via network sockets.
@ -733,7 +733,7 @@ where
let mut broker = LlmpBroker::new(shmem_provider, tuple_list!())?; let mut broker = LlmpBroker::new(shmem_provider, tuple_list!())?;
let _listener_thread = broker let _listener_thread = broker
.state_mut() .inner_mut()
.launch_listener(Listener::Tcp(listener))?; .launch_listener(Listener::Tcp(listener))?;
Ok(LlmpConnection::IsBroker { broker }) Ok(LlmpConnection::IsBroker { broker })
} }
@ -796,7 +796,7 @@ where
/// Sends the given buffer over this connection, no matter if client or broker. /// Sends the given buffer over this connection, no matter if client or broker.
pub fn send_buf(&mut self, tag: Tag, buf: &[u8]) -> Result<(), Error> { pub fn send_buf(&mut self, tag: Tag, buf: &[u8]) -> Result<(), Error> {
match self { match self {
LlmpConnection::IsBroker { broker } => broker.state.send_buf(tag, buf), LlmpConnection::IsBroker { broker } => broker.inner.send_buf(tag, buf),
LlmpConnection::IsClient { client } => client.send_buf(tag, buf), LlmpConnection::IsClient { client } => client.send_buf(tag, buf),
} }
} }
@ -805,7 +805,7 @@ where
pub fn send_buf_with_flags(&mut self, tag: Tag, buf: &[u8], flags: Flags) -> Result<(), Error> { pub fn send_buf_with_flags(&mut self, tag: Tag, buf: &[u8], flags: Flags) -> Result<(), Error> {
match self { match self {
LlmpConnection::IsBroker { broker } => { LlmpConnection::IsBroker { broker } => {
broker.state.send_buf_with_flags(tag, flags, buf) broker.inner.send_buf_with_flags(tag, flags, buf)
} }
LlmpConnection::IsClient { client } => client.send_buf_with_flags(tag, flags, buf), LlmpConnection::IsClient { client } => client.send_buf_with_flags(tag, flags, buf),
} }
@ -2020,7 +2020,7 @@ where
/// The inner state of [`LlmpBroker`] /// The inner state of [`LlmpBroker`]
#[derive(Debug)] #[derive(Debug)]
pub struct LlmpBrokerState<SP> pub struct LlmpBrokerInner<SP>
where where
SP: ShMemProvider, SP: ShMemProvider,
{ {
@ -2053,7 +2053,7 @@ where
SP: ShMemProvider, SP: ShMemProvider,
{ {
/// The broker /// The broker
state: LlmpBrokerState<SP>, inner: LlmpBrokerInner<SP>,
/// Llmp hooks /// Llmp hooks
hooks: HT, hooks: HT,
} }
@ -2106,7 +2106,7 @@ where
/// something with it (read, transform, forward, etc...) and decides to discard it or not. /// something with it (read, transform, forward, etc...) and decides to discard it or not.
fn on_new_message( fn on_new_message(
&mut self, &mut self,
llmp_broker_state: &mut LlmpBrokerState<SP>, broker_inner: &mut LlmpBrokerInner<SP>,
client_id: ClientId, client_id: ClientId,
msg_tag: &mut Tag, msg_tag: &mut Tag,
msg_flags: &mut Flags, msg_flags: &mut Flags,
@ -2127,7 +2127,7 @@ where
/// Call all hook callbacks on new message. /// Call all hook callbacks on new message.
fn on_new_message_all( fn on_new_message_all(
&mut self, &mut self,
llmp_broker_state: &mut LlmpBrokerState<SP>, inner: &mut LlmpBrokerInner<SP>,
client_id: ClientId, client_id: ClientId,
msg_tag: &mut Tag, msg_tag: &mut Tag,
msg_flags: &mut Flags, msg_flags: &mut Flags,
@ -2144,7 +2144,7 @@ where
{ {
fn on_new_message_all( fn on_new_message_all(
&mut self, &mut self,
_llmp_broker_state: &mut LlmpBrokerState<SP>, _inner: &mut LlmpBrokerInner<SP>,
_client_id: ClientId, _client_id: ClientId,
_msg_tag: &mut Tag, _msg_tag: &mut Tag,
_msg_flags: &mut Flags, _msg_flags: &mut Flags,
@ -2166,7 +2166,7 @@ where
{ {
fn on_new_message_all( fn on_new_message_all(
&mut self, &mut self,
llmp_broker_state: &mut LlmpBrokerState<SP>, inner: &mut LlmpBrokerInner<SP>,
client_id: ClientId, client_id: ClientId,
msg_tag: &mut Tag, msg_tag: &mut Tag,
msg_flags: &mut Flags, msg_flags: &mut Flags,
@ -2174,7 +2174,7 @@ where
) -> Result<LlmpMsgHookResult, Error> { ) -> Result<LlmpMsgHookResult, Error> {
match self match self
.0 .0
.on_new_message(llmp_broker_state, client_id, msg_tag, msg_flags, msg)? .on_new_message(inner, client_id, msg_tag, msg_flags, msg)?
{ {
LlmpMsgHookResult::Handled => { LlmpMsgHookResult::Handled => {
// message handled, stop early // message handled, stop early
@ -2183,7 +2183,7 @@ where
LlmpMsgHookResult::ForwardToClients => { LlmpMsgHookResult::ForwardToClients => {
// message should be forwarded, continue iterating // message should be forwarded, continue iterating
self.1 self.1
.on_new_message_all(llmp_broker_state, client_id, msg_tag, msg_flags, msg) .on_new_message_all(inner, client_id, msg_tag, msg_flags, msg)
} }
} }
} }
@ -2205,7 +2205,7 @@ where
HT: LlmpHookTuple<SP>, HT: LlmpHookTuple<SP>,
{ {
LlmpBroker { LlmpBroker {
state: self.state, inner: self.inner,
hooks, hooks,
} }
} }
@ -2228,7 +2228,7 @@ where
keep_pages_forever: bool, keep_pages_forever: bool,
) -> Result<Self, Error> { ) -> Result<Self, Error> {
Ok(LlmpBroker { Ok(LlmpBroker {
state: LlmpBrokerState::with_keep_pages(shmem_provider, keep_pages_forever)?, inner: LlmpBrokerInner::with_keep_pages(shmem_provider, keep_pages_forever)?,
hooks, hooks,
}) })
} }
@ -2237,7 +2237,7 @@ where
#[cfg(feature = "std")] #[cfg(feature = "std")]
pub fn create_attach_to_tcp(shmem_provider: SP, hooks: HT, port: u16) -> Result<Self, Error> { pub fn create_attach_to_tcp(shmem_provider: SP, hooks: HT, port: u16) -> Result<Self, Error> {
Ok(LlmpBroker { Ok(LlmpBroker {
state: LlmpBrokerState::create_attach_to_tcp(shmem_provider, port)?, inner: LlmpBrokerInner::create_attach_to_tcp(shmem_provider, port)?,
hooks, hooks,
}) })
} }
@ -2251,7 +2251,7 @@ where
keep_pages_forever: bool, keep_pages_forever: bool,
) -> Result<Self, Error> { ) -> Result<Self, Error> {
Ok(LlmpBroker { Ok(LlmpBroker {
state: LlmpBrokerState::with_keep_pages_attach_to_tcp( inner: LlmpBrokerInner::with_keep_pages_attach_to_tcp(
shmem_provider, shmem_provider,
port, port,
keep_pages_forever, keep_pages_forever,
@ -2261,13 +2261,13 @@ where
} }
/// Get the inner state of the broker /// Get the inner state of the broker
pub fn state(&self) -> &LlmpBrokerState<SP> { pub fn inner(&self) -> &LlmpBrokerInner<SP> {
&self.state &self.inner
} }
/// Get the inner mutable state of the broker /// Get the inner mutable state of the broker
pub fn state_mut(&mut self) -> &mut LlmpBrokerState<SP> { pub fn inner_mut(&mut self) -> &mut LlmpBrokerInner<SP> {
&mut self.state &mut self.inner
} }
/// Loops unitl the last client quit, /// Loops unitl the last client quit,
@ -2278,13 +2278,13 @@ where
#[cfg(any(all(unix, not(miri)), all(windows, feature = "std")))] #[cfg(any(all(unix, not(miri)), all(windows, feature = "std")))]
Self::setup_handlers(); Self::setup_handlers();
while !self.state.is_shutting_down() { while !self.inner.is_shutting_down() {
self.broker_once() self.broker_once()
.expect("An error occurred when brokering. Exiting."); .expect("An error occurred when brokering. Exiting.");
if let Some(exit_after_count) = self.state.exit_cleanly_after { if let Some(exit_after_count) = self.inner.exit_cleanly_after {
if !self.state.has_clients() if !self.inner.has_clients()
&& (self.state.num_clients_seen - self.state.listeners.len()) && (self.inner.num_clients_seen - self.inner.listeners.len())
> exit_after_count.into() > exit_after_count.into()
{ {
// No more clients connected, and the amount of clients we were waiting for was previously connected. // No more clients connected, and the amount of clients we were waiting for was previously connected.
@ -2303,7 +2303,7 @@ where
panic!("Cannot sleep on no_std platform (requested {time:?})"); panic!("Cannot sleep on no_std platform (requested {time:?})");
} }
} }
self.state self.inner
.llmp_out .llmp_out
.send_buf(LLMP_TAG_EXITING, &[]) .send_buf(LLMP_TAG_EXITING, &[])
.expect("Error when shutting down broker: Could not send LLMP_TAG_EXITING msg."); .expect("Error when shutting down broker: Could not send LLMP_TAG_EXITING msg.");
@ -2324,7 +2324,7 @@ where
let timeout = timeout.as_millis() as u64; let timeout = timeout.as_millis() as u64;
let mut end_time = current_milliseconds() + timeout; let mut end_time = current_milliseconds() + timeout;
while !self.state.is_shutting_down() { while !self.inner.is_shutting_down() {
if current_milliseconds() > end_time { if current_milliseconds() > end_time {
self.hooks self.hooks
.on_timeout_all() .on_timeout_all()
@ -2339,7 +2339,7 @@ where
end_time = current_milliseconds() + timeout; end_time = current_milliseconds() + timeout;
} }
if let Some(exit_after_count) = self.state.exit_cleanly_after { if let Some(exit_after_count) = self.inner.exit_cleanly_after {
// log::trace!( // log::trace!(
// "Clients connected: {} && > {} - {} >= {}", // "Clients connected: {} && > {} - {} >= {}",
// self.has_clients(), // self.has_clients(),
@ -2347,8 +2347,8 @@ where
// self.listeners.len(), // self.listeners.len(),
// exit_after_count // exit_after_count
// ); // );
if !self.state.has_clients() if !self.inner.has_clients()
&& (self.state.num_clients_seen - self.state.listeners.len()) && (self.inner.num_clients_seen - self.inner.listeners.len())
>= exit_after_count.into() >= exit_after_count.into()
{ {
// No more clients connected, and the amount of clients we were waiting for was previously connected. // No more clients connected, and the amount of clients we were waiting for was previously connected.
@ -2367,7 +2367,7 @@ where
panic!("Cannot sleep on no_std platform (requested {time:?})"); panic!("Cannot sleep on no_std platform (requested {time:?})");
} }
} }
self.state self.inner
.llmp_out .llmp_out
.send_buf(LLMP_TAG_EXITING, &[]) .send_buf(LLMP_TAG_EXITING, &[])
.expect("Error when shutting down broker: Could not send LLMP_TAG_EXITING msg."); .expect("Error when shutting down broker: Could not send LLMP_TAG_EXITING msg.");
@ -2378,37 +2378,37 @@ where
#[inline] #[inline]
pub fn broker_once(&mut self) -> Result<bool, Error> { pub fn broker_once(&mut self) -> Result<bool, Error> {
let mut new_messages = false; let mut new_messages = false;
for i in 0..self.state.llmp_clients.len() { for i in 0..self.inner.llmp_clients.len() {
let client_id = self.state.llmp_clients[i].id; let client_id = self.inner.llmp_clients[i].id;
match unsafe { self.handle_new_msgs(client_id) } { match unsafe { self.handle_new_msgs(client_id) } {
Ok(has_messages) => { Ok(has_messages) => {
new_messages = has_messages; new_messages = has_messages;
} }
Err(Error::ShuttingDown) => { Err(Error::ShuttingDown) => {
self.state.clients_to_remove.push(client_id); self.inner.clients_to_remove.push(client_id);
} }
Err(err) => return Err(err), Err(err) => return Err(err),
} }
} }
let possible_remove = self.state.clients_to_remove.len(); let possible_remove = self.inner.clients_to_remove.len();
if possible_remove > 0 { if possible_remove > 0 {
self.state.clients_to_remove.sort_unstable(); self.inner.clients_to_remove.sort_unstable();
self.state.clients_to_remove.dedup(); self.inner.clients_to_remove.dedup();
log::trace!("Removing {:#?}", self.state.clients_to_remove); log::trace!("Removing {:#?}", self.inner.clients_to_remove);
// rev() to make it works // rev() to make it works
// commit the change to llmp_clients // commit the change to llmp_clients
for idx in (0..self.state.llmp_clients.len()).rev() { for idx in (0..self.inner.llmp_clients.len()).rev() {
let client_id = self.state.llmp_clients[idx].id; let client_id = self.inner.llmp_clients[idx].id;
if self.state.clients_to_remove.contains(&client_id) { if self.inner.clients_to_remove.contains(&client_id) {
log::info!("Client {:#?} wants to exit. Removing.", client_id); log::info!("Client {:#?} wants to exit. Removing.", client_id);
self.state.llmp_clients.remove(idx); self.inner.llmp_clients.remove(idx);
} }
} }
// log::trace!("{:#?}", self.llmp_clients); // log::trace!("{:#?}", self.llmp_clients);
} }
self.state.clients_to_remove.clear(); self.inner.clients_to_remove.clear();
Ok(new_messages) Ok(new_messages)
} }
@ -2423,18 +2423,18 @@ where
loop { loop {
// log::trace!("{:#?}", self.llmp_clients); // log::trace!("{:#?}", self.llmp_clients);
let msg = { let msg = {
let pos = if (client_id.0 as usize) < self.state.llmp_clients.len() let pos = if (client_id.0 as usize) < self.inner.llmp_clients.len()
&& self.state.llmp_clients[client_id.0 as usize].id == client_id && self.inner.llmp_clients[client_id.0 as usize].id == client_id
{ {
// Fast path when no client before this one was removed // Fast path when no client before this one was removed
client_id.0 as usize client_id.0 as usize
} else { } else {
self.state self.inner
.llmp_clients .llmp_clients
.binary_search_by_key(&client_id, |x| x.id) .binary_search_by_key(&client_id, |x| x.id)
.expect("Fatal error, client ID {client_id} not found in llmp_clients.") .expect("Fatal error, client ID {client_id} not found in llmp_clients.")
}; };
let client = &mut self.state.llmp_clients[pos]; let client = &mut self.inner.llmp_clients[pos];
match client.recv()? { match client.recv()? {
None => { None => {
// We're done handling this client // We're done handling this client
@ -2442,7 +2442,7 @@ where
if new_messages { if new_messages {
// set the recv time // set the recv time
// We don't do that in recv() to keep calls to `current_time` to a minimum. // We don't do that in recv() to keep calls to `current_time` to a minimum.
self.state.llmp_clients[pos].last_msg_time = current_time(); self.inner.llmp_clients[pos].last_msg_time = current_time();
} }
return Ok(new_messages); return Ok(new_messages);
} }
@ -2474,7 +2474,7 @@ where
let client_id = ClientId((*exitinfo).client_id); let client_id = ClientId((*exitinfo).client_id);
log::info!("Client exit message received!, we are removing clients whose client_group_id is {:#?}", client_id); log::info!("Client exit message received!, we are removing clients whose client_group_id is {:#?}", client_id);
self.state.clients_to_remove.push(client_id); self.inner.clients_to_remove.push(client_id);
} }
LLMP_TAG_NEW_SHM_CLIENT => { LLMP_TAG_NEW_SHM_CLIENT => {
/* This client informs us about yet another new client /* This client informs us about yet another new client
@ -2492,7 +2492,7 @@ where
))); )));
} }
let pageinfo = (*msg).buf.as_mut_ptr() as *mut LlmpPayloadSharedMapInfo; let pageinfo = (*msg).buf.as_mut_ptr() as *mut LlmpPayloadSharedMapInfo;
match self.state.shmem_provider.shmem_from_id_and_size( match self.inner.shmem_provider.shmem_from_id_and_size(
ShMemId::from_array(&(*pageinfo).shm_str), ShMemId::from_array(&(*pageinfo).shm_str),
(*pageinfo).map_size, (*pageinfo).map_size,
) { ) {
@ -2500,11 +2500,11 @@ where
let mut new_page = LlmpSharedMap::existing(new_shmem); let mut new_page = LlmpSharedMap::existing(new_shmem);
new_page.mark_safe_to_unmap(); new_page.mark_safe_to_unmap();
let _new_client = self.state.add_client(LlmpReceiver { let _new_client = self.inner.add_client(LlmpReceiver {
id: ClientId(0), // will be auto-filled id: ClientId(0), // will be auto-filled
current_recv_shmem: new_page, current_recv_shmem: new_page,
last_msg_recvd: ptr::null_mut(), last_msg_recvd: ptr::null_mut(),
shmem_provider: self.state.shmem_provider.clone(), shmem_provider: self.inner.shmem_provider.clone(),
highest_msg_id: MessageId(0), highest_msg_id: MessageId(0),
// We don't know the last received time, just assume the current time. // We don't know the last received time, just assume the current time.
#[cfg(feature = "std")] #[cfg(feature = "std")]
@ -2525,22 +2525,22 @@ where
// The message is not specifically for use. Let the user handle it, then forward it to the clients, if necessary. // The message is not specifically for use. Let the user handle it, then forward it to the clients, if necessary.
let mut should_forward_msg = true; let mut should_forward_msg = true;
let pos = if (client_id.0 as usize) < self.state.llmp_clients.len() let pos = if (client_id.0 as usize) < self.inner.llmp_clients.len()
&& self.state.llmp_clients[client_id.0 as usize].id == client_id && self.inner.llmp_clients[client_id.0 as usize].id == client_id
{ {
// Fast path when no client before this one was removed // Fast path when no client before this one was removed
client_id.0 as usize client_id.0 as usize
} else { } else {
self.state self.inner
.llmp_clients .llmp_clients
.binary_search_by_key(&client_id, |x| x.id) .binary_search_by_key(&client_id, |x| x.id)
.expect("Fatal error, client ID {client_id} not found in llmp_clients.") .expect("Fatal error, client ID {client_id} not found in llmp_clients.")
}; };
let map = &mut self.state.llmp_clients[pos].current_recv_shmem; let map = &mut self.inner.llmp_clients[pos].current_recv_shmem;
let msg_buf = (*msg).try_as_slice_mut(map)?; let msg_buf = (*msg).try_as_slice_mut(map)?;
if let LlmpMsgHookResult::Handled = self.hooks.on_new_message_all( if let LlmpMsgHookResult::Handled = self.hooks.on_new_message_all(
&mut self.state, &mut self.inner,
client_id, client_id,
&mut (*msg).tag, &mut (*msg).tag,
&mut (*msg).flags, &mut (*msg).flags,
@ -2550,7 +2550,7 @@ where
} }
if should_forward_msg { if should_forward_msg {
self.state_mut().forward_msg(msg)?; self.inner_mut().forward_msg(msg)?;
} }
} }
} }
@ -2582,21 +2582,21 @@ where
/// The broker forwards all messages to its own bus-like broadcast map. /// The broker forwards all messages to its own bus-like broadcast map.
/// It may intercept messages passing through. /// It may intercept messages passing through.
impl<SP> LlmpBrokerState<SP> impl<SP> LlmpBrokerInner<SP>
where where
SP: ShMemProvider, SP: ShMemProvider,
{ {
/// Create and initialize a new [`LlmpBrokerState`], associated with some hooks. /// Create and initialize a new [`LlmpBrokerInner`], associated with some hooks.
pub fn new(shmem_provider: SP) -> Result<Self, Error> { pub fn new(shmem_provider: SP) -> Result<Self, Error> {
Self::with_keep_pages(shmem_provider, true) Self::with_keep_pages(shmem_provider, true)
} }
/// Create and initialize a new [`LlmpBrokerState`] telling if it has to keep pages forever /// Create and initialize a new [`LlmpBrokerInner`] telling if it has to keep pages forever
pub fn with_keep_pages( pub fn with_keep_pages(
mut shmem_provider: SP, mut shmem_provider: SP,
keep_pages_forever: bool, keep_pages_forever: bool,
) -> Result<Self, Error> { ) -> Result<Self, Error> {
Ok(LlmpBrokerState { Ok(LlmpBrokerInner {
llmp_out: LlmpSender { llmp_out: LlmpSender {
id: ClientId(0), id: ClientId(0),
last_msg_sent: ptr::null_mut(), last_msg_sent: ptr::null_mut(),
@ -2632,13 +2632,13 @@ where
) )
} }
/// Create a new [`LlmpBrokerState`] attaching to a TCP port /// Create a new [`LlmpBrokerInner`] attaching to a TCP port
#[cfg(feature = "std")] #[cfg(feature = "std")]
pub fn create_attach_to_tcp(shmem_provider: SP, port: u16) -> Result<Self, Error> { pub fn create_attach_to_tcp(shmem_provider: SP, port: u16) -> Result<Self, Error> {
Self::with_keep_pages_attach_to_tcp(shmem_provider, port, true) Self::with_keep_pages_attach_to_tcp(shmem_provider, port, true)
} }
/// Create a new [`LlmpBrokerState`] attaching to a TCP port and telling if it has to keep pages forever /// Create a new [`LlmpBrokerInner`] attaching to a TCP port and telling if it has to keep pages forever
#[cfg(feature = "std")] #[cfg(feature = "std")]
pub fn with_keep_pages_attach_to_tcp( pub fn with_keep_pages_attach_to_tcp(
shmem_provider: SP, shmem_provider: SP,
@ -2648,7 +2648,7 @@ where
match tcp_bind(port) { match tcp_bind(port) {
Ok(listener) => { Ok(listener) => {
let mut broker = let mut broker =
LlmpBrokerState::with_keep_pages(shmem_provider, keep_pages_forever)?; LlmpBrokerInner::with_keep_pages(shmem_provider, keep_pages_forever)?;
let _listener_thread = broker.launch_listener(Listener::Tcp(listener))?; let _listener_thread = broker.launch_listener(Listener::Tcp(listener))?;
Ok(broker) Ok(broker)
} }
@ -3587,6 +3587,6 @@ mod tests {
assert_eq!(arr[0], arr2[0]); assert_eq!(arr[0], arr2[0]);
// We want at least the tcp and sender clients. // We want at least the tcp and sender clients.
assert_eq!(broker.state.llmp_clients.len(), 2); assert_eq!(broker.inner.llmp_clients.len(), 2);
} }
} }