Fix LLMP eop race, introduce LLMP ShMem cache (#1091)

* Fix llmp eop race, introduce llmp shmem cache

* initialize cached page, clippy

* fix llmp_debug strings

* add error handling

* nicer error output

* More error handling convenience

* clippy

* fix macos example

* nits

* trying to add a logger

* no_std

* inline logger enabled

* clippy
This commit is contained in:
Dominik Maier 2023-02-24 10:28:21 +01:00 committed by GitHub
parent ff4e2f4192
commit 92842c8b04
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 205 additions and 52 deletions

View File

@ -3,13 +3,13 @@ This shows how llmp can be used directly, without libafl abstractions
*/ */
extern crate alloc; extern crate alloc;
#[cfg(all(unix, feature = "std"))] #[cfg(all(any(unix, windows), feature = "std"))]
use core::time::Duration; use core::time::Duration;
#[cfg(all(unix, feature = "std"))] #[cfg(all(any(unix, windows), feature = "std"))]
use std::{thread, time}; use std::{thread, time};
use libafl::bolts::llmp::Tag; use libafl::{bolts::llmp::Tag, prelude::SimpleStdErrLogger};
#[cfg(all(unix, feature = "std"))] #[cfg(all(any(unix, windows), feature = "std"))]
use libafl::{ use libafl::{
bolts::{ bolts::{
llmp, llmp,
@ -18,11 +18,13 @@ use libafl::{
Error, Error,
}; };
static LOGGER: SimpleStdErrLogger = SimpleStdErrLogger::debug();
const _TAG_SIMPLE_U32_V1: Tag = 0x5130_0321; const _TAG_SIMPLE_U32_V1: Tag = 0x5130_0321;
const _TAG_MATH_RESULT_V1: Tag = 0x7747_4331; const _TAG_MATH_RESULT_V1: Tag = 0x7747_4331;
const _TAG_1MEG_V1: Tag = 0xB111_1161; const _TAG_1MEG_V1: Tag = 0xB111_1161;
#[cfg(all(unix, feature = "std"))] #[cfg(all(any(unix, windows), feature = "std"))]
fn adder_loop(port: u16) -> ! { fn adder_loop(port: u16) -> ! {
let shmem_provider = StdShMemProvider::new().unwrap(); let shmem_provider = StdShMemProvider::new().unwrap();
let mut client = llmp::LlmpClient::create_attach_to_tcp(shmem_provider, port).unwrap(); let mut client = llmp::LlmpClient::create_attach_to_tcp(shmem_provider, port).unwrap();
@ -60,23 +62,28 @@ fn adder_loop(port: u16) -> ! {
} }
} }
#[cfg(all(unix, feature = "std"))] #[cfg(all(any(unix, windows), feature = "std"))]
fn large_msg_loop(port: u16) -> ! { fn large_msg_loop(port: u16) -> ! {
let mut client = let mut client =
llmp::LlmpClient::create_attach_to_tcp(StdShMemProvider::new().unwrap(), port).unwrap(); llmp::LlmpClient::create_attach_to_tcp(StdShMemProvider::new().unwrap(), port).unwrap();
#[allow(clippy::large_stack_arrays)] #[cfg(not(target_vendor = "apple"))]
let meg_buf = [1u8; 1 << 20]; let meg_buf = vec![1u8; 1 << 20];
#[cfg(target_vendor = "apple")]
let meg_buf = vec![1u8; 1 << 19];
loop { loop {
client.send_buf(_TAG_1MEG_V1, &meg_buf).unwrap(); client.send_buf(_TAG_1MEG_V1, &meg_buf).unwrap();
#[cfg(not(target_vendor = "apple"))]
println!("Sending the next megabyte"); println!("Sending the next megabyte");
#[cfg(target_vendor = "apple")]
println!("Sending the next half megabyte (Apple had issues with >1 meg)");
thread::sleep(time::Duration::from_millis(100)); thread::sleep(time::Duration::from_millis(100));
} }
} }
#[allow(clippy::unnecessary_wraps)] #[allow(clippy::unnecessary_wraps)]
#[cfg(all(unix, feature = "std"))] #[cfg(all(any(unix, windows), feature = "std"))]
fn broker_message_hook( fn broker_message_hook(
client_id: u32, client_id: u32,
tag: llmp::Tag, tag: llmp::Tag,
@ -106,12 +113,12 @@ fn broker_message_hook(
} }
} }
#[cfg(not(unix))] #[cfg(not(any(unix, windows)))]
fn main() { fn main() {
todo!("LLMP is not yet supported on this platform."); todo!("LLMP is not yet supported on this platform.");
} }
#[cfg(unix)] #[cfg(any(unix, windows))]
fn main() { fn main() {
/* The main node has a broker, and a few worker threads */ /* The main node has a broker, and a few worker threads */
@ -129,6 +136,9 @@ fn main() {
.unwrap_or_else(|| "4242".into()) .unwrap_or_else(|| "4242".into())
.parse::<u16>() .parse::<u16>()
.unwrap(); .unwrap();
log::set_logger(&LOGGER).unwrap();
println!("Launching in mode {mode} on port {port}"); println!("Launching in mode {mode} on port {port}");
match mode.as_str() { match mode.as_str() {

View File

@ -455,13 +455,13 @@ fn next_shmem_size(max_alloc: usize) -> usize {
/// Initialize a new `llmp_page`. The size should be relative to /// Initialize a new `llmp_page`. The size should be relative to
/// `llmp_page->messages` /// `llmp_page->messages`
unsafe fn _llmp_page_init<SHM: ShMem>(shmem: &mut SHM, sender_id: ClientId, allow_reinit: bool) { unsafe fn llmp_page_init<SHM: ShMem>(shmem: &mut SHM, sender_id: ClientId, allow_reinit: bool) {
#[cfg(feature = "llmp_debug")] #[cfg(feature = "llmp_debug")]
log::trace!("_llmp_page_init: shmem {:?}", &shmem); log::trace!("llmp_page_init: shmem {:?}", &shmem);
let map_size = shmem.len(); let map_size = shmem.len();
let page = shmem2page_mut(shmem); let page = shmem2page_mut(shmem);
#[cfg(feature = "llmp_debug")] #[cfg(feature = "llmp_debug")]
log::trace!("_llmp_page_init: page {:?}", &(*page)); log::trace!("llmp_page_init: page {:?}", &(*page));
if !allow_reinit { if !allow_reinit {
assert!( assert!(
@ -758,6 +758,12 @@ where
pub last_msg_sent: *const LlmpMsg, pub last_msg_sent: *const LlmpMsg,
/// A vec of page wrappers, each containing an initialized [`ShMem`] /// A vec of page wrappers, each containing an initialized [`ShMem`]
pub out_shmems: Vec<LlmpSharedMap<SP::ShMem>>, pub out_shmems: Vec<LlmpSharedMap<SP::ShMem>>,
/// A vec of pages that we previously used, but that have served its purpose
/// (no potential readers are left).
/// Instead of freeing them, we keep them around to potentially reuse them later,
/// if they are still large enough.
/// This way, the OS doesn't have to spend time zeroing pages, and getting rid of our old pages
unused_shmem_cache: Vec<LlmpSharedMap<SP::ShMem>>,
/// If true, pages will never be pruned. /// If true, pages will never be pruned.
/// The broker uses this feature. /// The broker uses this feature.
/// By keeping the message history around, /// By keeping the message history around,
@ -793,6 +799,7 @@ where
keep_pages_forever, keep_pages_forever,
has_unsent_message: false, has_unsent_message: false,
shmem_provider, shmem_provider,
unused_shmem_cache: vec![],
}) })
} }
@ -803,7 +810,7 @@ where
/// Only safe if you really really restart the page on everything connected /// Only safe if you really really restart the page on everything connected
/// No receiver should read from this page at a different location. /// No receiver should read from this page at a different location.
pub unsafe fn reset(&mut self) { pub unsafe fn reset(&mut self) {
_llmp_page_init( llmp_page_init(
&mut self.out_shmems.last_mut().unwrap().shmem, &mut self.out_shmems.last_mut().unwrap().shmem,
self.id, self.id,
true, true,
@ -917,11 +924,12 @@ where
keep_pages_forever: false, keep_pages_forever: false,
has_unsent_message: false, has_unsent_message: false,
shmem_provider, shmem_provider,
unused_shmem_cache: vec![],
}) })
} }
/// For non zero-copy, we want to get rid of old pages with duplicate messages in the client /// For non zero-copy, we want to get rid of old pages with duplicate messages in the client
/// eventually. This function This funtion sees if we can unallocate older pages. /// eventually. This function This function sees if we can deallocate older pages.
/// The broker would have informed us by setting the safe_to_unmap-flag. /// The broker would have informed us by setting the safe_to_unmap-flag.
unsafe fn prune_old_pages(&mut self) { unsafe fn prune_old_pages(&mut self) {
// Exclude the current page by splitting of the last element for this iter // Exclude the current page by splitting of the last element for this iter
@ -940,9 +948,15 @@ where
panic!("The receiver/broker could not process our sent llmp messages in time. Either we're sending too many messages too fast, the broker got stuck, or it crashed. Giving up."); panic!("The receiver/broker could not process our sent llmp messages in time. Either we're sending too many messages too fast, the broker got stuck, or it crashed. Giving up.");
} }
// Remove all maps that the broker already mapped // Remove all maps that the broker already mapped, move them to our unused pages cache
// simply removing them from the vec should then call drop and unmap them. self.out_shmems.reserve(unmap_until_excl);
self.out_shmems.drain(0..unmap_until_excl); for _ in 0..unmap_until_excl {
let shmem = self.out_shmems.remove(0);
#[cfg(feature = "llmp_debug")]
log::debug!("Moving unused map to cache: {shmem:?}");
self.unused_shmem_cache
.insert(self.unused_shmem_cache.len(), shmem);
}
} }
/// Intern: Special allocation function for `EOP` messages (and nothing else!) /// Intern: Special allocation function for `EOP` messages (and nothing else!)
@ -1099,6 +1113,42 @@ where
Ok(()) Ok(())
} }
/// Grab an unused `LlmpSharedMap` from `unused_shmem_cache` or allocate a new map,
/// if no suitable maps could be found.
unsafe fn new_or_unused_shmem(
&mut self,
sender_id: ClientId,
next_min_shmem_size: usize,
) -> Result<LlmpSharedMap<<SP>::ShMem>, Error> {
if self.unused_shmem_cache.is_empty() {
// No cached maps that fit our need, let's allocate a new one.
Ok(LlmpSharedMap::new(
sender_id,
self.shmem_provider.new_shmem(next_min_shmem_size)?,
))
} else {
// We got cached shmems laying around, hand it out, if they are large enough.
let mut cached_shmem = self
.unused_shmem_cache
.remove(self.unused_shmem_cache.len() - 1);
if cached_shmem.shmem.len() < next_min_shmem_size {
// This map is too small, we will never need it again (llmp allocation sizes always increase). Drop it, then call this function again..
#[cfg(feature = "llmp_debug")]
log::info!("Dropping too small shmem {cached_shmem:?}");
drop(cached_shmem);
self.new_or_unused_shmem(sender_id, next_min_shmem_size)
} else {
#[cfg(feature = "llmp_debug")]
log::info!("Returning cached shmem {cached_shmem:?}");
unsafe {
llmp_page_init(&mut cached_shmem.shmem, sender_id, true);
}
Ok(cached_shmem)
}
}
}
/// listener about it using a EOP message. /// listener about it using a EOP message.
unsafe fn handle_out_eop(&mut self) -> Result<(), Error> { unsafe fn handle_out_eop(&mut self) -> Result<(), Error> {
#[cfg(all(feature = "llmp_debug", feature = "std"))] #[cfg(all(feature = "llmp_debug", feature = "std"))]
@ -1116,33 +1166,37 @@ where
); );
} }
// If we want to get red if old pages, (client to broker), do that now
if !self.keep_pages_forever {
#[cfg(feature = "llmp_debug")]
log::info!("pruning");
self.prune_old_pages();
}
let old_map = self.out_shmems.last_mut().unwrap().page_mut(); let old_map = self.out_shmems.last_mut().unwrap().page_mut();
#[cfg(feature = "llmp_debug")] let next_min_shmem_size = next_shmem_size((*old_map).max_alloc_size);
log::info!(
"Next ShMem Size {}", #[cfg(feature = "llmp_debug")]
next_shmem_size((*old_map).max_alloc_size) log::info!("Next min ShMem Size {next_min_shmem_size}",);
);
// Get a new shared page, or reuse an old one, if available.
let mut new_map_shmem =
self.new_or_unused_shmem((*old_map).sender_id, next_min_shmem_size)?;
// Create a new shard page.
let mut new_map_shmem = LlmpSharedMap::new(
(*old_map).sender_id,
self.shmem_provider
.new_shmem(next_shmem_size((*old_map).max_alloc_size))?,
);
let mut new_map = new_map_shmem.page_mut(); let mut new_map = new_map_shmem.page_mut();
#[cfg(feature = "llmp_debug")] #[cfg(feature = "llmp_debug")]
log::info!("got new map at: {new_map:?}"); log::info!("got new map at: {new_map:?}");
(*new_map).current_msg_id.store( // New maps always start with 0 as message id -> No messages yet.
(*old_map).current_msg_id.load(Ordering::Relaxed), (*new_map).current_msg_id.store(0, Ordering::Relaxed);
Ordering::Relaxed,
);
#[cfg(feature = "llmp_debug")] #[cfg(feature = "llmp_debug")]
log::info!("Setting max alloc size: {:?}", (*old_map).max_alloc_size); log::info!("Setting max alloc size: {:?}", (*old_map).max_alloc_size);
// Allocations may never shrink:
// keep track of the max message size we allocated across maps.
(*new_map).max_alloc_size = (*old_map).max_alloc_size; (*new_map).max_alloc_size = (*old_map).max_alloc_size;
/* On the old map, place a last message linking to the new map for the clients /* On the old map, place a last message linking to the new map for the clients
@ -1154,7 +1208,7 @@ where
(*end_of_page_msg).map_size = new_map_shmem.shmem.len(); (*end_of_page_msg).map_size = new_map_shmem.shmem.len();
(*end_of_page_msg).shm_str = *new_map_shmem.shmem.id().as_array(); (*end_of_page_msg).shm_str = *new_map_shmem.shmem.id().as_array();
/* Send the last msg on the old buf */ /* Send the last msg (the EOP message) on the old buf */
self.send(out, true)?; self.send(out, true)?;
// Set the new page as current page. // Set the new page as current page.
@ -1162,13 +1216,6 @@ where
// We never sent a msg on the new buf */ // We never sent a msg on the new buf */
self.last_msg_sent = ptr::null_mut(); self.last_msg_sent = ptr::null_mut();
// If we want to get red if old pages, (client to broker), do that now
if !self.keep_pages_forever {
#[cfg(feature = "llmp_debug")]
log::info!("pruning");
self.prune_old_pages();
}
Ok(()) Ok(())
} }
@ -1469,7 +1516,7 @@ where
self.last_msg_recvd = ptr::null(); self.last_msg_recvd = ptr::null();
self.highest_msg_id = 0; self.highest_msg_id = 0;
// Mark the old page save to unmap, in case we didn't so earlier. // Mark the old page save to unmap, in case we didn't do so earlier.
(*page).safe_to_unmap.store(1, Ordering::Relaxed); (*page).safe_to_unmap.store(1, Ordering::Relaxed);
// Map the new page. The old one should be unmapped by Drop // Map the new page. The old one should be unmapped by Drop
@ -1623,7 +1670,7 @@ where
); );
unsafe { unsafe {
_llmp_page_init(&mut new_shmem, sender, false); llmp_page_init(&mut new_shmem, sender, false);
} }
Self { shmem: new_shmem } Self { shmem: new_shmem }
} }
@ -1803,6 +1850,7 @@ where
keep_pages_forever: true, keep_pages_forever: true,
has_unsent_message: false, has_unsent_message: false,
shmem_provider: shmem_provider.clone(), shmem_provider: shmem_provider.clone(),
unused_shmem_cache: vec![],
}, },
llmp_clients: vec![], llmp_clients: vec![],
shmem_provider, shmem_provider,
@ -2345,6 +2393,7 @@ where
keep_pages_forever: false, keep_pages_forever: false,
has_unsent_message: false, has_unsent_message: false,
shmem_provider: shmem_provider_bg.clone(), shmem_provider: shmem_provider_bg.clone(),
unused_shmem_cache: vec![],
}; };
loop { loop {
@ -2636,6 +2685,7 @@ where
keep_pages_forever: false, keep_pages_forever: false,
has_unsent_message: false, has_unsent_message: false,
shmem_provider: shmem_provider.clone(), shmem_provider: shmem_provider.clone(),
unused_shmem_cache: vec![],
}, },
receiver: LlmpReceiver { receiver: LlmpReceiver {

View File

@ -34,6 +34,9 @@ use core::{iter::Iterator, time};
#[cfg(feature = "std")] #[cfg(feature = "std")]
use std::time::{SystemTime, UNIX_EPOCH}; use std::time::{SystemTime, UNIX_EPOCH};
#[cfg(feature = "std")]
use log::{Level, Metadata, Record};
/// Can be converted to a slice /// Can be converted to a slice
pub trait AsSlice { pub trait AsSlice {
/// Type of the entries in this slice /// Type of the entries in this slice
@ -204,6 +207,73 @@ pub fn format_duration_hms(duration: &time::Duration) -> String {
format!("{}h-{}m-{}s", (secs / 60) / 60, (secs / 60) % 60, secs % 60) format!("{}h-{}m-{}s", (secs / 60) / 60, (secs / 60) % 60, secs % 60)
} }
/// A simple logger struct that logs to stderr when used with [`log::set_logger`].
#[derive(Debug)]
#[cfg(feature = "std")]
pub struct SimpleStdErrLogger {
/// The min log level for which this logger will write messages.
pub log_level: Level,
}
#[cfg(feature = "std")]
impl SimpleStdErrLogger {
/// Create a new [`log::Log`] logger that will log [`Level::Trace`] and above
#[must_use]
pub const fn trace() -> Self {
Self {
log_level: Level::Trace,
}
}
/// Create a new [`log::Log`] logger that will log [`Level::Debug`] and above
#[must_use]
pub const fn debug() -> Self {
Self {
log_level: Level::Debug,
}
}
/// Create a new [`log::Log`] logger that will log [`Level::Info`] and above
#[must_use]
pub const fn info() -> Self {
Self {
log_level: Level::Info,
}
}
/// Create a new [`log::Log`] logger that will log [`Level::Warn`] and above
#[must_use]
pub const fn warn() -> Self {
Self {
log_level: Level::Warn,
}
}
/// Create a new [`log::Log`] logger that will log [`Level::Error`]
#[must_use]
pub const fn error() -> Self {
Self {
log_level: Level::Error,
}
}
}
#[cfg(feature = "std")]
impl log::Log for SimpleStdErrLogger {
#[inline]
fn enabled(&self, metadata: &Metadata) -> bool {
metadata.level() <= self.log_level
}
fn log(&self, record: &Record) {
if self.enabled(record.metadata()) {
eprintln!("{}: {}", record.level(), record.args());
}
}
fn flush(&self) {}
}
/// The purpose of this module is to alleviate imports of the bolts by adding a glob import. /// The purpose of this module is to alleviate imports of the bolts by adding a glob import.
#[cfg(feature = "prelude")] #[cfg(feature = "prelude")]
pub mod bolts_prelude { pub mod bolts_prelude {

View File

@ -130,15 +130,16 @@ where
let mut message = header.to_vec(); let mut message = header.to_vec();
message.extend(body); message.extend(body);
self.stream self.stream.write_all(&message)?;
.write_all(&message) //.expect("Failed to send message");
.expect("Failed to send message");
let mut shm_slice = [0_u8; 20]; let mut shm_slice = [0_u8; 20];
let mut fd_buf = [-1; 1]; let mut fd_buf = [-1; 1];
self.stream let (slice_size, fd_count) = self.stream.recv_fds(&mut shm_slice, &mut fd_buf)?;
.recv_fds(&mut shm_slice, &mut fd_buf) //.expect("Did not receive a response");
.expect("Did not receive a response"); if slice_size == 0 && fd_count == 0 {
return Err(Error::illegal_state(format!("Tried to receive 20 bytes and one fd via unix shmem socket, but got {slice_size} bytes and {fd_count} fds.")));
}
let server_id = ShMemId::from_array(&shm_slice); let server_id = ShMemId::from_array(&shm_slice);
let server_fd: i32 = server_id.into(); let server_fd: i32 = server_id.into();
@ -179,7 +180,11 @@ where
let service = ShMemService::<SP>::start(); let service = ShMemService::<SP>::start();
let mut res = Self { let mut res = Self {
stream: UnixStream::connect_to_unix_addr(&UnixSocketAddr::new(UNIX_SERVER_NAME)?)?, stream: UnixStream::connect_to_unix_addr(&UnixSocketAddr::new(UNIX_SERVER_NAME)?).map_err(|err| Error::illegal_state(if cfg!(target_vendor = "apple") {
format!("The ServedShMemProvider was not started or is no longer running. You may need to remove the './libafl_unix_shmem_server' file and retry. Error details: {err:?}")
} else {
format!("The ServedShMemProvider was not started or is no longer running. Error details: {err:?}")
}))?,
inner: SP::new()?, inner: SP::new()?,
id: -1, id: -1,
service, service,
@ -536,7 +541,18 @@ where
} }
ServedShMemRequest::ExistingMap(description) => { ServedShMemRequest::ExistingMap(description) => {
let client = self.clients.get_mut(&client_id).unwrap(); let client = self.clients.get_mut(&client_id).unwrap();
let description_id: i32 = description.id.into();
if description.id.is_empty() {
return Err(Error::illegal_state("Received empty ShMemId from unix shmem client. Are the shmem limits set correctly? Did a client crash?"));
}
let description_id: i32 = description.id.try_into().unwrap();
if !self.all_shmems.contains_key(&description_id) {
// We should never get here, but it may happen if the OS ran out of shmem pages at some point//reached limits.
return Err(Error::illegal_state(format!("Client wanted to read from existing map with id {description_id}/{description:?}, but it was not allocated by this shmem server. Are the shmem limits set correctly? Did a client crash?")));
}
if client.maps.contains_key(&description_id) { if client.maps.contains_key(&description_id) {
// Using let else here as self needs to be accessed in the else branch. // Using let else here as self needs to be accessed in the else branch.
#[allow(clippy::option_if_let_else)] #[allow(clippy::option_if_let_else)]

View File

@ -123,6 +123,13 @@ impl ShMemId {
Self { id: slice } Self { id: slice }
} }
/// Returns `true` if this `ShMemId` has an empty backing slice.
/// If this is the case something went wrong, and this `ShMemId` may not be read from.
#[must_use]
pub fn is_empty(&self) -> bool {
self.id[0] == 0
}
/// Get the id as a fixed-length slice /// Get the id as a fixed-length slice
#[must_use] #[must_use]
pub fn as_array(&self) -> &[u8; 20] { pub fn as_array(&self) -> &[u8; 20] {