From 5a14b870e2da201928023281e5a131a07f926d0d Mon Sep 17 00:00:00 2001 From: Dominik Maier Date: Thu, 15 Jul 2021 13:13:07 +0200 Subject: [PATCH] Added staterestore to restarting mgrs (#225) * added staterestore to simple restarting mgr * reworked launcher * ? instead of unwrap * no_std fixes * windows * fixed save fn * added llvm to dockerfile --- Dockerfile | 2 +- libafl/src/bolts/compress.rs | 1 - libafl/src/bolts/mod.rs | 2 + libafl/src/bolts/shmem.rs | 8 +- libafl/src/bolts/staterestore.rs | 179 +++++++++++++++++++++++ libafl/src/events/llmp.rs | 235 +++++++++++++------------------ libafl/src/events/simple.rs | 93 +++++------- 7 files changed, 316 insertions(+), 204 deletions(-) create mode 100644 libafl/src/bolts/staterestore.rs diff --git a/Dockerfile b/Dockerfile index a05fea51e9..d6712ecd4c 100644 --- a/Dockerfile +++ b/Dockerfile @@ -19,7 +19,7 @@ RUN sh -c 'echo set encoding=utf-8 > /root/.vimrc' \ RUN rustup component add rustfmt clippy # Install clang 11, common build tools -RUN apt update && apt install -y build-essential gdb git wget clang clang-tools libc++-11-dev libc++abi-11-dev +RUN apt update && apt install -y build-essential gdb git wget clang clang-tools libc++-11-dev libc++abi-11-dev llvm # Copy a dummy.rs and Cargo.toml first, so that dependencies are cached WORKDIR /libafl diff --git a/libafl/src/bolts/compress.rs b/libafl/src/bolts/compress.rs index cd4c4e3cc4..d05a4ab7dc 100644 --- a/libafl/src/bolts/compress.rs +++ b/libafl/src/bolts/compress.rs @@ -1,7 +1,6 @@ //! Compression of events passed between a broker and clients. //! Currently we use the gzip compression algorithm for its fast decompression performance. -#[cfg(feature = "llmp_compression")] use crate::Error; use alloc::vec::Vec; use core::fmt::Debug; diff --git a/libafl/src/bolts/mod.rs b/libafl/src/bolts/mod.rs index 77a6f9218d..e30ba00d12 100644 --- a/libafl/src/bolts/mod.rs +++ b/libafl/src/bolts/mod.rs @@ -9,6 +9,8 @@ pub mod ownedref; pub mod rands; pub mod serdeany; pub mod shmem; +#[cfg(feature = "std")] +pub mod staterestore; pub mod tuples; #[cfg(feature = "llmp_compression")] diff --git a/libafl/src/bolts/shmem.rs b/libafl/src/bolts/shmem.rs index 0ee13fe3a4..d2b832ffcc 100644 --- a/libafl/src/bolts/shmem.rs +++ b/libafl/src/bolts/shmem.rs @@ -159,7 +159,7 @@ pub trait ShMem: Sized + Debug + Clone { /// The actual shared map, mutable fn map_mut(&mut self) -> &mut [u8]; - /// + /// Write this map's config to env #[cfg(feature = "std")] fn write_to_env(&self, env_name: &str) -> Result<(), Error> { @@ -959,7 +959,7 @@ pub mod win32_shmem { } } - /// Deinit sharedmaps on drop + /// Deinit sharedmaps on [`Drop`] impl Drop for Win32ShMem { fn drop(&mut self) { unsafe { @@ -969,7 +969,7 @@ pub mod win32_shmem { } } - /// A ShMemProvider which uses win32 functions to provide shared memory mappings. + /// A [`ShMemProvider`] which uses `win32` functions to provide shared memory mappings. #[derive(Clone, Debug)] pub struct Win32ShMemProvider {} @@ -979,7 +979,7 @@ pub mod win32_shmem { } } - /// Implement ShMemProvider for Win32ShMemProvider + /// Implement [`ShMemProvider`] for [`Win32ShMemProvider`] impl ShMemProvider for Win32ShMemProvider { type Mem = Win32ShMem; diff --git a/libafl/src/bolts/staterestore.rs b/libafl/src/bolts/staterestore.rs new file mode 100644 index 0000000000..f874eb8464 --- /dev/null +++ b/libafl/src/bolts/staterestore.rs @@ -0,0 +1,179 @@ +/// Stores and restores state when a client needs to relaunch. +/// Uses a [`ShMem`] up to a threshold, then write to disk. +use ahash::AHasher; +use core::{hash::Hasher, marker::PhantomData, mem::size_of, ptr, slice}; +use postcard; +use serde::{de::DeserializeOwned, Serialize}; +use std::{ + env::temp_dir, + fs::File, + io::{Read, Write}, +}; + +use crate::{ + bolts::shmem::{ShMem, ShMemProvider}, + Error, +}; + +/// A [`StateRestorer`] saves and restores bytes to a shared map. +/// If the state gets larger than the preallocated [`ShMem`] shared map, +/// it will instead write to disk, and store the file name into the map. +/// Writing to [`StateRestorer`] multiple times is not allowed. +#[derive(Debug, Clone)] +pub struct StateRestorer +where + SP: ShMemProvider, +{ + shmem: SP::Mem, + phantom: PhantomData<*const SP>, +} + +#[repr(C)] +struct StateShMemContent { + is_disk: bool, + buf_len: usize, + buf: [u8; 0], +} + +impl StateRestorer +where + SP: ShMemProvider, +{ + /// Writes this [`StateRestorer`] to env variable, to be restored later + pub fn write_to_env(&self, env_name: &str) -> Result<(), Error> { + self.shmem.write_to_env(env_name) + } + + /// Create a [`StateRrestore`] from `env` variable name + pub fn from_env(shmem_provider: &mut SP, env_name: &str) -> Result { + Ok(Self::new(shmem_provider.existing_from_env(env_name)?)) + } + + /// Create a new [`StateRestorer`]. + pub fn new(shmem: SP::Mem) -> Self { + let mut ret = Self { + shmem, + phantom: PhantomData, + }; + ret.reset(); + ret + } + + /// Saves a state to the connected [`ShMem`], or a tmpfile, if its serialized size get too large. + pub fn save(&mut self, state: &S) -> Result<(), Error> + where + S: Serialize, + { + if self.has_content() { + return Err(Error::IllegalState( + "Trying to save state to a non-empty state map".to_string(), + )); + } + + let serialized = postcard::to_allocvec(state)?; + + if size_of::() + serialized.len() > self.shmem.len() { + // generate a filename + let mut hasher = AHasher::new_with_keys(0, 0); + hasher.write(&serialized[serialized.len() - 1024..]); + + let filename = format!("{:016x}.libafl_state", hasher.finish()); + let tmpfile = temp_dir().join(&filename); + File::open(tmpfile)?.write_all(&serialized)?; + + // write the filename to shmem + let filename_buf = postcard::to_allocvec(&filename)?; + let len = filename_buf.len(); + + /*println!( + "Storing {} bytes to tmpfile {} (larger than map of {} bytes)", + serialized.len(), + &filename, + self.shmem.len() + );*/ + + let shmem_content = self.content_mut(); + unsafe { + ptr::copy_nonoverlapping( + filename_buf.as_ptr() as *const u8, + shmem_content.buf.as_mut_ptr(), + len, + ); + } + shmem_content.buf_len = len; + } else { + // write to shmem directly + let len = serialized.len(); + let shmem_content = self.content_mut(); + unsafe { + ptr::copy_nonoverlapping( + serialized.as_ptr() as *const u8, + shmem_content.buf.as_mut_ptr(), + len, + ); + } + shmem_content.buf_len = len; + }; + Ok(()) + } + + /// Reset this [`StateRestorer`] to an empty state. + pub fn reset(&mut self) { + let content_mut = self.content_mut(); + content_mut.is_disk = false; + content_mut.buf_len = 0; + } + + fn content_mut(&mut self) -> &mut StateShMemContent { + let ptr = self.shmem.map().as_ptr(); + #[allow(clippy::cast_ptr_alignment)] // Beginning of the page will always be aligned + unsafe { + &mut *(ptr as *mut StateShMemContent) + } + } + + fn content(&self) -> &StateShMemContent { + #[allow(clippy::cast_ptr_alignment)] // Beginning of the page will always be aligned + let ptr = self.shmem.map().as_ptr() as *const StateShMemContent; + unsafe { &*(ptr) } + } + + pub fn has_content(&self) -> bool { + self.content().buf_len > 0 + } + + pub fn restore(&self) -> Result, Error> + where + S: DeserializeOwned, + { + if self.has_content() { + return Ok(Option::None); + } + let state_shmem_content = self.content(); + let bytes = unsafe { + slice::from_raw_parts( + state_shmem_content.buf.as_ptr(), + state_shmem_content.buf_len, + ) + }; + let mut state = bytes; + let mut file_content; + if state_shmem_content.buf_len == 0 { + return Ok(Option::None); + } else if state_shmem_content.is_disk { + let filename: String = postcard::from_bytes(bytes)?; + let tmpfile = temp_dir().join(&filename); + file_content = vec![]; + File::open(tmpfile)?.read_to_end(&mut file_content)?; + if file_content.is_empty() { + return Err(Error::IllegalState(format!( + "Colud not restore state from file {}", + &filename + ))); + } + state = &file_content + } + let deserialized = postcard::from_bytes(state)?; + Ok(Some(deserialized)) + } +} diff --git a/libafl/src/events/llmp.rs b/libafl/src/events/llmp.rs index 95b09a388b..f8cd765f0b 100644 --- a/libafl/src/events/llmp.rs +++ b/libafl/src/events/llmp.rs @@ -1,30 +1,27 @@ //! LLMP-backed event manager for scalable multi-processed fuzzing -use alloc::{ - string::{String, ToString}, - vec::Vec, -}; +use alloc::string::{String, ToString}; use core::{marker::PhantomData, time::Duration}; -use serde::{de::DeserializeOwned, Serialize}; +#[cfg(feature = "std")] +use core::sync::atomic::{compiler_fence, Ordering}; #[cfg(feature = "std")] use core_affinity::CoreId; - #[cfg(feature = "std")] -use core::ptr::{addr_of, read_volatile}; - -#[cfg(feature = "std")] -use crate::bolts::{ - llmp::{LlmpClient, LlmpConnection, LlmpReceiver}, - shmem::StdShMemProvider, -}; - +use serde::{de::DeserializeOwned, Serialize}; #[cfg(feature = "std")] use std::net::{SocketAddr, ToSocketAddrs}; +#[cfg(feature = "std")] +use crate::bolts::{ + llmp::{LlmpClient, LlmpConnection}, + shmem::StdShMemProvider, + staterestore::StateRestorer, +}; + use crate::{ bolts::{ - llmp::{self, Flags, LlmpClientDescription, LlmpSender, Tag}, + llmp::{self, Flags, LlmpClientDescription, Tag}, shmem::ShMemProvider, }, events::{ @@ -130,7 +127,7 @@ where #[cfg(feature = "llmp_compression")] let compressor = &self.compressor; self.llmp.loop_forever( - &mut |sender_id: u32, tag: Tag, _flags: Flags, msg: &[u8]| { + &mut |client_id: u32, tag: Tag, _flags: Flags, msg: &[u8]| { if tag == LLMP_TAG_EVENT_TO_BOTH { #[cfg(not(feature = "llmp_compression"))] let event_bytes = msg; @@ -144,7 +141,7 @@ where msg }; let event: Event = postcard::from_bytes(event_bytes)?; - match Self::handle_in_broker(stats, sender_id, &event)? { + match Self::handle_in_broker(stats, client_id, &event)? { BrokerEventResult::Forward => Ok(llmp::LlmpMsgHookResult::ForwardToClients), BrokerEventResult::Handled => Ok(llmp::LlmpMsgHookResult::Handled), } @@ -162,7 +159,7 @@ where #[allow(clippy::unnecessary_wraps)] fn handle_in_broker( stats: &mut ST, - sender_id: u32, + client_id: u32, event: &Event, ) -> Result { match &event { @@ -175,10 +172,10 @@ where time, executions, } => { - let client = stats.client_stats_mut_for(sender_id); + let client = stats.client_stats_mut_for(client_id); client.update_corpus_size(*corpus_size as u64); client.update_executions(*executions as u64, *time); - stats.display(event.name().to_string(), sender_id); + stats.display(event.name().to_string(), client_id); Ok(BrokerEventResult::Forward) } Event::UpdateStats { @@ -187,9 +184,9 @@ where phantom: _, } => { // TODO: The stats buffer should be added on client add. - let client = stats.client_stats_mut_for(sender_id); + let client = stats.client_stats_mut_for(client_id); client.update_executions(*executions as u64, *time); - stats.display(event.name().to_string(), sender_id); + stats.display(event.name().to_string(), client_id); Ok(BrokerEventResult::Handled) } Event::UpdateUserStats { @@ -197,9 +194,9 @@ where value, phantom: _, } => { - let client = stats.client_stats_mut_for(sender_id); + let client = stats.client_stats_mut_for(client_id); client.update_user_stats(name.clone(), value.clone()); - stats.display(event.name().to_string(), sender_id); + stats.display(event.name().to_string(), client_id); Ok(BrokerEventResult::Handled) } #[cfg(feature = "introspection")] @@ -211,8 +208,8 @@ where } => { // TODO: The stats buffer should be added on client add. - // Get the client for the sender ID - let client = stats.client_stats_mut_for(sender_id); + // Get the client for the staterestorer ID + let client = stats.client_stats_mut_for(client_id); // Update the normal stats for this client client.update_executions(*executions as u64, *time); @@ -221,15 +218,15 @@ where client.update_introspection_stats((**introspection_stats).clone()); // Display the stats via `.display` only on core #1 - stats.display(event.name().to_string(), sender_id); + stats.display(event.name().to_string(), client_id); // Correctly handled the event Ok(BrokerEventResult::Handled) } Event::Objective { objective_size } => { - let client = stats.client_stats_mut_for(sender_id); + let client = stats.client_stats_mut_for(client_id); client.update_objective_size(*objective_size as u64); - stats.display(event.name().to_string(), sender_id); + stats.display(event.name().to_string(), client_id); Ok(BrokerEventResult::Handled) } Event::Log { @@ -360,7 +357,7 @@ where fuzzer: &mut Z, executor: &mut E, state: &mut S, - _sender_id: u32, + _client_id: u32, event: Event, ) -> Result<(), Error> where @@ -381,7 +378,7 @@ where #[cfg(feature = "std")] println!( "Received new Testcase from {} ({}) {:?}", - _sender_id, client_config, input + _client_id, client_config, input ); let _res = if client_config == self.configuration { @@ -470,11 +467,11 @@ where // TODO: Get around local event copy by moving handle_in_client let mut events = vec![]; let self_id = self.llmp.sender.id; - while let Some((sender_id, tag, _flags, msg)) = self.llmp.recv_buf_with_flags()? { + while let Some((client_id, tag, _flags, msg)) = self.llmp.recv_buf_with_flags()? { if tag == _LLMP_TAG_EVENT_TO_BROKER { panic!("EVENT_TO_BROKER parcel should not have arrived in the client!"); } - if sender_id == self_id { + if client_id == self_id { continue; } #[cfg(not(feature = "llmp_compression"))] @@ -489,11 +486,11 @@ where msg }; let event: Event = postcard::from_bytes(event_bytes)?; - events.push((sender_id, event)); + events.push((client_id, event)); } let count = events.len(); - events.drain(..).try_for_each(|(sender_id, event)| { - self.handle_in_client(fuzzer, executor, state, sender_id, event) + events.drain(..).try_for_each(|(client_id, event)| { + self.handle_in_client(fuzzer, executor, state, client_id, event) })?; Ok(count) } @@ -515,7 +512,7 @@ where OT: ObserversTuple + serde::de::DeserializeOwned, SP: ShMemProvider, { - /// Gets the id assigned to this sender. + /// Gets the id assigned to this staterestorer. fn mgr_id(&self) -> EventManagerId { EventManagerId { id: self.llmp.sender.id as usize, @@ -523,47 +520,8 @@ where } } -/// Serialize the current state and corpus during an executiont to bytes. -/// On top, add the current llmp event manager instance to be restored -/// This method is needed when the fuzzer run crashes and has to restart. -pub fn serialize_state_mgr( - state: &S, - mgr: &LlmpEventManager, -) -> Result, Error> -where - I: Input, - OT: ObserversTuple, - S: Serialize, - SP: ShMemProvider, -{ - Ok(postcard::to_allocvec(&(&state, &mgr.describe()?))?) -} - -/// Deserialize the state and corpus tuple, previously serialized with `serialize_state_corpus(...)` -#[allow(clippy::type_complexity)] -pub fn deserialize_state_mgr( - shmem_provider: SP, - state_corpus_serialized: &[u8], - configuration: String, -) -> Result<(S, LlmpEventManager), Error> -where - I: Input, - OT: ObserversTuple, - S: DeserializeOwned, - SP: ShMemProvider, -{ - let tuple: (S, _) = postcard::from_bytes(state_corpus_serialized)?; - Ok(( - tuple.0, - LlmpEventManager::existing_client_from_description( - shmem_provider, - &tuple.1, - configuration, - )?, - )) -} - /// A manager that can restart on the fly, storing states in-between (in `on_resatrt`) +#[cfg(feature = "std")] #[derive(Debug)] pub struct LlmpRestartingEventManager where @@ -574,10 +532,11 @@ where { /// The embedded llmp event manager llmp_mgr: LlmpEventManager, - /// The sender to serialize the state for the next runner - sender: LlmpSender, + /// The staterestorer to serialize the state for the next runner + staterestorer: StateRestorer, } +#[cfg(feature = "std")] impl EventFirer for LlmpRestartingEventManager where I: Input, @@ -596,6 +555,7 @@ where } } +#[cfg(feature = "std")] impl EventRestarter for LlmpRestartingEventManager where I: Input, @@ -614,15 +574,13 @@ where /// Reset the single page (we reuse it over and over from pos 0), then send the current state to the next runner. fn on_restart(&mut self, state: &mut S) -> Result<(), Error> { // First, reset the page to 0 so the next iteration can read read from the beginning of this page - unsafe { - self.sender.reset(); - } - let state_corpus_serialized = serialize_state_mgr(state, &self.llmp_mgr)?; - self.sender - .send_buf(_LLMP_TAG_RESTART, &state_corpus_serialized) + self.staterestorer.reset(); + self.staterestorer + .save(&(state, &self.llmp_mgr.describe()?)) } } +#[cfg(feature = "std")] impl EventProcessor for LlmpRestartingEventManager where E: Executor, I, S, Z> + HasObservers, @@ -637,6 +595,7 @@ where } } +#[cfg(feature = "std")] impl EventManager for LlmpRestartingEventManager where E: Executor, I, S, Z> + HasObservers, @@ -649,6 +608,7 @@ where { } +#[cfg(feature = "std")] impl HasEventManagerId for LlmpRestartingEventManager where I: Input, @@ -667,6 +627,7 @@ const _ENV_FUZZER_RECEIVER: &str = "_AFL_ENV_FUZZER_RECEIVER"; /// The llmp (2 way) connection from a fuzzer to the broker (broadcasting all other fuzzer messages) const _ENV_FUZZER_BROKER_CLIENT_INITIAL: &str = "_AFL_ENV_FUZZER_BROKER_CLIENT"; +#[cfg(feature = "std")] impl LlmpRestartingEventManager where I: Input, @@ -675,18 +636,21 @@ where //CE: CustomEvent, { /// Create a new runner, the executed child doing the actual fuzzing. - pub fn new(llmp_mgr: LlmpEventManager, sender: LlmpSender) -> Self { - Self { llmp_mgr, sender } + pub fn new(llmp_mgr: LlmpEventManager, staterestorer: StateRestorer) -> Self { + Self { + llmp_mgr, + staterestorer, + } } - /// Get the sender - pub fn sender(&self) -> &LlmpSender { - &self.sender + /// Get the staterestorer + pub fn staterestorer(&self) -> &StateRestorer { + &self.staterestorer } - /// Get the sender (mut) - pub fn sender_mut(&mut self) -> &mut LlmpSender { - &mut self.sender + /// Get the staterestorer (mut) + pub fn staterestorer_mut(&mut self) -> &mut StateRestorer { + &mut self.staterestorer } } @@ -788,10 +752,8 @@ where &mut self, ) -> Result<(Option, LlmpRestartingEventManager), Error> { // We start ourself as child process to actually fuzz - let (sender, mut receiver, new_shmem_provider, core_id) = if std::env::var( - _ENV_FUZZER_SENDER, - ) - .is_err() + let (staterestorer, new_shmem_provider, core_id) = if std::env::var(_ENV_FUZZER_SENDER) + .is_err() { let broker_things = |mut broker: LlmpEventBroker, remote_broker_addr| { if let Some(remote_broker_addr) = remote_broker_addr { @@ -863,17 +825,11 @@ where // We are the fuzzer respawner in a llmp client mgr.to_env(_ENV_FUZZER_BROKER_CLIENT_INITIAL); - // First, create a channel from the fuzzer (sender) to us (receiver) to report its state for restarts. - let sender = { LlmpSender::new(self.shmem_provider.clone(), 0, false)? }; - - let map = { - self.shmem_provider - .clone_ref(&sender.out_maps.last().unwrap().shmem)? - }; - let receiver = LlmpReceiver::on_existing_map(self.shmem_provider.clone(), map, None)?; + // First, create a channel from the current fuzzer to the next to store state between restarts. + let staterestorer: StateRestorer = + StateRestorer::new(self.shmem_provider.new_map(256 * 1024 * 1024)?); // Store the information to a map. - sender.to_env(_ENV_FUZZER_SENDER)?; - receiver.to_env(_ENV_FUZZER_RECEIVER)?; + staterestorer.write_to_env(_ENV_FUZZER_SENDER)?; let mut ctr: u64 = 0; // Client->parent loop @@ -891,7 +847,7 @@ where } ForkResult::Child => { self.shmem_provider.post_fork(true)?; - break (sender, receiver, self.shmem_provider.clone(), core_id); + break (staterestorer, self.shmem_provider.clone(), core_id); } } }; @@ -900,9 +856,9 @@ where #[cfg(windows)] let child_status = startable_self()?.status()?; - if unsafe { read_volatile(addr_of!((*receiver.current_recv_map.page()).size_used)) } - == 0 - { + compiler_fence(Ordering::SeqCst); + + if !staterestorer.has_content() { #[cfg(unix)] if child_status == 137 { // Out of Memory, see https://tldp.org/LDP/abs/html/exitcodes.html @@ -919,13 +875,9 @@ where } else { // We are the newly started fuzzing instance (i.e. on Windows), first, connect to our own restore map. // We get here *only on Windows*, if we were started by a restarting fuzzer. - // A sender and a receiver for single communication + // A staterestorer and a receiver for single communication ( - LlmpSender::on_existing_from_env(self.shmem_provider.clone(), _ENV_FUZZER_SENDER)?, - LlmpReceiver::on_existing_from_env( - self.shmem_provider.clone(), - _ENV_FUZZER_RECEIVER, - )?, + StateRestorer::from_env(&mut self.shmem_provider, _ENV_FUZZER_SENDER)?, self.shmem_provider.clone(), None, ) @@ -938,35 +890,36 @@ where println!("We're a client, let's fuzz :)"); // If we're restarting, deserialize the old state. - let (state, mut mgr) = match receiver.recv_buf()? { - None => { - println!("First run. Let's set it all up"); - // Mgr to send and receive msgs from/to all other fuzzer instances - let mgr = LlmpEventManager::::existing_client_from_env( - new_shmem_provider, - _ENV_FUZZER_BROKER_CLIENT_INITIAL, - self.configuration.clone(), - )?; + let (state, mut mgr) = if let Some((state, mgr_description)) = staterestorer.restore()? { + ( + state, + LlmpRestartingEventManager::new( + LlmpEventManager::existing_client_from_description( + new_shmem_provider, + &mgr_description, + self.configuration.clone(), + )?, + staterestorer, + ), + ) + } else { + println!("First run. Let's set it all up"); + // Mgr to send and receive msgs from/to all other fuzzer instances + let mgr = LlmpEventManager::::existing_client_from_env( + new_shmem_provider, + _ENV_FUZZER_BROKER_CLIENT_INITIAL, + self.configuration.clone(), + )?; - (None, LlmpRestartingEventManager::new(mgr, sender)) - } - // Restoring from a previous run, deserialize state and corpus. - Some((_sender, _tag, msg)) => { - println!("Subsequent run. Let's load all data from shmem (received {} bytes from previous instance)", msg.len()); - let (state, mgr): (S, LlmpEventManager) = - deserialize_state_mgr(new_shmem_provider, msg, self.configuration.clone())?; - - (Some(state), LlmpRestartingEventManager::new(mgr, sender)) - } + (None, LlmpRestartingEventManager::new(mgr, staterestorer)) }; - // We reset the sender, the next sender and receiver (after crash) will reuse the page from the initial message. - unsafe { - mgr.sender_mut().reset(); - } + // We reset the staterestorer, the next staterestorer and receiver (after crash) will reuse the page from the initial message. + mgr.staterestorer.reset(); + /* TODO: Not sure if this is needed // We commit an empty NO_RESTART message to this buf, against infinite loops, // in case something crashes in the fuzzer. - sender.send_buf(_LLMP_TAG_NO_RESTART, []); + staterestorer.send_buf(_LLMP_TAG_NO_RESTART, []); */ Ok((state, mgr)) diff --git a/libafl/src/events/simple.rs b/libafl/src/events/simple.rs index 7fa5799e91..384aef0039 100644 --- a/libafl/src/events/simple.rs +++ b/libafl/src/events/simple.rs @@ -1,18 +1,6 @@ //! A very simple event manager, that just supports log outputs, but no multiprocessing -use alloc::{string::ToString, vec::Vec}; -#[cfg(feature = "std")] -use core::{ - marker::PhantomData, - ptr::{addr_of, read_volatile}, -}; -#[cfg(feature = "std")] -use serde::{de::DeserializeOwned, Serialize}; -#[cfg(feature = "std")] -use std::convert::TryInto; - use crate::{ - bolts::llmp, events::{ BrokerEventResult, Event, EventFirer, EventManager, EventManagerId, EventProcessor, EventRestarter, HasEventManagerId, @@ -21,6 +9,15 @@ use crate::{ stats::Stats, Error, }; +use alloc::{string::ToString, vec::Vec}; +#[cfg(feature = "std")] +use core::{ + convert::TryInto, + marker::PhantomData, + sync::atomic::{compiler_fence, Ordering}, +}; +#[cfg(feature = "std")] +use serde::{de::DeserializeOwned, Serialize}; #[cfg(all(feature = "std", windows))] use crate::bolts::os::startable_self; @@ -28,10 +25,7 @@ use crate::bolts::os::startable_self; use crate::bolts::os::{fork, ForkResult}; #[cfg(feature = "std")] use crate::{ - bolts::{ - llmp::{LlmpReceiver, LlmpSender}, - shmem::ShMemProvider, - }, + bolts::{shmem::ShMemProvider, staterestore::StateRestorer}, corpus::Corpus, state::{HasCorpus, HasSolutions}, }; @@ -42,9 +36,6 @@ const _ENV_FUZZER_RECEIVER: &str = "_AFL_ENV_FUZZER_RECEIVER"; /// The llmp (2 way) connection from a fuzzer to the broker (broadcasting all other fuzzer messages) const _ENV_FUZZER_BROKER_CLIENT_INITIAL: &str = "_AFL_ENV_FUZZER_BROKER_CLIENT"; -/// We're restarting right now. -const _LLMP_TAG_RESTART: llmp::Tag = 0x8357A87; - /// A simple, single-threaded event manager that just logs #[derive(Clone, Debug)] pub struct SimpleEventManager @@ -233,8 +224,8 @@ where { /// The actual simple event mgr simple_event_mgr: SimpleEventManager, - /// [`LlmpSender`] for restarts - sender: LlmpSender, + /// [`StateRestorer`] for restarts + staterestorer: StateRestorer, /// Phantom data _phantom: PhantomData<&'a (C, I, S)>, } @@ -265,11 +256,8 @@ where /// Reset the single page (we reuse it over and over from pos 0), then send the current state to the next runner. fn on_restart(&mut self, state: &mut S) -> Result<(), Error> { // First, reset the page to 0 so the next iteration can read read from the beginning of this page - unsafe { - self.sender.reset(); - } - self.sender - .send_buf(_LLMP_TAG_RESTART, &postcard::to_allocvec(state)?) + self.staterestorer.reset(); + self.staterestorer.save(state) } } @@ -325,9 +313,9 @@ where ST: Stats, //TODO CE: CustomEvent, { /// Creates a new [`SimpleEventManager`]. - fn new_launched(stats: ST, sender: LlmpSender) -> Self { + fn new_launched(stats: ST, staterestorer: StateRestorer) -> Self { Self { - sender, + staterestorer, simple_event_mgr: SimpleEventManager::new(stats), _phantom: PhantomData {}, } @@ -339,15 +327,12 @@ where #[allow(clippy::similar_names)] pub fn launch(mut stats: ST, shmem_provider: &mut SP) -> Result<(Option, Self), Error> { // We start ourself as child process to actually fuzz - let (mut sender, mut receiver) = if std::env::var(_ENV_FUZZER_SENDER).is_err() { - // First, create a channel from the fuzzer (sender) to us (receiver) to report its state for restarts. - let sender = { LlmpSender::new(shmem_provider.clone(), 0, false)? }; - - let map = { shmem_provider.clone_ref(&sender.out_maps.last().unwrap().shmem)? }; - let receiver = LlmpReceiver::on_existing_map(shmem_provider.clone(), map, None)?; - // Store the information to a map. - sender.to_env(_ENV_FUZZER_SENDER)?; - receiver.to_env(_ENV_FUZZER_RECEIVER)?; + let mut staterestorer = if std::env::var(_ENV_FUZZER_SENDER).is_err() { + // First, create a place to store state in, for restarts. + let staterestorer: StateRestorer = + StateRestorer::new(shmem_provider.new_map(256 * 1024 * 1024)?); + //let staterestorer = { LlmpSender::new(shmem_provider.clone(), 0, false)? }; + staterestorer.write_to_env(_ENV_FUZZER_SENDER)?; let mut ctr: u64 = 0; // Client->parent loop @@ -365,7 +350,7 @@ where } ForkResult::Child => { shmem_provider.post_fork(true)?; - break (sender, receiver); + break staterestorer; } } }; @@ -374,9 +359,9 @@ where #[cfg(windows)] let child_status = startable_self()?.status()?; - if unsafe { read_volatile(addr_of!((*receiver.current_recv_map.page()).size_used)) } - == 0 - { + compiler_fence(Ordering::SeqCst); + + if !staterestorer.has_content() { #[cfg(unix)] if child_status == 137 { // Out of Memory, see https://tldp.org/LDP/abs/html/exitcodes.html @@ -393,33 +378,27 @@ where } else { // We are the newly started fuzzing instance (i.e. on Windows), first, connect to our own restore map. // We get here *only on Windows*, if we were started by a restarting fuzzer. - // A sender and a receiver for single communication - ( - LlmpSender::on_existing_from_env(shmem_provider.clone(), _ENV_FUZZER_SENDER)?, - LlmpReceiver::on_existing_from_env(shmem_provider.clone(), _ENV_FUZZER_RECEIVER)?, - ) + // A staterestorer and a receiver for single communication + StateRestorer::from_env(shmem_provider, _ENV_FUZZER_SENDER)? }; println!("We're a client, let's fuzz :)"); // If we're restarting, deserialize the old state. - let (state, mgr) = match receiver.recv_buf()? { + let (state, mgr) = match staterestorer.restore::()? { None => { println!("First run. Let's set it all up"); // Mgr to send and receive msgs from/to all other fuzzer instances ( None, - SimpleRestartingEventManager::new_launched(stats, sender), + SimpleRestartingEventManager::new_launched(stats, staterestorer), ) } // Restoring from a previous run, deserialize state and corpus. - Some((_sender, _tag, msg)) => { - println!("Subsequent run. Let's load all data from shmem (received {} bytes from previous instance)", msg.len()); - let state: S = postcard::from_bytes(msg)?; - // We reset the sender, the next sender and receiver (after crash) will reuse the page from the initial message. - unsafe { - sender.reset(); - } + Some(state) => { + println!("Subsequent run. Loaded previous state."); + // We reset the staterestorer, the next staterestorer and receiver (after crash) will reuse the page from the initial message. + staterestorer.reset(); // load the corpus size into stats to still display the correct numbers after restart. let client_stats = stats.client_stats_mut_for(0); @@ -428,7 +407,7 @@ where ( Some(state), - SimpleRestartingEventManager::new_launched(stats, sender), + SimpleRestartingEventManager::new_launched(stats, staterestorer), ) } }; @@ -436,7 +415,7 @@ where /* TODO: Not sure if this is needed // We commit an empty NO_RESTART message to this buf, against infinite loops, // in case something crashes in the fuzzer. - sender.send_buf(_LLMP_TAG_NO_RESTART, []); + staterestorer.send_buf(_LLMP_TAG_NO_RESTART, []); */ Ok((state, mgr))