Merge LlmpEventManager and LlmpRestartingEventManager (#2891)

* add

* add 2

* feature

* fix nyx launcher

* a bit of doc

* addressing comments
This commit is contained in:
Dongjia "toka" Zhang 2025-01-26 13:43:04 +01:00 committed by GitHub
parent 1addbd04b9
commit 133a0ffe7a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 520 additions and 782 deletions

View File

@ -72,7 +72,7 @@ So the outgoing messages flow is like this over the outgoing broadcast `Shmem`:
[client0] [client1] ... [clientN]
```
To use `LLMP` in LibAFL, you usually want to use an `LlmpEventManager` or its restarting variant.
To use `LLMP` in LibAFL, you usually want to use an `LlmpRestartingEventManager` or its restarting variant.
They are the default if using LibAFL's `Launcher`.
If you should want to use `LLMP` in its raw form, without any `LibAFL` abstractions, take a look at the `llmp_test` example in [./libafl/examples](https://github.com/AFLplusplus/LibAFL/blob/main/libafl_bolts/examples/llmp_test/main.rs).

View File

@ -58,6 +58,6 @@ For more examples, you can check out `qemu_launcher` and `libfuzzer_libpng_launc
## Other ways
The `LlmpEventManager` family is the easiest way to spawn instances, but for obscure targets, you may need to come up with other solutions.
The `LlmpRestartEventManager` is the easiest way to spawn instances, but for obscure targets, you may need to come up with other solutions.
LLMP is even, in theory, `no_std` compatible, and even completely different EventManagers can be used for message passing.
If you are in this situation, please either read through the current implementations and/or reach out to us.

View File

@ -1,8 +1,8 @@
# Fuzzbench Harness
This folder contains an example fuzzer tailored for fuzzbench.
It uses the best possible setting, with the exception of a SimpleRestartingEventManager instead of an LlmpEventManager - since fuzzbench is single threaded.
Real fuzz campaigns should consider using multithreaded LlmpEventManager, see the other examples.
It uses the best possible setting, with the exception of a SimpleRestartingEventManager instead of an LlmpRestartingEventManager - since fuzzbench is single threaded.
Real fuzz campaigns should consider using multithreaded LlmpRestaringtEventManager, see the other examples.
## Build

View File

@ -288,11 +288,6 @@ pub fn fuzz() {
.monitor(MultiMonitor::new(|s| println!("{s}")))
.run_client(&mut run_client)
.cores(&options.cores)
.stdout_file(if options.verbose {
None
} else {
Some("/dev/null")
})
.build()
.launch()
{

View File

@ -10,7 +10,7 @@ use libafl::events::SimpleEventManager;
#[cfg(not(feature = "simplemgr"))]
use libafl::events::{EventConfig, Launcher, MonitorTypedEventManager};
use libafl::{
events::{ClientDescription, LlmpEventManager, LlmpRestartingEventManager},
events::{ClientDescription, LlmpEventManagerBuilder},
monitors::{tui::TuiMonitor, Monitor, MultiMonitor},
Error,
};
@ -114,17 +114,17 @@ impl Fuzzer {
// To rerun an input, instead of using a launcher, we create dummy parameters and run the client directly.
return client.run(
None,
MonitorTypedEventManager::<_, M>::new(LlmpRestartingEventManager::new(
LlmpEventManager::builder()
.build_on_port(
shmem_provider.clone(),
broker_port,
EventConfig::AlwaysUnique,
None,
)
.unwrap(),
StateRestorer::new(shmem_provider.new_shmem(0x1000).unwrap()),
)),
MonitorTypedEventManager::<_, M>::new(
LlmpEventManagerBuilder::builder().build_on_port(
shmem_provider.clone(),
broker_port,
EventConfig::AlwaysUnique,
None,
Some(StateRestorer::new(
shmem_provider.new_shmem(0x1000).unwrap(),
)),
)?,
),
ClientDescription::new(0, 0, CoreId(0)),
);
}

View File

@ -7,8 +7,7 @@ use std::{
use clap::Parser;
use libafl::{
events::{
ClientDescription, EventConfig, Launcher, LlmpEventManager, LlmpRestartingEventManager,
MonitorTypedEventManager,
ClientDescription, EventConfig, Launcher, LlmpEventManagerBuilder, MonitorTypedEventManager,
},
monitors::{tui::TuiMonitor, Monitor, MultiMonitor},
Error,
@ -111,17 +110,17 @@ impl Fuzzer {
// To rerun an input, instead of using a launcher, we create dummy parameters and run the client directly.
return client.run(
None,
MonitorTypedEventManager::<_, M>::new(LlmpRestartingEventManager::new(
LlmpEventManager::builder()
.build_on_port(
shmem_provider.clone(),
broker_port,
EventConfig::AlwaysUnique,
None,
)
.unwrap(),
StateRestorer::new(shmem_provider.new_shmem(0x1000).unwrap()),
)),
MonitorTypedEventManager::<_, M>::new(
LlmpEventManagerBuilder::builder().build_on_port(
shmem_provider.clone(),
broker_port,
EventConfig::AlwaysUnique,
None,
Some(StateRestorer::new(
shmem_provider.new_shmem(0x1000).unwrap(),
)),
)?,
),
ClientDescription::new(0, 0, CoreId(0)),
);
}

View File

@ -1,8 +1,8 @@
# Fuzzbench Harness
This folder contains an example fuzzer tailored for fuzzbench.
It uses the best possible setting, with the exception of a SimpleRestartingEventManager instead of an LlmpEventManager - since fuzzbench is single threaded.
Real fuzz campaigns should consider using multithreaded LlmpEventManager, see the other examples.
It uses the best possible setting, with the exception of a SimpleRestartingEventManager instead of an LlmpRestartEventManager - since fuzzbench is single threaded.
Real fuzz campaigns should consider using multithreaded LlmpRestartEventManager, see the other examples.
## Build

View File

@ -1,8 +1,8 @@
# Fuzzbench Harness (text)
This folder contains an example fuzzer tailored for fuzzbench.
It uses the best possible setting, with the exception of a SimpleRestartingEventManager instead of an LlmpEventManager - since fuzzbench is single threaded.
Real fuzz campaigns should consider using multithreaded LlmpEventManager, see the other examples.
It uses the best possible setting, with the exception of a SimpleRestartingEventManager instead of an LlmpRestartEventManager - since fuzzbench is single threaded.
Real fuzz campaigns should consider using multithreaded LlmpRestartEventManager, see the other examples.
This fuzzer autodetect if the passed-in tokens and the initial inputs are text or binary data, and enables Grimoire in case of text.

View File

@ -1,629 +0,0 @@
//! An event manager that forwards all events to other attached fuzzers on shared maps or via tcp,
//! using low-level message passing, [`libafl_bolts::llmp`].
#[cfg(feature = "std")]
use alloc::string::ToString;
use alloc::vec::Vec;
use core::{fmt::Debug, marker::PhantomData, time::Duration};
#[cfg(feature = "std")]
use std::net::TcpStream;
#[cfg(feature = "std")]
use libafl_bolts::tuples::MatchNameRef;
#[cfg(feature = "llmp_compression")]
use libafl_bolts::{
compress::GzipCompressor,
llmp::{LLMP_FLAG_COMPRESSED, LLMP_FLAG_INITIALIZED},
};
use libafl_bolts::{
current_time,
llmp::{LlmpClient, LlmpClientDescription, LLMP_FLAG_FROM_MM},
shmem::{NopShMem, ShMem, ShMemProvider},
tuples::Handle,
ClientId,
};
#[cfg(feature = "std")]
use libafl_bolts::{
llmp::{recv_tcp_msg, send_tcp_msg, TcpRequest, TcpResponse},
IP_LOCALHOST,
};
use serde::{de::DeserializeOwned, Serialize};
#[cfg(feature = "share_objectives")]
use crate::corpus::{Corpus, Testcase};
#[cfg(feature = "llmp_compression")]
use crate::events::llmp::COMPRESS_THRESHOLD;
#[cfg(feature = "std")]
use crate::events::{serialize_observers_adaptive, CanSerializeObserver};
use crate::{
events::{
llmp::{LLMP_TAG_EVENT_TO_BOTH, _LLMP_TAG_EVENT_TO_BROKER},
std_maybe_report_progress, std_on_restart, std_report_progress, AdaptiveSerializer,
AwaitRestartSafe, Event, EventConfig, EventFirer, EventManagerHooksTuple, EventManagerId,
EventProcessor, EventRestarter, HasEventManagerId, ProgressReporter, SendExiting,
},
executors::HasObservers,
fuzzer::{EvaluatorObservers, ExecutionProcessor},
inputs::Input,
observers::TimeObserver,
stages::HasCurrentStageId,
state::{
HasCurrentTestcase, HasExecutions, HasImported, HasLastReportTime, HasSolutions,
MaybeHasClientPerfMonitor, Stoppable,
},
Error, HasMetadata,
};
/// Default initial capacity of the event buffer - 4KB
const INITIAL_EVENT_BUFFER_SIZE: usize = 1024 * 4;
/// An `EventManager` that forwards all events to other attached fuzzers on shared maps or via tcp,
/// using low-level message passing, `llmp`.
pub struct LlmpEventManager<EMH, I, S, SHM, SP>
where
SHM: ShMem,
{
/// We only send 1 testcase for every `throttle` second
pub(crate) throttle: Option<Duration>,
/// We sent last message at `last_sent`
last_sent: Duration,
hooks: EMH,
/// The LLMP client for inter process communication
llmp: LlmpClient<SHM, SP>,
#[cfg(feature = "llmp_compression")]
compressor: GzipCompressor,
/// The configuration defines this specific fuzzer.
/// A node will not re-use the observer values sent over LLMP
/// from nodes with other configurations.
configuration: EventConfig,
serialization_time: Duration,
deserialization_time: Duration,
serializations_cnt: usize,
should_serialize_cnt: usize,
pub(crate) time_ref: Option<Handle<TimeObserver>>,
event_buffer: Vec<u8>,
phantom: PhantomData<(I, S)>,
}
impl LlmpEventManager<(), (), (), NopShMem, ()> {
/// Creates a builder for [`LlmpEventManager`]
#[must_use]
pub fn builder() -> LlmpEventManagerBuilder<()> {
LlmpEventManagerBuilder::new()
}
}
/// Builder for `LlmpEventManager`
#[derive(Debug, Copy, Clone)]
pub struct LlmpEventManagerBuilder<EMH> {
throttle: Option<Duration>,
hooks: EMH,
}
impl Default for LlmpEventManagerBuilder<()> {
fn default() -> Self {
Self::new()
}
}
impl LlmpEventManagerBuilder<()> {
/// Create a new `LlmpEventManagerBuilder`
#[must_use]
pub fn new() -> Self {
Self {
throttle: None,
hooks: (),
}
}
/// Add hooks to it
pub fn hooks<EMH>(self, hooks: EMH) -> LlmpEventManagerBuilder<EMH> {
LlmpEventManagerBuilder {
throttle: self.throttle,
hooks,
}
}
}
impl<EMH> LlmpEventManagerBuilder<EMH> {
/// Change the sampling rate
#[must_use]
pub fn throttle(mut self, throttle: Duration) -> Self {
self.throttle = Some(throttle);
self
}
/// Create a manager from a raw LLMP client
pub fn build_from_client<I, S, SHM, SP>(
self,
llmp: LlmpClient<SHM, SP>,
configuration: EventConfig,
time_ref: Option<Handle<TimeObserver>>,
) -> Result<LlmpEventManager<EMH, I, S, SHM, SP>, Error>
where
SHM: ShMem,
{
Ok(LlmpEventManager {
throttle: self.throttle,
last_sent: Duration::from_secs(0),
hooks: self.hooks,
llmp,
#[cfg(feature = "llmp_compression")]
compressor: GzipCompressor::with_threshold(COMPRESS_THRESHOLD),
configuration,
serialization_time: Duration::ZERO,
deserialization_time: Duration::ZERO,
serializations_cnt: 0,
should_serialize_cnt: 0,
time_ref,
event_buffer: Vec::with_capacity(INITIAL_EVENT_BUFFER_SIZE),
phantom: PhantomData,
})
}
/// Create an LLMP event manager on a port.
/// It expects a broker to exist on this port.
#[cfg(feature = "std")]
pub fn build_on_port<I, S, SHM, SP>(
self,
shmem_provider: SP,
port: u16,
configuration: EventConfig,
time_ref: Option<Handle<TimeObserver>>,
) -> Result<LlmpEventManager<EMH, I, S, SHM, SP>, Error>
where
SHM: ShMem,
SP: ShMemProvider<ShMem = SHM>,
{
let llmp = LlmpClient::create_attach_to_tcp(shmem_provider, port)?;
Self::build_from_client(self, llmp, configuration, time_ref)
}
/// If a client respawns, it may reuse the existing connection, previously
/// stored by [`LlmpClient::to_env()`].
#[cfg(feature = "std")]
pub fn build_existing_client_from_env<I, S, SHM, SP>(
self,
shmem_provider: SP,
env_name: &str,
configuration: EventConfig,
time_ref: Option<Handle<TimeObserver>>,
) -> Result<LlmpEventManager<EMH, I, S, SHM, SP>, Error>
where
SHM: ShMem,
SP: ShMemProvider<ShMem = SHM>,
{
let llmp = LlmpClient::on_existing_from_env(shmem_provider, env_name)?;
Self::build_from_client(self, llmp, configuration, time_ref)
}
/// Create an existing client from description
pub fn build_existing_client_from_description<I, S, SHM, SP>(
self,
shmem_provider: SP,
description: &LlmpClientDescription,
configuration: EventConfig,
time_ref: Option<Handle<TimeObserver>>,
) -> Result<LlmpEventManager<EMH, I, S, SHM, SP>, Error>
where
SHM: ShMem,
SP: ShMemProvider<ShMem = SHM>,
{
let llmp = LlmpClient::existing_client_from_description(shmem_provider, description)?;
Self::build_from_client(self, llmp, configuration, time_ref)
}
}
#[cfg(feature = "std")]
impl<EMH, I, S, OT, SHM, SP> CanSerializeObserver<OT> for LlmpEventManager<EMH, I, S, SHM, SP>
where
OT: MatchNameRef + Serialize,
SHM: ShMem,
{
fn serialize_observers(&mut self, observers: &OT) -> Result<Option<Vec<u8>>, Error> {
serialize_observers_adaptive::<Self, OT>(self, observers, 2, 80)
}
}
impl<EMH, I, S, SHM, SP> AdaptiveSerializer for LlmpEventManager<EMH, I, S, SHM, SP>
where
SHM: ShMem,
{
fn serialization_time(&self) -> Duration {
self.serialization_time
}
fn deserialization_time(&self) -> Duration {
self.deserialization_time
}
fn serializations_cnt(&self) -> usize {
self.serializations_cnt
}
fn should_serialize_cnt(&self) -> usize {
self.should_serialize_cnt
}
fn serialization_time_mut(&mut self) -> &mut Duration {
&mut self.serialization_time
}
fn deserialization_time_mut(&mut self) -> &mut Duration {
&mut self.deserialization_time
}
fn serializations_cnt_mut(&mut self) -> &mut usize {
&mut self.serializations_cnt
}
fn should_serialize_cnt_mut(&mut self) -> &mut usize {
&mut self.should_serialize_cnt
}
fn time_ref(&self) -> &Option<Handle<TimeObserver>> {
&self.time_ref
}
}
impl<EMH, I, S, SHM, SP> Debug for LlmpEventManager<EMH, I, S, SHM, SP>
where
SHM: ShMem,
SP: Debug,
{
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
let mut debug_struct = f.debug_struct("LlmpEventManager");
let debug = debug_struct.field("llmp", &self.llmp);
//.field("custom_buf_handlers", &self.custom_buf_handlers)
#[cfg(feature = "llmp_compression")]
let debug = debug.field("compressor", &self.compressor);
debug
.field("configuration", &self.configuration)
.finish_non_exhaustive()
}
}
impl<EMH, I, S, SHM, SP> Drop for LlmpEventManager<EMH, I, S, SHM, SP>
where
SHM: ShMem,
{
/// LLMP clients will have to wait until their pages are mapped by somebody.
fn drop(&mut self) {
self.await_restart_safe();
}
}
impl<EMH, I, S, SHM, SP> LlmpEventManager<EMH, I, S, SHM, SP>
where
SHM: ShMem,
{
/// Calling this function will tell the llmp broker that this client is exiting
/// This should be called from the restarter not from the actual fuzzer client
/// This function serves the same roll as the `LlmpClient.send_exiting()`
/// However, from the the event restarter process it is forbidden to call `send_exiting()`
/// (You can call it and it compiles but you should never do so)
/// `send_exiting()` is exclusive to the fuzzer client.
#[cfg(feature = "std")]
pub fn detach_from_broker(&self, broker_port: u16) -> Result<(), Error> {
let client_id = self.llmp.sender().id();
let Ok(mut stream) = TcpStream::connect((IP_LOCALHOST, broker_port)) else {
log::error!("Connection refused.");
return Ok(());
};
// The broker tells us hello we don't care we just tell it our client died
let TcpResponse::BrokerConnectHello {
broker_shmem_description: _,
hostname: _,
} = recv_tcp_msg(&mut stream)?.try_into()?
else {
return Err(Error::illegal_state(
"Received unexpected Broker Hello".to_string(),
));
};
let msg = TcpRequest::ClientQuit { client_id };
// Send this mesasge off and we are leaving.
match send_tcp_msg(&mut stream, &msg) {
Ok(()) => (),
Err(e) => log::error!("Failed to send tcp message {:#?}", e),
}
log::debug!("Asking he broker to be disconnected");
Ok(())
}
}
impl<EMH, I, S, SHM, SP> LlmpEventManager<EMH, I, S, SHM, SP>
where
SHM: ShMem,
{
/// Describe the client event manager's LLMP parts in a restorable fashion
pub fn describe(&self) -> Result<LlmpClientDescription, Error> {
self.llmp.describe()
}
/// Write the config for a client `EventManager` to env vars, a new
/// client can reattach using [`LlmpEventManagerBuilder::build_existing_client_from_env()`].
#[cfg(feature = "std")]
pub fn to_env(&self, env_name: &str) {
self.llmp.to_env(env_name).unwrap();
}
}
impl<EMH, I, S, SHM, SP> LlmpEventManager<EMH, I, S, SHM, SP>
where
SHM: ShMem,
{
// Handle arriving events in the client
fn handle_in_client<E, Z>(
&mut self,
fuzzer: &mut Z,
executor: &mut E,
state: &mut S,
client_id: ClientId,
event: Event<I>,
) -> Result<(), Error>
where
S: HasImported + HasSolutions<I> + HasCurrentTestcase<I> + Stoppable,
EMH: EventManagerHooksTuple<I, S>,
I: Input,
E: HasObservers,
E::Observers: DeserializeOwned,
Z: ExecutionProcessor<Self, I, E::Observers, S> + EvaluatorObservers<E, Self, I, S>,
{
log::trace!("Got event in client: {} from {client_id:?}", event.name());
if !self.hooks.pre_exec_all(state, client_id, &event)? {
return Ok(());
}
let evt_name = event.name_detailed();
match event {
Event::NewTestcase {
input,
client_config,
exit_kind,
observers_buf,
#[cfg(feature = "std")]
forward_id,
..
} => {
#[cfg(feature = "std")]
log::debug!("[{}] Received new Testcase {evt_name} from {client_id:?} ({client_config:?}, forward {forward_id:?})", std::process::id());
let res = if client_config.match_with(&self.configuration)
&& observers_buf.is_some()
{
let start = current_time();
let observers: E::Observers =
postcard::from_bytes(observers_buf.as_ref().unwrap())?;
{
self.deserialization_time = current_time() - start;
}
fuzzer.evaluate_execution(state, self, input, &observers, &exit_kind, false)?
} else {
fuzzer.evaluate_input_with_observers(state, executor, self, input, false)?
};
if let Some(item) = res.1 {
*state.imported_mut() += 1;
log::debug!("Added received Testcase {evt_name} as item #{item}");
} else {
log::debug!("Testcase {evt_name} was discarded");
}
}
#[cfg(feature = "share_objectives")]
Event::Objective { input, .. } => {
log::debug!("Received new Objective");
let mut testcase = Testcase::from(input);
testcase.set_parent_id_optional(*state.corpus().current());
if let Ok(mut tc) = state.current_testcase_mut() {
tc.found_objective();
}
state.solutions_mut().add(testcase)?;
log::info!("Added received Objective to Corpus");
}
Event::Stop => {
state.request_stop();
}
_ => {
return Err(Error::unknown(format!(
"Received illegal message that message should not have arrived: {:?}.",
event.name()
)));
}
}
self.hooks.post_exec_all(state, client_id)?;
Ok(())
}
}
impl<EMH, I, S, SHM, SP> LlmpEventManager<EMH, I, S, SHM, SP>
where
SHM: ShMem,
SP: ShMemProvider<ShMem = SHM>,
{
/// Send information that this client is exiting.
/// The other side may free up all allocated memory.
/// We are no longer allowed to send anything afterwards.
pub fn send_exiting(&mut self) -> Result<(), Error> {
self.llmp.sender_mut().send_exiting()
}
}
impl<EMH, I, S, SHM, SP> EventFirer<I, S> for LlmpEventManager<EMH, I, S, SHM, SP>
where
I: Serialize,
SHM: ShMem,
SP: ShMemProvider<ShMem = SHM>,
{
fn fire(&mut self, _state: &mut S, event: Event<I>) -> Result<(), Error> {
#[cfg(feature = "llmp_compression")]
let flags = LLMP_FLAG_INITIALIZED;
self.event_buffer.resize(self.event_buffer.capacity(), 0);
// Serialize the event, reallocating event_buffer if needed
let written_len = match postcard::to_slice(&event, &mut self.event_buffer) {
Ok(written) => written.len(),
Err(postcard::Error::SerializeBufferFull) => {
let serialized = postcard::to_allocvec(&event)?;
self.event_buffer = serialized;
self.event_buffer.len()
}
Err(e) => return Err(Error::from(e)),
};
#[cfg(feature = "llmp_compression")]
{
match self
.compressor
.maybe_compress(&self.event_buffer[..written_len])
{
Some(comp_buf) => {
self.llmp.send_buf_with_flags(
LLMP_TAG_EVENT_TO_BOTH,
flags | LLMP_FLAG_COMPRESSED,
&comp_buf,
)?;
}
None => {
self.llmp
.send_buf(LLMP_TAG_EVENT_TO_BOTH, &self.event_buffer[..written_len])?;
}
}
}
#[cfg(not(feature = "llmp_compression"))]
{
self.llmp
.send_buf(LLMP_TAG_EVENT_TO_BOTH, &self.event_buffer[..written_len])?;
}
self.last_sent = current_time();
Ok(())
}
fn configuration(&self) -> EventConfig {
self.configuration
}
fn should_send(&self) -> bool {
if let Some(throttle) = self.throttle {
current_time() - self.last_sent > throttle
} else {
true
}
}
}
impl<EMH, I, S, SHM, SP> EventRestarter<S> for LlmpEventManager<EMH, I, S, SHM, SP>
where
S: HasCurrentStageId,
SHM: ShMem,
{
fn on_restart(&mut self, state: &mut S) -> Result<(), Error> {
std_on_restart(self, state)
}
}
impl<EMH, I, S, SHM, SP> SendExiting for LlmpEventManager<EMH, I, S, SHM, SP>
where
SHM: ShMem,
SP: ShMemProvider<ShMem = SHM>,
{
fn send_exiting(&mut self) -> Result<(), Error> {
self.llmp.sender_mut().send_exiting()
}
}
impl<EMH, I, S, SHM, SP> AwaitRestartSafe for LlmpEventManager<EMH, I, S, SHM, SP>
where
SHM: ShMem,
{
/// The LLMP client needs to wait until a broker has mapped all pages before shutting down.
/// Otherwise, the OS may already have removed the shared maps.
fn await_restart_safe(&mut self) {
// wait until we can drop the message safely.
self.llmp.await_safe_to_unmap_blocking();
}
}
impl<E, EMH, I, S, SHM, SP, Z> EventProcessor<E, S, Z> for LlmpEventManager<EMH, I, S, SHM, SP>
where
E: HasObservers,
E::Observers: DeserializeOwned,
EMH: EventManagerHooksTuple<I, S>,
I: DeserializeOwned + Input,
S: HasImported + HasSolutions<I> + HasCurrentTestcase<I> + Stoppable,
SHM: ShMem,
SP: ShMemProvider<ShMem = SHM>,
Z: ExecutionProcessor<Self, I, E::Observers, S> + EvaluatorObservers<E, Self, I, S>,
{
fn process(&mut self, fuzzer: &mut Z, state: &mut S, executor: &mut E) -> Result<usize, Error> {
// TODO: Get around local event copy by moving handle_in_client
let self_id = self.llmp.sender().id();
let mut count = 0;
while let Some((client_id, tag, flags, msg)) = self.llmp.recv_buf_with_flags()? {
assert_ne!(
tag, _LLMP_TAG_EVENT_TO_BROKER,
"EVENT_TO_BROKER parcel should not have arrived in the client!"
);
if client_id == self_id {
continue;
}
#[cfg(not(feature = "llmp_compression"))]
let event_bytes = msg;
#[cfg(feature = "llmp_compression")]
let compressed;
#[cfg(feature = "llmp_compression")]
let event_bytes = if flags & LLMP_FLAG_COMPRESSED == LLMP_FLAG_COMPRESSED {
compressed = self.compressor.decompress(msg)?;
&compressed
} else {
msg
};
let event: Event<I> = postcard::from_bytes(event_bytes)?;
log::debug!("Received event in normal llmp {}", event.name_detailed());
// If the message comes from another machine, do not
// consider other events than new testcase.
if !event.is_new_testcase() && (flags & LLMP_FLAG_FROM_MM == LLMP_FLAG_FROM_MM) {
continue;
}
self.handle_in_client(fuzzer, executor, state, client_id, event)?;
count += 1;
}
Ok(count)
}
fn on_shutdown(&mut self) -> Result<(), Error> {
self.send_exiting()
}
}
impl<EMH, I, S, SHM, SP> ProgressReporter<S> for LlmpEventManager<EMH, I, S, SHM, SP>
where
I: Serialize,
S: HasExecutions + HasLastReportTime + HasMetadata + MaybeHasClientPerfMonitor,
SHM: ShMem,
SP: ShMemProvider<ShMem = SHM>,
{
fn maybe_report_progress(
&mut self,
state: &mut S,
monitor_timeout: Duration,
) -> Result<(), Error> {
std_maybe_report_progress(self, state, monitor_timeout)
}
fn report_progress(&mut self, state: &mut S) -> Result<(), Error> {
std_report_progress(self, state)
}
}
impl<EMH, I, S, SHM, SP> HasEventManagerId for LlmpEventManager<EMH, I, S, SHM, SP>
where
SHM: ShMem,
{
/// Gets the id assigned to this staterestorer.
fn mgr_id(&self) -> EventManagerId {
EventManagerId(self.llmp.sender().id().0 as usize)
}
}

View File

@ -24,10 +24,6 @@ use crate::{
Error,
};
/// The llmp event manager
pub mod mgr;
pub use mgr::*;
/// The llmp restarting manager
#[cfg(feature = "std")]
pub mod restarting;

View File

@ -3,6 +3,8 @@
//! When the target crashes, a watch process (the parent) will
//! restart/refork it.
#[cfg(feature = "std")]
use alloc::string::ToString;
use alloc::vec::Vec;
use core::{
marker::PhantomData,
@ -11,6 +13,8 @@ use core::{
time::Duration,
};
use std::net::SocketAddr;
#[cfg(feature = "std")]
use std::net::TcpStream;
#[cfg(any(windows, not(feature = "fork")))]
use libafl_bolts::os::startable_self;
@ -18,17 +22,35 @@ use libafl_bolts::os::startable_self;
use libafl_bolts::os::unix_signals::setup_signal_handler;
#[cfg(all(feature = "fork", unix))]
use libafl_bolts::os::{fork, ForkResult};
#[cfg(feature = "llmp_compression")]
use libafl_bolts::{
compress::GzipCompressor,
llmp::{LLMP_FLAG_COMPRESSED, LLMP_FLAG_INITIALIZED},
};
use libafl_bolts::{
core_affinity::CoreId,
llmp::{Broker, LlmpBroker, LlmpConnection},
current_time,
llmp::{
Broker, LlmpBroker, LlmpClient, LlmpClientDescription, LlmpConnection, LLMP_FLAG_FROM_MM,
},
os::CTRL_C_EXIT,
shmem::{ShMem, ShMemProvider, StdShMem, StdShMemProvider},
staterestore::StateRestorer,
tuples::{tuple_list, Handle, MatchNameRef},
ClientId,
};
#[cfg(feature = "std")]
use libafl_bolts::{
llmp::{recv_tcp_msg, send_tcp_msg, TcpRequest, TcpResponse},
IP_LOCALHOST,
};
use serde::{de::DeserializeOwned, Serialize};
use typed_builder::TypedBuilder;
#[cfg(feature = "share_objectives")]
use crate::corpus::{Corpus, Testcase};
#[cfg(feature = "llmp_compression")]
use crate::events::COMPRESS_THRESHOLD;
#[cfg(all(unix, not(miri)))]
use crate::events::EVENTMGR_SIGHANDLER_STATE;
use crate::{
@ -37,8 +59,8 @@ use crate::{
launcher::ClientDescription, serialize_observers_adaptive, std_maybe_report_progress,
std_report_progress, AdaptiveSerializer, AwaitRestartSafe, CanSerializeObserver, Event,
EventConfig, EventFirer, EventManagerHooksTuple, EventManagerId, EventProcessor,
EventRestarter, HasEventManagerId, LlmpEventManager, LlmpShouldSaveState, ProgressReporter,
SendExiting, StdLlmpEventHook,
EventRestarter, HasEventManagerId, LlmpShouldSaveState, ProgressReporter, SendExiting,
StdLlmpEventHook, LLMP_TAG_EVENT_TO_BOTH, _LLMP_TAG_EVENT_TO_BROKER,
},
executors::HasObservers,
fuzzer::{EvaluatorObservers, ExecutionProcessor},
@ -53,18 +75,35 @@ use crate::{
Error,
};
const INITIAL_EVENT_BUFFER_SIZE: usize = 1024 * 4;
/// A manager that can restart on the fly, storing states in-between (in `on_restart`)
#[derive(Debug)]
pub struct LlmpRestartingEventManager<EMH, I, S, SHM, SP>
where
SHM: ShMem,
{
/// The embedded LLMP event manager
llmp_mgr: LlmpEventManager<EMH, I, S, SHM, SP>,
pub struct LlmpRestartingEventManager<EMH, I, S, SHM, SP> {
/// We only send 1 testcase for every `throttle` second
pub(crate) throttle: Option<Duration>,
/// We sent last message at `last_sent`
last_sent: Duration,
hooks: EMH,
/// The LLMP client for inter process communication
llmp: LlmpClient<SHM, SP>,
#[cfg(feature = "llmp_compression")]
compressor: GzipCompressor,
/// The configuration defines this specific fuzzer.
/// A node will not re-use the observer values sent over LLMP
/// from nodes with other configurations.
configuration: EventConfig,
serialization_time: Duration,
deserialization_time: Duration,
serializations_cnt: usize,
should_serialize_cnt: usize,
pub(crate) time_ref: Option<Handle<TimeObserver>>,
event_buffer: Vec<u8>,
/// The staterestorer to serialize the state for the next runner
staterestorer: StateRestorer<SHM, SP>,
/// If this is Some, this event manager can restart. Else it does not.
staterestorer: Option<StateRestorer<SHM, SP>>,
/// Decide if the state restorer must save the serialized state
save_state: LlmpShouldSaveState,
phantom: PhantomData<(I, S)>,
}
impl<EMH, I, S, SHM, SP> AdaptiveSerializer for LlmpRestartingEventManager<EMH, I, S, SHM, SP>
@ -72,33 +111,33 @@ where
SHM: ShMem,
{
fn serialization_time(&self) -> Duration {
self.llmp_mgr.serialization_time()
self.serialization_time
}
fn deserialization_time(&self) -> Duration {
self.llmp_mgr.deserialization_time()
self.deserialization_time
}
fn serializations_cnt(&self) -> usize {
self.llmp_mgr.serializations_cnt()
self.serializations_cnt
}
fn should_serialize_cnt(&self) -> usize {
self.llmp_mgr.should_serialize_cnt()
self.should_serialize_cnt
}
fn serialization_time_mut(&mut self) -> &mut Duration {
self.llmp_mgr.serialization_time_mut()
&mut self.serialization_time
}
fn deserialization_time_mut(&mut self) -> &mut Duration {
self.llmp_mgr.deserialization_time_mut()
&mut self.deserialization_time
}
fn serializations_cnt_mut(&mut self) -> &mut usize {
self.llmp_mgr.serializations_cnt_mut()
&mut self.serializations_cnt
}
fn should_serialize_cnt_mut(&mut self) -> &mut usize {
self.llmp_mgr.should_serialize_cnt_mut()
&mut self.should_serialize_cnt
}
fn time_ref(&self) -> &Option<Handle<TimeObserver>> {
&self.llmp_mgr.time_ref
&self.time_ref
}
}
@ -129,19 +168,68 @@ where
SHM: ShMem,
SP: ShMemProvider<ShMem = SHM>,
{
fn fire(&mut self, state: &mut S, event: Event<I>) -> Result<(), Error> {
fn fire(&mut self, _state: &mut S, event: Event<I>) -> Result<(), Error> {
// Check if we are going to crash in the event, in which case we store our current state for the next runner
self.llmp_mgr.fire(state, event)?;
self.intermediate_save()?;
#[cfg(feature = "llmp_compression")]
let flags = LLMP_FLAG_INITIALIZED;
self.event_buffer.resize(self.event_buffer.capacity(), 0);
// Serialize the event, reallocating event_buffer if needed
let written_len = match postcard::to_slice(&event, &mut self.event_buffer) {
Ok(written) => written.len(),
Err(postcard::Error::SerializeBufferFull) => {
let serialized = postcard::to_allocvec(&event)?;
self.event_buffer = serialized;
self.event_buffer.len()
}
Err(e) => return Err(Error::from(e)),
};
#[cfg(feature = "llmp_compression")]
{
match self
.compressor
.maybe_compress(&self.event_buffer[..written_len])
{
Some(comp_buf) => {
self.llmp.send_buf_with_flags(
LLMP_TAG_EVENT_TO_BOTH,
flags | LLMP_FLAG_COMPRESSED,
&comp_buf,
)?;
}
None => {
self.llmp
.send_buf(LLMP_TAG_EVENT_TO_BOTH, &self.event_buffer[..written_len])?;
}
}
}
#[cfg(not(feature = "llmp_compression"))]
{
self.llmp
.send_buf(LLMP_TAG_EVENT_TO_BOTH, &self.event_buffer[..written_len])?;
}
self.last_sent = current_time();
if self.staterestorer.is_some() {
self.intermediate_save()?;
}
Ok(())
}
fn configuration(&self) -> EventConfig {
<LlmpEventManager<EMH, I, S, SHM, SP> as EventFirer<I, S>>::configuration(&self.llmp_mgr)
self.configuration
}
fn should_send(&self) -> bool {
<LlmpEventManager<EMH, I, S, SHM, SP> as EventFirer<I, S>>::should_send(&self.llmp_mgr)
if let Some(throttle) = self.throttle {
current_time() - self.last_sent > throttle
} else {
true
}
}
}
@ -167,18 +255,21 @@ where
fn on_restart(&mut self, state: &mut S) -> Result<(), Error> {
state.on_restart()?;
// First, reset the page to 0 so the next iteration can read from the beginning of this page
self.staterestorer.reset();
self.staterestorer.save(&(
if self.save_state.on_restart() {
Some(state)
} else {
None
},
&self.llmp_mgr.describe()?,
))?;
if let Some(sr) = &mut self.staterestorer {
// First, reset the page to 0 so the next iteration can read from the beginning of this page
sr.reset();
sr.save(&(
if self.save_state.on_restart() {
Some(state)
} else {
None
},
&self.llmp.describe()?,
))?;
log::info!("Waiting for broker...");
}
log::info!("Waiting for broker...");
self.await_restart_safe();
Ok(())
}
@ -190,10 +281,12 @@ where
SP: ShMemProvider<ShMem = SHM>,
{
fn send_exiting(&mut self) -> Result<(), Error> {
self.staterestorer.send_exiting();
if let Some(ref mut sr) = &mut self.staterestorer {
sr.send_exiting();
}
// Also inform the broker that we are about to exit.
// This way, the broker can clean up the pages, and eventually exit.
self.llmp_mgr.send_exiting()
self.llmp.sender_mut().send_exiting()
}
}
@ -205,7 +298,7 @@ where
/// Otherwise, the OS may already have removed the shared maps,
#[inline]
fn await_restart_safe(&mut self) {
self.llmp_mgr.await_restart_safe();
self.llmp.await_safe_to_unmap_blocking();
}
}
@ -219,12 +312,52 @@ where
S: HasImported + HasCurrentTestcase<I> + HasSolutions<I> + Stoppable + Serialize,
SHM: ShMem,
SP: ShMemProvider<ShMem = SHM>,
Z: ExecutionProcessor<LlmpEventManager<EMH, I, S, SHM, SP>, I, E::Observers, S>
+ EvaluatorObservers<E, LlmpEventManager<EMH, I, S, SHM, SP>, I, S>,
Z: ExecutionProcessor<Self, I, E::Observers, S> + EvaluatorObservers<E, Self, I, S>,
{
fn process(&mut self, fuzzer: &mut Z, state: &mut S, executor: &mut E) -> Result<usize, Error> {
let res = self.llmp_mgr.process(fuzzer, state, executor)?;
self.intermediate_save()?;
let res = {
// TODO: Get around local event copy by moving handle_in_client
let self_id = self.llmp.sender().id();
let mut count = 0;
while let Some((client_id, tag, flags, msg)) = self.llmp.recv_buf_with_flags()? {
assert_ne!(
tag, _LLMP_TAG_EVENT_TO_BROKER,
"EVENT_TO_BROKER parcel should not have arrived in the client!"
);
if client_id == self_id {
continue;
}
#[cfg(not(feature = "llmp_compression"))]
let event_bytes = msg;
#[cfg(feature = "llmp_compression")]
let compressed;
#[cfg(feature = "llmp_compression")]
let event_bytes = if flags & LLMP_FLAG_COMPRESSED == LLMP_FLAG_COMPRESSED {
compressed = self.compressor.decompress(msg)?;
&compressed
} else {
msg
};
let event: Event<I> = postcard::from_bytes(event_bytes)?;
log::debug!("Received event in normal llmp {}", event.name_detailed());
// If the message comes from another machine, do not
// consider other events than new testcase.
if !event.is_new_testcase() && (flags & LLMP_FLAG_FROM_MM == LLMP_FLAG_FROM_MM) {
continue;
}
self.handle_in_client(fuzzer, executor, state, client_id, event)?;
count += 1;
}
count
};
if self.staterestorer.is_some() {
self.intermediate_save()?;
}
Ok(res)
}
@ -239,7 +372,7 @@ where
SP: ShMemProvider<ShMem = SHM>,
{
fn mgr_id(&self) -> EventManagerId {
self.llmp_mgr.mgr_id()
EventManagerId(self.llmp.sender().id().0 as usize)
}
}
@ -249,55 +382,303 @@ const _ENV_FUZZER_RECEIVER: &str = "_AFL_ENV_FUZZER_RECEIVER";
/// The llmp (2 way) connection from a fuzzer to the broker (broadcasting all other fuzzer messages)
const _ENV_FUZZER_BROKER_CLIENT_INITIAL: &str = "_AFL_ENV_FUZZER_BROKER_CLIENT";
/// Builder for `LlmpRestartingEventManager`
#[derive(Debug)]
pub struct LlmpEventManagerBuilder<EMH> {
throttle: Option<Duration>,
save_state: LlmpShouldSaveState,
hooks: EMH,
}
impl Default for LlmpEventManagerBuilder<()> {
fn default() -> Self {
Self::builder()
}
}
impl LlmpEventManagerBuilder<()> {
/// Create a new `LlmpEventManagerBuilder`
#[must_use]
pub fn builder() -> Self {
Self {
throttle: None,
save_state: LlmpShouldSaveState::OnRestart,
hooks: (),
}
}
}
impl LlmpEventManagerBuilder<()> {
/// Add hooks to it
pub fn hooks<EMH>(self, hooks: EMH) -> LlmpEventManagerBuilder<EMH> {
LlmpEventManagerBuilder {
throttle: self.throttle,
save_state: self.save_state,
hooks,
}
}
}
impl<EMH> LlmpEventManagerBuilder<EMH> {
/// Change the sampling rate
#[must_use]
pub fn throttle(mut self, throttle: Duration) -> Self {
self.throttle = Some(throttle);
self
}
/// Change save state policy
#[must_use]
pub fn save_state(mut self, save_state: LlmpShouldSaveState) -> Self {
self.save_state = save_state;
self
}
/// Create a manager from a raw LLMP client
/// If staterestorer is some then this restarting manager restarts
/// Otherwise this restarting manager does not restart
pub fn build_from_client<I, S, SHM, SP>(
self,
llmp: LlmpClient<SHM, SP>,
configuration: EventConfig,
time_ref: Option<Handle<TimeObserver>>,
staterestorer: Option<StateRestorer<SHM, SP>>,
) -> Result<LlmpRestartingEventManager<EMH, I, S, SHM, SP>, Error> {
Ok(LlmpRestartingEventManager {
throttle: self.throttle,
last_sent: Duration::from_secs(0),
hooks: self.hooks,
llmp,
#[cfg(feature = "llmp_compression")]
compressor: GzipCompressor::with_threshold(COMPRESS_THRESHOLD),
configuration,
serialization_time: Duration::ZERO,
deserialization_time: Duration::ZERO,
serializations_cnt: 0,
should_serialize_cnt: 0,
time_ref,
event_buffer: Vec::with_capacity(INITIAL_EVENT_BUFFER_SIZE),
staterestorer,
save_state: LlmpShouldSaveState::OnRestart,
phantom: PhantomData,
})
}
/// Create an LLMP event manager on a port.
/// It expects a broker to exist on this port.
#[cfg(feature = "std")]
pub fn build_on_port<I, S, SHM, SP>(
self,
shmem_provider: SP,
port: u16,
configuration: EventConfig,
time_ref: Option<Handle<TimeObserver>>,
staterestorer: Option<StateRestorer<SHM, SP>>,
) -> Result<LlmpRestartingEventManager<EMH, I, S, SHM, SP>, Error>
where
SHM: ShMem,
SP: ShMemProvider<ShMem = SHM>,
{
let llmp = LlmpClient::create_attach_to_tcp(shmem_provider, port)?;
Self::build_from_client(self, llmp, configuration, time_ref, staterestorer)
}
/// If a client respawns, it may reuse the existing connection, previously
/// stored by [`LlmpClient::to_env()`].
#[cfg(feature = "std")]
pub fn build_existing_client_from_env<I, S, SHM, SP>(
self,
shmem_provider: SP,
env_name: &str,
configuration: EventConfig,
time_ref: Option<Handle<TimeObserver>>,
staterestorer: Option<StateRestorer<SHM, SP>>,
) -> Result<LlmpRestartingEventManager<EMH, I, S, SHM, SP>, Error>
where
SHM: ShMem,
SP: ShMemProvider<ShMem = SHM>,
{
let llmp = LlmpClient::on_existing_from_env(shmem_provider, env_name)?;
Self::build_from_client(self, llmp, configuration, time_ref, staterestorer)
}
/// Create an existing client from description
pub fn build_existing_client_from_description<I, S, SHM, SP>(
self,
shmem_provider: SP,
description: &LlmpClientDescription,
configuration: EventConfig,
time_ref: Option<Handle<TimeObserver>>,
staterestorer: Option<StateRestorer<SHM, SP>>,
) -> Result<LlmpRestartingEventManager<EMH, I, S, SHM, SP>, Error>
where
SHM: ShMem,
SP: ShMemProvider<ShMem = SHM>,
{
let llmp = LlmpClient::existing_client_from_description(shmem_provider, description)?;
Self::build_from_client(self, llmp, configuration, time_ref, staterestorer)
}
}
impl<EMH, I, S, SHM, SP> LlmpRestartingEventManager<EMH, I, S, SHM, SP>
where
S: Serialize,
SHM: ShMem,
SP: ShMemProvider<ShMem = SHM>,
{
/// Create a new runner, the executed child doing the actual fuzzing.
pub fn new(
llmp_mgr: LlmpEventManager<EMH, I, S, SHM, SP>,
staterestorer: StateRestorer<SHM, SP>,
) -> Self {
Self {
llmp_mgr,
staterestorer,
save_state: LlmpShouldSaveState::OnRestart,
}
}
/// Create a new runner specifying if it must save the serialized state on restart.
pub fn with_save_state(
llmp_mgr: LlmpEventManager<EMH, I, S, SHM, SP>,
staterestorer: StateRestorer<SHM, SP>,
save_state: LlmpShouldSaveState,
) -> Self {
Self {
llmp_mgr,
staterestorer,
save_state,
}
/// Write the config for a client `EventManager` to env vars, a new
/// client can reattach using [`LlmpEventManagerBuilder::build_existing_client_from_env()`].
#[cfg(feature = "std")]
pub fn to_env(&self, env_name: &str) {
self.llmp.to_env(env_name).unwrap();
}
/// Get the staterestorer
pub fn staterestorer(&self) -> &StateRestorer<SHM, SP> {
pub fn staterestorer(&self) -> &Option<StateRestorer<SHM, SP>> {
&self.staterestorer
}
/// Get the staterestorer (mutable)
pub fn staterestorer_mut(&mut self) -> &mut StateRestorer<SHM, SP> {
pub fn staterestorer_mut(&mut self) -> &mut Option<StateRestorer<SHM, SP>> {
&mut self.staterestorer
}
/// Save LLMP state and empty state in staterestorer
pub fn intermediate_save(&mut self) -> Result<(), Error> {
// First, reset the page to 0 so the next iteration can read read from the beginning of this page
if self.save_state.oom_safe() {
self.staterestorer.reset();
self.staterestorer
.save(&(None::<S>, &self.llmp_mgr.describe()?))?;
if let Some(sr) = &mut self.staterestorer {
if self.save_state.oom_safe() {
sr.reset();
sr.save(&(None::<S>, &self.llmp.describe()?))?;
}
}
Ok(())
}
/// Reset the state in state restorer
pub fn staterestorer_reset(&mut self) -> Result<(), Error> {
if let Some(sr) = &mut self.staterestorer {
sr.reset();
}
Ok(())
}
// Handle arriving events in the client
fn handle_in_client<E, Z>(
&mut self,
fuzzer: &mut Z,
executor: &mut E,
state: &mut S,
client_id: ClientId,
event: Event<I>,
) -> Result<(), Error>
where
S: HasImported + HasSolutions<I> + HasCurrentTestcase<I> + Stoppable,
EMH: EventManagerHooksTuple<I, S>,
I: Input,
E: HasObservers,
E::Observers: DeserializeOwned,
Z: ExecutionProcessor<Self, I, E::Observers, S> + EvaluatorObservers<E, Self, I, S>,
{
log::trace!("Got event in client: {} from {client_id:?}", event.name());
if !self.hooks.pre_exec_all(state, client_id, &event)? {
return Ok(());
}
let evt_name = event.name_detailed();
match event {
Event::NewTestcase {
input,
client_config,
exit_kind,
observers_buf,
#[cfg(feature = "std")]
forward_id,
..
} => {
#[cfg(feature = "std")]
log::debug!("[{}] Received new Testcase {evt_name} from {client_id:?} ({client_config:?}, forward {forward_id:?})", std::process::id());
let res = if client_config.match_with(&self.configuration)
&& observers_buf.is_some()
{
let start = current_time();
let observers: E::Observers =
postcard::from_bytes(observers_buf.as_ref().unwrap())?;
{
self.deserialization_time = current_time() - start;
}
fuzzer.evaluate_execution(state, self, input, &observers, &exit_kind, false)?
} else {
fuzzer.evaluate_input_with_observers(state, executor, self, input, false)?
};
if let Some(item) = res.1 {
*state.imported_mut() += 1;
log::debug!("Added received Testcase {evt_name} as item #{item}");
} else {
log::debug!("Testcase {evt_name} was discarded");
}
}
#[cfg(feature = "share_objectives")]
Event::Objective { input, .. } => {
log::debug!("Received new Objective");
let mut testcase = Testcase::from(input);
testcase.set_parent_id_optional(*state.corpus().current());
if let Ok(mut tc) = state.current_testcase_mut() {
tc.found_objective();
}
state.solutions_mut().add(testcase)?;
log::info!("Added received Objective to Corpus");
}
Event::Stop => {
state.request_stop();
}
_ => {
return Err(Error::unknown(format!(
"Received illegal message that message should not have arrived: {:?}.",
event.name()
)));
}
}
self.hooks.post_exec_all(state, client_id)?;
Ok(())
}
/// Calling this function will tell the llmp broker that this client is exiting
/// This should be called from the restarter not from the actual fuzzer client
/// This function serves the same roll as the `LlmpClient.send_exiting()`
/// However, from the the event restarter process it is forbidden to call `send_exiting()`
/// (You can call it and it compiles but you should never do so)
/// `send_exiting()` is exclusive to the fuzzer client.
#[cfg(feature = "std")]
pub fn detach_from_broker(&self, broker_port: u16) -> Result<(), Error> {
let client_id = self.llmp.sender().id();
let Ok(mut stream) = TcpStream::connect((IP_LOCALHOST, broker_port)) else {
log::error!("Connection refused.");
return Ok(());
};
// The broker tells us hello we don't care we just tell it our client died
let TcpResponse::BrokerConnectHello {
broker_shmem_description: _,
hostname: _,
} = recv_tcp_msg(&mut stream)?.try_into()?
else {
return Err(Error::illegal_state(
"Received unexpected Broker Hello".to_string(),
));
};
let msg = TcpRequest::ClientQuit { client_id };
// Send this mesasge off and we are leaving.
match send_tcp_msg(&mut stream, &msg) {
Ok(()) => (),
Err(e) => log::error!("Failed to send tcp message {:#?}", e),
}
log::debug!("Asking he broker to be disconnected");
Ok(())
}
}
@ -487,13 +868,14 @@ where
return Err(Error::shutting_down());
}
LlmpConnection::IsClient { client } => {
let mgr: LlmpEventManager<EMH, I, S, SP::ShMem, SP> =
LlmpEventManager::builder()
let mgr: LlmpRestartingEventManager<EMH, I, S, SP::ShMem, SP> =
LlmpEventManagerBuilder::builder()
.hooks(self.hooks)
.build_from_client(
client,
self.configuration,
self.time_ref.clone(),
None,
)?;
(mgr, None)
}
@ -513,13 +895,14 @@ where
}
ManagerKind::Client { client_description } => {
// We are a client
let mgr = LlmpEventManager::builder()
let mgr = LlmpEventManagerBuilder::builder()
.hooks(self.hooks)
.build_on_port(
self.shmem_provider.clone(),
self.broker_port,
self.configuration,
self.time_ref.clone(),
None,
)?;
(mgr, Some(client_description.core_id()))
@ -643,48 +1026,41 @@ where
// If we're restarting, deserialize the old state.
let (state, mut mgr) =
if let Some((state_opt, mgr_description)) = staterestorer.restore()? {
let llmp_mgr = LlmpEventManager::builder()
.hooks(self.hooks)
.build_existing_client_from_description(
new_shmem_provider,
&mgr_description,
self.configuration,
self.time_ref.clone(),
)?;
(
state_opt,
LlmpRestartingEventManager::with_save_state(
llmp_mgr,
staterestorer,
self.serialize_state,
),
LlmpEventManagerBuilder::builder()
.hooks(self.hooks)
.save_state(self.serialize_state)
.build_existing_client_from_description(
new_shmem_provider,
&mgr_description,
self.configuration,
self.time_ref.clone(),
Some(staterestorer),
)?,
)
} else {
log::info!("First run. Let's set it all up");
// Mgr to send and receive msgs from/to all other fuzzer instances
let mgr = LlmpEventManager::builder()
.hooks(self.hooks)
.build_existing_client_from_env(
new_shmem_provider,
_ENV_FUZZER_BROKER_CLIENT_INITIAL,
self.configuration,
self.time_ref.clone(),
)?;
(
None,
LlmpRestartingEventManager::with_save_state(
mgr,
staterestorer,
self.serialize_state,
),
LlmpEventManagerBuilder::builder()
.hooks(self.hooks)
.save_state(self.serialize_state)
.build_existing_client_from_env(
new_shmem_provider,
_ENV_FUZZER_BROKER_CLIENT_INITIAL,
self.configuration,
self.time_ref.clone(),
Some(staterestorer),
)?,
)
};
// We reset the staterestorer, the next staterestorer and receiver (after crash) will reuse the page from the initial message.
if self.serialize_state.oom_safe() {
mgr.intermediate_save()?;
} else {
mgr.staterestorer.reset();
mgr.staterestorer_reset()?;
}
/* TODO: Not sure if this is needed
@ -713,7 +1089,7 @@ mod tests {
use crate::{
corpus::{Corpus, InMemoryCorpus, Testcase},
events::llmp::{restarting::_ENV_FUZZER_SENDER, LlmpEventManager},
events::llmp::restarting::{LlmpEventManagerBuilder, _ENV_FUZZER_SENDER},
executors::{ExitKind, InProcessExecutor},
feedbacks::ConstFeedback,
fuzzer::Fuzzer,
@ -768,8 +1144,8 @@ mod tests {
llmp_client.mark_safe_to_unmap();
}
let mut llmp_mgr = LlmpEventManager::builder()
.build_from_client(llmp_client, "fuzzer".into(), Some(time_ref.clone()))
let mut llmp_mgr = LlmpEventManagerBuilder::builder()
.build_from_client(llmp_client, "fuzzer".into(), Some(time_ref.clone()), None)
.unwrap();
let scheduler = RandScheduler::new();
@ -799,7 +1175,7 @@ mod tests {
staterestorer.reset();
staterestorer
.save(&(&mut state, &llmp_mgr.describe().unwrap()))
.save(&(&mut state, &llmp_mgr.llmp.describe().unwrap()))
.unwrap();
assert!(staterestorer.has_content());
@ -812,12 +1188,13 @@ mod tests {
assert!(sc_cpy.has_content());
let (mut state_clone, mgr_description) = staterestorer.restore().unwrap().unwrap();
let mut llmp_clone = LlmpEventManager::builder()
let mut llmp_clone = LlmpEventManagerBuilder::builder()
.build_existing_client_from_description(
shmem_provider,
&mgr_description,
"fuzzer".into(),
Some(time_ref),
None,
)
.unwrap();

View File

@ -96,7 +96,7 @@ impl SignalHandler for ShutdownSignalData {
}
/// A per-fuzzer unique `ID`, usually starting with `0` and increasing
/// by `1` in multiprocessed `EventManagers`, such as [`LlmpEventManager`].
/// by `1` in multiprocessed `EventManagers`, such as [`LlmpRestartingEventManager`].
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Hash)]
#[repr(transparent)]
pub struct EventManagerId(
@ -377,7 +377,7 @@ impl<I> Event<I> {
pub trait EventFirer<I, S> {
/// Send off an [`Event`] to the broker
///
/// For multi-processed managers, such as [`LlmpEventManager`],
/// For multi-processed managers, such as [`LlmpRestartingEventManager`],
/// this serializes the [`Event`] and commits it to the [`llmp`] page.
/// In this case, if you `fire` faster than the broker can consume
/// (for example for each [`Input`], on multiple cores)

View File

@ -20,7 +20,7 @@ pub const TRANSFERRED_FEEDBACK_NAME: Cow<'static, str> =
/// Metadata which denotes whether we are currently transferring an input.
///
/// Implementors of multi-node communication systems (like [`crate::events::LlmpEventManager`]) should wrap any
/// Implementors of multi-node communication systems (like [`crate::events::LlmpRestartingEventManager`]) should wrap any
/// [`crate::EvaluatorObservers::evaluate_input_with_observers`] or
/// [`crate::ExecutionProcessor::process_execution`] calls with setting this metadata to true/false
/// before and after.