Decouple llmp broker from manager (#125)

* decouple broker from manager

* fix no_std

* fix win build
This commit is contained in:
Andrea Fioraldi 2021-05-25 18:00:27 +02:00 committed by GitHub
parent 46716e8090
commit a0804fd24d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 241 additions and 270 deletions

View File

@ -289,16 +289,13 @@ unsafe fn fuzz(
stdout_file: Option<&str>, stdout_file: Option<&str>,
broker_addr: Option<SocketAddr>, broker_addr: Option<SocketAddr>,
) -> Result<(), Error> { ) -> Result<(), Error> {
let stats_closure = |s| println!("{}", s);
// 'While the stats are state, they are usually used in the broker - which is likely never restarted // 'While the stats are state, they are usually used in the broker - which is likely never restarted
let stats = MultiStats::new(stats_closure); let stats = MultiStats::new(|s| println!("{}", s));
#[cfg(target_os = "android")] #[cfg(target_os = "android")]
AshmemService::start().expect("Failed to start Ashmem service"); AshmemService::start().expect("Failed to start Ashmem service");
let shmem_provider = StdShMemProvider::new()?; let shmem_provider = StdShMemProvider::new()?;
let mut client_init_stats = || Ok(MultiStats::new(stats_closure));
let mut run_client = |state: Option<StdState<_, _, _, _, _>>, mut mgr| { let mut run_client = |state: Option<StdState<_, _, _, _, _>>, mut mgr| {
// The restarting state will spawn the same process again as child, then restarted it each time it crashes. // The restarting state will spawn the same process again as child, then restarted it each time it crashes.
@ -427,7 +424,6 @@ unsafe fn fuzz(
Launcher::builder() Launcher::builder()
.shmem_provider(shmem_provider) .shmem_provider(shmem_provider)
.stats(stats) .stats(stats)
.client_init_stats(&mut client_init_stats)
.run_client(&mut run_client) .run_client(&mut run_client)
.cores(cores) .cores(cores)
.broker_port(broker_port) .broker_port(broker_port)

View File

@ -58,9 +58,7 @@ pub fn main() {
AshmemService::start().expect("Failed to start Ashmem service"); AshmemService::start().expect("Failed to start Ashmem service");
let shmem_provider = StdShMemProvider::new().expect("Failed to init shared memory"); let shmem_provider = StdShMemProvider::new().expect("Failed to init shared memory");
let stats_closure = |s| println!("{}", s); let stats = MultiStats::new(|s| println!("{}", s));
let stats = MultiStats::new(stats_closure);
let mut client_init_stats = || Ok(MultiStats::new(stats_closure));
let mut run_client = |state: Option<StdState<_, _, _, _, _>>, mut restarting_mgr| { let mut run_client = |state: Option<StdState<_, _, _, _, _>>, mut restarting_mgr| {
let corpus_dirs = &[PathBuf::from("./corpus")]; let corpus_dirs = &[PathBuf::from("./corpus")];
@ -170,7 +168,6 @@ pub fn main() {
Launcher::builder() Launcher::builder()
.shmem_provider(shmem_provider) .shmem_provider(shmem_provider)
.stats(stats) .stats(stats)
.client_init_stats(&mut client_init_stats)
.run_client(&mut run_client) .run_client(&mut run_client)
.cores(&cores) .cores(&cores)
.broker_port(broker_port) .broker_port(broker_port)

View File

@ -33,8 +33,8 @@ use typed_builder::TypedBuilder;
/// The Launcher client callback type reference /// The Launcher client callback type reference
#[cfg(feature = "std")] #[cfg(feature = "std")]
pub type LauncherClientFnRef<'a, I, OT, S, SP, ST> = pub type LauncherClientFnRef<'a, I, OT, S, SP> =
&'a mut dyn FnMut(Option<S>, LlmpRestartingEventManager<I, OT, S, SP, ST>) -> Result<(), Error>; &'a mut dyn FnMut(Option<S>, LlmpRestartingEventManager<I, OT, S, SP>) -> Result<(), Error>;
/// Provides a Launcher, which can be used to launch a fuzzing run on a specified list of cores /// Provides a Launcher, which can be used to launch a fuzzing run on a specified list of cores
#[cfg(feature = "std")] #[cfg(feature = "std")]
@ -52,10 +52,8 @@ where
shmem_provider: SP, shmem_provider: SP,
/// The stats instance to use /// The stats instance to use
stats: ST, stats: ST,
/// A closure or function which generates stats instances for newly spawned clients
client_init_stats: &'a mut dyn FnMut() -> Result<ST, Error>,
/// The 'main' function to run for each client forked. This probably shouldn't return /// The 'main' function to run for each client forked. This probably shouldn't return
run_client: LauncherClientFnRef<'a, I, OT, S, SP, ST>, run_client: LauncherClientFnRef<'a, I, OT, S, SP>,
/// The broker port to use /// The broker port to use
#[builder(default = 1337_u16)] #[builder(default = 1337_u16)]
broker_port: u16, broker_port: u16,
@ -92,7 +90,7 @@ where
.stdout_file .stdout_file
.map(|filename| File::create(filename).unwrap()); .map(|filename| File::create(filename).unwrap());
//spawn clients // Spawn clients
for (id, bind_to) in core_ids.iter().enumerate().take(num_cores) { for (id, bind_to) in core_ids.iter().enumerate().take(num_cores) {
if self.cores.iter().any(|&x| x == id) { if self.cores.iter().any(|&x| x == id) {
self.shmem_provider.pre_fork()?; self.shmem_provider.pre_fork()?;
@ -114,11 +112,9 @@ where
dup2(file.as_ref().unwrap().as_raw_fd(), libc::STDOUT_FILENO)?; dup2(file.as_ref().unwrap().as_raw_fd(), libc::STDOUT_FILENO)?;
dup2(file.as_ref().unwrap().as_raw_fd(), libc::STDERR_FILENO)?; dup2(file.as_ref().unwrap().as_raw_fd(), libc::STDERR_FILENO)?;
} }
//fuzzer client. keeps retrying the connection to broker till the broker starts // Fuzzer client. keeps retrying the connection to broker till the broker starts
let stats = (self.client_init_stats)()?; let (state, mgr) = RestartingMgr::<I, OT, S, SP, ST>::builder()
let (state, mgr) = RestartingMgr::builder()
.shmem_provider(self.shmem_provider.clone()) .shmem_provider(self.shmem_provider.clone())
.stats(stats)
.broker_port(self.broker_port) .broker_port(self.broker_port)
.kind(ManagerKind::Client { .kind(ManagerKind::Client {
cpu_core: Some(*bind_to), cpu_core: Some(*bind_to),
@ -135,16 +131,17 @@ where
#[cfg(feature = "std")] #[cfg(feature = "std")]
println!("I am broker!!."); println!("I am broker!!.");
// TODO we don't want always a broker here, thing about using different laucher process to spawn different configurations
RestartingMgr::<I, OT, S, SP, ST>::builder() RestartingMgr::<I, OT, S, SP, ST>::builder()
.shmem_provider(self.shmem_provider.clone()) .shmem_provider(self.shmem_provider.clone())
.stats(self.stats.clone()) .stats(Some(self.stats.clone()))
.broker_port(self.broker_port) .broker_port(self.broker_port)
.kind(ManagerKind::Broker) .kind(ManagerKind::Broker)
.remote_broker_addr(self.remote_broker_addr) .remote_broker_addr(self.remote_broker_addr)
.build() .build()
.launch()?; .launch()?;
//broker exited. kill all clients. // Broker exited. kill all clients.
for handle in &handles { for handle in &handles {
unsafe { unsafe {
libc::kill(*handle, libc::SIGINT); libc::kill(*handle, libc::SIGINT);
@ -165,10 +162,8 @@ where
//todo: silence stdout and stderr for clients //todo: silence stdout and stderr for clients
// the actual client. do the fuzzing // the actual client. do the fuzzing
let stats = (self.client_init_stats)()?;
let (state, mgr) = RestartingMgr::<I, OT, S, SP, ST>::builder() let (state, mgr) = RestartingMgr::<I, OT, S, SP, ST>::builder()
.shmem_provider(self.shmem_provider.clone()) .shmem_provider(self.shmem_provider.clone())
.stats(stats)
.broker_port(self.broker_port) .broker_port(self.broker_port)
.kind(ManagerKind::Client { .kind(ManagerKind::Client {
cpu_core: Some(CoreId { cpu_core: Some(CoreId {
@ -225,7 +220,7 @@ where
RestartingMgr::<I, OT, S, SP, ST>::builder() RestartingMgr::<I, OT, S, SP, ST>::builder()
.shmem_provider(self.shmem_provider.clone()) .shmem_provider(self.shmem_provider.clone())
.stats(self.stats.clone()) .stats(Some(self.stats.clone()))
.broker_port(self.broker_port) .broker_port(self.broker_port)
.kind(ManagerKind::Broker) .kind(ManagerKind::Broker)
.remote_broker_addr(self.remote_broker_addr) .remote_broker_addr(self.remote_broker_addr)

View File

@ -604,14 +604,9 @@ where
/// Creates a new broker on the given port /// Creates a new broker on the given port
#[cfg(feature = "std")] #[cfg(feature = "std")]
pub fn broker_on_port(shmem_provider: SP, port: u16) -> Result<Self, Error> { pub fn broker_on_port(shmem_provider: SP, port: u16) -> Result<Self, Error> {
match tcp_bind(port) { Ok(LlmpConnection::IsBroker {
Ok(listener) => { broker: LlmpBroker::create_attach_to_tcp(shmem_provider, port)?,
let mut broker = LlmpBroker::new(shmem_provider)?; })
let _listener_thread = broker.launch_listener(Listener::Tcp(listener))?;
Ok(LlmpConnection::IsBroker { broker })
}
Err(e) => Err(e),
}
} }
/// Creates a new client on the given port /// Creates a new client on the given port
@ -1641,6 +1636,19 @@ where
}) })
} }
/// Create a new [`LlmpBroker`] sttaching to a TCP port
#[cfg(feature = "std")]
pub fn create_attach_to_tcp(shmem_provider: SP, port: u16) -> Result<Self, Error> {
match tcp_bind(port) {
Ok(listener) => {
let mut broker = LlmpBroker::new(shmem_provider)?;
let _listener_thread = broker.launch_listener(Listener::Tcp(listener))?;
Ok(broker)
}
Err(e) => Err(e),
}
}
/// Allocate the next message on the outgoing map /// Allocate the next message on the outgoing map
unsafe fn alloc_next(&mut self, buf_len: usize) -> Result<*mut LlmpMsg, Error> { unsafe fn alloc_next(&mut self, buf_len: usize) -> Result<*mut LlmpMsg, Error> {
self.llmp_out.alloc_next(buf_len) self.llmp_out.alloc_next(buf_len)
@ -2302,7 +2310,7 @@ where
} }
/// Describe this client in a way that it can be recreated, for example after crash /// Describe this client in a way that it can be recreated, for example after crash
fn describe(&self) -> Result<LlmpClientDescription, Error> { pub fn describe(&self) -> Result<LlmpClientDescription, Error> {
Ok(LlmpClientDescription { Ok(LlmpClientDescription {
sender: self.sender.describe()?, sender: self.sender.describe()?,
receiver: self.receiver.describe()?, receiver: self.receiver.describe()?,
@ -2310,7 +2318,7 @@ where
} }
/// Create an existing client from description /// Create an existing client from description
fn existing_client_from_description( pub fn existing_client_from_description(
shmem_provider: SP, shmem_provider: SP,
description: &LlmpClientDescription, description: &LlmpClientDescription,
) -> Result<Self, Error> { ) -> Result<Self, Error> {

View File

@ -60,140 +60,69 @@ const LLMP_TAG_EVENT_TO_BOTH: llmp::Tag = 0x2B0741;
const _LLMP_TAG_RESTART: llmp::Tag = 0x8357A87; const _LLMP_TAG_RESTART: llmp::Tag = 0x8357A87;
const _LLMP_TAG_NO_RESTART: llmp::Tag = 0x57A7EE71; const _LLMP_TAG_NO_RESTART: llmp::Tag = 0x57A7EE71;
/// An [`EventManager`] that forwards all events to other attached fuzzers on shared maps or via tcp,
/// using low-level message passing, [`crate::bolts::llmp`].
#[derive(Debug)]
pub struct LlmpEventManager<I, OT, S, SP, ST>
where
I: Input,
OT: ObserversTuple,
SP: ShMemProvider + 'static,
ST: Stats,
//CE: CustomEvent<I>,
{
stats: Option<ST>,
llmp: llmp::LlmpConnection<SP>,
#[cfg(feature = "llmp_compression")]
compressor: GzipCompressor,
phantom: PhantomData<(I, OT, S)>,
}
/// The minimum buffer size at which to compress LLMP IPC messages. /// The minimum buffer size at which to compress LLMP IPC messages.
#[cfg(feature = "llmp_compression")] #[cfg(feature = "llmp_compression")]
const COMPRESS_THRESHOLD: usize = 1024; const COMPRESS_THRESHOLD: usize = 1024;
impl<I, OT, S, SP, ST> Drop for LlmpEventManager<I, OT, S, SP, ST> #[derive(Debug)]
pub struct LlmpEventBroker<I, SP, ST>
where where
I: Input, I: Input,
OT: ObserversTuple,
SP: ShMemProvider + 'static, SP: ShMemProvider + 'static,
ST: Stats, ST: Stats,
//CE: CustomEvent<I>,
{ {
/// LLMP clients will have to wait until their pages are mapped by somebody. stats: ST,
fn drop(&mut self) { llmp: llmp::LlmpBroker<SP>,
self.await_restart_safe() #[cfg(feature = "llmp_compression")]
} compressor: GzipCompressor,
phantom: PhantomData<I>,
} }
impl<I, OT, S, SP, ST> LlmpEventManager<I, OT, S, SP, ST> impl<I, SP, ST> LlmpEventBroker<I, SP, ST>
where where
I: Input, I: Input,
OT: ObserversTuple,
SP: ShMemProvider + 'static, SP: ShMemProvider + 'static,
ST: Stats, ST: Stats,
{ {
/// Create an even broker from a raw broker.
pub fn new(llmp: llmp::LlmpBroker<SP>, stats: ST) -> Result<Self, Error> {
Ok(Self {
stats,
llmp,
#[cfg(feature = "llmp_compression")]
compressor: GzipCompressor::new(COMPRESS_THRESHOLD),
phantom: PhantomData,
})
}
/// Create llmp on a port /// Create llmp on a port
/// If the port is not yet bound, it will act as broker /// The port must not be bound yet to have a broker.
/// Else, it will act as client.
#[cfg(feature = "std")] #[cfg(feature = "std")]
pub fn new_on_port(shmem_provider: SP, stats: ST, port: u16) -> Result<Self, Error> { pub fn new_on_port(shmem_provider: SP, stats: ST, port: u16) -> Result<Self, Error> {
Ok(Self { Ok(Self {
stats: Some(stats), stats,
llmp: llmp::LlmpConnection::on_port(shmem_provider, port)?, llmp: llmp::LlmpBroker::create_attach_to_tcp(shmem_provider, port)?,
#[cfg(feature = "llmp_compression")] #[cfg(feature = "llmp_compression")]
compressor: GzipCompressor::new(COMPRESS_THRESHOLD), compressor: GzipCompressor::new(COMPRESS_THRESHOLD),
phantom: PhantomData, phantom: PhantomData,
}) })
} }
/// If a client respawns, it may reuse the existing connection, previously stored by [`LlmpClient::to_env()`].
#[cfg(feature = "std")]
pub fn existing_client_from_env(shmem_provider: SP, env_name: &str) -> Result<Self, Error> {
Ok(Self {
stats: None,
llmp: llmp::LlmpConnection::IsClient {
client: LlmpClient::on_existing_from_env(shmem_provider, env_name)?,
},
#[cfg(feature = "llmp_compression")]
compressor: GzipCompressor::new(COMPRESS_THRESHOLD),
// Inserting a nop-stats element here so rust won't complain.
// In any case, the client won't currently use it.
phantom: PhantomData,
})
}
/// Describe the client event mgr's llmp parts in a restorable fashion
pub fn describe(&self) -> Result<LlmpClientDescription, Error> {
self.llmp.describe()
}
/// Create an existing client from description
pub fn existing_client_from_description(
shmem_provider: SP,
description: &LlmpClientDescription,
) -> Result<Self, Error> {
Ok(Self {
stats: None,
llmp: llmp::LlmpConnection::existing_client_from_description(
shmem_provider,
description,
)?,
#[cfg(feature = "llmp_compression")]
compressor: GzipCompressor::new(COMPRESS_THRESHOLD),
// Inserting a nop-stats element here so rust won't complain.
// In any case, the client won't currently use it.
phantom: PhantomData,
})
}
/// Write the config for a client [`EventManager`] to env vars, a new client can reattach using [`LlmpEventManager::existing_client_from_env()`].
#[cfg(feature = "std")]
pub fn to_env(&self, env_name: &str) {
match &self.llmp {
llmp::LlmpConnection::IsBroker { broker: _ } => {
todo!("There is probably no use storing the broker to env. Client only for now")
}
llmp::LlmpConnection::IsClient { client } => client.to_env(env_name).unwrap(),
}
}
/// Returns if we are the broker
pub fn is_broker(&self) -> bool {
matches!(self.llmp, llmp::LlmpConnection::IsBroker { broker: _ })
}
#[cfg(feature = "std")] #[cfg(feature = "std")]
pub fn connect_b2b<A>(&mut self, addr: A) -> Result<(), Error> pub fn connect_b2b<A>(&mut self, addr: A) -> Result<(), Error>
where where
A: ToSocketAddrs, A: ToSocketAddrs,
{ {
match &mut self.llmp { self.llmp.connect_b2b(addr)
llmp::LlmpConnection::IsBroker { broker } => broker.connect_b2b(addr),
llmp::LlmpConnection::IsClient { client: _ } => Err(Error::IllegalState(
"Called broker loop in the client".into(),
)),
}
} }
/// Run forever in the broker /// Run forever in the broker
pub fn broker_loop(&mut self) -> Result<(), Error> { pub fn broker_loop(&mut self) -> Result<(), Error> {
match &mut self.llmp { let stats = &mut self.stats;
llmp::LlmpConnection::IsBroker { broker } => {
let stats = self.stats.as_mut().unwrap();
#[cfg(feature = "llmp_compression")] #[cfg(feature = "llmp_compression")]
let compressor = &self.compressor; let compressor = &self.compressor;
broker.loop_forever( self.llmp.loop_forever(
&mut |sender_id: u32, tag: Tag, _flags: Flags, msg: &[u8]| { &mut |sender_id: u32, tag: Tag, _flags: Flags, msg: &[u8]| {
if tag == LLMP_TAG_EVENT_TO_BOTH { if tag == LLMP_TAG_EVENT_TO_BOTH {
#[cfg(not(feature = "llmp_compression"))] #[cfg(not(feature = "llmp_compression"))]
@ -201,8 +130,7 @@ where
#[cfg(feature = "llmp_compression")] #[cfg(feature = "llmp_compression")]
let compressed; let compressed;
#[cfg(feature = "llmp_compression")] #[cfg(feature = "llmp_compression")]
let event_bytes = let event_bytes = if _flags & LLMP_FLAG_COMPRESSED == LLMP_FLAG_COMPRESSED {
if _flags & LLMP_FLAG_COMPRESSED == LLMP_FLAG_COMPRESSED {
compressed = compressor.decompress(msg)?; compressed = compressor.decompress(msg)?;
&compressed &compressed
} else { } else {
@ -210,9 +138,7 @@ where
}; };
let event: Event<I> = postcard::from_bytes(event_bytes)?; let event: Event<I> = postcard::from_bytes(event_bytes)?;
match Self::handle_in_broker(stats, sender_id, &event)? { match Self::handle_in_broker(stats, sender_id, &event)? {
BrokerEventResult::Forward => { BrokerEventResult::Forward => Ok(llmp::LlmpMsgHookResult::ForwardToClients),
Ok(llmp::LlmpMsgHookResult::ForwardToClients)
}
BrokerEventResult::Handled => Ok(llmp::LlmpMsgHookResult::Handled), BrokerEventResult::Handled => Ok(llmp::LlmpMsgHookResult::Handled),
} }
} else { } else {
@ -224,11 +150,6 @@ where
Ok(()) Ok(())
} }
_ => Err(Error::IllegalState(
"Called broker loop in the client".into(),
)),
}
}
/// Handle arriving events in the broker /// Handle arriving events in the broker
#[allow(clippy::unnecessary_wraps)] #[allow(clippy::unnecessary_wraps)]
@ -316,6 +237,93 @@ where
} //_ => Ok(BrokerEventResult::Forward), } //_ => Ok(BrokerEventResult::Forward),
} }
} }
}
/// An [`EventManager`] that forwards all events to other attached fuzzers on shared maps or via tcp,
/// using low-level message passing, [`crate::bolts::llmp`].
#[derive(Debug)]
pub struct LlmpEventManager<I, OT, S, SP>
where
I: Input,
OT: ObserversTuple,
SP: ShMemProvider + 'static,
//CE: CustomEvent<I>,
{
llmp: llmp::LlmpClient<SP>,
#[cfg(feature = "llmp_compression")]
compressor: GzipCompressor,
phantom: PhantomData<(I, OT, S)>,
}
impl<I, OT, S, SP> Drop for LlmpEventManager<I, OT, S, SP>
where
I: Input,
OT: ObserversTuple,
SP: ShMemProvider + 'static,
{
/// LLMP clients will have to wait until their pages are mapped by somebody.
fn drop(&mut self) {
self.await_restart_safe()
}
}
impl<I, OT, S, SP> LlmpEventManager<I, OT, S, SP>
where
I: Input,
OT: ObserversTuple,
SP: ShMemProvider + 'static,
{
/// Create llmp on a port
/// If the port is not yet bound, it will act as broker
/// Else, it will act as client.
#[cfg(feature = "std")]
pub fn new_on_port(shmem_provider: SP, port: u16) -> Result<Self, Error> {
Ok(Self {
llmp: llmp::LlmpClient::create_attach_to_tcp(shmem_provider, port)?,
#[cfg(feature = "llmp_compression")]
compressor: GzipCompressor::new(COMPRESS_THRESHOLD),
phantom: PhantomData,
})
}
/// If a client respawns, it may reuse the existing connection, previously stored by [`LlmpClient::to_env()`].
#[cfg(feature = "std")]
pub fn existing_client_from_env(shmem_provider: SP, env_name: &str) -> Result<Self, Error> {
Ok(Self {
llmp: LlmpClient::on_existing_from_env(shmem_provider, env_name)?,
#[cfg(feature = "llmp_compression")]
compressor: GzipCompressor::new(COMPRESS_THRESHOLD),
// Inserting a nop-stats element here so rust won't complain.
// In any case, the client won't currently use it.
phantom: PhantomData,
})
}
/// Describe the client event mgr's llmp parts in a restorable fashion
pub fn describe(&self) -> Result<LlmpClientDescription, Error> {
self.llmp.describe()
}
/// Create an existing client from description
pub fn existing_client_from_description(
shmem_provider: SP,
description: &LlmpClientDescription,
) -> Result<Self, Error> {
Ok(Self {
llmp: llmp::LlmpClient::existing_client_from_description(shmem_provider, description)?,
#[cfg(feature = "llmp_compression")]
compressor: GzipCompressor::new(COMPRESS_THRESHOLD),
// Inserting a nop-stats element here so rust won't complain.
// In any case, the client won't currently use it.
phantom: PhantomData,
})
}
/// Write the config for a client [`EventManager`] to env vars, a new client can reattach using [`LlmpEventManager::existing_client_from_env()`].
#[cfg(feature = "std")]
pub fn to_env(&self, env_name: &str) {
self.llmp.to_env(env_name).unwrap()
}
// Handle arriving events in the client // Handle arriving events in the client
#[allow(clippy::unused_self)] #[allow(clippy::unused_self)]
@ -367,12 +375,11 @@ where
} }
} }
impl<I, OT, S, SP, ST> EventFirer<I, S> for LlmpEventManager<I, OT, S, SP, ST> impl<I, OT, S, SP> EventFirer<I, S> for LlmpEventManager<I, OT, S, SP>
where where
I: Input, I: Input,
OT: ObserversTuple, OT: ObserversTuple,
SP: ShMemProvider, SP: ShMemProvider,
ST: Stats,
//CE: CustomEvent<I>, //CE: CustomEvent<I>,
{ {
#[cfg(feature = "llmp_compression")] #[cfg(feature = "llmp_compression")]
@ -384,8 +391,8 @@ where
Some(comp_buf) => { Some(comp_buf) => {
self.llmp.send_buf_with_flags( self.llmp.send_buf_with_flags(
LLMP_TAG_EVENT_TO_BOTH, LLMP_TAG_EVENT_TO_BOTH,
&comp_buf,
flags | LLMP_FLAG_COMPRESSED, flags | LLMP_FLAG_COMPRESSED,
&comp_buf,
)?; )?;
} }
None => { None => {
@ -403,28 +410,24 @@ where
} }
} }
impl<I, OT, S, SP, ST> EventRestarter<S> for LlmpEventManager<I, OT, S, SP, ST> impl<I, OT, S, SP> EventRestarter<S> for LlmpEventManager<I, OT, S, SP>
where where
I: Input, I: Input,
OT: ObserversTuple, OT: ObserversTuple,
SP: ShMemProvider, SP: ShMemProvider,
ST: Stats,
//CE: CustomEvent<I>, //CE: CustomEvent<I>,
{ {
/// The llmp client needs to wait until a broker mapped all pages, before shutting down. /// The llmp client needs to wait until a broker mapped all pages, before shutting down.
/// Otherwise, the OS may already have removed the shared maps, /// Otherwise, the OS may already have removed the shared maps,
fn await_restart_safe(&mut self) { fn await_restart_safe(&mut self) {
if let llmp::LlmpConnection::IsClient { client } = &self.llmp {
// wait until we can drop the message safely. // wait until we can drop the message safely.
client.await_save_to_unmap_blocking(); self.llmp.await_save_to_unmap_blocking();
}
} }
} }
impl<E, I, OT, S, SP, ST, Z> EventProcessor<E, S, Z> for LlmpEventManager<I, OT, S, SP, ST> impl<E, I, OT, S, SP, Z> EventProcessor<E, S, Z> for LlmpEventManager<I, OT, S, SP>
where where
SP: ShMemProvider, SP: ShMemProvider,
ST: Stats,
E: Executor<Self, I, S, Z>, E: Executor<Self, I, S, Z>,
I: Input, I: Input,
OT: ObserversTuple, OT: ObserversTuple,
@ -433,9 +436,7 @@ where
fn process(&mut self, fuzzer: &mut Z, state: &mut S, executor: &mut E) -> Result<usize, Error> { fn process(&mut self, fuzzer: &mut Z, state: &mut S, executor: &mut E) -> Result<usize, Error> {
// TODO: Get around local event copy by moving handle_in_client // TODO: Get around local event copy by moving handle_in_client
let mut events = vec![]; let mut events = vec![];
match &mut self.llmp { while let Some((sender_id, tag, _flags, msg)) = self.llmp.recv_buf_with_flags()? {
llmp::LlmpConnection::IsClient { client } => {
while let Some((sender_id, tag, _flags, msg)) = client.recv_buf_with_flags()? {
if tag == _LLMP_TAG_EVENT_TO_BROKER { if tag == _LLMP_TAG_EVENT_TO_BROKER {
panic!("EVENT_TO_BROKER parcel should not have arrived in the client!"); panic!("EVENT_TO_BROKER parcel should not have arrived in the client!");
} }
@ -453,12 +454,6 @@ where
let event: Event<I> = postcard::from_bytes(event_bytes)?; let event: Event<I> = postcard::from_bytes(event_bytes)?;
events.push((sender_id, event)); events.push((sender_id, event));
} }
}
_ => {
#[cfg(feature = "std")]
dbg!("Skipping process in broker");
}
};
let count = events.len(); let count = events.len();
events.drain(..).try_for_each(|(sender_id, event)| { events.drain(..).try_for_each(|(sender_id, event)| {
self.handle_in_client(fuzzer, executor, state, sender_id, event) self.handle_in_client(fuzzer, executor, state, sender_id, event)
@ -467,10 +462,9 @@ where
} }
} }
impl<E, I, OT, S, SP, ST, Z> EventManager<E, I, S, Z> for LlmpEventManager<I, OT, S, SP, ST> impl<E, I, OT, S, SP, Z> EventManager<E, I, S, Z> for LlmpEventManager<I, OT, S, SP>
where where
SP: ShMemProvider, SP: ShMemProvider,
ST: Stats,
E: Executor<Self, I, S, Z>, E: Executor<Self, I, S, Z>,
I: Input, I: Input,
OT: ObserversTuple, OT: ObserversTuple,
@ -481,32 +475,30 @@ where
/// Serialize the current state and corpus during an executiont to bytes. /// Serialize the current state and corpus during an executiont to bytes.
/// On top, add the current llmp event manager instance to be restored /// On top, add the current llmp event manager instance to be restored
/// This method is needed when the fuzzer run crashes and has to restart. /// This method is needed when the fuzzer run crashes and has to restart.
pub fn serialize_state_mgr<I, OT, S, SP, ST>( pub fn serialize_state_mgr<I, OT, S, SP>(
state: &S, state: &S,
mgr: &LlmpEventManager<I, OT, S, SP, ST>, mgr: &LlmpEventManager<I, OT, S, SP>,
) -> Result<Vec<u8>, Error> ) -> Result<Vec<u8>, Error>
where where
I: Input, I: Input,
OT: ObserversTuple, OT: ObserversTuple,
S: Serialize, S: Serialize,
SP: ShMemProvider, SP: ShMemProvider,
ST: Stats,
{ {
Ok(postcard::to_allocvec(&(&state, &mgr.describe()?))?) Ok(postcard::to_allocvec(&(&state, &mgr.describe()?))?)
} }
/// Deserialize the state and corpus tuple, previously serialized with `serialize_state_corpus(...)` /// Deserialize the state and corpus tuple, previously serialized with `serialize_state_corpus(...)`
#[allow(clippy::type_complexity)] #[allow(clippy::type_complexity)]
pub fn deserialize_state_mgr<I, OT, S, SP, ST>( pub fn deserialize_state_mgr<I, OT, S, SP>(
shmem_provider: SP, shmem_provider: SP,
state_corpus_serialized: &[u8], state_corpus_serialized: &[u8],
) -> Result<(S, LlmpEventManager<I, OT, S, SP, ST>), Error> ) -> Result<(S, LlmpEventManager<I, OT, S, SP>), Error>
where where
I: Input, I: Input,
OT: ObserversTuple, OT: ObserversTuple,
S: DeserializeOwned, S: DeserializeOwned,
SP: ShMemProvider, SP: ShMemProvider,
ST: Stats,
{ {
let tuple: (S, _) = postcard::from_bytes(&state_corpus_serialized)?; let tuple: (S, _) = postcard::from_bytes(&state_corpus_serialized)?;
Ok(( Ok((
@ -517,27 +509,25 @@ where
/// A manager that can restart on the fly, storing states in-between (in `on_resatrt`) /// A manager that can restart on the fly, storing states in-between (in `on_resatrt`)
#[derive(Debug)] #[derive(Debug)]
pub struct LlmpRestartingEventManager<I, OT, S, SP, ST> pub struct LlmpRestartingEventManager<I, OT, S, SP>
where where
I: Input, I: Input,
OT: ObserversTuple, OT: ObserversTuple,
SP: ShMemProvider + 'static, SP: ShMemProvider + 'static,
ST: Stats,
//CE: CustomEvent<I>, //CE: CustomEvent<I>,
{ {
/// The embedded llmp event manager /// The embedded llmp event manager
llmp_mgr: LlmpEventManager<I, OT, S, SP, ST>, llmp_mgr: LlmpEventManager<I, OT, S, SP>,
/// The sender to serialize the state for the next runner /// The sender to serialize the state for the next runner
sender: LlmpSender<SP>, sender: LlmpSender<SP>,
} }
impl<I, OT, S, SP, ST> EventFirer<I, S> for LlmpRestartingEventManager<I, OT, S, SP, ST> impl<I, OT, S, SP> EventFirer<I, S> for LlmpRestartingEventManager<I, OT, S, SP>
where where
I: Input, I: Input,
OT: ObserversTuple, OT: ObserversTuple,
S: Serialize, S: Serialize,
SP: ShMemProvider, SP: ShMemProvider,
ST: Stats,
//CE: CustomEvent<I>, //CE: CustomEvent<I>,
{ {
fn fire(&mut self, state: &mut S, event: Event<I>) -> Result<(), Error> { fn fire(&mut self, state: &mut S, event: Event<I>) -> Result<(), Error> {
@ -546,13 +536,12 @@ where
} }
} }
impl<I, OT, S, SP, ST> EventRestarter<S> for LlmpRestartingEventManager<I, OT, S, SP, ST> impl<I, OT, S, SP> EventRestarter<S> for LlmpRestartingEventManager<I, OT, S, SP>
where where
I: Input, I: Input,
OT: ObserversTuple, OT: ObserversTuple,
S: Serialize, S: Serialize,
SP: ShMemProvider, SP: ShMemProvider,
ST: Stats,
//CE: CustomEvent<I>, //CE: CustomEvent<I>,
{ {
/// The llmp client needs to wait until a broker mapped all pages, before shutting down. /// The llmp client needs to wait until a broker mapped all pages, before shutting down.
@ -572,15 +561,13 @@ where
} }
} }
impl<E, I, OT, S, SP, ST, Z> EventProcessor<E, S, Z> impl<E, I, OT, S, SP, Z> EventProcessor<E, S, Z> for LlmpRestartingEventManager<I, OT, S, SP>
for LlmpRestartingEventManager<I, OT, S, SP, ST>
where where
E: Executor<LlmpEventManager<I, OT, S, SP, ST>, I, S, Z>, E: Executor<LlmpEventManager<I, OT, S, SP>, I, S, Z>,
I: Input, I: Input,
Z: IfInteresting<I, S> + IsInteresting<I, OT, S>, Z: IfInteresting<I, S> + IsInteresting<I, OT, S>,
OT: ObserversTuple, OT: ObserversTuple,
SP: ShMemProvider + 'static, SP: ShMemProvider + 'static,
ST: Stats,
//CE: CustomEvent<I>, //CE: CustomEvent<I>,
{ {
fn process(&mut self, fuzzer: &mut Z, state: &mut S, executor: &mut E) -> Result<usize, Error> { fn process(&mut self, fuzzer: &mut Z, state: &mut S, executor: &mut E) -> Result<usize, Error> {
@ -588,16 +575,14 @@ where
} }
} }
impl<E, I, OT, S, SP, ST, Z> EventManager<E, I, S, Z> impl<E, I, OT, S, SP, Z> EventManager<E, I, S, Z> for LlmpRestartingEventManager<I, OT, S, SP>
for LlmpRestartingEventManager<I, OT, S, SP, ST>
where where
E: Executor<LlmpEventManager<I, OT, S, SP, ST>, I, S, Z>, E: Executor<LlmpEventManager<I, OT, S, SP>, I, S, Z>,
I: Input, I: Input,
S: Serialize, S: Serialize,
Z: IfInteresting<I, S> + IsInteresting<I, OT, S>, Z: IfInteresting<I, S> + IsInteresting<I, OT, S>,
OT: ObserversTuple, OT: ObserversTuple,
SP: ShMemProvider + 'static, SP: ShMemProvider + 'static,
ST: Stats,
//CE: CustomEvent<I>, //CE: CustomEvent<I>,
{ {
} }
@ -608,16 +593,15 @@ const _ENV_FUZZER_RECEIVER: &str = &"_AFL_ENV_FUZZER_RECEIVER";
/// The llmp (2 way) connection from a fuzzer to the broker (broadcasting all other fuzzer messages) /// The llmp (2 way) connection from a fuzzer to the broker (broadcasting all other fuzzer messages)
const _ENV_FUZZER_BROKER_CLIENT_INITIAL: &str = &"_AFL_ENV_FUZZER_BROKER_CLIENT"; const _ENV_FUZZER_BROKER_CLIENT_INITIAL: &str = &"_AFL_ENV_FUZZER_BROKER_CLIENT";
impl<I, OT, S, SP, ST> LlmpRestartingEventManager<I, OT, S, SP, ST> impl<I, OT, S, SP> LlmpRestartingEventManager<I, OT, S, SP>
where where
I: Input, I: Input,
OT: ObserversTuple, OT: ObserversTuple,
SP: ShMemProvider + 'static, SP: ShMemProvider + 'static,
ST: Stats,
//CE: CustomEvent<I>, //CE: CustomEvent<I>,
{ {
/// Create a new runner, the executed child doing the actual fuzzing. /// Create a new runner, the executed child doing the actual fuzzing.
pub fn new(llmp_mgr: LlmpEventManager<I, OT, S, SP, ST>, sender: LlmpSender<SP>) -> Self { pub fn new(llmp_mgr: LlmpEventManager<I, OT, S, SP>, sender: LlmpSender<SP>) -> Self {
Self { llmp_mgr, sender } Self { llmp_mgr, sender }
} }
@ -654,7 +638,7 @@ pub fn setup_restarting_mgr_std<I, OT, S, ST>(
) -> Result< ) -> Result<
( (
Option<S>, Option<S>,
LlmpRestartingEventManager<I, OT, S, StdShMemProvider, ST>, LlmpRestartingEventManager<I, OT, S, StdShMemProvider>,
), ),
Error, Error,
> >
@ -670,7 +654,7 @@ where
RestartingMgr::builder() RestartingMgr::builder()
.shmem_provider(StdShMemProvider::new()?) .shmem_provider(StdShMemProvider::new()?)
.stats(stats) .stats(Some(stats))
.broker_port(broker_port) .broker_port(broker_port)
.build() .build()
.launch() .launch()
@ -695,7 +679,8 @@ where
/// manager. /// manager.
shmem_provider: SP, shmem_provider: SP,
/// The stats to use /// The stats to use
stats: ST, #[builder(default = None)]
stats: Option<ST>,
/// The broker port to use /// The broker port to use
#[builder(default = 1337_u16)] #[builder(default = 1337_u16)]
broker_port: u16, broker_port: u16,
@ -723,13 +708,7 @@ where
/// Launch the restarting manager /// Launch the restarting manager
pub fn launch( pub fn launch(
&mut self, &mut self,
) -> Result<(Option<S>, LlmpRestartingEventManager<I, OT, S, SP, ST>), Error> { ) -> Result<(Option<S>, LlmpRestartingEventManager<I, OT, S, SP>), Error> {
let mut mgr = LlmpEventManager::<I, OT, S, SP, ST>::new_on_port(
self.shmem_provider.clone(),
self.stats.clone(),
self.broker_port,
)?;
// We start ourself as child process to actually fuzz // We start ourself as child process to actually fuzz
let (sender, mut receiver, new_shmem_provider, core_id) = if std::env::var( let (sender, mut receiver, new_shmem_provider, core_id) = if std::env::var(
_ENV_FUZZER_SENDER, _ENV_FUZZER_SENDER,
@ -737,9 +716,14 @@ where
.is_err() .is_err()
{ {
// We get here if we are on Unix, or we are a broker on Windows. // We get here if we are on Unix, or we are a broker on Windows.
let core_id = if mgr.is_broker() { let core_id = match self.kind {
match self.kind {
ManagerKind::Broker | ManagerKind::Any => { ManagerKind::Broker | ManagerKind::Any => {
let mut broker = LlmpEventBroker::<I, SP, ST>::new_on_port(
self.shmem_provider.clone(),
self.stats.take().unwrap(),
self.broker_port,
)?;
// Yep, broker. Just loop here. // Yep, broker. Just loop here.
println!( println!(
"Doing broker things. Run this tool again to start fuzzing in a client." "Doing broker things. Run this tool again to start fuzzing in a client."
@ -747,30 +731,21 @@ where
if let Some(remote_broker_addr) = self.remote_broker_addr { if let Some(remote_broker_addr) = self.remote_broker_addr {
println!("B2b: Connecting to {:?}", &remote_broker_addr); println!("B2b: Connecting to {:?}", &remote_broker_addr);
mgr.connect_b2b(remote_broker_addr)?; broker.connect_b2b(remote_broker_addr)?;
}; };
mgr.broker_loop()?; broker.broker_loop()?;
return Err(Error::ShuttingDown); return Err(Error::ShuttingDown);
} }
ManagerKind::Client { cpu_core: _ } => {
return Err(Error::IllegalState(
"Tried to start a client, but got a broker".to_string(),
));
}
}
} else {
match self.kind {
ManagerKind::Broker => {
return Err(Error::IllegalState(
"Tried to start a broker, but got a client".to_string(),
));
}
ManagerKind::Client { cpu_core } => cpu_core, ManagerKind::Client { cpu_core } => cpu_core,
ManagerKind::Any => None,
}
}; };
// We are a client
let mgr = LlmpEventManager::<I, OT, S, SP>::new_on_port(
self.shmem_provider.clone(),
self.broker_port,
)?;
if let Some(core_id) = core_id { if let Some(core_id) = core_id {
println!("Setting core affinity to {:?}", core_id); println!("Setting core affinity to {:?}", core_id);
core_affinity::set_for_current(core_id); core_affinity::set_for_current(core_id);
@ -858,17 +833,17 @@ where
None => { None => {
println!("First run. Let's set it all up"); println!("First run. Let's set it all up");
// Mgr to send and receive msgs from/to all other fuzzer instances // Mgr to send and receive msgs from/to all other fuzzer instances
let client_mgr = LlmpEventManager::<I, OT, S, SP, ST>::existing_client_from_env( let mgr = LlmpEventManager::<I, OT, S, SP>::existing_client_from_env(
new_shmem_provider, new_shmem_provider,
_ENV_FUZZER_BROKER_CLIENT_INITIAL, _ENV_FUZZER_BROKER_CLIENT_INITIAL,
)?; )?;
(None, LlmpRestartingEventManager::new(client_mgr, sender)) (None, LlmpRestartingEventManager::new(mgr, sender))
} }
// Restoring from a previous run, deserialize state and corpus. // Restoring from a previous run, deserialize state and corpus.
Some((_sender, _tag, msg)) => { Some((_sender, _tag, msg)) => {
println!("Subsequent run. Let's load all data from shmem (received {} bytes from previous instance)", msg.len()); println!("Subsequent run. Let's load all data from shmem (received {} bytes from previous instance)", msg.len());
let (state, mgr): (S, LlmpEventManager<I, OT, S, SP, ST>) = let (state, mgr): (S, LlmpEventManager<I, OT, S, SP>) =
deserialize_state_mgr(new_shmem_provider, &msg)?; deserialize_state_mgr(new_shmem_provider, &msg)?;
(Some(state), LlmpRestartingEventManager::new(mgr, sender)) (Some(state), LlmpRestartingEventManager::new(mgr, sender))