diff --git a/libafl/src/bolts/llmp.rs b/libafl/src/bolts/llmp.rs index 8be287c193..de7b75a189 100644 --- a/libafl/src/bolts/llmp.rs +++ b/libafl/src/bolts/llmp.rs @@ -63,9 +63,10 @@ use alloc::{string::String, vec::Vec}; use core::{ cmp::max, fmt::Debug, + hint, mem::size_of, ptr, slice, - sync::atomic::{compiler_fence, Ordering}, + sync::atomic::{fence, AtomicU16, AtomicU64, Ordering}, time::Duration, }; use serde::{Deserialize, Serialize}; @@ -281,7 +282,7 @@ impl Listener { Listener::Tcp(inner) => match inner.accept() { Ok(res) => ListenerStream::Tcp(res.0, res.1), Err(err) => { - dbg!("Ignoring failed accept", err); + println!("Ignoring failed accept: {:?}", err); ListenerStream::Empty() } }, @@ -422,11 +423,11 @@ fn new_map_size(max_alloc: usize) -> usize { /// `llmp_page->messages` unsafe fn _llmp_page_init(shmem: &mut SHM, sender: u32, allow_reinit: bool) { #[cfg(all(feature = "llmp_debug", feature = "std"))] - dbg!("_llmp_page_init: shmem {}", &shmem); + println!("_llmp_page_init: shmem {:?}", &shmem); let map_size = shmem.len(); let page = shmem2page_mut(shmem); #[cfg(all(feature = "llmp_debug", feature = "std"))] - dbg!("_llmp_page_init: page {}", *page); + println!("_llmp_page_init: page {:?}", &(*page)); if !allow_reinit { assert!( @@ -439,15 +440,15 @@ unsafe fn _llmp_page_init(shmem: &mut SHM, sender: u32, allow_reinit (*page).magic = PAGE_INITIALIZED_MAGIC; (*page).sender = sender; - ptr::write_volatile(ptr::addr_of_mut!((*page).current_msg_id), 0); + (*page).current_msg_id.store(0, Ordering::Relaxed); (*page).max_alloc_size = 0; // Don't forget to subtract our own header size (*page).size_total = map_size - LLMP_PAGE_HEADER_LEN; (*page).size_used = 0; (*(*page).messages.as_mut_ptr()).message_id = 0; (*(*page).messages.as_mut_ptr()).tag = LLMP_TAG_UNSET; - ptr::write_volatile(ptr::addr_of_mut!((*page).safe_to_unmap), 0); - ptr::write_volatile(ptr::addr_of_mut!((*page).sender_dead), 0); + (*page).safe_to_unmap.store(0, Ordering::Relaxed); + (*page).sender_dead.store(0, Ordering::Relaxed); assert!((*page).size_total != 0); } @@ -598,7 +599,7 @@ where match tcp_bind(port) { Ok(listener) => { // We got the port. We are the broker! :) - dbg!("We're the broker"); + println!("We're the broker"); let mut broker = LlmpBroker::new(shmem_provider)?; let _listener_thread = broker.launch_listener(Listener::Tcp(listener))?; @@ -670,7 +671,7 @@ where } /// Contents of the share mem pages, used by llmp internally -#[derive(Copy, Clone, Debug)] +#[derive(Debug)] #[repr(C)] pub struct LlmpPage { /// to check if this page got initialized properly @@ -680,11 +681,11 @@ pub struct LlmpPage { /// Set to != 1 by the receiver, once it got mapped /// It's not safe for the sender to unmap this page before /// (The os may have tidied up the memory when the receiver starts to map) - pub safe_to_unmap: u16, + pub safe_to_unmap: AtomicU16, /// Not used at the moment (would indicate that the sender is no longer there) - pub sender_dead: u16, + pub sender_dead: AtomicU16, /// The current message ID - pub current_msg_id: u64, + pub current_msg_id: AtomicU64, /// How much space is available on this page in bytes pub size_total: usize, /// How much space is used on this page in bytes @@ -816,6 +817,7 @@ where if self.safe_to_unmap() { return; } + hint::spin_loop(); // We log that we're looping -> see when we're blocking. #[cfg(feature = "std")] { @@ -831,9 +833,11 @@ where pub fn safe_to_unmap(&self) -> bool { let current_out_map = self.out_maps.last().unwrap(); unsafe { - compiler_fence(Ordering::SeqCst); // println!("Reading safe_to_unmap from {:?}", current_out_map.page() as *const _); - ptr::read_volatile(ptr::addr_of!((*current_out_map.page()).safe_to_unmap)) != 0 + (*current_out_map.page()) + .safe_to_unmap + .load(Ordering::Relaxed) + != 0 } } @@ -841,8 +845,9 @@ where /// # Safety /// If this method is called, the page may be unmapped before it is read by any receiver. pub unsafe fn mark_safe_to_unmap(&mut self) { - // No need to do this volatile, as we should be the same thread in this scenario. - (*self.out_maps.last_mut().unwrap().page_mut()).safe_to_unmap = 1; + (*self.out_maps.last_mut().unwrap().page_mut()) + .safe_to_unmap + .store(1, Ordering::Relaxed); } /// Reattach to a vacant `out_map`. @@ -877,7 +882,7 @@ where // Exclude the current page by splitting of the last element for this iter let mut unmap_until_excl = 0; for map in self.out_maps.split_last_mut().unwrap().1 { - if (*map.page_mut()).safe_to_unmap == 0 { + if (*map.page()).safe_to_unmap.load(Ordering::Relaxed) == 0 { // The broker didn't read this page yet, no more pages to unmap. break; } @@ -960,7 +965,7 @@ where #[cfg(all(feature = "llmp_debug", feature = "std"))] dbg!( page, - *page, + &(*page), (*page).size_used, buf_len_padded, EOP_MSG_SIZE, @@ -984,7 +989,7 @@ where * with 0... */ (*ret).message_id = if last_msg.is_null() { 1 - } else if (*page).current_msg_id == (*last_msg).message_id { + } else if (*page).current_msg_id.load(Ordering::Relaxed) == (*last_msg).message_id { (*last_msg).message_id + 1 } else { /* Oops, wrong usage! */ @@ -1034,10 +1039,14 @@ where msg ))); } - (*msg).message_id = (*page).current_msg_id + 1; - compiler_fence(Ordering::SeqCst); - ptr::write_volatile(ptr::addr_of_mut!((*page).current_msg_id), (*msg).message_id); - compiler_fence(Ordering::SeqCst); + + (*msg).message_id = (*page).current_msg_id.load(Ordering::Relaxed) + 1; + + // Make sure all things have been written to the page, and commit the message to the page + (*page) + .current_msg_id + .store((*msg).message_id, Ordering::Release); + self.last_msg_sent = msg; self.has_unsent_message = false; Ok(()) @@ -1076,9 +1085,9 @@ where #[cfg(all(feature = "llmp_debug", feature = "std"))] println!("got new map at: {:?}", new_map); - ptr::write_volatile( - ptr::addr_of_mut!((*new_map).current_msg_id), - (*old_map).current_msg_id, + (*new_map).current_msg_id.store( + (*old_map).current_msg_id.load(Ordering::Relaxed), + Ordering::Relaxed, ); #[cfg(all(feature = "llmp_debug", feature = "std"))] @@ -1283,6 +1292,8 @@ where pub shmem_provider: SP, /// current page. After EOP, this gets replaced with the new one pub current_recv_map: LlmpSharedMap, + /// Caches the highest msg id we've seen so far + highest_msg_id: u64, } /// Receiving end of an llmp channel @@ -1328,6 +1339,7 @@ where current_recv_map, last_msg_recvd, shmem_provider, + highest_msg_id: 0, }) } @@ -1336,10 +1348,19 @@ where #[inline(never)] unsafe fn recv(&mut self) -> Result, Error> { /* DBG("recv %p %p\n", page, last_msg); */ - compiler_fence(Ordering::SeqCst); let mut page = self.current_recv_map.page_mut(); let last_msg = self.last_msg_recvd; - let current_msg_id = ptr::read_volatile(ptr::addr_of!((*page).current_msg_id)); + + let (current_msg_id, loaded) = + if !last_msg.is_null() && self.highest_msg_id > (*last_msg).message_id { + // read the msg_id from cache + (self.highest_msg_id, false) + } else { + // read the msg_id from shared map + let current_msg_id = (*page).current_msg_id.load(Ordering::Relaxed); + self.highest_msg_id = current_msg_id; + (current_msg_id, true) + }; // Read the message from the page let ret = if current_msg_id == 0 { @@ -1347,11 +1368,16 @@ where None } else if last_msg.is_null() { /* We never read a message from this queue. Return first. */ + fence(Ordering::Acquire); Some((*page).messages.as_mut_ptr()) } else if (*last_msg).message_id == current_msg_id { /* Oops! No new message! */ None } else { + if loaded { + // we read a higher id from this page, fetch. + fence(Ordering::Acquire); + } // We don't know how big the msg wants to be, assert at least the header has space. Some(llmp_next_msg_ptr_checked( &mut self.current_recv_map, @@ -1360,14 +1386,18 @@ where )?) }; - // Let's see what we go here. + // Let's see what we got. if let Some(msg) = ret { if !(*msg).in_map(&mut self.current_recv_map) { return Err(Error::IllegalState("Unexpected message in map (out of map bounds) - bugy client or tampered shared map detedted!".into())); } // Handle special, LLMP internal, messages. match (*msg).tag { - LLMP_TAG_UNSET => panic!("BUG: Read unallocated msg"), + LLMP_TAG_UNSET => panic!( + "BUG: Read unallocated msg (tag was {:x} - msg header: {:?}", + LLMP_TAG_UNSET, + &(*msg) + ), LLMP_TAG_EXITING => { // The other side is done. assert_eq!((*msg).buf_len, 0); @@ -1394,9 +1424,10 @@ where // Set last msg we received to null (as the map may no longer exist) self.last_msg_recvd = ptr::null(); + self.highest_msg_id = 0; // Mark the old page save to unmap, in case we didn't so earlier. - ptr::write_volatile(ptr::addr_of_mut!((*page).safe_to_unmap), 1); + (*page).safe_to_unmap.store(1, Ordering::Relaxed); // Map the new page. The old one should be unmapped by Drop self.current_recv_map = @@ -1406,7 +1437,7 @@ where )?); page = self.current_recv_map.page_mut(); // Mark the new page save to unmap also (it's mapped by us, the broker now) - ptr::write_volatile(ptr::addr_of_mut!((*page).safe_to_unmap), 1); + (*page).safe_to_unmap.store(1, Ordering::Relaxed); #[cfg(all(feature = "llmp_debug", feature = "std"))] println!( @@ -1443,13 +1474,13 @@ where current_msg_id = (*last_msg).message_id; } loop { - compiler_fence(Ordering::SeqCst); - if ptr::read_volatile(ptr::addr_of!((*page).current_msg_id)) != current_msg_id { + if (*page).current_msg_id.load(Ordering::Relaxed) != current_msg_id { return match self.recv()? { Some(msg) => Ok(msg), None => panic!("BUG: blocking llmp message should never be NULL"), }; } + hint::spin_loop(); } } @@ -1562,7 +1593,7 @@ where //let bt = Backtrace::new(); //#[cfg(not(debug_assertions))] //let bt = ""; - dbg!( + println!( "LLMP_DEBUG: Using existing map {} with size {}", existing_map.id(), existing_map.len(), @@ -1580,7 +1611,7 @@ where &ret.shmem ); #[cfg(all(feature = "llmp_debug", feature = "std"))] - dbg!("PAGE: {}", *ret.page()); + println!("PAGE: {:?}", &(*ret.page())); } ret } @@ -1589,7 +1620,7 @@ where /// This indicates, that the page may safely be unmapped by the sender. pub fn mark_safe_to_unmap(&mut self) { unsafe { - ptr::write_volatile(ptr::addr_of_mut!((*self.page_mut()).safe_to_unmap), 1); + (*self.page_mut()).safe_to_unmap.store(1, Ordering::Relaxed); } } @@ -1767,6 +1798,7 @@ where current_recv_map: client_page, last_msg_recvd: ptr::null_mut(), shmem_provider: self.shmem_provider.clone(), + highest_msg_id: 0, }); } @@ -1858,7 +1890,6 @@ where where F: FnMut(ClientId, Tag, Flags, &[u8]) -> Result, { - compiler_fence(Ordering::SeqCst); for i in 0..self.llmp_clients.len() { unsafe { self.handle_new_msgs(i as u32, on_new_msg)?; @@ -1898,7 +1929,6 @@ where } while !self.is_shutting_down() { - compiler_fence(Ordering::SeqCst); self.once(on_new_msg) .expect("An error occurred when brokering. Exiting."); @@ -2010,7 +2040,7 @@ where .expect("Failed to map local page in broker 2 broker thread!"); #[cfg(all(feature = "llmp_debug", feature = "std"))] - dbg!("B2B: Starting proxy loop :)"); + println!("B2B: Starting proxy loop :)"); loop { // first, forward all data we have. @@ -2019,13 +2049,16 @@ where .expect("Error reading from local page!") { if client_id == b2b_client_id { - dbg!("Ignored message we probably sent earlier (same id)", tag); + println!( + "Ignored message we probably sent earlier (same id), TAG: {:x}", + tag + ); continue; } #[cfg(all(feature = "llmp_debug", feature = "std"))] - dbg!( - "Fowarding message via broker2broker connection", + println!( + "Fowarding message ({} bytes) via broker2broker connection", payload.len() ); // We got a new message! Forward... @@ -2053,8 +2086,8 @@ where ); #[cfg(all(feature = "llmp_debug", feature = "std"))] - dbg!( - "Fowarding incoming message from broker2broker connection", + println!( + "Fowarding incoming message ({} bytes) from broker2broker connection", msg.payload.len() ); @@ -2065,7 +2098,7 @@ where .expect("B2B: Error forwarding message. Exiting."); } else { #[cfg(all(feature = "llmp_debug", feature = "std"))] - dbg!("Received no input, timeout or closed. Looping back up :)"); + println!("Received no input, timeout or closed. Looping back up :)"); } } }); @@ -2075,7 +2108,7 @@ where }); #[cfg(all(feature = "llmp_debug", feature = "std"))] - dbg!("B2B: returning from loop. Success: {}", ret.is_ok()); + println!("B2B: returning from loop. Success: {}", ret.is_ok()); ret } @@ -2186,14 +2219,18 @@ where loop { match listener.accept() { ListenerStream::Tcp(mut stream, addr) => { - dbg!("New connection", addr, stream.peer_addr().unwrap()); + eprintln!( + "New connection: {:?}/{:?}", + addr, + stream.peer_addr().unwrap() + ); // Send initial information, without anyone asking. // This makes it a tiny bit easier to map the broker map for new Clients. match send_tcp_msg(&mut stream, &broker_hello) { Ok(()) => {} Err(e) => { - dbg!("Error sending initial hello: {:?}", e); + eprintln!("Error sending initial hello: {:?}", e); continue; } } @@ -2201,14 +2238,14 @@ where let buf = match recv_tcp_msg(&mut stream) { Ok(buf) => buf, Err(e) => { - dbg!("Error receving from tcp", e); + eprintln!("Error receving from tcp: {:?}", e); continue; } }; let req = match (&buf).try_into() { Ok(req) => req, Err(e) => { - dbg!("Could not deserialize tcp message", e); + eprintln!("Could not deserialize tcp message: {:?}", e); continue; } }; @@ -2290,6 +2327,7 @@ where current_recv_map: new_page, last_msg_recvd: ptr::null_mut(), shmem_provider: self.shmem_provider.clone(), + highest_msg_id: 0, }); } Err(e) => { @@ -2469,6 +2507,7 @@ where current_recv_map: initial_broker_map, last_msg_recvd: ptr::null_mut(), shmem_provider, + highest_msg_id: 0, }, }) } @@ -2576,7 +2615,7 @@ where match TcpStream::connect((_LLMP_CONNECT_ADDR, port)) { Ok(stream) => break stream, Err(_) => { - dbg!("Connection Refused.. Retrying"); + println!("Connection Refused.. Retrying"); } } } @@ -2670,7 +2709,7 @@ mod tests { .unwrap(); let tag: Tag = 0x1337; - let arr: [u8; 1] = [1u8]; + let arr: [u8; 1] = [1_u8]; // Send stuff client.send_buf(tag, &arr).unwrap();