Safer EoP handling (#1128)
This commit is contained in:
parent
3ac439b345
commit
e8838ebebe
@ -168,6 +168,9 @@ const _NULL_ENV_STR: &str = "_NULL";
|
|||||||
/// Magic indicating that a got initialized correctly
|
/// Magic indicating that a got initialized correctly
|
||||||
const PAGE_INITIALIZED_MAGIC: u64 = 0x1A1A1A1A1A1A1AF1;
|
const PAGE_INITIALIZED_MAGIC: u64 = 0x1A1A1A1A1A1A1AF1;
|
||||||
|
|
||||||
|
/// Magic indicating that a got deinitialized correctly, after use
|
||||||
|
const PAGE_DEINITIALIZED_MAGIC: u64 = 0xDEADC0FEAF1BEEF1;
|
||||||
|
|
||||||
/// Size of a new page message, header, payload, and alignment
|
/// Size of a new page message, header, payload, and alignment
|
||||||
const EOP_MSG_SIZE: usize =
|
const EOP_MSG_SIZE: usize =
|
||||||
llmp_align(size_of::<LlmpMsg>() + size_of::<LlmpPayloadSharedMapInfo>());
|
llmp_align(size_of::<LlmpMsg>() + size_of::<LlmpPayloadSharedMapInfo>());
|
||||||
@ -546,7 +549,7 @@ unsafe fn llmp_page_init<SHM: ShMem>(shmem: &mut SHM, sender_id: ClientId, allow
|
|||||||
(*page).size_used = 0;
|
(*page).size_used = 0;
|
||||||
(*(*page).messages.as_mut_ptr()).message_id = MessageId(0);
|
(*(*page).messages.as_mut_ptr()).message_id = MessageId(0);
|
||||||
(*(*page).messages.as_mut_ptr()).tag = LLMP_TAG_UNSET;
|
(*(*page).messages.as_mut_ptr()).tag = LLMP_TAG_UNSET;
|
||||||
(*page).safe_to_unmap.store(0, Ordering::Relaxed);
|
(*page).safe_to_unmap.store(0, Ordering::Release);
|
||||||
(*page).sender_dead.store(0, Ordering::Relaxed);
|
(*page).sender_dead.store(0, Ordering::Relaxed);
|
||||||
assert!((*page).size_total != 0);
|
assert!((*page).size_total != 0);
|
||||||
}
|
}
|
||||||
@ -873,6 +876,7 @@ where
|
|||||||
/// Completely reset the current sender map.
|
/// Completely reset the current sender map.
|
||||||
/// Afterwards, no receiver should read from it at a different location.
|
/// Afterwards, no receiver should read from it at a different location.
|
||||||
/// This is only useful if all connected llmp parties start over, for example after a crash.
|
/// This is only useful if all connected llmp parties start over, for example after a crash.
|
||||||
|
///
|
||||||
/// # Safety
|
/// # Safety
|
||||||
/// Only safe if you really really restart the page on everything connected
|
/// Only safe if you really really restart the page on everything connected
|
||||||
/// No receiver should read from this page at a different location.
|
/// No receiver should read from this page at a different location.
|
||||||
@ -1002,7 +1006,7 @@ where
|
|||||||
// Exclude the current page by splitting of the last element for this iter
|
// Exclude the current page by splitting of the last element for this iter
|
||||||
let mut unmap_until_excl = 0;
|
let mut unmap_until_excl = 0;
|
||||||
for map in self.out_shmems.split_last_mut().unwrap().1 {
|
for map in self.out_shmems.split_last_mut().unwrap().1 {
|
||||||
if (*map.page()).safe_to_unmap.load(Ordering::Relaxed) == 0 {
|
if (*map.page()).safe_to_unmap.load(Ordering::Acquire) == 0 {
|
||||||
// The broker didn't read this page yet, no more pages to unmap.
|
// The broker didn't read this page yet, no more pages to unmap.
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@ -1010,6 +1014,8 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
if unmap_until_excl == 0 && self.out_shmems.len() > LLMP_CFG_MAX_PENDING_UNREAD_PAGES {
|
if unmap_until_excl == 0 && self.out_shmems.len() > LLMP_CFG_MAX_PENDING_UNREAD_PAGES {
|
||||||
|
// Looks like nobody is listening to our pages anymore! :/
|
||||||
|
// The n old pages have not been touched yet.
|
||||||
// We send one last information to the broker before quitting.
|
// We send one last information to the broker before quitting.
|
||||||
self.send_buf(LLMP_SLOW_RECEIVER_PANIC, &[]).unwrap();
|
self.send_buf(LLMP_SLOW_RECEIVER_PANIC, &[]).unwrap();
|
||||||
panic!("The receiver/broker could not process our sent llmp messages in time. Either we're sending too many messages too fast, the broker got stuck, or it crashed. Giving up.");
|
panic!("The receiver/broker could not process our sent llmp messages in time. Either we're sending too many messages too fast, the broker got stuck, or it crashed. Giving up.");
|
||||||
@ -1018,11 +1024,20 @@ where
|
|||||||
// Remove all maps that the broker already mapped, move them to our unused pages cache
|
// Remove all maps that the broker already mapped, move them to our unused pages cache
|
||||||
self.out_shmems.reserve(unmap_until_excl);
|
self.out_shmems.reserve(unmap_until_excl);
|
||||||
for _ in 0..unmap_until_excl {
|
for _ in 0..unmap_until_excl {
|
||||||
let shmem = self.out_shmems.remove(0);
|
let mut map = self.out_shmems.remove(0);
|
||||||
|
|
||||||
|
let page = shmem2page_mut(&mut map.shmem);
|
||||||
|
assert!(
|
||||||
|
(*page).magic == PAGE_INITIALIZED_MAGIC,
|
||||||
|
"LLMP: Tried to free uninitialized shared map at addr {:#}!",
|
||||||
|
page as usize
|
||||||
|
);
|
||||||
|
(*page).magic = PAGE_DEINITIALIZED_MAGIC;
|
||||||
|
|
||||||
#[cfg(feature = "llmp_debug")]
|
#[cfg(feature = "llmp_debug")]
|
||||||
log::debug!("Moving unused map to cache: {shmem:?}");
|
log::debug!("Moving unused map to cache: {map:?}");
|
||||||
self.unused_shmem_cache
|
self.unused_shmem_cache
|
||||||
.insert(self.unused_shmem_cache.len(), shmem);
|
.insert(self.unused_shmem_cache.len(), map);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1209,7 +1224,7 @@ where
|
|||||||
#[cfg(feature = "llmp_debug")]
|
#[cfg(feature = "llmp_debug")]
|
||||||
log::info!("Returning cached shmem {cached_shmem:?}");
|
log::info!("Returning cached shmem {cached_shmem:?}");
|
||||||
unsafe {
|
unsafe {
|
||||||
llmp_page_init(&mut cached_shmem.shmem, sender_id, true);
|
llmp_page_init(&mut cached_shmem.shmem, sender_id, false);
|
||||||
}
|
}
|
||||||
Ok(cached_shmem)
|
Ok(cached_shmem)
|
||||||
}
|
}
|
||||||
@ -1257,7 +1272,7 @@ where
|
|||||||
log::info!("got new map at: {new_map:?}");
|
log::info!("got new map at: {new_map:?}");
|
||||||
|
|
||||||
// New maps always start with 0 as message id -> No messages yet.
|
// New maps always start with 0 as message id -> No messages yet.
|
||||||
(*new_map).current_msg_id.store(0, Ordering::Relaxed);
|
(*new_map).current_msg_id.store(0, Ordering::Release);
|
||||||
|
|
||||||
#[cfg(feature = "llmp_debug")]
|
#[cfg(feature = "llmp_debug")]
|
||||||
log::info!("Setting max alloc size: {:?}", (*old_map).max_alloc_size);
|
log::info!("Setting max alloc size: {:?}", (*old_map).max_alloc_size);
|
||||||
@ -1563,7 +1578,7 @@ where
|
|||||||
// Let's see what we got.
|
// Let's see what we got.
|
||||||
if let Some(msg) = ret {
|
if let Some(msg) = ret {
|
||||||
if !(*msg).in_shmem(&mut self.current_recv_shmem) {
|
if !(*msg).in_shmem(&mut self.current_recv_shmem) {
|
||||||
return Err(Error::illegal_state("Unexpected message in map (out of map bounds) - bugy client or tampered shared map detedted!"));
|
return Err(Error::illegal_state("Unexpected message in map (out of map bounds) - buggy client or tampered shared map detected!"));
|
||||||
}
|
}
|
||||||
// Handle special, LLMP internal, messages.
|
// Handle special, LLMP internal, messages.
|
||||||
match (*msg).tag {
|
match (*msg).tag {
|
||||||
|
@ -24,6 +24,8 @@ use nix::{
|
|||||||
unistd::Pid,
|
unistd::Pid,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
#[cfg(feature = "regex")]
|
||||||
|
use crate::observers::{get_asan_runtime_flags_with_log_path, AsanBacktraceObserver};
|
||||||
use crate::{
|
use crate::{
|
||||||
bolts::{
|
bolts::{
|
||||||
fs::{get_unique_std_input_file, InputFile},
|
fs::{get_unique_std_input_file, InputFile},
|
||||||
@ -40,9 +42,6 @@ use crate::{
|
|||||||
Error,
|
Error,
|
||||||
};
|
};
|
||||||
|
|
||||||
#[cfg(feature = "regex")]
|
|
||||||
use crate::observers::{get_asan_runtime_flags_with_log_path, AsanBacktraceObserver};
|
|
||||||
|
|
||||||
const FORKSRV_FD: i32 = 198;
|
const FORKSRV_FD: i32 = 198;
|
||||||
#[allow(clippy::cast_possible_wrap)]
|
#[allow(clippy::cast_possible_wrap)]
|
||||||
const FS_OPT_ENABLED: i32 = 0x80000001_u32 as i32;
|
const FS_OPT_ENABLED: i32 = 0x80000001_u32 as i32;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user