Closure builder for inner managers of centralized. (#2279)

* generic inner manager for centralized, with builder closures.

* moved options inside the function

* removed useless bound

* unused import

* remove useless clone bound.

* make clearer what is secondary node

* same inner manager for main and secondary

* fix example
This commit is contained in:
Romain Malmain 2024-06-05 19:12:21 +02:00 committed by GitHub
parent 7dd345d18c
commit 399fbccea2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 125 additions and 58 deletions

View File

@ -135,7 +135,7 @@ pub extern "C" fn libafl_main() {
let monitor = MultiMonitor::new(|s| println!("{s}")); let monitor = MultiMonitor::new(|s| println!("{s}"));
let mut run_client = |state: Option<_>, let mut secondary_run_client = |state: Option<_>,
mut mgr: CentralizedEventManager<_, _>, mut mgr: CentralizedEventManager<_, _>,
_core_id: CoreId| { _core_id: CoreId| {
// Create an observation channel using the coverage map // Create an observation channel using the coverage map
@ -252,13 +252,13 @@ pub extern "C" fn libafl_main() {
Ok(()) Ok(())
}; };
let mut main_run_client = run_client.clone(); // clone it just for borrow checker let mut main_run_client = secondary_run_client.clone(); // clone it just for borrow checker
match CentralizedLauncher::builder() match CentralizedLauncher::builder()
.shmem_provider(shmem_provider) .shmem_provider(shmem_provider)
.configuration(EventConfig::from_name("default")) .configuration(EventConfig::from_name("default"))
.monitor(monitor) .monitor(monitor)
.run_client(&mut run_client) .secondary_run_client(&mut secondary_run_client)
.main_run_client(&mut main_run_client) .main_run_client(&mut main_run_client)
.cores(&cores) .cores(&cores)
.broker_port(broker_port) .broker_port(broker_port)

View File

@ -214,7 +214,7 @@ where
} }
} }
/// A wrapper manager to implement a main-secondary architecture witgh another broker /// A wrapper manager to implement a main-secondary architecture with another broker
#[derive(Debug)] #[derive(Debug)]
pub struct CentralizedEventManager<EM, SP> pub struct CentralizedEventManager<EM, SP>
where where
@ -518,7 +518,8 @@ where
if !self.is_main { if !self.is_main {
// secondary node // secondary node
let mut is_tc = false; let mut is_tc = false;
let is_nt_or_heartbeat = match &mut event { // Forward to main only if new tc or heartbeat
let should_be_forwarded = match &mut event {
Event::NewTestcase { Event::NewTestcase {
input: _, input: _,
client_config: _, client_config: _,
@ -541,7 +542,7 @@ where
_ => false, _ => false,
}; };
if is_nt_or_heartbeat { if should_be_forwarded {
self.forward_to_main(&event)?; self.forward_to_main(&event)?;
if is_tc { if is_tc {
// early return here because we only send it to centralized not main broker. // early return here because we only send it to centralized not main broker.
@ -549,7 +550,8 @@ where
} }
} }
} }
// now inner llmp manager will process it
// now inner llmp manager will process it if it's not a new testcase from a secondary node.
self.inner.fire(state, event) self.inner.fire(state, event)
} }

View File

@ -48,10 +48,13 @@ use libafl_bolts::{
use typed_builder::TypedBuilder; use typed_builder::TypedBuilder;
use super::hooks::EventManagerHooksTuple; use super::hooks::EventManagerHooksTuple;
#[cfg(all(unix, feature = "std", feature = "fork"))]
use crate::events::centralized::{CentralizedEventManager, CentralizedLlmpEventBroker};
#[cfg(feature = "adaptive_serialization")] #[cfg(feature = "adaptive_serialization")]
use crate::observers::TimeObserver; use crate::observers::TimeObserver;
#[cfg(all(unix, feature = "std", feature = "fork"))]
use crate::{
events::centralized::{CentralizedEventManager, CentralizedLlmpEventBroker},
state::UsesState,
};
#[cfg(feature = "std")] #[cfg(feature = "std")]
use crate::{ use crate::{
events::{ events::{
@ -464,7 +467,7 @@ where
#[cfg(all(unix, feature = "std", feature = "fork"))] #[cfg(all(unix, feature = "std", feature = "fork"))]
#[derive(TypedBuilder)] #[derive(TypedBuilder)]
#[allow(clippy::type_complexity, missing_debug_implementations)] #[allow(clippy::type_complexity, missing_debug_implementations)]
pub struct CentralizedLauncher<'a, CF, MF, MT, S, SP> { pub struct CentralizedLauncher<'a, CF, IM, MF, MT, S, SP> {
/// The `ShmemProvider` to use /// The `ShmemProvider` to use
shmem_provider: SP, shmem_provider: SP,
/// The monitor instance to use /// The monitor instance to use
@ -474,10 +477,10 @@ pub struct CentralizedLauncher<'a, CF, MF, MT, S, SP> {
/// Consider this testcase as interesting always if true /// Consider this testcase as interesting always if true
#[builder(default = false)] #[builder(default = false)]
always_interesting: bool, always_interesting: bool,
/// The 'main' function to run for each client forked. This probably shouldn't return /// The 'main' function to run for each secondary client forked. This probably shouldn't return
#[builder(default, setter(strip_option))] #[builder(default, setter(strip_option))]
run_client: Option<CF>, secondary_run_client: Option<CF>,
/// The 'main' function to run for the main evaluator noed /// The 'main' function to run for the main evaluator node.
#[builder(default, setter(strip_option))] #[builder(default, setter(strip_option))]
main_run_client: Option<MF>, main_run_client: Option<MF>,
/// The broker port to use (or to attach to, in case [`Self::spawn_broker`] is `false`) /// The broker port to use (or to attach to, in case [`Self::spawn_broker`] is `false`)
@ -524,11 +527,11 @@ pub struct CentralizedLauncher<'a, CF, MF, MT, S, SP> {
#[builder(default = LlmpShouldSaveState::OnRestart)] #[builder(default = LlmpShouldSaveState::OnRestart)]
serialize_state: LlmpShouldSaveState, serialize_state: LlmpShouldSaveState,
#[builder(setter(skip), default = PhantomData)] #[builder(setter(skip), default = PhantomData)]
phantom_data: PhantomData<(&'a S, &'a SP)>, phantom_data: PhantomData<(IM, &'a S, &'a SP)>,
} }
#[cfg(all(unix, feature = "std", feature = "fork"))] #[cfg(all(unix, feature = "std", feature = "fork"))]
impl<CF, MF, MT, S, SP> Debug for CentralizedLauncher<'_, CF, MF, MT, S, SP> { impl<CF, IM, MF, MT, S, SP> Debug for CentralizedLauncher<'_, CF, IM, MF, MT, S, SP> {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("Launcher") f.debug_struct("Launcher")
.field("configuration", &self.configuration) .field("configuration", &self.configuration)
@ -542,34 +545,89 @@ impl<CF, MF, MT, S, SP> Debug for CentralizedLauncher<'_, CF, MF, MT, S, SP> {
} }
} }
/// The standard inner manager of centralized
pub type StdCentralizedInnerMgr<S, SP> = LlmpRestartingEventManager<(), S, SP>;
#[cfg(all(unix, feature = "std", feature = "fork"))] #[cfg(all(unix, feature = "std", feature = "fork"))]
impl<'a, CF, MF, MT, S, SP> CentralizedLauncher<'a, CF, MF, MT, S, SP> impl<'a, CF, MF, MT, S, SP>
CentralizedLauncher<'a, CF, StdCentralizedInnerMgr<S, SP>, MF, MT, S, SP>
where where
CF: FnOnce( CF: FnOnce(
Option<S>, Option<S>,
CentralizedEventManager<LlmpRestartingEventManager<(), S, SP>, SP>, CentralizedEventManager<StdCentralizedInnerMgr<S, SP>, SP>,
CoreId, CoreId,
) -> Result<(), Error>, ) -> Result<(), Error>,
MF: FnOnce( MF: FnOnce(
Option<S>, Option<S>,
CentralizedEventManager<LlmpRestartingEventManager<(), S, SP>, SP>, // No hooks for centralized EM CentralizedEventManager<StdCentralizedInnerMgr<S, SP>, SP>,
CoreId, CoreId,
) -> Result<(), Error>, ) -> Result<(), Error>,
MT: Monitor + Clone, MT: Monitor + Clone,
S: State + HasExecutions, S: State + HasExecutions,
SP: ShMemProvider + 'static, SP: ShMemProvider + 'static,
{ {
/// Launch a standard Centralized-based fuzzer
pub fn launch(&mut self) -> Result<(), Error> {
let restarting_mgr_builder = |centralized_launcher: &Self, core_to_bind: CoreId| {
// Fuzzer client. keeps retrying the connection to broker till the broker starts
let builder = RestartingMgr::<(), MT, S, SP>::builder()
.always_interesting(centralized_launcher.always_interesting)
.shmem_provider(centralized_launcher.shmem_provider.clone())
.broker_port(centralized_launcher.broker_port)
.kind(ManagerKind::Client {
cpu_core: Some(core_to_bind),
})
.configuration(centralized_launcher.configuration)
.serialize_state(centralized_launcher.serialize_state)
.hooks(tuple_list!());
#[cfg(feature = "adaptive_serialization")]
let builder = builder.time_ref(centralized_launcher.time_obs.handle());
builder.build().launch()
};
self.launch_generic(restarting_mgr_builder, restarting_mgr_builder)
}
}
#[cfg(all(unix, feature = "std", feature = "fork"))]
impl<'a, CF, IM, MF, MT, S, SP> CentralizedLauncher<'a, CF, IM, MF, MT, S, SP>
where
CF: FnOnce(Option<S>, CentralizedEventManager<IM, SP>, CoreId) -> Result<(), Error>,
IM: UsesState,
MF: FnOnce(
Option<S>,
CentralizedEventManager<IM, SP>, // No hooks for centralized EM
CoreId,
) -> Result<(), Error>,
MT: Monitor + Clone,
S: State + HasExecutions,
SP: ShMemProvider + 'static,
{
/// Launch a Centralized-based fuzzer.
/// - `main_inner_mgr_builder` will be called to build the inner manager of the main node.
/// - `secondary_inner_mgr_builder` will be called to build the inner manager of the secondary nodes.
#[allow(clippy::similar_names)] #[allow(clippy::similar_names)]
#[allow(clippy::too_many_lines)] #[allow(clippy::too_many_lines)]
/// launch the broker and the client and fuzz pub fn launch_generic<IMF>(
pub fn launch(&mut self) -> Result<(), Error> { &mut self,
main_inner_mgr_builder: IMF,
secondary_inner_mgr_builder: IMF,
) -> Result<(), Error>
where
IMF: FnOnce(&Self, CoreId) -> Result<(Option<S>, IM), Error>,
{
let mut main_inner_mgr_builder = Some(main_inner_mgr_builder);
let mut secondary_inner_mgr_builder = Some(secondary_inner_mgr_builder);
if self.cores.ids.is_empty() { if self.cores.ids.is_empty() {
return Err(Error::illegal_argument( return Err(Error::illegal_argument(
"No cores to spawn on given, cannot launch anything.", "No cores to spawn on given, cannot launch anything.",
)); ));
} }
if self.run_client.is_none() { if self.secondary_run_client.is_none() {
return Err(Error::illegal_argument( return Err(Error::illegal_argument(
"No client callback provided".to_string(), "No client callback provided".to_string(),
)); ));
@ -646,26 +704,13 @@ where
} }
} }
// Fuzzer client. keeps retrying the connection to broker till the broker starts if index == 1 {
let builder = RestartingMgr::<(), MT, S, SP>::builder() // Main client
.always_interesting(self.always_interesting) let (state, mgr) =
.shmem_provider(self.shmem_provider.clone()) main_inner_mgr_builder.take().unwrap()(self, *bind_to)?;
.broker_port(self.broker_port)
.kind(ManagerKind::Client {
cpu_core: Some(*bind_to),
})
.configuration(self.configuration)
.serialize_state(self.serialize_state)
.hooks(tuple_list!());
#[cfg(feature = "adaptive_serialization")]
let builder = builder.time_ref(self.time_obs.handle());
let (state, mgr) = builder.build().launch()?;
let mut centralized_builder = CentralizedEventManager::builder(); let mut centralized_builder = CentralizedEventManager::builder();
if index == 1 {
centralized_builder = centralized_builder.is_main(true); centralized_builder = centralized_builder.is_main(true);
}
#[cfg(not(feature = "adaptive_serialization"))] #[cfg(not(feature = "adaptive_serialization"))]
let c_mgr = centralized_builder.build_on_port( let c_mgr = centralized_builder.build_on_port(
@ -681,11 +726,31 @@ where
self.time_obs, self.time_obs,
)?; )?;
if index == 1 { self.main_run_client.take().unwrap()(state, c_mgr, *bind_to)
return (self.main_run_client.take().unwrap())(state, c_mgr, *bind_to); } else {
} // Secondary clients
return (self.run_client.take().unwrap())(state, c_mgr, *bind_to); let (state, mgr) =
secondary_inner_mgr_builder.take().unwrap()(self, *bind_to)?;
let centralized_builder = CentralizedEventManager::builder();
#[cfg(not(feature = "adaptive_serialization"))]
let c_mgr = centralized_builder.build_on_port(
mgr,
self.shmem_provider.clone(),
self.centralized_broker_port,
)?;
#[cfg(feature = "adaptive_serialization")]
let c_mgr = centralized_builder.build_on_port(
mgr,
self.shmem_provider.clone(),
self.centralized_broker_port,
self.time_obs,
)?;
self.secondary_run_client.take().unwrap()(state, c_mgr, *bind_to)
} }
}?,
}; };
} }
} }