From 2cc33464fa9222a719e36a92f0bfb1baeaae25e8 Mon Sep 17 00:00:00 2001 From: "Dongjia \"toka\" Zhang" Date: Fri, 7 Jun 2024 14:56:14 +0200 Subject: [PATCH] Cleanup for #2280 (#2286) * cleanup * ppppp * a * b --- libafl/src/events/launcher.rs | 2 +- libafl/src/events/llmp/hooks/centralized.rs | 17 ++- libafl/src/events/llmp/hooks/mod.rs | 16 +-- libafl/src/events/llmp/restarting.rs | 6 +- libafl_bolts/examples/llmp_test/main.rs | 12 +- libafl_bolts/src/llmp.rs | 130 ++++++++++---------- 6 files changed, 89 insertions(+), 94 deletions(-) diff --git a/libafl/src/events/launcher.rs b/libafl/src/events/launcher.rs index 37a4fcfc24..727eaab732 100644 --- a/libafl/src/events/launcher.rs +++ b/libafl/src/events/launcher.rs @@ -665,7 +665,7 @@ where log::info!("PID: {:#?} I am centralized broker", std::process::id()); self.shmem_provider.post_fork(true)?; - let llmp_centralized_hook = CentralizedLlmpHook::::new()?; + let llmp_centralized_hook = CentralizedLlmpHook::::new()?; // TODO switch to false after solving the bug let mut broker = LlmpBroker::with_keep_pages_attach_to_tcp( diff --git a/libafl/src/events/llmp/hooks/centralized.rs b/libafl/src/events/llmp/hooks/centralized.rs index 72e78ba623..92f78a10e6 100644 --- a/libafl/src/events/llmp/hooks/centralized.rs +++ b/libafl/src/events/llmp/hooks/centralized.rs @@ -3,7 +3,7 @@ use std::{fmt::Debug, marker::PhantomData}; #[cfg(feature = "llmp_compression")] use libafl_bolts::{compress::GzipCompressor, llmp::LLMP_FLAG_COMPRESSED}; use libafl_bolts::{ - llmp::{Flags, LlmpBrokerState, LlmpHook, LlmpMsgHookResult, Tag}, + llmp::{Flags, LlmpBrokerInner, LlmpHook, LlmpMsgHookResult, Tag}, shmem::ShMemProvider, ClientId, Error, }; @@ -16,20 +16,20 @@ use crate::{ }; /// An LLMP-backed event manager for scalable multi-processed fuzzing -pub struct CentralizedLlmpHook { +pub struct CentralizedLlmpHook { #[cfg(feature = "llmp_compression")] compressor: GzipCompressor, - phantom: PhantomData<(I, SP)>, + phantom: PhantomData, } -impl LlmpHook for CentralizedLlmpHook +impl LlmpHook for CentralizedLlmpHook where I: Input, SP: ShMemProvider, { fn on_new_message( &mut self, - _llmp_broker_state: &mut LlmpBrokerState, + _broker_inner: &mut LlmpBrokerInner, client_id: ClientId, msg_tag: &mut Tag, msg_flags: &mut Flags, @@ -60,9 +60,9 @@ where } } -impl Debug for CentralizedLlmpHook { +impl Debug for CentralizedLlmpHook { fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { - let mut debug_struct = f.debug_struct("CentralizedLlmpEventBroker"); + let mut debug_struct = f.debug_struct("CentralizedLlmpHook"); #[cfg(feature = "llmp_compression")] let debug_struct = debug_struct.field("compressor", &self.compressor); @@ -73,10 +73,9 @@ impl Debug for CentralizedLlmpHook { } } -impl CentralizedLlmpHook +impl CentralizedLlmpHook where I: Input, - SP: ShMemProvider, { /// Create an event broker from a raw broker. pub fn new() -> Result { diff --git a/libafl/src/events/llmp/hooks/mod.rs b/libafl/src/events/llmp/hooks/mod.rs index 525d5448a3..07af0cc0b4 100644 --- a/libafl/src/events/llmp/hooks/mod.rs +++ b/libafl/src/events/llmp/hooks/mod.rs @@ -4,7 +4,7 @@ use core::marker::PhantomData; #[cfg(feature = "llmp_compression")] use libafl_bolts::{compress::GzipCompressor, llmp::LLMP_FLAG_COMPRESSED}; use libafl_bolts::{ - llmp::{Flags, LlmpBrokerState, LlmpHook, LlmpMsgHookResult, Tag}, + llmp::{Flags, LlmpBrokerInner, LlmpHook, LlmpMsgHookResult, Tag}, shmem::ShMemProvider, ClientId, }; @@ -24,17 +24,14 @@ pub mod centralized; /// An LLMP-backed event hook for scalable multi-processed fuzzing #[derive(Debug)] -pub struct StdLlmpEventHook -where - SP: ShMemProvider, -{ +pub struct StdLlmpEventHook { monitor: MT, #[cfg(feature = "llmp_compression")] compressor: GzipCompressor, - phantom: PhantomData<(I, SP)>, + phantom: PhantomData, } -impl LlmpHook for StdLlmpEventHook +impl LlmpHook for StdLlmpEventHook where I: Input, MT: Monitor, @@ -42,7 +39,7 @@ where { fn on_new_message( &mut self, - _llmp_broker_state: &mut LlmpBrokerState, + _broker_inner: &mut LlmpBrokerInner, client_id: ClientId, msg_tag: &mut Tag, #[cfg(feature = "llmp_compression")] msg_flags: &mut Flags, @@ -81,10 +78,9 @@ where } } -impl StdLlmpEventHook +impl StdLlmpEventHook where I: Input, - SP: ShMemProvider, MT: Monitor, { /// Create an event broker from a raw broker. diff --git a/libafl/src/events/llmp/restarting.rs b/libafl/src/events/llmp/restarting.rs index 92cf63f9f5..b4af6ca040 100644 --- a/libafl/src/events/llmp/restarting.rs +++ b/libafl/src/events/llmp/restarting.rs @@ -460,12 +460,12 @@ where let broker_things = |mut broker: LlmpBroker<_, SP>, remote_broker_addr| { if let Some(remote_broker_addr) = remote_broker_addr { log::info!("B2b: Connecting to {:?}", &remote_broker_addr); - broker.state_mut().connect_b2b(remote_broker_addr)?; + broker.inner_mut().connect_b2b(remote_broker_addr)?; }; if let Some(exit_cleanly_after) = self.exit_cleanly_after { broker - .state_mut() + .inner_mut() .set_exit_cleanly_after(exit_cleanly_after); } @@ -483,7 +483,7 @@ where LlmpConnection::on_port(self.shmem_provider.clone(), self.broker_port)?; match connection { LlmpConnection::IsBroker { broker } => { - let llmp_hook = StdLlmpEventHook::::new( + let llmp_hook = StdLlmpEventHook::::new( self.monitor.take().unwrap(), )?; diff --git a/libafl_bolts/examples/llmp_test/main.rs b/libafl_bolts/examples/llmp_test/main.rs index 79110bfaf1..a18778f83e 100644 --- a/libafl_bolts/examples/llmp_test/main.rs +++ b/libafl_bolts/examples/llmp_test/main.rs @@ -9,7 +9,7 @@ use std::marker::PhantomData; #[cfg(all(feature = "std", not(target_os = "haiku")))] use std::{num::NonZeroUsize, thread, time}; -use libafl_bolts::{bolts_prelude::LlmpMsgHookResult, llmp::LlmpBrokerState}; +use libafl_bolts::{bolts_prelude::LlmpMsgHookResult, llmp::LlmpBrokerInner}; #[cfg(all(feature = "std", not(target_os = "haiku")))] use libafl_bolts::{ llmp::{self, Flags, LlmpHook, Tag}, @@ -119,7 +119,7 @@ where { fn on_new_message( &mut self, - _llmp_broker_state: &mut LlmpBrokerState, + _broker_inner: &mut LlmpBrokerInner, client_id: ClientId, msg_tag: &mut Tag, _msg_flags: &mut Flags, @@ -190,10 +190,10 @@ fn main() -> Result<(), Box> { StdShMemProvider::new()?, tuple_list!(LlmpExampleHook::new()), )?; - broker.state_mut().launch_tcp_listener_on(port)?; + broker.inner_mut().launch_tcp_listener_on(port)?; // Exit when we got at least _n_ nodes, and all of them quit. broker - .state_mut() + .inner_mut() .set_exit_cleanly_after(NonZeroUsize::new(1_usize).unwrap()); broker.loop_with_timeouts(BROKER_TIMEOUT, Some(SLEEP_BETWEEN_FORWARDS)); } @@ -202,9 +202,9 @@ fn main() -> Result<(), Box> { StdShMemProvider::new()?, tuple_list!(LlmpExampleHook::new()), )?; - broker.state_mut().launch_tcp_listener_on(b2b_port)?; + broker.inner_mut().launch_tcp_listener_on(b2b_port)?; // connect back to the main broker. - broker.state_mut().connect_b2b(("127.0.0.1", port))?; + broker.inner_mut().connect_b2b(("127.0.0.1", port))?; broker.loop_with_timeouts(BROKER_TIMEOUT, Some(SLEEP_BETWEEN_FORWARDS)); } "ctr" => { diff --git a/libafl_bolts/src/llmp.rs b/libafl_bolts/src/llmp.rs index 22f4f4cc3b..9f9b71481d 100644 --- a/libafl_bolts/src/llmp.rs +++ b/libafl_bolts/src/llmp.rs @@ -48,7 +48,7 @@ also need to create a new [`ShMem`] each time their bufs are filled up. To use, you will have to create a broker using [`LlmpBroker::new()`]. Then, create some [`LlmpClient`]`s` in other threads and register them -with the main thread using [`LlmpBrokerState::register_client`]. +with the main thread using [`LlmpBrokerInner::register_client`]. Finally, call [`LlmpBroker::loop_forever()`]. For broker2broker communication, all messages are forwarded via network sockets. @@ -733,7 +733,7 @@ where let mut broker = LlmpBroker::new(shmem_provider, tuple_list!())?; let _listener_thread = broker - .state_mut() + .inner_mut() .launch_listener(Listener::Tcp(listener))?; Ok(LlmpConnection::IsBroker { broker }) } @@ -796,7 +796,7 @@ where /// Sends the given buffer over this connection, no matter if client or broker. pub fn send_buf(&mut self, tag: Tag, buf: &[u8]) -> Result<(), Error> { match self { - LlmpConnection::IsBroker { broker } => broker.state.send_buf(tag, buf), + LlmpConnection::IsBroker { broker } => broker.inner.send_buf(tag, buf), LlmpConnection::IsClient { client } => client.send_buf(tag, buf), } } @@ -805,7 +805,7 @@ where pub fn send_buf_with_flags(&mut self, tag: Tag, buf: &[u8], flags: Flags) -> Result<(), Error> { match self { LlmpConnection::IsBroker { broker } => { - broker.state.send_buf_with_flags(tag, flags, buf) + broker.inner.send_buf_with_flags(tag, flags, buf) } LlmpConnection::IsClient { client } => client.send_buf_with_flags(tag, flags, buf), } @@ -2020,7 +2020,7 @@ where /// The inner state of [`LlmpBroker`] #[derive(Debug)] -pub struct LlmpBrokerState +pub struct LlmpBrokerInner where SP: ShMemProvider, { @@ -2053,7 +2053,7 @@ where SP: ShMemProvider, { /// The broker - state: LlmpBrokerState, + inner: LlmpBrokerInner, /// Llmp hooks hooks: HT, } @@ -2106,7 +2106,7 @@ where /// something with it (read, transform, forward, etc...) and decides to discard it or not. fn on_new_message( &mut self, - llmp_broker_state: &mut LlmpBrokerState, + broker_inner: &mut LlmpBrokerInner, client_id: ClientId, msg_tag: &mut Tag, msg_flags: &mut Flags, @@ -2127,7 +2127,7 @@ where /// Call all hook callbacks on new message. fn on_new_message_all( &mut self, - llmp_broker_state: &mut LlmpBrokerState, + inner: &mut LlmpBrokerInner, client_id: ClientId, msg_tag: &mut Tag, msg_flags: &mut Flags, @@ -2144,7 +2144,7 @@ where { fn on_new_message_all( &mut self, - _llmp_broker_state: &mut LlmpBrokerState, + _inner: &mut LlmpBrokerInner, _client_id: ClientId, _msg_tag: &mut Tag, _msg_flags: &mut Flags, @@ -2166,7 +2166,7 @@ where { fn on_new_message_all( &mut self, - llmp_broker_state: &mut LlmpBrokerState, + inner: &mut LlmpBrokerInner, client_id: ClientId, msg_tag: &mut Tag, msg_flags: &mut Flags, @@ -2174,7 +2174,7 @@ where ) -> Result { match self .0 - .on_new_message(llmp_broker_state, client_id, msg_tag, msg_flags, msg)? + .on_new_message(inner, client_id, msg_tag, msg_flags, msg)? { LlmpMsgHookResult::Handled => { // message handled, stop early @@ -2183,7 +2183,7 @@ where LlmpMsgHookResult::ForwardToClients => { // message should be forwarded, continue iterating self.1 - .on_new_message_all(llmp_broker_state, client_id, msg_tag, msg_flags, msg) + .on_new_message_all(inner, client_id, msg_tag, msg_flags, msg) } } } @@ -2205,7 +2205,7 @@ where HT: LlmpHookTuple, { LlmpBroker { - state: self.state, + inner: self.inner, hooks, } } @@ -2228,7 +2228,7 @@ where keep_pages_forever: bool, ) -> Result { Ok(LlmpBroker { - state: LlmpBrokerState::with_keep_pages(shmem_provider, keep_pages_forever)?, + inner: LlmpBrokerInner::with_keep_pages(shmem_provider, keep_pages_forever)?, hooks, }) } @@ -2237,7 +2237,7 @@ where #[cfg(feature = "std")] pub fn create_attach_to_tcp(shmem_provider: SP, hooks: HT, port: u16) -> Result { Ok(LlmpBroker { - state: LlmpBrokerState::create_attach_to_tcp(shmem_provider, port)?, + inner: LlmpBrokerInner::create_attach_to_tcp(shmem_provider, port)?, hooks, }) } @@ -2251,7 +2251,7 @@ where keep_pages_forever: bool, ) -> Result { Ok(LlmpBroker { - state: LlmpBrokerState::with_keep_pages_attach_to_tcp( + inner: LlmpBrokerInner::with_keep_pages_attach_to_tcp( shmem_provider, port, keep_pages_forever, @@ -2261,13 +2261,13 @@ where } /// Get the inner state of the broker - pub fn state(&self) -> &LlmpBrokerState { - &self.state + pub fn inner(&self) -> &LlmpBrokerInner { + &self.inner } /// Get the inner mutable state of the broker - pub fn state_mut(&mut self) -> &mut LlmpBrokerState { - &mut self.state + pub fn inner_mut(&mut self) -> &mut LlmpBrokerInner { + &mut self.inner } /// Loops unitl the last client quit, @@ -2278,13 +2278,13 @@ where #[cfg(any(all(unix, not(miri)), all(windows, feature = "std")))] Self::setup_handlers(); - while !self.state.is_shutting_down() { + while !self.inner.is_shutting_down() { self.broker_once() .expect("An error occurred when brokering. Exiting."); - if let Some(exit_after_count) = self.state.exit_cleanly_after { - if !self.state.has_clients() - && (self.state.num_clients_seen - self.state.listeners.len()) + if let Some(exit_after_count) = self.inner.exit_cleanly_after { + if !self.inner.has_clients() + && (self.inner.num_clients_seen - self.inner.listeners.len()) > exit_after_count.into() { // No more clients connected, and the amount of clients we were waiting for was previously connected. @@ -2303,7 +2303,7 @@ where panic!("Cannot sleep on no_std platform (requested {time:?})"); } } - self.state + self.inner .llmp_out .send_buf(LLMP_TAG_EXITING, &[]) .expect("Error when shutting down broker: Could not send LLMP_TAG_EXITING msg."); @@ -2324,7 +2324,7 @@ where let timeout = timeout.as_millis() as u64; let mut end_time = current_milliseconds() + timeout; - while !self.state.is_shutting_down() { + while !self.inner.is_shutting_down() { if current_milliseconds() > end_time { self.hooks .on_timeout_all() @@ -2339,7 +2339,7 @@ where end_time = current_milliseconds() + timeout; } - if let Some(exit_after_count) = self.state.exit_cleanly_after { + if let Some(exit_after_count) = self.inner.exit_cleanly_after { // log::trace!( // "Clients connected: {} && > {} - {} >= {}", // self.has_clients(), @@ -2347,8 +2347,8 @@ where // self.listeners.len(), // exit_after_count // ); - if !self.state.has_clients() - && (self.state.num_clients_seen - self.state.listeners.len()) + if !self.inner.has_clients() + && (self.inner.num_clients_seen - self.inner.listeners.len()) >= exit_after_count.into() { // No more clients connected, and the amount of clients we were waiting for was previously connected. @@ -2367,7 +2367,7 @@ where panic!("Cannot sleep on no_std platform (requested {time:?})"); } } - self.state + self.inner .llmp_out .send_buf(LLMP_TAG_EXITING, &[]) .expect("Error when shutting down broker: Could not send LLMP_TAG_EXITING msg."); @@ -2378,37 +2378,37 @@ where #[inline] pub fn broker_once(&mut self) -> Result { let mut new_messages = false; - for i in 0..self.state.llmp_clients.len() { - let client_id = self.state.llmp_clients[i].id; + for i in 0..self.inner.llmp_clients.len() { + let client_id = self.inner.llmp_clients[i].id; match unsafe { self.handle_new_msgs(client_id) } { Ok(has_messages) => { new_messages = has_messages; } Err(Error::ShuttingDown) => { - self.state.clients_to_remove.push(client_id); + self.inner.clients_to_remove.push(client_id); } Err(err) => return Err(err), } } - let possible_remove = self.state.clients_to_remove.len(); + let possible_remove = self.inner.clients_to_remove.len(); if possible_remove > 0 { - self.state.clients_to_remove.sort_unstable(); - self.state.clients_to_remove.dedup(); - log::trace!("Removing {:#?}", self.state.clients_to_remove); + self.inner.clients_to_remove.sort_unstable(); + self.inner.clients_to_remove.dedup(); + log::trace!("Removing {:#?}", self.inner.clients_to_remove); // rev() to make it works // commit the change to llmp_clients - for idx in (0..self.state.llmp_clients.len()).rev() { - let client_id = self.state.llmp_clients[idx].id; - if self.state.clients_to_remove.contains(&client_id) { + for idx in (0..self.inner.llmp_clients.len()).rev() { + let client_id = self.inner.llmp_clients[idx].id; + if self.inner.clients_to_remove.contains(&client_id) { log::info!("Client {:#?} wants to exit. Removing.", client_id); - self.state.llmp_clients.remove(idx); + self.inner.llmp_clients.remove(idx); } } // log::trace!("{:#?}", self.llmp_clients); } - self.state.clients_to_remove.clear(); + self.inner.clients_to_remove.clear(); Ok(new_messages) } @@ -2423,18 +2423,18 @@ where loop { // log::trace!("{:#?}", self.llmp_clients); let msg = { - let pos = if (client_id.0 as usize) < self.state.llmp_clients.len() - && self.state.llmp_clients[client_id.0 as usize].id == client_id + let pos = if (client_id.0 as usize) < self.inner.llmp_clients.len() + && self.inner.llmp_clients[client_id.0 as usize].id == client_id { // Fast path when no client before this one was removed client_id.0 as usize } else { - self.state + self.inner .llmp_clients .binary_search_by_key(&client_id, |x| x.id) .expect("Fatal error, client ID {client_id} not found in llmp_clients.") }; - let client = &mut self.state.llmp_clients[pos]; + let client = &mut self.inner.llmp_clients[pos]; match client.recv()? { None => { // We're done handling this client @@ -2442,7 +2442,7 @@ where if new_messages { // set the recv time // We don't do that in recv() to keep calls to `current_time` to a minimum. - self.state.llmp_clients[pos].last_msg_time = current_time(); + self.inner.llmp_clients[pos].last_msg_time = current_time(); } return Ok(new_messages); } @@ -2474,7 +2474,7 @@ where let client_id = ClientId((*exitinfo).client_id); log::info!("Client exit message received!, we are removing clients whose client_group_id is {:#?}", client_id); - self.state.clients_to_remove.push(client_id); + self.inner.clients_to_remove.push(client_id); } LLMP_TAG_NEW_SHM_CLIENT => { /* This client informs us about yet another new client @@ -2492,7 +2492,7 @@ where ))); } let pageinfo = (*msg).buf.as_mut_ptr() as *mut LlmpPayloadSharedMapInfo; - match self.state.shmem_provider.shmem_from_id_and_size( + match self.inner.shmem_provider.shmem_from_id_and_size( ShMemId::from_array(&(*pageinfo).shm_str), (*pageinfo).map_size, ) { @@ -2500,11 +2500,11 @@ where let mut new_page = LlmpSharedMap::existing(new_shmem); new_page.mark_safe_to_unmap(); - let _new_client = self.state.add_client(LlmpReceiver { + let _new_client = self.inner.add_client(LlmpReceiver { id: ClientId(0), // will be auto-filled current_recv_shmem: new_page, last_msg_recvd: ptr::null_mut(), - shmem_provider: self.state.shmem_provider.clone(), + shmem_provider: self.inner.shmem_provider.clone(), highest_msg_id: MessageId(0), // We don't know the last received time, just assume the current time. #[cfg(feature = "std")] @@ -2525,22 +2525,22 @@ where // The message is not specifically for use. Let the user handle it, then forward it to the clients, if necessary. let mut should_forward_msg = true; - let pos = if (client_id.0 as usize) < self.state.llmp_clients.len() - && self.state.llmp_clients[client_id.0 as usize].id == client_id + let pos = if (client_id.0 as usize) < self.inner.llmp_clients.len() + && self.inner.llmp_clients[client_id.0 as usize].id == client_id { // Fast path when no client before this one was removed client_id.0 as usize } else { - self.state + self.inner .llmp_clients .binary_search_by_key(&client_id, |x| x.id) .expect("Fatal error, client ID {client_id} not found in llmp_clients.") }; - let map = &mut self.state.llmp_clients[pos].current_recv_shmem; + let map = &mut self.inner.llmp_clients[pos].current_recv_shmem; let msg_buf = (*msg).try_as_slice_mut(map)?; if let LlmpMsgHookResult::Handled = self.hooks.on_new_message_all( - &mut self.state, + &mut self.inner, client_id, &mut (*msg).tag, &mut (*msg).flags, @@ -2550,7 +2550,7 @@ where } if should_forward_msg { - self.state_mut().forward_msg(msg)?; + self.inner_mut().forward_msg(msg)?; } } } @@ -2582,21 +2582,21 @@ where /// The broker forwards all messages to its own bus-like broadcast map. /// It may intercept messages passing through. -impl LlmpBrokerState +impl LlmpBrokerInner where SP: ShMemProvider, { - /// Create and initialize a new [`LlmpBrokerState`], associated with some hooks. + /// Create and initialize a new [`LlmpBrokerInner`], associated with some hooks. pub fn new(shmem_provider: SP) -> Result { Self::with_keep_pages(shmem_provider, true) } - /// Create and initialize a new [`LlmpBrokerState`] telling if it has to keep pages forever + /// Create and initialize a new [`LlmpBrokerInner`] telling if it has to keep pages forever pub fn with_keep_pages( mut shmem_provider: SP, keep_pages_forever: bool, ) -> Result { - Ok(LlmpBrokerState { + Ok(LlmpBrokerInner { llmp_out: LlmpSender { id: ClientId(0), last_msg_sent: ptr::null_mut(), @@ -2632,13 +2632,13 @@ where ) } - /// Create a new [`LlmpBrokerState`] attaching to a TCP port + /// Create a new [`LlmpBrokerInner`] attaching to a TCP port #[cfg(feature = "std")] pub fn create_attach_to_tcp(shmem_provider: SP, port: u16) -> Result { Self::with_keep_pages_attach_to_tcp(shmem_provider, port, true) } - /// Create a new [`LlmpBrokerState`] attaching to a TCP port and telling if it has to keep pages forever + /// Create a new [`LlmpBrokerInner`] attaching to a TCP port and telling if it has to keep pages forever #[cfg(feature = "std")] pub fn with_keep_pages_attach_to_tcp( shmem_provider: SP, @@ -2648,7 +2648,7 @@ where match tcp_bind(port) { Ok(listener) => { let mut broker = - LlmpBrokerState::with_keep_pages(shmem_provider, keep_pages_forever)?; + LlmpBrokerInner::with_keep_pages(shmem_provider, keep_pages_forever)?; let _listener_thread = broker.launch_listener(Listener::Tcp(listener))?; Ok(broker) } @@ -3587,6 +3587,6 @@ mod tests { assert_eq!(arr[0], arr2[0]); // We want at least the tcp and sender clients. - assert_eq!(broker.state.llmp_clients.len(), 2); + assert_eq!(broker.inner.llmp_clients.len(), 2); } }