Algorithm to choose to serialize the observers or not (#1227)
the algorithm is balancing between observers serialization and re-execution
This commit is contained in:
parent
cbf0952ec7
commit
65368408dd
@ -34,6 +34,7 @@ gzip = ["miniz_oxide"] # Enables gzip compression in certain parts of the lib
|
||||
regex = ["std", "dep:regex"] # enables the NaiveTokenizer and StacktraceObserver
|
||||
casr = ["libcasr", "std", "regex"] # enables deduplication based on libcasr for StacktraceObserver
|
||||
tcp_manager = ["tokio", "std"] # A simple EventManager proxying everything via TCP
|
||||
adaptive_serialization = []
|
||||
|
||||
# features hiding dependencies licensed under GPL
|
||||
gpl = []
|
||||
|
@ -8,7 +8,7 @@ pub mod build_id;
|
||||
feature = "std"
|
||||
))]
|
||||
pub mod cli;
|
||||
#[cfg(feature = "llmp_compression")]
|
||||
#[cfg(feature = "gzip")]
|
||||
pub mod compress;
|
||||
#[cfg(feature = "std")]
|
||||
pub mod core_affinity;
|
||||
|
@ -1,10 +1,14 @@
|
||||
//! A wrapper manager to implement a main-secondary architecture with point-to-point channels
|
||||
|
||||
use alloc::{boxed::Box, string::String, vec::Vec};
|
||||
#[cfg(feature = "adaptive_serialization")]
|
||||
use core::time::Duration;
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use super::{CustomBufEventResult, HasCustomBufHandlers, ProgressReporter};
|
||||
#[cfg(feature = "adaptive_serialization")]
|
||||
use crate::bolts::current_time;
|
||||
use crate::{
|
||||
bolts::{
|
||||
llmp::{LlmpReceiver, LlmpSender, Tag},
|
||||
@ -12,8 +16,8 @@ use crate::{
|
||||
ClientId,
|
||||
},
|
||||
events::{
|
||||
Event, EventConfig, EventFirer, EventManager, EventManagerId, EventProcessor,
|
||||
EventRestarter, HasEventManagerId, LogSeverity,
|
||||
llmp::EventStatsCollector, Event, EventConfig, EventFirer, EventManager, EventManagerId,
|
||||
EventProcessor, EventRestarter, HasEventManagerId, LogSeverity,
|
||||
},
|
||||
executors::{Executor, HasObservers},
|
||||
fuzzer::{EvaluatorObservers, ExecutionProcessor},
|
||||
@ -45,9 +49,50 @@ where
|
||||
type State = EM::State;
|
||||
}
|
||||
|
||||
#[cfg(feature = "adaptive_serialization")]
|
||||
impl<EM, SP> EventStatsCollector for CentralizedEventManager<EM, SP>
|
||||
where
|
||||
EM: EventStatsCollector + UsesState,
|
||||
SP: ShMemProvider,
|
||||
{
|
||||
fn serialization_time(&self) -> Duration {
|
||||
self.inner.serialization_time()
|
||||
}
|
||||
fn deserialization_time(&self) -> Duration {
|
||||
self.inner.deserialization_time()
|
||||
}
|
||||
fn serializations_cnt(&self) -> usize {
|
||||
self.inner.serializations_cnt()
|
||||
}
|
||||
fn should_serialize_cnt(&self) -> usize {
|
||||
self.inner.should_serialize_cnt()
|
||||
}
|
||||
|
||||
fn serialization_time_mut(&mut self) -> &mut Duration {
|
||||
self.inner.serialization_time_mut()
|
||||
}
|
||||
fn deserialization_time_mut(&mut self) -> &mut Duration {
|
||||
self.inner.deserialization_time_mut()
|
||||
}
|
||||
fn serializations_cnt_mut(&mut self) -> &mut usize {
|
||||
self.inner.serializations_cnt_mut()
|
||||
}
|
||||
fn should_serialize_cnt_mut(&mut self) -> &mut usize {
|
||||
self.inner.should_serialize_cnt_mut()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "adaptive_serialization"))]
|
||||
impl<EM, SP> EventStatsCollector for CentralizedEventManager<EM, SP>
|
||||
where
|
||||
EM: EventStatsCollector + UsesState,
|
||||
SP: ShMemProvider,
|
||||
{
|
||||
}
|
||||
|
||||
impl<EM, SP> EventFirer for CentralizedEventManager<EM, SP>
|
||||
where
|
||||
EM: EventFirer + HasEventManagerId,
|
||||
EM: EventStatsCollector + EventFirer + HasEventManagerId,
|
||||
SP: ShMemProvider,
|
||||
{
|
||||
fn fire(
|
||||
@ -63,9 +108,9 @@ where
|
||||
client_config: _,
|
||||
exit_kind: _,
|
||||
corpus_size: _,
|
||||
observers_buf: _,
|
||||
time: _,
|
||||
executions: _,
|
||||
observers_buf: _,
|
||||
forward_id,
|
||||
} => {
|
||||
*forward_id = Some(ClientId(self.inner.mgr_id().0 as u32));
|
||||
@ -91,13 +136,55 @@ where
|
||||
self.inner.log(state, severity_level, message)
|
||||
}
|
||||
|
||||
fn serialize_observers<OT>(&mut self, observers: &OT) -> Result<Vec<u8>, Error>
|
||||
#[cfg(not(feature = "adaptive_serialization"))]
|
||||
fn serialize_observers<OT>(&mut self, observers: &OT) -> Result<Option<Vec<u8>>, Error>
|
||||
where
|
||||
OT: ObserversTuple<Self::State> + Serialize,
|
||||
{
|
||||
self.inner.serialize_observers(observers)
|
||||
}
|
||||
|
||||
#[cfg(feature = "adaptive_serialization")]
|
||||
fn serialize_observers<OT>(&mut self, observers: &OT) -> Result<Option<Vec<u8>>, Error>
|
||||
where
|
||||
OT: ObserversTuple<Self::State> + Serialize,
|
||||
{
|
||||
const SERIALIZE_TIME_FACTOR: u32 = 4;
|
||||
const SERIALIZE_PERCENTAGE_TRESHOLD: usize = 80;
|
||||
|
||||
let exec_time = observers
|
||||
.match_name::<crate::observers::TimeObserver>("time")
|
||||
.map(|o| o.last_runtime().unwrap_or(Duration::ZERO))
|
||||
.unwrap();
|
||||
|
||||
let mut must_ser = (self.serialization_time() + self.deserialization_time())
|
||||
* SERIALIZE_TIME_FACTOR
|
||||
< exec_time;
|
||||
if must_ser {
|
||||
*self.should_serialize_cnt_mut() += 1;
|
||||
}
|
||||
|
||||
if self.serializations_cnt() > 32 {
|
||||
must_ser = (self.should_serialize_cnt() * 100 / self.serializations_cnt())
|
||||
> SERIALIZE_PERCENTAGE_TRESHOLD;
|
||||
}
|
||||
|
||||
if self.inner.serialization_time() == Duration::ZERO
|
||||
|| must_ser
|
||||
|| self.serializations_cnt().trailing_zeros() >= 8
|
||||
{
|
||||
let start = current_time();
|
||||
let ser = postcard::to_allocvec(observers)?;
|
||||
*self.inner.serialization_time_mut() = current_time() - start;
|
||||
|
||||
*self.serializations_cnt_mut() += 1;
|
||||
Ok(Some(ser))
|
||||
} else {
|
||||
*self.serializations_cnt_mut() += 1;
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
fn configuration(&self) -> EventConfig {
|
||||
self.inner.configuration()
|
||||
}
|
||||
@ -125,12 +212,13 @@ where
|
||||
|
||||
impl<E, EM, SP, Z> EventProcessor<E, Z> for CentralizedEventManager<EM, SP>
|
||||
where
|
||||
EM: EventProcessor<E, Z> + EventFirer + HasEventManagerId,
|
||||
EM: EventStatsCollector + EventProcessor<E, Z> + EventFirer + HasEventManagerId,
|
||||
SP: ShMemProvider,
|
||||
E: HasObservers<State = Self::State> + Executor<Self, Z>,
|
||||
for<'a> E::Observers: Deserialize<'a>,
|
||||
Z: EvaluatorObservers<E::Observers, State = Self::State>
|
||||
+ ExecutionProcessor<E::Observers, State = Self::State>,
|
||||
Self::State: HasExecutions + HasMetadata,
|
||||
{
|
||||
fn process(
|
||||
&mut self,
|
||||
@ -168,15 +256,36 @@ where
|
||||
"Received new Testcase to evaluate from secondary node {idx:?}"
|
||||
);
|
||||
|
||||
// TODO check the config and use the serialized observers
|
||||
let res = if client_config.match_with(&self.configuration())
|
||||
&& observers_buf.is_some()
|
||||
{
|
||||
#[cfg(feature = "adaptive_serialization")]
|
||||
let start = current_time();
|
||||
let observers: E::Observers =
|
||||
postcard::from_bytes(observers_buf.as_ref().unwrap())?;
|
||||
|
||||
let res = fuzzer.evaluate_input_with_observers::<E, Self>(
|
||||
#[cfg(feature = "adaptive_serialization")]
|
||||
{
|
||||
*self.inner.deserialization_time_mut() = current_time() - start;
|
||||
}
|
||||
|
||||
fuzzer.process_execution(
|
||||
state,
|
||||
self,
|
||||
input.clone(),
|
||||
&observers,
|
||||
&exit_kind,
|
||||
false,
|
||||
)?
|
||||
} else {
|
||||
fuzzer.evaluate_input_with_observers::<E, Self>(
|
||||
state,
|
||||
executor,
|
||||
self,
|
||||
input.clone(),
|
||||
false,
|
||||
)?;
|
||||
)?
|
||||
};
|
||||
if let Some(item) = res.1 {
|
||||
log::info!("Added received Testcase as item #{item}");
|
||||
|
||||
@ -214,7 +323,7 @@ where
|
||||
|
||||
impl<E, EM, SP, Z> EventManager<E, Z> for CentralizedEventManager<EM, SP>
|
||||
where
|
||||
EM: EventManager<E, Z>,
|
||||
EM: EventStatsCollector + EventManager<E, Z>,
|
||||
EM::State: HasClientPerfMonitor + HasExecutions + HasMetadata,
|
||||
SP: ShMemProvider,
|
||||
E: HasObservers<State = Self::State> + Executor<Self, Z>,
|
||||
@ -242,7 +351,7 @@ where
|
||||
|
||||
impl<EM, SP> ProgressReporter for CentralizedEventManager<EM, SP>
|
||||
where
|
||||
EM: ProgressReporter + HasEventManagerId,
|
||||
EM: EventStatsCollector + ProgressReporter + HasEventManagerId,
|
||||
EM::State: HasClientPerfMonitor + HasMetadata + HasExecutions,
|
||||
SP: ShMemProvider,
|
||||
{
|
||||
|
@ -11,15 +11,17 @@ use core::{marker::PhantomData, num::NonZeroUsize, time::Duration};
|
||||
#[cfg(feature = "std")]
|
||||
use std::net::{SocketAddr, ToSocketAddrs};
|
||||
|
||||
use serde::Deserialize;
|
||||
#[cfg(feature = "std")]
|
||||
use serde::{de::DeserializeOwned, Serialize};
|
||||
use serde::de::DeserializeOwned;
|
||||
use serde::{Deserialize, Serialize};
|
||||
#[cfg(feature = "std")]
|
||||
use typed_builder::TypedBuilder;
|
||||
|
||||
use super::{CustomBufEventResult, CustomBufHandlerFn};
|
||||
#[cfg(feature = "std")]
|
||||
use crate::bolts::core_affinity::CoreId;
|
||||
#[cfg(feature = "adaptive_serialization")]
|
||||
use crate::bolts::current_time;
|
||||
#[cfg(all(feature = "std", any(windows, not(feature = "fork"))))]
|
||||
use crate::bolts::os::startable_self;
|
||||
#[cfg(all(unix, feature = "std", not(miri)))]
|
||||
@ -49,6 +51,7 @@ use crate::{
|
||||
fuzzer::{EvaluatorObservers, ExecutionProcessor},
|
||||
inputs::{Input, InputConverter, UsesInput},
|
||||
monitors::Monitor,
|
||||
observers::ObserversTuple,
|
||||
state::{HasClientPerfMonitor, HasExecutions, HasMetadata, UsesState},
|
||||
Error,
|
||||
};
|
||||
@ -308,6 +311,32 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
/// Collected stats to decide if observers must be serialized or not
|
||||
#[cfg(feature = "adaptive_serialization")]
|
||||
pub trait EventStatsCollector {
|
||||
/// Expose the collected observers serialization time
|
||||
fn serialization_time(&self) -> Duration;
|
||||
/// Expose the collected observers deserialization time
|
||||
fn deserialization_time(&self) -> Duration;
|
||||
/// How many times observers were serialized
|
||||
fn serializations_cnt(&self) -> usize;
|
||||
/// How many times shoukd have been serialized an observer
|
||||
fn should_serialize_cnt(&self) -> usize;
|
||||
|
||||
/// Expose the collected observers serialization time (mut)
|
||||
fn serialization_time_mut(&mut self) -> &mut Duration;
|
||||
/// Expose the collected observers deserialization time (mut)
|
||||
fn deserialization_time_mut(&mut self) -> &mut Duration;
|
||||
/// How many times observers were serialized (mut)
|
||||
fn serializations_cnt_mut(&mut self) -> &mut usize;
|
||||
/// How many times shoukd have been serialized an observer (mut)
|
||||
fn should_serialize_cnt_mut(&mut self) -> &mut usize;
|
||||
}
|
||||
|
||||
/// Collected stats to decide if observers must be serialized or not
|
||||
#[cfg(not(feature = "adaptive_serialization"))]
|
||||
pub trait EventStatsCollector {}
|
||||
|
||||
/// An [`EventManager`] that forwards all events to other attached fuzzers on shared maps or via tcp,
|
||||
/// using low-level message passing, [`crate::bolts::llmp`].
|
||||
pub struct LlmpEventManager<S, SP>
|
||||
@ -325,9 +354,50 @@ where
|
||||
/// A node will not re-use the observer values sent over LLMP
|
||||
/// from nodes with other configurations.
|
||||
configuration: EventConfig,
|
||||
#[cfg(feature = "adaptive_serialization")]
|
||||
serialization_time: Duration,
|
||||
#[cfg(feature = "adaptive_serialization")]
|
||||
deserialization_time: Duration,
|
||||
#[cfg(feature = "adaptive_serialization")]
|
||||
serializations_cnt: usize,
|
||||
#[cfg(feature = "adaptive_serialization")]
|
||||
should_serialize_cnt: usize,
|
||||
phantom: PhantomData<S>,
|
||||
}
|
||||
|
||||
#[cfg(feature = "adaptive_serialization")]
|
||||
impl<S, SP> EventStatsCollector for LlmpEventManager<S, SP>
|
||||
where
|
||||
SP: ShMemProvider + 'static,
|
||||
S: UsesInput,
|
||||
{
|
||||
fn serialization_time(&self) -> Duration {
|
||||
self.serialization_time
|
||||
}
|
||||
fn deserialization_time(&self) -> Duration {
|
||||
self.deserialization_time
|
||||
}
|
||||
fn serializations_cnt(&self) -> usize {
|
||||
self.serializations_cnt
|
||||
}
|
||||
fn should_serialize_cnt(&self) -> usize {
|
||||
self.should_serialize_cnt
|
||||
}
|
||||
|
||||
fn serialization_time_mut(&mut self) -> &mut Duration {
|
||||
&mut self.serialization_time
|
||||
}
|
||||
fn deserialization_time_mut(&mut self) -> &mut Duration {
|
||||
&mut self.deserialization_time
|
||||
}
|
||||
fn serializations_cnt_mut(&mut self) -> &mut usize {
|
||||
&mut self.serializations_cnt
|
||||
}
|
||||
fn should_serialize_cnt_mut(&mut self) -> &mut usize {
|
||||
&mut self.should_serialize_cnt
|
||||
}
|
||||
}
|
||||
|
||||
impl<S, SP> core::fmt::Debug for LlmpEventManager<S, SP>
|
||||
where
|
||||
SP: ShMemProvider + 'static,
|
||||
@ -359,7 +429,7 @@ where
|
||||
|
||||
impl<S, SP> LlmpEventManager<S, SP>
|
||||
where
|
||||
S: UsesInput + HasExecutions + HasClientPerfMonitor,
|
||||
S: UsesInput,
|
||||
SP: ShMemProvider + 'static,
|
||||
{
|
||||
/// Create a manager from a raw LLMP client
|
||||
@ -369,6 +439,14 @@ where
|
||||
#[cfg(feature = "llmp_compression")]
|
||||
compressor: GzipCompressor::new(COMPRESS_THRESHOLD),
|
||||
configuration,
|
||||
#[cfg(feature = "adaptive_serialization")]
|
||||
serialization_time: Duration::ZERO,
|
||||
#[cfg(feature = "adaptive_serialization")]
|
||||
deserialization_time: Duration::ZERO,
|
||||
#[cfg(feature = "adaptive_serialization")]
|
||||
serializations_cnt: 0,
|
||||
#[cfg(feature = "adaptive_serialization")]
|
||||
should_serialize_cnt: 0,
|
||||
phantom: PhantomData,
|
||||
custom_buf_handlers: vec![],
|
||||
})
|
||||
@ -389,6 +467,14 @@ where
|
||||
#[cfg(feature = "llmp_compression")]
|
||||
compressor: GzipCompressor::new(COMPRESS_THRESHOLD),
|
||||
configuration,
|
||||
#[cfg(feature = "adaptive_serialization")]
|
||||
serialization_time: Duration::ZERO,
|
||||
#[cfg(feature = "adaptive_serialization")]
|
||||
deserialization_time: Duration::ZERO,
|
||||
#[cfg(feature = "adaptive_serialization")]
|
||||
serializations_cnt: 0,
|
||||
#[cfg(feature = "adaptive_serialization")]
|
||||
should_serialize_cnt: 0,
|
||||
phantom: PhantomData,
|
||||
custom_buf_handlers: vec![],
|
||||
})
|
||||
@ -407,6 +493,14 @@ where
|
||||
#[cfg(feature = "llmp_compression")]
|
||||
compressor: GzipCompressor::new(COMPRESS_THRESHOLD),
|
||||
configuration,
|
||||
#[cfg(feature = "adaptive_serialization")]
|
||||
serialization_time: Duration::ZERO,
|
||||
#[cfg(feature = "adaptive_serialization")]
|
||||
deserialization_time: Duration::ZERO,
|
||||
#[cfg(feature = "adaptive_serialization")]
|
||||
serializations_cnt: 0,
|
||||
#[cfg(feature = "adaptive_serialization")]
|
||||
should_serialize_cnt: 0,
|
||||
phantom: PhantomData,
|
||||
custom_buf_handlers: vec![],
|
||||
})
|
||||
@ -428,6 +522,14 @@ where
|
||||
#[cfg(feature = "llmp_compression")]
|
||||
compressor: GzipCompressor::new(COMPRESS_THRESHOLD),
|
||||
configuration,
|
||||
#[cfg(feature = "adaptive_serialization")]
|
||||
serialization_time: Duration::ZERO,
|
||||
#[cfg(feature = "adaptive_serialization")]
|
||||
deserialization_time: Duration::ZERO,
|
||||
#[cfg(feature = "adaptive_serialization")]
|
||||
serializations_cnt: 0,
|
||||
#[cfg(feature = "adaptive_serialization")]
|
||||
should_serialize_cnt: 0,
|
||||
phantom: PhantomData,
|
||||
custom_buf_handlers: vec![],
|
||||
})
|
||||
@ -439,7 +541,13 @@ where
|
||||
pub fn to_env(&self, env_name: &str) {
|
||||
self.llmp.to_env(env_name).unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
impl<S, SP> LlmpEventManager<S, SP>
|
||||
where
|
||||
S: UsesInput + HasExecutions + HasClientPerfMonitor + HasMetadata,
|
||||
SP: ShMemProvider + 'static,
|
||||
{
|
||||
// Handle arriving events in the client
|
||||
#[allow(clippy::unused_self)]
|
||||
fn handle_in_client<E, Z>(
|
||||
@ -468,18 +576,25 @@ where
|
||||
} => {
|
||||
log::info!("Received new Testcase from {client_id:?} ({client_config:?}, forward {forward_id:?})");
|
||||
|
||||
let _res = if client_config.match_with(&self.configuration)
|
||||
let res = if client_config.match_with(&self.configuration)
|
||||
&& observers_buf.is_some()
|
||||
{
|
||||
#[cfg(feature = "adaptive_serialization")]
|
||||
let start = current_time();
|
||||
let observers: E::Observers =
|
||||
postcard::from_bytes(observers_buf.as_ref().unwrap())?;
|
||||
#[cfg(feature = "adaptive_serialization")]
|
||||
{
|
||||
self.deserialization_time = current_time() - start;
|
||||
}
|
||||
|
||||
fuzzer.process_execution(state, self, input, &observers, &exit_kind, false)?
|
||||
} else {
|
||||
fuzzer.evaluate_input_with_observers::<E, Self>(
|
||||
state, executor, self, input, false,
|
||||
)?
|
||||
};
|
||||
if let Some(item) = _res.1 {
|
||||
if let Some(item) = res.1 {
|
||||
log::info!("Added received Testcase as item #{item}");
|
||||
}
|
||||
Ok(())
|
||||
@ -557,6 +672,55 @@ where
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "adaptive_serialization"))]
|
||||
fn serialize_observers<OT>(&mut self, observers: &OT) -> Result<Option<Vec<u8>>, Error>
|
||||
where
|
||||
OT: ObserversTuple<Self::State> + Serialize,
|
||||
{
|
||||
Ok(Some(postcard::to_allocvec(observers)?))
|
||||
}
|
||||
|
||||
#[cfg(feature = "adaptive_serialization")]
|
||||
fn serialize_observers<OT>(&mut self, observers: &OT) -> Result<Option<Vec<u8>>, Error>
|
||||
where
|
||||
OT: ObserversTuple<Self::State> + Serialize,
|
||||
{
|
||||
const SERIALIZE_TIME_FACTOR: u32 = 2;
|
||||
const SERIALIZE_PERCENTAGE_TRESHOLD: usize = 80;
|
||||
|
||||
let exec_time = observers
|
||||
.match_name::<crate::observers::TimeObserver>("time")
|
||||
.map(|o| o.last_runtime().unwrap_or(Duration::ZERO))
|
||||
.unwrap();
|
||||
|
||||
let mut must_ser = (self.serialization_time() + self.deserialization_time())
|
||||
* SERIALIZE_TIME_FACTOR
|
||||
< exec_time;
|
||||
if must_ser {
|
||||
*self.should_serialize_cnt_mut() += 1;
|
||||
}
|
||||
|
||||
if self.serializations_cnt() > 32 {
|
||||
must_ser = (self.should_serialize_cnt() * 100 / self.serializations_cnt())
|
||||
> SERIALIZE_PERCENTAGE_TRESHOLD;
|
||||
}
|
||||
|
||||
if self.serialization_time() == Duration::ZERO
|
||||
|| must_ser
|
||||
|| self.serializations_cnt().trailing_zeros() >= 8
|
||||
{
|
||||
let start = current_time();
|
||||
let ser = postcard::to_allocvec(observers)?;
|
||||
*self.serialization_time_mut() = current_time() - start;
|
||||
|
||||
*self.serializations_cnt_mut() += 1;
|
||||
Ok(Some(ser))
|
||||
} else {
|
||||
*self.serializations_cnt_mut() += 1;
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
fn configuration(&self) -> EventConfig {
|
||||
self.configuration
|
||||
}
|
||||
@ -577,7 +741,7 @@ where
|
||||
|
||||
impl<E, S, SP, Z> EventProcessor<E, Z> for LlmpEventManager<S, SP>
|
||||
where
|
||||
S: UsesInput + HasClientPerfMonitor + HasExecutions,
|
||||
S: UsesInput + HasClientPerfMonitor + HasExecutions + HasMetadata,
|
||||
SP: ShMemProvider,
|
||||
E: HasObservers<State = S> + Executor<Self, Z>,
|
||||
for<'a> E::Observers: Deserialize<'a>,
|
||||
@ -678,6 +842,47 @@ where
|
||||
save_state: bool,
|
||||
}
|
||||
|
||||
#[cfg(all(feature = "std", feature = "adaptive_serialization"))]
|
||||
impl<S, SP> EventStatsCollector for LlmpRestartingEventManager<S, SP>
|
||||
where
|
||||
SP: ShMemProvider + 'static,
|
||||
S: UsesInput,
|
||||
{
|
||||
fn serialization_time(&self) -> Duration {
|
||||
self.llmp_mgr.serialization_time()
|
||||
}
|
||||
fn deserialization_time(&self) -> Duration {
|
||||
self.llmp_mgr.deserialization_time()
|
||||
}
|
||||
fn serializations_cnt(&self) -> usize {
|
||||
self.llmp_mgr.serializations_cnt()
|
||||
}
|
||||
fn should_serialize_cnt(&self) -> usize {
|
||||
self.llmp_mgr.should_serialize_cnt()
|
||||
}
|
||||
|
||||
fn serialization_time_mut(&mut self) -> &mut Duration {
|
||||
self.llmp_mgr.serialization_time_mut()
|
||||
}
|
||||
fn deserialization_time_mut(&mut self) -> &mut Duration {
|
||||
self.llmp_mgr.deserialization_time_mut()
|
||||
}
|
||||
fn serializations_cnt_mut(&mut self) -> &mut usize {
|
||||
self.llmp_mgr.serializations_cnt_mut()
|
||||
}
|
||||
fn should_serialize_cnt_mut(&mut self) -> &mut usize {
|
||||
self.llmp_mgr.should_serialize_cnt_mut()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(all(feature = "std", not(feature = "adaptive_serialization")))]
|
||||
impl<S, SP> EventStatsCollector for LlmpRestartingEventManager<S, SP>
|
||||
where
|
||||
SP: ShMemProvider + 'static,
|
||||
S: UsesInput,
|
||||
{
|
||||
}
|
||||
|
||||
#[cfg(feature = "std")]
|
||||
impl<S, SP> UsesState for LlmpRestartingEventManager<S, SP>
|
||||
where
|
||||
@ -711,6 +916,13 @@ where
|
||||
self.llmp_mgr.fire(state, event)
|
||||
}
|
||||
|
||||
fn serialize_observers<OT>(&mut self, observers: &OT) -> Result<Option<Vec<u8>>, Error>
|
||||
where
|
||||
OT: ObserversTuple<Self::State> + Serialize,
|
||||
{
|
||||
self.llmp_mgr.serialize_observers(observers)
|
||||
}
|
||||
|
||||
fn configuration(&self) -> EventConfig {
|
||||
self.llmp_mgr.configuration()
|
||||
}
|
||||
@ -753,7 +965,7 @@ impl<E, S, SP, Z> EventProcessor<E, Z> for LlmpRestartingEventManager<S, SP>
|
||||
where
|
||||
E: HasObservers<State = S> + Executor<LlmpEventManager<S, SP>, Z>,
|
||||
for<'a> E::Observers: Deserialize<'a>,
|
||||
S: UsesInput + HasExecutions + HasClientPerfMonitor,
|
||||
S: UsesInput + HasExecutions + HasClientPerfMonitor + HasMetadata,
|
||||
SP: ShMemProvider + 'static,
|
||||
Z: EvaluatorObservers<E::Observers, State = S> + ExecutionProcessor<E::Observers>, //CE: CustomEvent<I>,
|
||||
{
|
||||
|
@ -435,11 +435,11 @@ pub trait EventFirer: UsesState {
|
||||
}
|
||||
|
||||
/// Serialize all observers for this type and manager
|
||||
fn serialize_observers<OT>(&mut self, observers: &OT) -> Result<Vec<u8>, Error>
|
||||
fn serialize_observers<OT>(&mut self, observers: &OT) -> Result<Option<Vec<u8>>, Error>
|
||||
where
|
||||
OT: ObserversTuple<Self::State> + Serialize,
|
||||
{
|
||||
Ok(postcard::to_allocvec(observers)?)
|
||||
Ok(Some(postcard::to_allocvec(observers)?))
|
||||
}
|
||||
|
||||
/// Get the configuration
|
||||
@ -462,10 +462,23 @@ where
|
||||
last_report_time: Duration,
|
||||
monitor_timeout: Duration,
|
||||
) -> Result<Duration, Error> {
|
||||
let executions = *state.executions();
|
||||
let cur = current_time();
|
||||
// default to 0 here to avoid crashes on clock skew
|
||||
if cur.checked_sub(last_report_time).unwrap_or_default() > monitor_timeout {
|
||||
self.report_progress(state)?;
|
||||
|
||||
Ok(cur)
|
||||
} else {
|
||||
Ok(last_report_time)
|
||||
}
|
||||
}
|
||||
|
||||
/// Send off an info/monitor/heartbeat message to the broker.
|
||||
/// Will return an [`crate::Error`], if the stats could not be sent.
|
||||
fn report_progress(&mut self, state: &mut Self::State) -> Result<(), Error> {
|
||||
let executions = *state.executions();
|
||||
let cur = current_time();
|
||||
|
||||
// Default no introspection implmentation
|
||||
#[cfg(not(feature = "introspection"))]
|
||||
self.fire(
|
||||
@ -497,10 +510,7 @@ where
|
||||
)?;
|
||||
}
|
||||
|
||||
Ok(cur)
|
||||
} else {
|
||||
Ok(last_report_time)
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -439,6 +439,7 @@ pub(crate) struct InProcessExecutorHandlerData {
|
||||
fuzzer_ptr: *mut c_void,
|
||||
executor_ptr: *const c_void,
|
||||
pub current_input_ptr: *const c_void,
|
||||
pub in_handler: bool,
|
||||
|
||||
/// The timeout handler
|
||||
#[cfg(any(unix, feature = "std"))]
|
||||
@ -504,6 +505,13 @@ impl InProcessExecutorHandlerData {
|
||||
.unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(any(unix, feature = "std"))]
|
||||
fn set_in_handler(&mut self, v: bool) -> bool {
|
||||
let old = self.in_handler;
|
||||
self.in_handler = v;
|
||||
old
|
||||
}
|
||||
}
|
||||
|
||||
/// Exception handling needs some nasty unsafe.
|
||||
@ -519,6 +527,8 @@ pub(crate) static mut GLOBAL_STATE: InProcessExecutorHandlerData = InProcessExec
|
||||
/// The current input for signal handling
|
||||
current_input_ptr: ptr::null(),
|
||||
|
||||
in_handler: false,
|
||||
|
||||
/// The crash handler fn
|
||||
#[cfg(any(unix, feature = "std"))]
|
||||
crash_handler: ptr::null(),
|
||||
@ -568,6 +578,12 @@ pub fn inprocess_get_input<'a, I>() -> Option<&'a I> {
|
||||
unsafe { (GLOBAL_STATE.current_input_ptr as *const I).as_ref() }
|
||||
}
|
||||
|
||||
/// Know if we ar eexecuting in a crash/timeout handler
|
||||
#[must_use]
|
||||
pub fn inprocess_in_handler() -> bool {
|
||||
unsafe { GLOBAL_STATE.in_handler }
|
||||
}
|
||||
|
||||
use crate::{
|
||||
corpus::{Corpus, Testcase},
|
||||
events::Event,
|
||||
@ -716,6 +732,7 @@ mod unix_signal_handler {
|
||||
fn handle(&mut self, signal: Signal, info: siginfo_t, context: &mut ucontext_t) {
|
||||
unsafe {
|
||||
let data = &mut GLOBAL_STATE;
|
||||
let in_handler = data.set_in_handler(true);
|
||||
match signal {
|
||||
Signal::SigUser2 | Signal::SigAlarm => {
|
||||
if !data.timeout_handler.is_null() {
|
||||
@ -730,6 +747,7 @@ mod unix_signal_handler {
|
||||
}
|
||||
}
|
||||
}
|
||||
data.set_in_handler(in_handler);
|
||||
}
|
||||
}
|
||||
|
||||
@ -765,6 +783,7 @@ mod unix_signal_handler {
|
||||
panic::set_hook(Box::new(move |panic_info| {
|
||||
old_hook(panic_info);
|
||||
let data = unsafe { &mut GLOBAL_STATE };
|
||||
let in_handler = data.set_in_handler(true);
|
||||
if data.is_valid() {
|
||||
// We are fuzzing!
|
||||
let executor = data.executor_mut::<E>();
|
||||
@ -786,6 +805,7 @@ mod unix_signal_handler {
|
||||
libc::_exit(128 + 6);
|
||||
} // SIGABRT exit code
|
||||
}
|
||||
data.set_in_handler(in_handler);
|
||||
}));
|
||||
}
|
||||
|
||||
@ -979,6 +999,7 @@ pub mod windows_asan_handler {
|
||||
+ HasScheduler,
|
||||
{
|
||||
let data = &mut GLOBAL_STATE;
|
||||
data.set_in_handler(true);
|
||||
// Have we set a timer_before?
|
||||
if !(data.tp_timer as *mut windows::Win32::System::Threading::TP_TIMER).is_null() {
|
||||
/*
|
||||
@ -1095,10 +1116,12 @@ mod windows_exception_handler {
|
||||
fn handle(&mut self, _code: ExceptionCode, exception_pointers: *mut EXCEPTION_POINTERS) {
|
||||
unsafe {
|
||||
let data = &mut GLOBAL_STATE;
|
||||
let in_handler = data.set_in_handler(true);
|
||||
if !data.crash_handler.is_null() {
|
||||
let func: HandlerFuncPtr = transmute(data.crash_handler);
|
||||
(func)(exception_pointers, data);
|
||||
}
|
||||
data.set_in_handler(in_handler);
|
||||
}
|
||||
}
|
||||
|
||||
@ -1125,6 +1148,7 @@ mod windows_exception_handler {
|
||||
let old_hook = panic::take_hook();
|
||||
panic::set_hook(Box::new(move |panic_info| {
|
||||
let data = unsafe { &mut GLOBAL_STATE };
|
||||
let in_handler = data.set_in_handler(true);
|
||||
// Have we set a timer_before?
|
||||
unsafe {
|
||||
if !(data.tp_timer as *mut windows::Win32::System::Threading::TP_TIMER).is_null() {
|
||||
@ -1166,6 +1190,7 @@ mod windows_exception_handler {
|
||||
}
|
||||
}
|
||||
old_hook(panic_info);
|
||||
data.set_in_handler(in_handler);
|
||||
}));
|
||||
}
|
||||
|
||||
|
@ -490,7 +490,7 @@ where
|
||||
if self.batch_mode {
|
||||
unsafe {
|
||||
let elapsed = current_time() - self.tmout_start_time;
|
||||
// elapsed may be > than tmout in case of reveived but ingored signal
|
||||
// elapsed may be > than tmout in case of received but ingored signal
|
||||
if elapsed > self.exec_tmout
|
||||
|| self.exec_tmout - elapsed < self.avg_exec_time * self.avg_mul_k
|
||||
{
|
||||
|
@ -225,6 +225,8 @@ where
|
||||
last = manager.maybe_report_progress(state, last, monitor_timeout)?;
|
||||
}
|
||||
|
||||
manager.report_progress(state)?;
|
||||
|
||||
// If we would assume the fuzzer loop will always exit after this, we could do this here:
|
||||
// manager.on_restart(state)?;
|
||||
// But as the state may grow to a few megabytes,
|
||||
@ -397,7 +399,7 @@ where
|
||||
let observers_buf = if manager.configuration() == EventConfig::AlwaysUnique {
|
||||
None
|
||||
} else {
|
||||
Some(manager.serialize_observers::<OT>(observers)?)
|
||||
manager.serialize_observers::<OT>(observers)?
|
||||
};
|
||||
manager.fire(
|
||||
state,
|
||||
@ -547,7 +549,7 @@ where
|
||||
let observers_buf = if manager.configuration() == EventConfig::AlwaysUnique {
|
||||
None
|
||||
} else {
|
||||
Some(manager.serialize_observers::<OT>(observers)?)
|
||||
manager.serialize_observers::<OT>(observers)?
|
||||
};
|
||||
manager.fire(
|
||||
state,
|
||||
|
@ -151,7 +151,7 @@ pub enum Error {
|
||||
/// Serialization error
|
||||
Serialize(String, ErrorBacktrace),
|
||||
/// Compression error
|
||||
#[cfg(feature = "llmp_compression")]
|
||||
#[cfg(feature = "gzip")]
|
||||
Compression(ErrorBacktrace),
|
||||
/// File related error
|
||||
#[cfg(feature = "std")]
|
||||
@ -187,7 +187,7 @@ impl Error {
|
||||
{
|
||||
Error::Serialize(arg.into(), ErrorBacktrace::new())
|
||||
}
|
||||
#[cfg(feature = "llmp_compression")]
|
||||
#[cfg(feature = "gzip")]
|
||||
/// Compression error
|
||||
#[must_use]
|
||||
pub fn compression() -> Self {
|
||||
@ -285,7 +285,7 @@ impl fmt::Display for Error {
|
||||
write!(f, "Error in Serialization: `{0}`", &s)?;
|
||||
display_error_backtrace(f, b)
|
||||
}
|
||||
#[cfg(feature = "llmp_compression")]
|
||||
#[cfg(feature = "gzip")]
|
||||
Self::Compression(b) => {
|
||||
write!(f, "Error in decompression")?;
|
||||
display_error_backtrace(f, b)
|
||||
|
Loading…
x
Reference in New Issue
Block a user