diff --git a/libafl/src/events/centralized.rs b/libafl/src/events/centralized.rs index 2e0d7ad48d..1258439842 100644 --- a/libafl/src/events/centralized.rs +++ b/libafl/src/events/centralized.rs @@ -377,7 +377,7 @@ where } fn send_exiting(&mut self) -> Result<(), Error> { - self.client.sender.send_exiting()?; + self.client.sender_mut().send_exiting()?; self.inner.send_exiting() } @@ -594,7 +594,7 @@ where Z: ExecutionProcessor + EvaluatorObservers, { // TODO: Get around local event copy by moving handle_in_client - let self_id = self.client.sender.id; + let self_id = self.client.sender().id(); let mut count = 0; while let Some((client_id, tag, _flags, msg)) = self.client.recv_buf_with_flags()? { assert!( diff --git a/libafl/src/events/llmp.rs b/libafl/src/events/llmp.rs index 1866610870..53d0718ca0 100644 --- a/libafl/src/events/llmp.rs +++ b/libafl/src/events/llmp.rs @@ -624,7 +624,7 @@ impl LlmpEventManager { /// The other side may free up all allocated memory. /// We are no longer allowed to send anything afterwards. pub fn send_exiting(&mut self) -> Result<(), Error> { - self.llmp.sender.send_exiting() + self.llmp.sender_mut().send_exiting() } } @@ -758,7 +758,7 @@ where executor: &mut E, ) -> Result { // TODO: Get around local event copy by moving handle_in_client - let self_id = self.llmp.sender.id; + let self_id = self.llmp.sender().id(); let mut count = 0; while let Some((client_id, tag, _flags, msg)) = self.llmp.recv_buf_with_flags()? { assert!( @@ -825,7 +825,7 @@ where { /// Gets the id assigned to this staterestorer. fn mgr_id(&self) -> EventManagerId { - EventManagerId(self.llmp.sender.id.0 as usize) + EventManagerId(self.llmp.sender().id().0 as usize) } } @@ -1572,7 +1572,7 @@ where Z: ExecutionProcessor + EvaluatorObservers, { // TODO: Get around local event copy by moving handle_in_client - let self_id = self.llmp.sender.id; + let self_id = self.llmp.sender().id(); let mut count = 0; while let Some((client_id, tag, _flags, msg)) = self.llmp.recv_buf_with_flags()? { assert!( diff --git a/libafl_bolts/examples/llmp_test/main.rs b/libafl_bolts/examples/llmp_test/main.rs index 2bbcc27af3..a5265aebf8 100644 --- a/libafl_bolts/examples/llmp_test/main.rs +++ b/libafl_bolts/examples/llmp_test/main.rs @@ -202,7 +202,7 @@ fn main() -> Result<(), Box> { thread::sleep(Duration::from_millis(10)); } log::info!("Exiting Client exits"); - client.sender.send_exiting()?; + client.sender_mut().send_exiting()?; } _ => { println!("No valid mode supplied"); diff --git a/libafl_bolts/src/llmp.rs b/libafl_bolts/src/llmp.rs index b3ebf1abb8..9c310f419c 100644 --- a/libafl_bolts/src/llmp.rs +++ b/libafl_bolts/src/llmp.rs @@ -827,12 +827,12 @@ where SP: ShMemProvider, { /// ID of this sender. - pub id: ClientId, + id: ClientId, /// Ref to the last message this sender sent on the last page. /// If null, a new page (just) started. - pub last_msg_sent: *const LlmpMsg, + last_msg_sent: *const LlmpMsg, /// A vec of page wrappers, each containing an initialized [`ShMem`] - pub out_shmems: Vec>, + out_shmems: Vec>, /// A vec of pages that we previously used, but that have served its purpose /// (no potential readers are left). /// Instead of freeing them, we keep them around to potentially reuse them later, @@ -843,7 +843,7 @@ where /// The broker uses this feature. /// By keeping the message history around, /// new clients may join at any time in the future. - pub keep_pages_forever: bool, + keep_pages_forever: bool, /// True, if we allocatd a message, but didn't call [`Self::send()`] yet has_unsent_message: bool, /// The sharedmem provider to get new sharaed maps if we're full @@ -878,6 +878,12 @@ where }) } + /// ID of this sender. + #[must_use] + pub fn id(&self) -> ClientId { + self.id + } + /// Completely reset the current sender map. /// Afterwards, no receiver should read from it at a different location. /// This is only useful if all connected llmp parties start over, for example after a crash. @@ -1470,16 +1476,16 @@ where SP: ShMemProvider, { /// Client Id of this receiver - pub id: ClientId, + id: ClientId, /// Pointer to the last message received - pub last_msg_recvd: *const LlmpMsg, + last_msg_recvd: *const LlmpMsg, /// Time we received the last message from this receiver #[cfg(feature = "std")] last_msg_time: Duration, /// The shmem provider - pub shmem_provider: SP, + shmem_provider: SP, /// current page. After EOP, this gets replaced with the new one - pub current_recv_shmem: LlmpSharedMap, + current_recv_shmem: LlmpSharedMap, /// Caches the highest msg id we've seen so far highest_msg_id: MessageId, } @@ -1753,7 +1759,7 @@ where { /// Shmem containg the actual (unsafe) page, /// shared between one LlmpSender and one LlmpReceiver - pub shmem: SHM, + shmem: SHM, } // TODO: May be obsolete @@ -1903,13 +1909,13 @@ where SP: ShMemProvider + 'static, { /// Broadcast map from broker to all clients - pub llmp_out: LlmpSender, + llmp_out: LlmpSender, /// Users of Llmp can add message handlers in the broker. /// This allows us to intercept messages right in the broker. /// This keeps the out map clean. /// The backing values of `llmp_clients` [`ClientId`]s will always be sorted (but not gapless) /// Make sure to always increase `num_clients_total` when pushing a new [`LlmpReceiver`] to `llmp_clients`! - pub llmp_clients: Vec>, + llmp_clients: Vec>, /// The own listeners we spawned via `launch_listener` or `crate_attach_to_tcp`. /// Listeners will be ignored for `exit_cleanly_after` and they are never considered to have timed out. listeners: Vec, @@ -2030,6 +2036,19 @@ where self.exit_cleanly_after = Some(n_clients); } + /// Add a client to this broker. + /// Will set an appropriate [`ClientId`] before pushing the client to the internal vec. + /// Will increase `num_clients_total`. + /// The backing values of `llmp_clients` [`ClientId`]s will always be sorted (but not gapless) + /// returns the [`ClientId`] of the new client. + pub fn add_client(&mut self, mut client_receiver: LlmpReceiver) -> ClientId { + let id = self.peek_next_client_id(); + client_receiver.id = id; + self.llmp_clients.push(client_receiver); + self.num_clients_total += 1; + id + } + /// Allocate the next message on the outgoing map unsafe fn alloc_next(&mut self, buf_len: usize) -> Result<*mut LlmpMsg, Error> { self.llmp_out.alloc_next(buf_len) @@ -2042,9 +2061,8 @@ where // Since we now have a handle to it, it won't be umapped too early (only after we also unmap it) client_page.mark_safe_to_unmap(); - let id = self.peek_next_client_id(); - self.llmp_clients.push(LlmpReceiver { - id, + self.add_client(LlmpReceiver { + id: ClientId(0), // Will be auto-filled current_recv_shmem: client_page, last_msg_recvd: ptr::null_mut(), shmem_provider: self.shmem_provider.clone(), @@ -2052,10 +2070,7 @@ where // We don't know the last received time, just assume the current time. #[cfg(feature = "std")] last_msg_time: current_time(), - }); - self.num_clients_total += 1; - - id + }) } /// Connects to a broker running on another machine. @@ -2753,9 +2768,8 @@ where let mut new_page = LlmpSharedMap::existing(new_shmem); new_page.mark_safe_to_unmap(); - let id = self.peek_next_client_id(); - self.llmp_clients.push(LlmpReceiver { - id, + self.add_client(LlmpReceiver { + id: ClientId(0), // will be auto-filled current_recv_shmem: new_page, last_msg_recvd: ptr::null_mut(), shmem_provider: self.shmem_provider.clone(), @@ -2764,7 +2778,6 @@ where #[cfg(feature = "std")] last_msg_time: current_time(), }); - self.num_clients_total += 1; } Err(e) => { log::info!("Error adding client! Ignoring: {e:?}"); @@ -2811,9 +2824,9 @@ where #[derive(Clone, Copy, Debug, Serialize, Deserialize)] pub struct LlmpClientDescription { /// Description of the sender - pub sender: LlmpDescription, + sender: LlmpDescription, /// Description of the receiver - pub receiver: LlmpDescription, + receiver: LlmpDescription, } /// Client side of LLMP @@ -2823,9 +2836,9 @@ where SP: ShMemProvider, { /// Outgoing channel to the broker - pub sender: LlmpSender, + sender: LlmpSender, /// Incoming (broker) broadcast map - pub receiver: LlmpReceiver, + receiver: LlmpReceiver, } /// `n` clients connect to a broker. They share an outgoing map with the broker, @@ -2907,6 +2920,30 @@ where }) } + /// Outgoing channel to the broker + #[must_use] + pub fn sender(&self) -> &LlmpSender { + &self.sender + } + + /// Outgoing channel to the broker (mut) + #[must_use] + pub fn sender_mut(&mut self) -> &mut LlmpSender { + &mut self.sender + } + + /// Incoming (broker) broadcast map + #[must_use] + pub fn receiver(&self) -> &LlmpReceiver { + &self.receiver + } + + /// Incoming (broker) broadcast map (mut) + #[must_use] + pub fn receiver_mut(&mut self) -> &mut LlmpReceiver { + &mut self.receiver + } + /// Waits for the sender to be save to unmap. /// If a receiver is involved on the other side, this function should always be called. pub fn await_safe_to_unmap_blocking(&self) {