Add HasAdaptiveSerializer trait. (#2040)

* fix

* revert test

* add

* a

* check
This commit is contained in:
Dongjia "toka" Zhang 2024-04-11 15:36:08 +02:00 committed by GitHub
parent 48463d079b
commit 287d1ac7c7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 109 additions and 75 deletions

View File

@ -22,15 +22,15 @@ use libafl_bolts::{
};
use serde::{Deserialize, Serialize};
use super::{CustomBufEventResult, HasCustomBufHandlers, ProgressReporter};
#[cfg(feature = "llmp_compression")]
use crate::events::llmp::COMPRESS_THRESHOLD;
#[cfg(feature = "scalability_introspection")]
use crate::state::HasScalabilityMonitor;
use crate::{
events::{
llmp::EventStatsCollector, BrokerEventResult, Event, EventConfig, EventFirer, EventManager,
EventManagerId, EventProcessor, EventRestarter, HasEventManagerId, LogSeverity,
AdaptiveSerializer, BrokerEventResult, CustomBufEventResult, Event, EventConfig,
EventFirer, EventManager, EventManagerId, EventProcessor, EventRestarter,
HasCustomBufHandlers, HasEventManagerId, LogSeverity, ProgressReporter,
},
executors::{Executor, HasObservers},
fuzzer::{EvaluatorObservers, ExecutionProcessor},
@ -233,9 +233,9 @@ where
}
#[cfg(feature = "adaptive_serialization")]
impl<EM, SP> EventStatsCollector for CentralizedEventManager<EM, SP>
impl<EM, SP> AdaptiveSerializer for CentralizedEventManager<EM, SP>
where
EM: EventStatsCollector + UsesState,
EM: AdaptiveSerializer + UsesState,
SP: ShMemProvider + 'static,
{
fn serialization_time(&self) -> Duration {
@ -266,16 +266,16 @@ where
}
#[cfg(not(feature = "adaptive_serialization"))]
impl<EM, SP> EventStatsCollector for CentralizedEventManager<EM, SP>
impl<EM, SP> AdaptiveSerializer for CentralizedEventManager<EM, SP>
where
EM: EventStatsCollector + UsesState,
EM: AdaptiveSerializer + UsesState,
SP: ShMemProvider + 'static,
{
}
impl<EM, SP> EventFirer for CentralizedEventManager<EM, SP>
where
EM: EventStatsCollector + EventFirer + HasEventManagerId,
EM: AdaptiveSerializer + EventFirer + HasEventManagerId + AdaptiveSerializer,
SP: ShMemProvider + 'static,
{
fn fire(
@ -330,11 +330,26 @@ where
self.inner.log(state, severity_level, message)
}
#[cfg(not(feature = "adaptive_serialization"))]
fn serialize_observers<OT>(&mut self, observers: &OT) -> Result<Option<Vec<u8>>, Error>
where
OT: ObserversTuple<Self::State> + Serialize,
{
self.inner.serialize_observers(observers)
Ok(Some(postcard::to_allocvec(observers)?))
}
#[cfg(feature = "adaptive_serialization")]
fn serialize_observers<OT>(&mut self, observers: &OT) -> Result<Option<Vec<u8>>, Error>
where
OT: ObserversTuple<Self::State> + Serialize,
{
const SERIALIZE_TIME_FACTOR: u32 = 4; // twice as much as the normal llmp em's value cuz it does this job twice.
const SERIALIZE_PERCENTAGE_THRESHOLD: usize = 80;
self.inner.serialize_observers_adaptive(
observers,
SERIALIZE_TIME_FACTOR,
SERIALIZE_PERCENTAGE_THRESHOLD,
)
}
fn configuration(&self) -> EventConfig {
@ -368,7 +383,7 @@ where
impl<E, EM, SP, Z> EventProcessor<E, Z> for CentralizedEventManager<EM, SP>
where
EM: EventStatsCollector + EventProcessor<E, Z> + EventFirer + HasEventManagerId,
EM: AdaptiveSerializer + EventProcessor<E, Z> + EventFirer + HasEventManagerId,
E: HasObservers<State = Self::State> + Executor<Self, Z>,
for<'a> E::Observers: Deserialize<'a>,
Z: EvaluatorObservers<E::Observers, State = Self::State>
@ -394,7 +409,7 @@ where
impl<E, EM, SP, Z> EventManager<E, Z> for CentralizedEventManager<EM, SP>
where
EM: EventStatsCollector + EventManager<E, Z>,
EM: AdaptiveSerializer + EventManager<E, Z>,
EM::State: HasExecutions + HasMetadata + HasLastReportTime,
E: HasObservers<State = Self::State> + Executor<Self, Z>,
for<'a> E::Observers: Deserialize<'a>,
@ -422,7 +437,7 @@ where
impl<EM, SP> ProgressReporter for CentralizedEventManager<EM, SP>
where
EM: EventStatsCollector + ProgressReporter + HasEventManagerId,
EM: AdaptiveSerializer + ProgressReporter + HasEventManagerId,
EM::State: HasMetadata + HasExecutions + HasLastReportTime,
SP: ShMemProvider + 'static,
{
@ -524,7 +539,7 @@ where
impl<EM, SP> CentralizedEventManager<EM, SP>
where
EM: UsesState + EventFirer + EventStatsCollector + HasEventManagerId,
EM: UsesState + EventFirer + AdaptiveSerializer + HasEventManagerId,
SP: ShMemProvider + 'static,
{
#[cfg(feature = "llmp_compression")]

View File

@ -45,6 +45,8 @@ use serde::{Deserialize, Serialize};
use typed_builder::TypedBuilder;
use super::{hooks::EventManagerHooksTuple, CustomBufEventResult, CustomBufHandlerFn};
#[cfg(any(feature = "std", feature = "adaptive_serialization"))]
use crate::events::AdaptiveSerializer;
#[cfg(all(unix, feature = "std"))]
use crate::events::EVENTMGR_SIGHANDLER_STATE;
use crate::{
@ -333,32 +335,6 @@ where
}
}
/// Collected stats to decide if observers must be serialized or not
#[cfg(feature = "adaptive_serialization")]
pub trait EventStatsCollector {
/// Expose the collected observers serialization time
fn serialization_time(&self) -> Duration;
/// Expose the collected observers deserialization time
fn deserialization_time(&self) -> Duration;
/// How many times observers were serialized
fn serializations_cnt(&self) -> usize;
/// How many times shoukd have been serialized an observer
fn should_serialize_cnt(&self) -> usize;
/// Expose the collected observers serialization time (mut)
fn serialization_time_mut(&mut self) -> &mut Duration;
/// Expose the collected observers deserialization time (mut)
fn deserialization_time_mut(&mut self) -> &mut Duration;
/// How many times observers were serialized (mut)
fn serializations_cnt_mut(&mut self) -> &mut usize;
/// How many times shoukd have been serialized an observer (mut)
fn should_serialize_cnt_mut(&mut self) -> &mut usize;
}
/// Collected stats to decide if observers must be serialized or not
#[cfg(not(feature = "adaptive_serialization"))]
pub trait EventStatsCollector {}
/// An [`EventManager`] that forwards all events to other attached fuzzers on shared maps or via tcp,
/// using low-level message passing, [`libafl_bolts::llmp`].
pub struct LlmpEventManager<EMH, S, SP>
@ -389,7 +365,7 @@ where
}
#[cfg(feature = "adaptive_serialization")]
impl<EMH, S, SP> EventStatsCollector for LlmpEventManager<EMH, S, SP>
impl<EMH, S, SP> AdaptiveSerializer for LlmpEventManager<EMH, S, SP>
where
SP: ShMemProvider + 'static,
S: State,
@ -785,39 +761,12 @@ where
OT: ObserversTuple<Self::State> + Serialize,
{
const SERIALIZE_TIME_FACTOR: u32 = 2;
const SERIALIZE_PERCENTAGE_TRESHOLD: usize = 80;
let exec_time = observers
.match_name::<crate::observers::TimeObserver>("time")
.map(|o| o.last_runtime().unwrap_or(Duration::ZERO))
.unwrap();
let mut must_ser = (self.serialization_time() + self.deserialization_time())
* SERIALIZE_TIME_FACTOR
< exec_time;
if must_ser {
*self.should_serialize_cnt_mut() += 1;
}
if self.serializations_cnt() > 32 {
must_ser = (self.should_serialize_cnt() * 100 / self.serializations_cnt())
> SERIALIZE_PERCENTAGE_TRESHOLD;
}
if self.serialization_time() == Duration::ZERO
|| must_ser
|| self.serializations_cnt().trailing_zeros() >= 8
{
let start = current_time();
let ser = postcard::to_allocvec(observers)?;
*self.serialization_time_mut() = current_time() - start;
*self.serializations_cnt_mut() += 1;
Ok(Some(ser))
} else {
*self.serializations_cnt_mut() += 1;
Ok(None)
}
const SERIALIZE_PERCENTAGE_THRESHOLD: usize = 80;
self.serialize_observers_adaptive(
observers,
SERIALIZE_TIME_FACTOR,
SERIALIZE_PERCENTAGE_THRESHOLD,
)
}
fn configuration(&self) -> EventConfig {
@ -979,7 +928,7 @@ where
}
#[cfg(all(feature = "std", feature = "adaptive_serialization"))]
impl<EMH, S, SP> EventStatsCollector for LlmpRestartingEventManager<EMH, S, SP>
impl<EMH, S, SP> AdaptiveSerializer for LlmpRestartingEventManager<EMH, S, SP>
where
SP: ShMemProvider + 'static,
S: State,
@ -1012,7 +961,7 @@ where
}
#[cfg(all(feature = "std", not(feature = "adaptive_serialization")))]
impl<EMH, S, SP> EventStatsCollector for LlmpRestartingEventManager<EMH, S, SP>
impl<EMH, S, SP> AdaptiveSerializer for LlmpRestartingEventManager<EMH, S, SP>
where
SP: ShMemProvider + 'static,
S: State,

View File

@ -852,6 +852,76 @@ where
}
}
/// Collected stats to decide if observers must be serialized or not
#[cfg(not(feature = "adaptive_serialization"))]
pub trait AdaptiveSerializer {}
/// Collected stats to decide if observers must be serialized or not
#[cfg(feature = "adaptive_serialization")]
pub trait AdaptiveSerializer {
/// Expose the collected observers serialization time
fn serialization_time(&self) -> Duration;
/// Expose the collected observers deserialization time
fn deserialization_time(&self) -> Duration;
/// How many times observers were serialized
fn serializations_cnt(&self) -> usize;
/// How many times shoukd have been serialized an observer
fn should_serialize_cnt(&self) -> usize;
/// Expose the collected observers serialization time (mut)
fn serialization_time_mut(&mut self) -> &mut Duration;
/// Expose the collected observers deserialization time (mut)
fn deserialization_time_mut(&mut self) -> &mut Duration;
/// How many times observers were serialized (mut)
fn serializations_cnt_mut(&mut self) -> &mut usize;
/// How many times shoukd have been serialized an observer (mut)
fn should_serialize_cnt_mut(&mut self) -> &mut usize;
/// Serialize the observer using the `time_factor` and `percentage_threshold`.
/// These parameters are unique to each of the different types of `EventManager`
fn serialize_observers_adaptive<S, OT>(
&mut self,
observers: &OT,
time_factor: u32,
percentage_threshold: usize,
) -> Result<Option<Vec<u8>>, Error>
where
OT: ObserversTuple<S> + Serialize,
S: UsesInput,
{
let exec_time = observers
.match_name::<crate::observers::TimeObserver>("time")
.map(|o| o.last_runtime().unwrap_or(Duration::ZERO))
.unwrap();
let mut must_ser =
(self.serialization_time() + self.deserialization_time()) * time_factor < exec_time;
if must_ser {
*self.should_serialize_cnt_mut() += 1;
}
if self.serializations_cnt() > 32 {
must_ser = (self.should_serialize_cnt() * 100 / self.serializations_cnt())
> percentage_threshold;
}
if self.serialization_time() == Duration::ZERO
|| must_ser
|| self.serializations_cnt().trailing_zeros() >= 8
{
let start = current_time();
let ser = postcard::to_allocvec(observers)?;
*self.serialization_time_mut() = current_time() - start;
*self.serializations_cnt_mut() += 1;
Ok(Some(ser))
} else {
*self.serializations_cnt_mut() += 1;
Ok(None)
}
}
}
#[cfg(test)]
mod tests {