Fix LLMP map reuse overflow for slow brokers (#1679)

* Attempt to fix llmp refcounting

* readers->readers_count

* Removed refcounting for now, fixed

* fixes

* fixes

* add extra debug assert, remove duplicate line

* semicolons are pain :)

* Add comment

* oooooops, bugfix

* fmt

* Fix calcualtion in in_shmem

---------

Co-authored-by: Dominik Maier <dmnk@google.com>
Co-authored-by: Dominik Maier <domenukk@gmail.com>
This commit is contained in:
s1341 2023-11-21 17:48:17 +02:00 committed by GitHub
parent 379e2ae89b
commit ba394c4acc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 80 additions and 49 deletions

View File

@ -548,8 +548,8 @@ unsafe fn llmp_page_init<SHM: ShMem>(shmem: &mut SHM, sender_id: ClientId, allow
(*page).size_used = 0; (*page).size_used = 0;
(*(*page).messages.as_mut_ptr()).message_id = MessageId(0); (*(*page).messages.as_mut_ptr()).message_id = MessageId(0);
(*(*page).messages.as_mut_ptr()).tag = LLMP_TAG_UNSET; (*(*page).messages.as_mut_ptr()).tag = LLMP_TAG_UNSET;
(*page).safe_to_unmap.store(0, Ordering::Release); (*page).receivers_joined_count.store(0, Ordering::Release);
(*page).sender_dead.store(0, Ordering::Relaxed); (*page).receivers_left_count.store(0, Ordering::Relaxed);
assert!((*page).size_total != 0); assert!((*page).size_total != 0);
} }
@ -664,16 +664,11 @@ impl LlmpMsg {
pub fn in_shmem<SHM: ShMem>(&self, map: &mut LlmpSharedMap<SHM>) -> bool { pub fn in_shmem<SHM: ShMem>(&self, map: &mut LlmpSharedMap<SHM>) -> bool {
let map_size = map.shmem.as_slice().len(); let map_size = map.shmem.as_slice().len();
let buf_ptr = self.buf.as_ptr(); let buf_ptr = self.buf.as_ptr();
let len = self.buf_len_padded as usize + size_of::<LlmpMsg>();
unsafe { unsafe {
if buf_ptr > (map.page_mut() as *const u8).add(size_of::<LlmpPage>()) buf_ptr > (map.page_mut() as *const u8).add(size_of::<LlmpPage>())
&& buf_ptr <= (map.page_mut() as *const u8).add(map_size - size_of::<LlmpMsg>()) && buf_ptr.add(len).sub(size_of::<LlmpPage>())
{ <= (map.page_mut() as *const u8).add(map_size)
// The message header is in the page. Continue with checking the body.
let len = self.buf_len_padded as usize + size_of::<LlmpMsg>();
buf_ptr <= (map.page_mut() as *const u8).add(map_size - len)
} else {
false
}
} }
} }
} }
@ -788,9 +783,10 @@ pub struct LlmpPage {
/// Set to != 1 by the receiver, once it got mapped. /// Set to != 1 by the receiver, once it got mapped.
/// It's not safe for the sender to unmap this page before /// It's not safe for the sender to unmap this page before
/// (The os may have tidied up the memory when the receiver starts to map) /// (The os may have tidied up the memory when the receiver starts to map)
pub safe_to_unmap: AtomicU16, pub receivers_joined_count: AtomicU16,
/// Not used at the moment (would indicate that the sender is no longer there) /// Set to != 1 by the receiver, once it left again after joining.
pub sender_dead: AtomicU16, /// It's not safe for the sender to re-map this page before this is equal to receivers_joined_count
pub receivers_left_count: AtomicU16,
#[cfg(target_pointer_width = "64")] #[cfg(target_pointer_width = "64")]
/// The current message ID /// The current message ID
pub current_msg_id: AtomicU64, pub current_msg_id: AtomicU64,
@ -808,6 +804,22 @@ pub struct LlmpPage {
pub messages: [LlmpMsg; 0], pub messages: [LlmpMsg; 0],
} }
impl LlmpPage {
#[inline]
fn receiver_joined(&mut self) {
let receivers_joined_count = &mut self.receivers_joined_count;
//receivers_joined_count.fetch_add(1, Ordering::Relaxed);
receivers_joined_count.store(1, Ordering::Relaxed);
}
#[inline]
fn receiver_left(&mut self) {
let receivers_joined_count = &mut self.receivers_joined_count;
//receivers_joined_count.fetch_add(1, Ordering::Relaxed);
receivers_joined_count.store(1, Ordering::Relaxed);
}
}
/// Message payload when a client got added */ /// Message payload when a client got added */
/// This is an internal message! /// This is an internal message!
/// [`LLMP_TAG_END_OF_PAGE_V1`] /// [`LLMP_TAG_END_OF_PAGE_V1`]
@ -834,7 +846,7 @@ where
/// A vec of page wrappers, each containing an initialized [`ShMem`] /// A vec of page wrappers, each containing an initialized [`ShMem`]
out_shmems: Vec<LlmpSharedMap<SP::ShMem>>, out_shmems: Vec<LlmpSharedMap<SP::ShMem>>,
/// A vec of pages that we previously used, but that have served its purpose /// A vec of pages that we previously used, but that have served its purpose
/// (no potential readers are left). /// (no potential receivers are left).
/// Instead of freeing them, we keep them around to potentially reuse them later, /// Instead of freeing them, we keep them around to potentially reuse them later,
/// if they are still large enough. /// if they are still large enough.
/// This way, the OS doesn't have to spend time zeroing pages, and getting rid of our old pages /// This way, the OS doesn't have to spend time zeroing pages, and getting rid of our old pages
@ -969,9 +981,9 @@ where
unsafe { unsafe {
// log::info!("Reading safe_to_unmap from {:?}", current_out_shmem.page() as *const _); // log::info!("Reading safe_to_unmap from {:?}", current_out_shmem.page() as *const _);
(*current_out_shmem.page()) (*current_out_shmem.page())
.safe_to_unmap .receivers_joined_count
.load(Ordering::Relaxed) .load(Ordering::Relaxed)
!= 0 >= 1
} }
} }
@ -979,9 +991,7 @@ where
/// # Safety /// # Safety
/// If this method is called, the page may be unmapped before it is read by any receiver. /// If this method is called, the page may be unmapped before it is read by any receiver.
pub unsafe fn mark_safe_to_unmap(&mut self) { pub unsafe fn mark_safe_to_unmap(&mut self) {
(*self.out_shmems.last_mut().unwrap().page_mut()) (*self.out_shmems.last_mut().unwrap().page_mut()).receiver_joined();
.safe_to_unmap
.store(1, Ordering::Relaxed);
} }
/// Reattach to a vacant `out_shmem`. /// Reattach to a vacant `out_shmem`.
@ -1017,7 +1027,7 @@ where
// Exclude the current page by splitting of the last element for this iter // Exclude the current page by splitting of the last element for this iter
let mut unmap_until_excl = 0; let mut unmap_until_excl = 0;
for map in self.out_shmems.split_last_mut().unwrap().1 { for map in self.out_shmems.split_last_mut().unwrap().1 {
if (*map.page()).safe_to_unmap.load(Ordering::Acquire) == 0 { if (*map.page()).receivers_joined_count.load(Ordering::Acquire) == 0 {
// The broker didn't read this page yet, no more pages to unmap. // The broker didn't read this page yet, no more pages to unmap.
break; break;
} }
@ -1046,7 +1056,7 @@ where
(*page).magic = PAGE_DEINITIALIZED_MAGIC; (*page).magic = PAGE_DEINITIALIZED_MAGIC;
#[cfg(feature = "llmp_debug")] #[cfg(feature = "llmp_debug")]
log::debug!("Moving unused map to cache: {map:?}"); log::debug!("Moving unused map to cache: {map:?} {:x?}", map.page());
self.unused_shmem_cache self.unused_shmem_cache
.insert(self.unused_shmem_cache.len(), map); .insert(self.unused_shmem_cache.len(), map);
} }
@ -1106,7 +1116,7 @@ where
"Allocating {} bytes on page {:?} / map {:?} (last msg: {:?})", "Allocating {} bytes on page {:?} / map {:?} (last msg: {:?})",
buf_len, buf_len,
page, page,
&map, &map.shmem.id().as_str(),
last_msg last_msg
); );
@ -1213,18 +1223,32 @@ where
sender_id: ClientId, sender_id: ClientId,
next_min_shmem_size: usize, next_min_shmem_size: usize,
) -> Result<LlmpSharedMap<<SP>::ShMem>, Error> { ) -> Result<LlmpSharedMap<<SP>::ShMem>, Error> {
if self.unused_shmem_cache.is_empty() { // Find a shared map that has been released to reuse, from which all receivers left / finished reading.
// No cached maps that fit our need, let's allocate a new one. let cached_shmem = self
Ok(LlmpSharedMap::new( .unused_shmem_cache
sender_id, .iter()
self.shmem_provider.new_shmem(next_min_shmem_size)?, .position(|cached_shmem| {
)) let page = &(*shmem2page(&cached_shmem.shmem));
} else { let receivers_joined_count = page.receivers_joined_count.load(Ordering::Relaxed);
// We got cached shmems laying around, hand it out, if they are large enough. debug_assert_ne!(receivers_joined_count, 0);
let mut cached_shmem = self let receivers_left_count = page.receivers_left_count.load(Ordering::Relaxed);
.unused_shmem_cache debug_assert!(receivers_joined_count >= receivers_left_count);
.remove(self.unused_shmem_cache.len() - 1);
let ret = receivers_joined_count == receivers_left_count;
// For proper refcounts, double check that nobody joined in the meantime.
debug_assert_eq!(
receivers_joined_count,
page.receivers_joined_count.load(Ordering::Relaxed),
"Oops, some receiver joined while re-using the page!"
);
ret
})
.map(|e| self.unused_shmem_cache.remove(e));
if let Some(mut cached_shmem) = cached_shmem {
// We got cached shmems laying around, hand it out, if they are large enough.
if cached_shmem.shmem.len() < next_min_shmem_size { if cached_shmem.shmem.len() < next_min_shmem_size {
// This map is too small, we will never need it again (llmp allocation sizes always increase). Drop it, then call this function again.. // This map is too small, we will never need it again (llmp allocation sizes always increase). Drop it, then call this function again..
#[cfg(feature = "llmp_debug")] #[cfg(feature = "llmp_debug")]
@ -1239,6 +1263,12 @@ where
} }
Ok(cached_shmem) Ok(cached_shmem)
} }
} else {
// No cached maps that fit our need, let's allocate a new one.
Ok(LlmpSharedMap::new(
sender_id,
self.shmem_provider.new_shmem(next_min_shmem_size)?,
))
} }
} }
@ -1548,7 +1578,7 @@ where
#[inline(never)] #[inline(never)]
unsafe fn recv(&mut self) -> Result<Option<*mut LlmpMsg>, Error> { unsafe fn recv(&mut self) -> Result<Option<*mut LlmpMsg>, Error> {
/* DBG("recv %p %p\n", page, last_msg); */ /* DBG("recv %p %p\n", page, last_msg); */
let mut page = self.current_recv_shmem.page_mut(); let page = self.current_recv_shmem.page_mut();
let last_msg = self.last_msg_recvd; let last_msg = self.last_msg_recvd;
let (current_msg_id, loaded) = let (current_msg_id, loaded) =
@ -1625,8 +1655,8 @@ where
self.last_msg_recvd = ptr::null(); self.last_msg_recvd = ptr::null();
self.highest_msg_id = MessageId(0); self.highest_msg_id = MessageId(0);
// Mark the old page save to unmap, in case we didn't do so earlier. // Mark the old page save to remap.
(*page).safe_to_unmap.store(1, Ordering::Relaxed); (*page).receiver_left();
// Map the new page. The old one should be unmapped by Drop // Map the new page. The old one should be unmapped by Drop
self.current_recv_shmem = self.current_recv_shmem =
@ -1634,9 +1664,10 @@ where
ShMemId::from_array(&pageinfo_cpy.shm_str), ShMemId::from_array(&pageinfo_cpy.shm_str),
pageinfo_cpy.map_size, pageinfo_cpy.map_size,
)?); )?);
page = self.current_recv_shmem.page_mut(); let new_page = self.current_recv_shmem.page_mut();
// Mark the new page save to unmap also (it's mapped by us, the broker now)
(*page).safe_to_unmap.store(1, Ordering::Relaxed); // Mark the old page as save to remap (it's mapped by us, the receiver, now)
(*new_page).receiver_joined();
#[cfg(feature = "llmp_debug")] #[cfg(feature = "llmp_debug")]
log::info!( log::info!(
@ -1819,7 +1850,7 @@ where
/// This indicates, that the page may safely be unmapped by the sender. /// This indicates, that the page may safely be unmapped by the sender.
pub fn mark_safe_to_unmap(&mut self) { pub fn mark_safe_to_unmap(&mut self) {
unsafe { unsafe {
(*self.page_mut()).safe_to_unmap.store(1, Ordering::Relaxed); (*self.page_mut()).receiver_joined();
} }
} }
@ -2274,13 +2305,13 @@ where
} }
if let Some(exit_after_count) = self.exit_cleanly_after { if let Some(exit_after_count) = self.exit_cleanly_after {
log::trace!( // log::trace!(
"Clients connected: {} && > {} - {} >= {}", // "Clients connected: {} && > {} - {} >= {}",
self.has_clients(), // self.has_clients(),
self.num_clients_total, // self.num_clients_total,
self.listeners.len(), // self.listeners.len(),
exit_after_count // exit_after_count
); // );
if !self.has_clients() if !self.has_clients()
&& (self.num_clients_total - self.listeners.len()) >= exit_after_count.into() && (self.num_clients_total - self.listeners.len()) >= exit_after_count.into()
{ {

View File

@ -26,7 +26,7 @@ fn dll_extension<'a>() -> &'a str {
return "dylib"; return "dylib";
} }
} }
let family = env::var("CARGO_CFG_TARGET_FAMILY").unwrap(); let family = env::var("CARGO_CFG_TARGET_FAMILY").unwrap_or_else(|_| "unknown".into());
match family.as_str() { match family.as_str() {
"windows" => "dll", "windows" => "dll",
"unix" => "so", "unix" => "so",