From ba394c4acca2d085851cee831bbf7872bec58277 Mon Sep 17 00:00:00 2001 From: s1341 Date: Tue, 21 Nov 2023 17:48:17 +0200 Subject: [PATCH] Fix LLMP map reuse overflow for slow brokers (#1679) * Attempt to fix llmp refcounting * readers->readers_count * Removed refcounting for now, fixed * fixes * fixes * add extra debug assert, remove duplicate line * semicolons are pain :) * Add comment * oooooops, bugfix * fmt * Fix calcualtion in in_shmem --------- Co-authored-by: Dominik Maier Co-authored-by: Dominik Maier --- libafl_bolts/src/llmp.rs | 127 ++++++++++++++++++++++++--------------- libafl_cc/build.rs | 2 +- 2 files changed, 80 insertions(+), 49 deletions(-) diff --git a/libafl_bolts/src/llmp.rs b/libafl_bolts/src/llmp.rs index ef22424cd9..f2a720d9fc 100644 --- a/libafl_bolts/src/llmp.rs +++ b/libafl_bolts/src/llmp.rs @@ -548,8 +548,8 @@ unsafe fn llmp_page_init(shmem: &mut SHM, sender_id: ClientId, allow (*page).size_used = 0; (*(*page).messages.as_mut_ptr()).message_id = MessageId(0); (*(*page).messages.as_mut_ptr()).tag = LLMP_TAG_UNSET; - (*page).safe_to_unmap.store(0, Ordering::Release); - (*page).sender_dead.store(0, Ordering::Relaxed); + (*page).receivers_joined_count.store(0, Ordering::Release); + (*page).receivers_left_count.store(0, Ordering::Relaxed); assert!((*page).size_total != 0); } @@ -664,16 +664,11 @@ impl LlmpMsg { pub fn in_shmem(&self, map: &mut LlmpSharedMap) -> bool { let map_size = map.shmem.as_slice().len(); let buf_ptr = self.buf.as_ptr(); + let len = self.buf_len_padded as usize + size_of::(); unsafe { - if buf_ptr > (map.page_mut() as *const u8).add(size_of::()) - && buf_ptr <= (map.page_mut() as *const u8).add(map_size - size_of::()) - { - // The message header is in the page. Continue with checking the body. - let len = self.buf_len_padded as usize + size_of::(); - buf_ptr <= (map.page_mut() as *const u8).add(map_size - len) - } else { - false - } + buf_ptr > (map.page_mut() as *const u8).add(size_of::()) + && buf_ptr.add(len).sub(size_of::()) + <= (map.page_mut() as *const u8).add(map_size) } } } @@ -788,9 +783,10 @@ pub struct LlmpPage { /// Set to != 1 by the receiver, once it got mapped. /// It's not safe for the sender to unmap this page before /// (The os may have tidied up the memory when the receiver starts to map) - pub safe_to_unmap: AtomicU16, - /// Not used at the moment (would indicate that the sender is no longer there) - pub sender_dead: AtomicU16, + pub receivers_joined_count: AtomicU16, + /// Set to != 1 by the receiver, once it left again after joining. + /// It's not safe for the sender to re-map this page before this is equal to receivers_joined_count + pub receivers_left_count: AtomicU16, #[cfg(target_pointer_width = "64")] /// The current message ID pub current_msg_id: AtomicU64, @@ -808,6 +804,22 @@ pub struct LlmpPage { pub messages: [LlmpMsg; 0], } +impl LlmpPage { + #[inline] + fn receiver_joined(&mut self) { + let receivers_joined_count = &mut self.receivers_joined_count; + //receivers_joined_count.fetch_add(1, Ordering::Relaxed); + receivers_joined_count.store(1, Ordering::Relaxed); + } + + #[inline] + fn receiver_left(&mut self) { + let receivers_joined_count = &mut self.receivers_joined_count; + //receivers_joined_count.fetch_add(1, Ordering::Relaxed); + receivers_joined_count.store(1, Ordering::Relaxed); + } +} + /// Message payload when a client got added */ /// This is an internal message! /// [`LLMP_TAG_END_OF_PAGE_V1`] @@ -834,7 +846,7 @@ where /// A vec of page wrappers, each containing an initialized [`ShMem`] out_shmems: Vec>, /// A vec of pages that we previously used, but that have served its purpose - /// (no potential readers are left). + /// (no potential receivers are left). /// Instead of freeing them, we keep them around to potentially reuse them later, /// if they are still large enough. /// This way, the OS doesn't have to spend time zeroing pages, and getting rid of our old pages @@ -969,9 +981,9 @@ where unsafe { // log::info!("Reading safe_to_unmap from {:?}", current_out_shmem.page() as *const _); (*current_out_shmem.page()) - .safe_to_unmap + .receivers_joined_count .load(Ordering::Relaxed) - != 0 + >= 1 } } @@ -979,9 +991,7 @@ where /// # Safety /// If this method is called, the page may be unmapped before it is read by any receiver. pub unsafe fn mark_safe_to_unmap(&mut self) { - (*self.out_shmems.last_mut().unwrap().page_mut()) - .safe_to_unmap - .store(1, Ordering::Relaxed); + (*self.out_shmems.last_mut().unwrap().page_mut()).receiver_joined(); } /// Reattach to a vacant `out_shmem`. @@ -1017,7 +1027,7 @@ where // Exclude the current page by splitting of the last element for this iter let mut unmap_until_excl = 0; for map in self.out_shmems.split_last_mut().unwrap().1 { - if (*map.page()).safe_to_unmap.load(Ordering::Acquire) == 0 { + if (*map.page()).receivers_joined_count.load(Ordering::Acquire) == 0 { // The broker didn't read this page yet, no more pages to unmap. break; } @@ -1046,7 +1056,7 @@ where (*page).magic = PAGE_DEINITIALIZED_MAGIC; #[cfg(feature = "llmp_debug")] - log::debug!("Moving unused map to cache: {map:?}"); + log::debug!("Moving unused map to cache: {map:?} {:x?}", map.page()); self.unused_shmem_cache .insert(self.unused_shmem_cache.len(), map); } @@ -1106,7 +1116,7 @@ where "Allocating {} bytes on page {:?} / map {:?} (last msg: {:?})", buf_len, page, - &map, + &map.shmem.id().as_str(), last_msg ); @@ -1213,18 +1223,32 @@ where sender_id: ClientId, next_min_shmem_size: usize, ) -> Result::ShMem>, Error> { - if self.unused_shmem_cache.is_empty() { - // No cached maps that fit our need, let's allocate a new one. - Ok(LlmpSharedMap::new( - sender_id, - self.shmem_provider.new_shmem(next_min_shmem_size)?, - )) - } else { - // We got cached shmems laying around, hand it out, if they are large enough. - let mut cached_shmem = self - .unused_shmem_cache - .remove(self.unused_shmem_cache.len() - 1); + // Find a shared map that has been released to reuse, from which all receivers left / finished reading. + let cached_shmem = self + .unused_shmem_cache + .iter() + .position(|cached_shmem| { + let page = &(*shmem2page(&cached_shmem.shmem)); + let receivers_joined_count = page.receivers_joined_count.load(Ordering::Relaxed); + debug_assert_ne!(receivers_joined_count, 0); + let receivers_left_count = page.receivers_left_count.load(Ordering::Relaxed); + debug_assert!(receivers_joined_count >= receivers_left_count); + let ret = receivers_joined_count == receivers_left_count; + + // For proper refcounts, double check that nobody joined in the meantime. + debug_assert_eq!( + receivers_joined_count, + page.receivers_joined_count.load(Ordering::Relaxed), + "Oops, some receiver joined while re-using the page!" + ); + + ret + }) + .map(|e| self.unused_shmem_cache.remove(e)); + + if let Some(mut cached_shmem) = cached_shmem { + // We got cached shmems laying around, hand it out, if they are large enough. if cached_shmem.shmem.len() < next_min_shmem_size { // This map is too small, we will never need it again (llmp allocation sizes always increase). Drop it, then call this function again.. #[cfg(feature = "llmp_debug")] @@ -1239,6 +1263,12 @@ where } Ok(cached_shmem) } + } else { + // No cached maps that fit our need, let's allocate a new one. + Ok(LlmpSharedMap::new( + sender_id, + self.shmem_provider.new_shmem(next_min_shmem_size)?, + )) } } @@ -1548,7 +1578,7 @@ where #[inline(never)] unsafe fn recv(&mut self) -> Result, Error> { /* DBG("recv %p %p\n", page, last_msg); */ - let mut page = self.current_recv_shmem.page_mut(); + let page = self.current_recv_shmem.page_mut(); let last_msg = self.last_msg_recvd; let (current_msg_id, loaded) = @@ -1625,8 +1655,8 @@ where self.last_msg_recvd = ptr::null(); self.highest_msg_id = MessageId(0); - // Mark the old page save to unmap, in case we didn't do so earlier. - (*page).safe_to_unmap.store(1, Ordering::Relaxed); + // Mark the old page save to remap. + (*page).receiver_left(); // Map the new page. The old one should be unmapped by Drop self.current_recv_shmem = @@ -1634,9 +1664,10 @@ where ShMemId::from_array(&pageinfo_cpy.shm_str), pageinfo_cpy.map_size, )?); - page = self.current_recv_shmem.page_mut(); - // Mark the new page save to unmap also (it's mapped by us, the broker now) - (*page).safe_to_unmap.store(1, Ordering::Relaxed); + let new_page = self.current_recv_shmem.page_mut(); + + // Mark the old page as save to remap (it's mapped by us, the receiver, now) + (*new_page).receiver_joined(); #[cfg(feature = "llmp_debug")] log::info!( @@ -1819,7 +1850,7 @@ where /// This indicates, that the page may safely be unmapped by the sender. pub fn mark_safe_to_unmap(&mut self) { unsafe { - (*self.page_mut()).safe_to_unmap.store(1, Ordering::Relaxed); + (*self.page_mut()).receiver_joined(); } } @@ -2274,13 +2305,13 @@ where } if let Some(exit_after_count) = self.exit_cleanly_after { - log::trace!( - "Clients connected: {} && > {} - {} >= {}", - self.has_clients(), - self.num_clients_total, - self.listeners.len(), - exit_after_count - ); + // log::trace!( + // "Clients connected: {} && > {} - {} >= {}", + // self.has_clients(), + // self.num_clients_total, + // self.listeners.len(), + // exit_after_count + // ); if !self.has_clients() && (self.num_clients_total - self.listeners.len()) >= exit_after_count.into() { diff --git a/libafl_cc/build.rs b/libafl_cc/build.rs index 3674f74406..8e321d26e2 100644 --- a/libafl_cc/build.rs +++ b/libafl_cc/build.rs @@ -26,7 +26,7 @@ fn dll_extension<'a>() -> &'a str { return "dylib"; } } - let family = env::var("CARGO_CFG_TARGET_FAMILY").unwrap(); + let family = env::var("CARGO_CFG_TARGET_FAMILY").unwrap_or_else(|_| "unknown".into()); match family.as_str() { "windows" => "dll", "unix" => "so",