From 92842c8b04c9aeb0f9df32ea30f58799d4a20805 Mon Sep 17 00:00:00 2001 From: Dominik Maier Date: Fri, 24 Feb 2023 10:28:21 +0100 Subject: [PATCH] Fix LLMP eop race, introduce LLMP ShMem cache (#1091) * Fix llmp eop race, introduce llmp shmem cache * initialize cached page, clippy * fix llmp_debug strings * add error handling * nicer error output * More error handling convenience * clippy * fix macos example * nits * trying to add a logger * no_std * inline logger enabled * clippy --- libafl/examples/llmp_test/main.rs | 32 ++++--- libafl/src/bolts/llmp.rs | 116 ++++++++++++++++------- libafl/src/bolts/mod.rs | 70 ++++++++++++++ libafl/src/bolts/os/unix_shmem_server.rs | 32 +++++-- libafl/src/bolts/shmem.rs | 7 ++ 5 files changed, 205 insertions(+), 52 deletions(-) diff --git a/libafl/examples/llmp_test/main.rs b/libafl/examples/llmp_test/main.rs index dbdf8d7d93..3c676103a9 100644 --- a/libafl/examples/llmp_test/main.rs +++ b/libafl/examples/llmp_test/main.rs @@ -3,13 +3,13 @@ This shows how llmp can be used directly, without libafl abstractions */ extern crate alloc; -#[cfg(all(unix, feature = "std"))] +#[cfg(all(any(unix, windows), feature = "std"))] use core::time::Duration; -#[cfg(all(unix, feature = "std"))] +#[cfg(all(any(unix, windows), feature = "std"))] use std::{thread, time}; -use libafl::bolts::llmp::Tag; -#[cfg(all(unix, feature = "std"))] +use libafl::{bolts::llmp::Tag, prelude::SimpleStdErrLogger}; +#[cfg(all(any(unix, windows), feature = "std"))] use libafl::{ bolts::{ llmp, @@ -18,11 +18,13 @@ use libafl::{ Error, }; +static LOGGER: SimpleStdErrLogger = SimpleStdErrLogger::debug(); + const _TAG_SIMPLE_U32_V1: Tag = 0x5130_0321; const _TAG_MATH_RESULT_V1: Tag = 0x7747_4331; const _TAG_1MEG_V1: Tag = 0xB111_1161; -#[cfg(all(unix, feature = "std"))] +#[cfg(all(any(unix, windows), feature = "std"))] fn adder_loop(port: u16) -> ! { let shmem_provider = StdShMemProvider::new().unwrap(); let mut client = llmp::LlmpClient::create_attach_to_tcp(shmem_provider, port).unwrap(); @@ -60,23 +62,28 @@ fn adder_loop(port: u16) -> ! { } } -#[cfg(all(unix, feature = "std"))] +#[cfg(all(any(unix, windows), feature = "std"))] fn large_msg_loop(port: u16) -> ! { let mut client = llmp::LlmpClient::create_attach_to_tcp(StdShMemProvider::new().unwrap(), port).unwrap(); - #[allow(clippy::large_stack_arrays)] - let meg_buf = [1u8; 1 << 20]; + #[cfg(not(target_vendor = "apple"))] + let meg_buf = vec![1u8; 1 << 20]; + #[cfg(target_vendor = "apple")] + let meg_buf = vec![1u8; 1 << 19]; loop { client.send_buf(_TAG_1MEG_V1, &meg_buf).unwrap(); + #[cfg(not(target_vendor = "apple"))] println!("Sending the next megabyte"); + #[cfg(target_vendor = "apple")] + println!("Sending the next half megabyte (Apple had issues with >1 meg)"); thread::sleep(time::Duration::from_millis(100)); } } #[allow(clippy::unnecessary_wraps)] -#[cfg(all(unix, feature = "std"))] +#[cfg(all(any(unix, windows), feature = "std"))] fn broker_message_hook( client_id: u32, tag: llmp::Tag, @@ -106,12 +113,12 @@ fn broker_message_hook( } } -#[cfg(not(unix))] +#[cfg(not(any(unix, windows)))] fn main() { todo!("LLMP is not yet supported on this platform."); } -#[cfg(unix)] +#[cfg(any(unix, windows))] fn main() { /* The main node has a broker, and a few worker threads */ @@ -129,6 +136,9 @@ fn main() { .unwrap_or_else(|| "4242".into()) .parse::() .unwrap(); + + log::set_logger(&LOGGER).unwrap(); + println!("Launching in mode {mode} on port {port}"); match mode.as_str() { diff --git a/libafl/src/bolts/llmp.rs b/libafl/src/bolts/llmp.rs index a71fcbc8f0..2ca0dd8912 100644 --- a/libafl/src/bolts/llmp.rs +++ b/libafl/src/bolts/llmp.rs @@ -455,13 +455,13 @@ fn next_shmem_size(max_alloc: usize) -> usize { /// Initialize a new `llmp_page`. The size should be relative to /// `llmp_page->messages` -unsafe fn _llmp_page_init(shmem: &mut SHM, sender_id: ClientId, allow_reinit: bool) { +unsafe fn llmp_page_init(shmem: &mut SHM, sender_id: ClientId, allow_reinit: bool) { #[cfg(feature = "llmp_debug")] - log::trace!("_llmp_page_init: shmem {:?}", &shmem); + log::trace!("llmp_page_init: shmem {:?}", &shmem); let map_size = shmem.len(); let page = shmem2page_mut(shmem); #[cfg(feature = "llmp_debug")] - log::trace!("_llmp_page_init: page {:?}", &(*page)); + log::trace!("llmp_page_init: page {:?}", &(*page)); if !allow_reinit { assert!( @@ -758,6 +758,12 @@ where pub last_msg_sent: *const LlmpMsg, /// A vec of page wrappers, each containing an initialized [`ShMem`] pub out_shmems: Vec>, + /// A vec of pages that we previously used, but that have served its purpose + /// (no potential readers are left). + /// Instead of freeing them, we keep them around to potentially reuse them later, + /// if they are still large enough. + /// This way, the OS doesn't have to spend time zeroing pages, and getting rid of our old pages + unused_shmem_cache: Vec>, /// If true, pages will never be pruned. /// The broker uses this feature. /// By keeping the message history around, @@ -793,6 +799,7 @@ where keep_pages_forever, has_unsent_message: false, shmem_provider, + unused_shmem_cache: vec![], }) } @@ -803,7 +810,7 @@ where /// Only safe if you really really restart the page on everything connected /// No receiver should read from this page at a different location. pub unsafe fn reset(&mut self) { - _llmp_page_init( + llmp_page_init( &mut self.out_shmems.last_mut().unwrap().shmem, self.id, true, @@ -917,11 +924,12 @@ where keep_pages_forever: false, has_unsent_message: false, shmem_provider, + unused_shmem_cache: vec![], }) } /// For non zero-copy, we want to get rid of old pages with duplicate messages in the client - /// eventually. This function This funtion sees if we can unallocate older pages. + /// eventually. This function This function sees if we can deallocate older pages. /// The broker would have informed us by setting the safe_to_unmap-flag. unsafe fn prune_old_pages(&mut self) { // Exclude the current page by splitting of the last element for this iter @@ -940,9 +948,15 @@ where panic!("The receiver/broker could not process our sent llmp messages in time. Either we're sending too many messages too fast, the broker got stuck, or it crashed. Giving up."); } - // Remove all maps that the broker already mapped - // simply removing them from the vec should then call drop and unmap them. - self.out_shmems.drain(0..unmap_until_excl); + // Remove all maps that the broker already mapped, move them to our unused pages cache + self.out_shmems.reserve(unmap_until_excl); + for _ in 0..unmap_until_excl { + let shmem = self.out_shmems.remove(0); + #[cfg(feature = "llmp_debug")] + log::debug!("Moving unused map to cache: {shmem:?}"); + self.unused_shmem_cache + .insert(self.unused_shmem_cache.len(), shmem); + } } /// Intern: Special allocation function for `EOP` messages (and nothing else!) @@ -1099,6 +1113,42 @@ where Ok(()) } + /// Grab an unused `LlmpSharedMap` from `unused_shmem_cache` or allocate a new map, + /// if no suitable maps could be found. + unsafe fn new_or_unused_shmem( + &mut self, + sender_id: ClientId, + next_min_shmem_size: usize, + ) -> Result::ShMem>, Error> { + if self.unused_shmem_cache.is_empty() { + // No cached maps that fit our need, let's allocate a new one. + Ok(LlmpSharedMap::new( + sender_id, + self.shmem_provider.new_shmem(next_min_shmem_size)?, + )) + } else { + // We got cached shmems laying around, hand it out, if they are large enough. + let mut cached_shmem = self + .unused_shmem_cache + .remove(self.unused_shmem_cache.len() - 1); + + if cached_shmem.shmem.len() < next_min_shmem_size { + // This map is too small, we will never need it again (llmp allocation sizes always increase). Drop it, then call this function again.. + #[cfg(feature = "llmp_debug")] + log::info!("Dropping too small shmem {cached_shmem:?}"); + drop(cached_shmem); + self.new_or_unused_shmem(sender_id, next_min_shmem_size) + } else { + #[cfg(feature = "llmp_debug")] + log::info!("Returning cached shmem {cached_shmem:?}"); + unsafe { + llmp_page_init(&mut cached_shmem.shmem, sender_id, true); + } + Ok(cached_shmem) + } + } + } + /// listener about it using a EOP message. unsafe fn handle_out_eop(&mut self) -> Result<(), Error> { #[cfg(all(feature = "llmp_debug", feature = "std"))] @@ -1116,33 +1166,37 @@ where ); } + // If we want to get red if old pages, (client to broker), do that now + if !self.keep_pages_forever { + #[cfg(feature = "llmp_debug")] + log::info!("pruning"); + self.prune_old_pages(); + } + let old_map = self.out_shmems.last_mut().unwrap().page_mut(); - #[cfg(feature = "llmp_debug")] - log::info!( - "Next ShMem Size {}", - next_shmem_size((*old_map).max_alloc_size) - ); + let next_min_shmem_size = next_shmem_size((*old_map).max_alloc_size); + + #[cfg(feature = "llmp_debug")] + log::info!("Next min ShMem Size {next_min_shmem_size}",); + + // Get a new shared page, or reuse an old one, if available. + let mut new_map_shmem = + self.new_or_unused_shmem((*old_map).sender_id, next_min_shmem_size)?; - // Create a new shard page. - let mut new_map_shmem = LlmpSharedMap::new( - (*old_map).sender_id, - self.shmem_provider - .new_shmem(next_shmem_size((*old_map).max_alloc_size))?, - ); let mut new_map = new_map_shmem.page_mut(); #[cfg(feature = "llmp_debug")] log::info!("got new map at: {new_map:?}"); - (*new_map).current_msg_id.store( - (*old_map).current_msg_id.load(Ordering::Relaxed), - Ordering::Relaxed, - ); + // New maps always start with 0 as message id -> No messages yet. + (*new_map).current_msg_id.store(0, Ordering::Relaxed); #[cfg(feature = "llmp_debug")] log::info!("Setting max alloc size: {:?}", (*old_map).max_alloc_size); + // Allocations may never shrink: + // keep track of the max message size we allocated across maps. (*new_map).max_alloc_size = (*old_map).max_alloc_size; /* On the old map, place a last message linking to the new map for the clients @@ -1154,7 +1208,7 @@ where (*end_of_page_msg).map_size = new_map_shmem.shmem.len(); (*end_of_page_msg).shm_str = *new_map_shmem.shmem.id().as_array(); - /* Send the last msg on the old buf */ + /* Send the last msg (the EOP message) on the old buf */ self.send(out, true)?; // Set the new page as current page. @@ -1162,13 +1216,6 @@ where // We never sent a msg on the new buf */ self.last_msg_sent = ptr::null_mut(); - // If we want to get red if old pages, (client to broker), do that now - if !self.keep_pages_forever { - #[cfg(feature = "llmp_debug")] - log::info!("pruning"); - self.prune_old_pages(); - } - Ok(()) } @@ -1469,7 +1516,7 @@ where self.last_msg_recvd = ptr::null(); self.highest_msg_id = 0; - // Mark the old page save to unmap, in case we didn't so earlier. + // Mark the old page save to unmap, in case we didn't do so earlier. (*page).safe_to_unmap.store(1, Ordering::Relaxed); // Map the new page. The old one should be unmapped by Drop @@ -1623,7 +1670,7 @@ where ); unsafe { - _llmp_page_init(&mut new_shmem, sender, false); + llmp_page_init(&mut new_shmem, sender, false); } Self { shmem: new_shmem } } @@ -1803,6 +1850,7 @@ where keep_pages_forever: true, has_unsent_message: false, shmem_provider: shmem_provider.clone(), + unused_shmem_cache: vec![], }, llmp_clients: vec![], shmem_provider, @@ -2345,6 +2393,7 @@ where keep_pages_forever: false, has_unsent_message: false, shmem_provider: shmem_provider_bg.clone(), + unused_shmem_cache: vec![], }; loop { @@ -2636,6 +2685,7 @@ where keep_pages_forever: false, has_unsent_message: false, shmem_provider: shmem_provider.clone(), + unused_shmem_cache: vec![], }, receiver: LlmpReceiver { diff --git a/libafl/src/bolts/mod.rs b/libafl/src/bolts/mod.rs index b584d374bc..ae09d6fcb6 100644 --- a/libafl/src/bolts/mod.rs +++ b/libafl/src/bolts/mod.rs @@ -34,6 +34,9 @@ use core::{iter::Iterator, time}; #[cfg(feature = "std")] use std::time::{SystemTime, UNIX_EPOCH}; +#[cfg(feature = "std")] +use log::{Level, Metadata, Record}; + /// Can be converted to a slice pub trait AsSlice { /// Type of the entries in this slice @@ -204,6 +207,73 @@ pub fn format_duration_hms(duration: &time::Duration) -> String { format!("{}h-{}m-{}s", (secs / 60) / 60, (secs / 60) % 60, secs % 60) } +/// A simple logger struct that logs to stderr when used with [`log::set_logger`]. +#[derive(Debug)] +#[cfg(feature = "std")] +pub struct SimpleStdErrLogger { + /// The min log level for which this logger will write messages. + pub log_level: Level, +} + +#[cfg(feature = "std")] +impl SimpleStdErrLogger { + /// Create a new [`log::Log`] logger that will log [`Level::Trace`] and above + #[must_use] + pub const fn trace() -> Self { + Self { + log_level: Level::Trace, + } + } + + /// Create a new [`log::Log`] logger that will log [`Level::Debug`] and above + #[must_use] + pub const fn debug() -> Self { + Self { + log_level: Level::Debug, + } + } + + /// Create a new [`log::Log`] logger that will log [`Level::Info`] and above + #[must_use] + pub const fn info() -> Self { + Self { + log_level: Level::Info, + } + } + + /// Create a new [`log::Log`] logger that will log [`Level::Warn`] and above + #[must_use] + pub const fn warn() -> Self { + Self { + log_level: Level::Warn, + } + } + + /// Create a new [`log::Log`] logger that will log [`Level::Error`] + #[must_use] + pub const fn error() -> Self { + Self { + log_level: Level::Error, + } + } +} + +#[cfg(feature = "std")] +impl log::Log for SimpleStdErrLogger { + #[inline] + fn enabled(&self, metadata: &Metadata) -> bool { + metadata.level() <= self.log_level + } + + fn log(&self, record: &Record) { + if self.enabled(record.metadata()) { + eprintln!("{}: {}", record.level(), record.args()); + } + } + + fn flush(&self) {} +} + /// The purpose of this module is to alleviate imports of the bolts by adding a glob import. #[cfg(feature = "prelude")] pub mod bolts_prelude { diff --git a/libafl/src/bolts/os/unix_shmem_server.rs b/libafl/src/bolts/os/unix_shmem_server.rs index 71957b475a..9142e12bc0 100644 --- a/libafl/src/bolts/os/unix_shmem_server.rs +++ b/libafl/src/bolts/os/unix_shmem_server.rs @@ -130,15 +130,16 @@ where let mut message = header.to_vec(); message.extend(body); - self.stream - .write_all(&message) - .expect("Failed to send message"); + self.stream.write_all(&message)?; + //.expect("Failed to send message"); let mut shm_slice = [0_u8; 20]; let mut fd_buf = [-1; 1]; - self.stream - .recv_fds(&mut shm_slice, &mut fd_buf) - .expect("Did not receive a response"); + let (slice_size, fd_count) = self.stream.recv_fds(&mut shm_slice, &mut fd_buf)?; + //.expect("Did not receive a response"); + if slice_size == 0 && fd_count == 0 { + return Err(Error::illegal_state(format!("Tried to receive 20 bytes and one fd via unix shmem socket, but got {slice_size} bytes and {fd_count} fds."))); + } let server_id = ShMemId::from_array(&shm_slice); let server_fd: i32 = server_id.into(); @@ -179,7 +180,11 @@ where let service = ShMemService::::start(); let mut res = Self { - stream: UnixStream::connect_to_unix_addr(&UnixSocketAddr::new(UNIX_SERVER_NAME)?)?, + stream: UnixStream::connect_to_unix_addr(&UnixSocketAddr::new(UNIX_SERVER_NAME)?).map_err(|err| Error::illegal_state(if cfg!(target_vendor = "apple") { + format!("The ServedShMemProvider was not started or is no longer running. You may need to remove the './libafl_unix_shmem_server' file and retry. Error details: {err:?}") + } else { + format!("The ServedShMemProvider was not started or is no longer running. Error details: {err:?}") + }))?, inner: SP::new()?, id: -1, service, @@ -536,7 +541,18 @@ where } ServedShMemRequest::ExistingMap(description) => { let client = self.clients.get_mut(&client_id).unwrap(); - let description_id: i32 = description.id.into(); + + if description.id.is_empty() { + return Err(Error::illegal_state("Received empty ShMemId from unix shmem client. Are the shmem limits set correctly? Did a client crash?")); + } + + let description_id: i32 = description.id.try_into().unwrap(); + + if !self.all_shmems.contains_key(&description_id) { + // We should never get here, but it may happen if the OS ran out of shmem pages at some point//reached limits. + return Err(Error::illegal_state(format!("Client wanted to read from existing map with id {description_id}/{description:?}, but it was not allocated by this shmem server. Are the shmem limits set correctly? Did a client crash?"))); + } + if client.maps.contains_key(&description_id) { // Using let else here as self needs to be accessed in the else branch. #[allow(clippy::option_if_let_else)] diff --git a/libafl/src/bolts/shmem.rs b/libafl/src/bolts/shmem.rs index eb9ab4d856..0960933afe 100644 --- a/libafl/src/bolts/shmem.rs +++ b/libafl/src/bolts/shmem.rs @@ -123,6 +123,13 @@ impl ShMemId { Self { id: slice } } + /// Returns `true` if this `ShMemId` has an empty backing slice. + /// If this is the case something went wrong, and this `ShMemId` may not be read from. + #[must_use] + pub fn is_empty(&self) -> bool { + self.id[0] == 0 + } + /// Get the id as a fixed-length slice #[must_use] pub fn as_array(&self) -> &[u8; 20] {