diff --git a/libafl/examples/llmp_test/main.rs b/libafl/examples/llmp_test/main.rs index 0324dace4f..43a73ea375 100644 --- a/libafl/examples/llmp_test/main.rs +++ b/libafl/examples/llmp_test/main.rs @@ -31,7 +31,7 @@ fn adder_loop(port: u16) -> ! { loop { let mut msg_counter = 0; loop { - let (_sender, tag, buf) = match client.recv_buf().unwrap() { + let (sender, tag, buf) = match client.recv_buf().unwrap() { None => break, Some(msg) => msg, }; @@ -42,8 +42,9 @@ fn adder_loop(port: u16) -> ! { current_result.wrapping_add(u32::from_le_bytes(buf.try_into().unwrap())); } _ => println!( - "Adder Client ignored unknown message {} with {} bytes", + "Adder Client ignored unknown message {:#x} from client {} with {} bytes", tag, + sender, buf.len() ), }; diff --git a/libafl/src/bolts/llmp.rs b/libafl/src/bolts/llmp.rs index 5a5f1ce0ed..981c860b76 100644 --- a/libafl/src/bolts/llmp.rs +++ b/libafl/src/bolts/llmp.rs @@ -697,7 +697,7 @@ pub struct LlmpSender where SP: ShMemProvider, { - /// ID of this sender. Only used in the broker. + /// ID of this sender. pub id: u32, /// Ref to the last message this sender sent on the last page. /// If null, a new page (just) started. @@ -967,8 +967,9 @@ where /// Commit the message last allocated by [`alloc_next`] to the queue. /// After commiting, the msg shall no longer be altered! /// It will be read by the consuming threads (`broker->clients` or `client->broker`) + /// If `overwrite_client_id` is `false`, the message's `sender` won't be touched (for broker forwarding) #[inline(never)] // Not inlined to make cpu-level reodering (hopefully?) improbable - unsafe fn send(&mut self, msg: *mut LlmpMsg) -> Result<(), Error> { + unsafe fn send(&mut self, msg: *mut LlmpMsg, overwrite_client_id: bool) -> Result<(), Error> { // dbg!("Sending msg {:?}", msg); if self.last_msg_sent == msg { @@ -977,6 +978,10 @@ where if (*msg).tag == LLMP_TAG_UNSET { panic!("No tag set on message with id {}", (*msg).message_id); } + // A client gets the sender id assigned to by the broker during the initial handshake. + if overwrite_client_id { + (*msg).sender = self.id; + } let page = self.out_maps.last_mut().unwrap().page_mut(); if msg.is_null() || !llmp_msg_in_page(page, msg) { return Err(Error::Unknown(format!( @@ -1034,17 +1039,18 @@ where println!("Setting max alloc size: {:?}", (*old_map).max_alloc_size); (*new_map).max_alloc_size = (*old_map).max_alloc_size; + (*new_map).sender = self.id; + /* On the old map, place a last message linking to the new map for the clients * to consume */ - let mut out: *mut LlmpMsg = self.alloc_eop()?; - (*out).sender = (*old_map).sender; + let out = self.alloc_eop()?; let mut end_of_page_msg = (*out).buf.as_mut_ptr() as *mut LlmpPayloadSharedMapInfo; (*end_of_page_msg).map_size = new_map_shmem.shmem.len(); (*end_of_page_msg).shm_str = *new_map_shmem.shmem.id().as_slice(); /* Send the last msg on the old buf */ - self.send(out)?; + self.send(out, true)?; // Set the new page as current page. self.out_maps.push(new_map_shmem); @@ -1115,7 +1121,7 @@ where (*msg).flags = LLMP_FLAG_INITIALIZED; buf.as_ptr() .copy_to_nonoverlapping((*msg).buf.as_mut_ptr(), buf.len()); - self.send(msg) + self.send(msg, true) } } @@ -1139,7 +1145,7 @@ where (*msg).flags = flags; buf.as_ptr() .copy_to_nonoverlapping((*msg).buf.as_mut_ptr(), buf.len()); - self.send(msg) + self.send(msg, true) } } @@ -1355,7 +1361,7 @@ where /// Returns the next message, tag, buf, if avaliable, else None #[allow(clippy::type_complexity)] #[inline] - pub fn recv_buf(&mut self) -> Result, Error> { + pub fn recv_buf(&mut self) -> Result, Error> { if let Some((sender, tag, _flags, buf)) = self.recv_buf_with_flags()? { Ok(Some((sender, tag, buf))) } else { @@ -1744,7 +1750,7 @@ where (msg as *const u8).copy_to_nonoverlapping(out as *mut u8, complete_size); (*out).buf_len_padded = actual_size; /* We need to replace the message ID with our own */ - if let Err(e) = self.llmp_out.send(out) { + if let Err(e) = self.llmp_out.send(out, false) { panic!("Error sending msg: {:?}", e) }; self.llmp_out.last_msg_sent = out; @@ -1855,7 +1861,7 @@ where let pageinfo = (*msg).buf.as_mut_ptr() as *mut LlmpPayloadSharedMapInfo; (*pageinfo).shm_str = *shmem_description.id.as_slice(); (*pageinfo).map_size = shmem_description.size; - sender.send(msg) + sender.send(msg, true) } } @@ -2377,7 +2383,7 @@ where /// # Safety /// Needs to be called with a proper msg pointer pub unsafe fn send(&mut self, msg: *mut LlmpMsg) -> Result<(), Error> { - self.sender.send(msg) + self.sender.send(msg, true) } /// Allocates a message of the given size, tags it, and sends it off. @@ -2439,13 +2445,13 @@ where /// Returns the next message, tag, buf, if avaliable, else None #[allow(clippy::type_complexity)] #[inline] - pub fn recv_buf(&mut self) -> Result, Error> { + pub fn recv_buf(&mut self) -> Result, Error> { self.receiver.recv_buf() } /// Receives a buf from the broker, looping until a messages becomes avaliable #[inline] - pub fn recv_buf_blocking(&mut self) -> Result<(u32, Tag, &[u8]), Error> { + pub fn recv_buf_blocking(&mut self) -> Result<(ClientId, Tag, &[u8]), Error> { self.receiver.recv_buf_blocking() }