llmp event mgr

This commit is contained in:
Dominik Maier 2020-12-11 18:06:54 +01:00
parent 299634ccce
commit b2e1fa0de4
5 changed files with 216 additions and 255 deletions

View File

@ -113,6 +113,21 @@ where
// TODO move some of these, like evaluate_input, to Engine
/// Adds a feedback
#[inline]
pub fn add_feedback(&mut self, feedback: Box<dyn Feedback<I>>) {
self.feedbacks_mut().push(feedback);
}
// TODO move some of these, like evaluate_input, to FuzzingEngine
pub fn is_interesting(&mut self, input: &I, observers: &crate::observers::observer_serde::NamedSerdeAnyMap) -> Result<u32, AflError> {
let mut fitness;
for feedback in self.feedbacks_mut() {
fitness += feedback.is_interesting(&input, observers)?;
}
Ok(fitness)
}
/// Runs the input and triggers observers and feedback
pub fn evaluate_input<E, OT>(&mut self, input: &I, executor: &mut E) -> Result<u32, AflError>
where

View File

@ -234,6 +234,39 @@ impl LlmpConnection {
}
}
}
/// Sends the given buffer over this connection, no matter if client or broker.
pub fn send_buf(&mut self, tag: Tag, buf: &[u8]) -> Result<(), AflError> {
match self {
LlmpConnection::IsBroker {broker, listener_thread: _} => {
broker.send_buf(tag, buf)
},
LlmpConnection::IsClient {client} => {
client.send_buf(tag, buf)
}
}
}
/// Returns the next message, tag, buf, if avaliable, else None
#[inline]
pub fn recv_buf(&mut self) -> Result<Option<(u32, &[u8])>, AflError> {
match self {
LlmpConnection::IsBroker {broker, listener_thread: _} => {
broker.recv_buf()
},
LlmpConnection::IsClient {client} => {
client.recv_buf()
}
}
}
/// A client blocks/spins until the next message gets posted to the page,
/// then returns that message.
#[inline]
pub unsafe fn recv_blocking(&mut self) -> Result<*mut LlmpMsg, AflError> {
self.llmp_in.recv_blocking()
}
}
/// Contents of the share mem pages, used by llmp internally

View File

@ -4,7 +4,7 @@ pub mod llmp;
pub mod shmem_translated;
use alloc::string::String;
use core::marker::PhantomData;
use core::{marker::PhantomData, time};
use serde::{Deserialize, Serialize};
@ -12,10 +12,10 @@ use serde::{Deserialize, Serialize};
//pub mod shmem_translated;
#[cfg(feature = "std")]
use std::io::Write;
use std::{io::Write, time::{SystemTime, UNIX_EPOCH}};
use crate::corpus::Corpus;
use crate::engines::State;
use crate::{engines::State, utils};
use crate::executors::Executor;
use crate::feedbacks::FeedbacksTuple;
use crate::inputs::Input;
@ -32,7 +32,13 @@ pub enum BrokerEventResult {
Forward,
}
pub trait ShowStats {}
pub struct ClientStats {
// stats (maybe we need a separated struct?)
id: usize,
executions: u64,
execs_over_sec: u64,
corpus_size: usize,
}
/// A custom event, for own messages, with own handler.
pub trait CustomEvent<I, OT>: SerdeAny + Serialize
@ -178,86 +184,121 @@ where
/// Return the number of processes events or an error
fn process(&mut self, state: &mut State<I, R, FT>, corpus: &mut C) -> Result<usize, AflError>;
/// the client stat, mutable
fn client_stats_mut(&mut self) -> &mut Vec<ClientStats>;
/// the client stat
fn client_stats(&self) -> &[ClientStats];
/// Amount of elements in the corpus (combined for all children)
fn corpus_size(&self) -> usize;
/// Incremt the cropus size
fn corpus_size_inc(&mut self) -> usize;
/// Time this fuzzing run stated
fn start_time(&mut self) -> time::Duration;
/// Total executions
#[inline]
fn on_recv(&self, _state: &mut State<I, R, FT>, _corpus: &mut C) -> Result<(), AflError> {
// TODO: Better way to move out of testcase, or get ref
//Ok(corpus.add(self.testcase.take().unwrap()))
Ok(())
fn total_execs(&mut self) -> u64 {
self.client_stats().iter().fold(0u64, |acc, x| acc + x.executions)
}
// TODO the broker has a state? do we need to pass state and corpus?
fn handle_in_broker(&mut self, event: &Event<I, OT>) -> Result<BrokerEventResult, AflError> {
/// Executions per second
#[inline]
fn execs_per_sec(&mut self) -> u64 {
let time_since_start = (utils::current_time() - self.start_time()).as_secs();
if time_since_start == 0 { 0 } else {
self.total_execs() / time_since_start
}
}
/// Broker fun
fn handle_in_broker(&mut self, event: &Event<I>) -> Result<BrokerEventResult, AflError> {
match event {
Event::LoadInitial {
sender_id: _,
phantom: _,
} => Ok(BrokerEventResult::Handled),
Event::NewTestcase {
sender_id: _,
input: _,
observers: _,
corpus_count: _,
} => Ok(BrokerEventResult::Forward),
corpus_count,
} => {
self.corpus_size_inc();
println!(
"[NEW] corpus: {} execs: {} execs/s: {}",
self.corpus_size(), self.total_execs(), self.execs_per_sec()
);
Ok(BrokerEventResult::Forward)
}
Event::UpdateStats {
sender_id: _,
executions: _,
execs_over_sec: _,
sender_id,
executions,
execs_over_sec,
phantom: _,
} => {
// TODO
// TODO: The stats buffer should be added on client add.
let client_stat_count = self.client_stats().len();
for i in client_stat_count..*sender_id as usize {
self.client_stats_mut().push(ClientStats {
id: client_stat_count + i,
corpus_size: 0,
execs_over_sec: 0,
executions: 0
})
}
let mut stat = &mut self.client_stats_mut()[*sender_id as usize];
println!(
"[UPDATE] corpus: {} execs: {} execs/s: {}",
self.corpus_size(), self.total_execs(), self.execs_per_sec()
);
Ok(BrokerEventResult::Handled)
}
Event::Crash {
sender_id: _,
input: _,
phantom: _,
} => Ok(BrokerEventResult::Handled),
} => {
panic!("LoggerEventManager cannot handle Event::Crash");
}
Event::Timeout {
sender_id: _,
input: _,
phantom: _,
} => {
// TODO
Ok(BrokerEventResult::Handled)
panic!("LoggerEventManager cannot handle Event::Timeout");
}
Event::Log {
sender_id,
sender_id: _,
severity_level,
message,
phantom: _,
} => {
//TODO: broker.log()
#[cfg(feature = "std")]
println!("{}[{}]: {}", sender_id, severity_level, message);
// Silence warnings for no_std
let (_, _, _) = (sender_id, severity_level, message);
println!("[LOG {}]: {}", severity_level, message);
Ok(BrokerEventResult::Handled)
}
Event::None { phantom: _ } => Ok(BrokerEventResult::Handled),
Event::Custom {
sender_id: _, /*custom_event} => custom_event.handle_in_broker(state, corpus)*/
} => Ok(BrokerEventResult::Forward),
//_ => Ok(BrokerEventResult::Forward),
_ => Ok(BrokerEventResult::Forward),
}
}
/// Client fun
fn handle_in_client(
&mut self,
event: Event<I, OT>,
_state: &mut State<I, R, FT>,
_corpus: &mut C,
event: Event<I>,
state: &mut State<I, R>,
corpus: &mut C,
) -> Result<(), AflError> {
match event {
Event::NewTestcase {
sender_id: _,
input: _,
observers: _,
input,
observers,
corpus_count: _,
} => {
// here u should match sender_id, if equal to the current one do not re-execute
// we need to pass engine to process() too, TODO
#[cfg(feature = "std")]
println!("PLACEHOLDER: received NewTestcase");
println!("Received new Testcase");
let interestingness = state.is_interesting(input, observers)?;
state.add_if_interesting(corpus, input, interestingness);
Ok(())
}
_ => Err(AflError::Unknown(
@ -267,18 +308,6 @@ where
}
}
/*TODO
fn on_recv(&self, state: &mut State<I, R, FT>, _corpus: &mut C) -> Result<(), AflError> {
println!(
"#{}\t exec/s: {}",
state.executions(),
//TODO: Count corpus.entries().len(),
state.executions_over_seconds()
);
Ok(())
}
*/
#[cfg(feature = "std")]
pub struct LoggerEventManager<C, E, OT, FT, I, R, W>
where
@ -297,9 +326,10 @@ where
// stats (maybe we need a separated struct?)
executions: usize,
execs_over_sec: u64,
corpus_count: usize,
phantom: PhantomData<(C, E, OT, FT, I, R)>,
corpus_size: usize,
start_time: time::Duration,
client_stats: Vec<ClientStats>,
phantom: PhantomData<(C, E, I, R)>,
}
#[cfg(feature = "std")]
@ -335,63 +365,27 @@ where
Ok(c)
}
fn handle_in_broker(&mut self, event: &Event<I, OT>) -> Result<BrokerEventResult, AflError> {
match event {
Event::NewTestcase {
sender_id: _,
input: _,
observers: _,
corpus_count,
} => {
self.corpus_count = *corpus_count;
writeln!(
self.writer,
"[NEW] corpus: {} execs: {} execs/s: {}",
self.corpus_count, self.executions, self.execs_over_sec
)?;
Ok(BrokerEventResult::Handled)
}
Event::UpdateStats {
sender_id: _,
executions,
execs_over_sec,
phantom: _,
} => {
self.executions = *executions;
self.execs_over_sec = *execs_over_sec;
writeln!(
self.writer,
"[UPDATE] corpus: {} execs: {} execs/s: {}",
self.corpus_count, self.executions, self.execs_over_sec
)?;
Ok(BrokerEventResult::Handled)
}
Event::Crash {
sender_id: _,
input: _,
phantom: _,
} => {
panic!("LoggerEventManager cannot handle Event::Crash");
}
Event::Timeout {
sender_id: _,
input: _,
phantom: _,
} => {
panic!("LoggerEventManager cannot handle Event::Timeout");
}
Event::Log {
sender_id: _,
severity_level,
message,
phantom: _,
} => {
writeln!(self.writer, "[LOG {}]: {}", severity_level, message)?;
Ok(BrokerEventResult::Handled)
}
_ => Ok(BrokerEventResult::Handled),
}
fn client_stats_mut(&mut self) -> &mut Vec<ClientStats> {
&mut self.client_stats
}
fn client_stats(&self) -> &[ClientStats] {
&self.client_stats
}
fn corpus_size(&self) -> usize {
self.corpus_size
}
fn corpus_size_inc(&mut self) -> usize {
self.corpus_size += 1;
self.corpus_size
}
fn start_time(&mut self) -> time::Duration {
self.start_time
}
}
#[cfg(feature = "std")]
@ -408,31 +402,18 @@ where
{
pub fn new(writer: W) -> Self {
Self {
start_time: utils::current_time(),
client_stats: vec![],
writer: writer,
count: 0,
executions: 0,
execs_over_sec: 0,
corpus_count: 0,
corpus_size: 0,
phantom: PhantomData,
}
};
}
}
/// Eventmanager for multi-processed application
#[cfg(feature = "std")]
pub struct LlmpBrokerEventManager<C, E, OT, FT, I, R>
where
C: Corpus<I, R>,
I: Input,
E: Executor<I>,
OT: ObserversTuple,
FT: FeedbacksTuple<I>,
R: Rand,
//CE: CustomEvent<I, OT>,
{
llmp_broker: llmp::LlmpBroker,
phantom: PhantomData<(C, E, OT, FT, I, R)>,
}
#[cfg(feature = "std")]
/// Forward this to the client
@ -444,25 +425,29 @@ const _LLMP_TAG_EVENT_TO_BROKER: llmp::Tag = 0x2B80438;
/// Handle in both
const _LLMP_TAG_EVENT_TO_BOTH: llmp::Tag = 0x2B0741;
/// Eventmanager for multi-processed application
#[cfg(feature = "std")]
pub struct LlmpClientEventManager<C, E, OT, FT, I, R>
pub struct LlmpEventManager<C, E, I, R, W>
where
C: Corpus<I, R>,
I: Input,
E: Executor<I>,
OT: ObserversTuple,
FT: FeedbacksTuple<I>,
R: Rand,
//CE: CustomEvent<I, OT>,
W: Write,
//CE: CustomEvent<I>,
{
_llmp_client: llmp::LlmpClient,
phantom: PhantomData<(C, E, OT, FT, I, R)>,
writer: W,
count: usize,
// stats (maybe we need a separated struct?)
executions: usize,
execs_over_sec: u64,
corpus_size: usize,
start_time: time::Duration,
client_stats: Vec<ClientStats>,
llmp: llmp::LlmpConnection,
phantom: PhantomData<(C, E, I, R)>,
}
#[cfg(feature = "std")]
impl<C, E, OT, FT, I, R> EventManager<C, E, OT, FT, I, R>
for LlmpBrokerEventManager<C, E, OT, FT, I, R>
impl<C, E, I, R, W> EventManager<C, E, I, R> for LlmpEventManager<C, E, I, R, W>
where
C: Corpus<I, R>,
E: Executor<I>,
@ -470,131 +455,44 @@ where
FT: FeedbacksTuple<I>,
I: Input,
R: Rand,
W: Write,
//CE: CustomEvent<I>,
{
/// Fire an Event
fn fire<'a>(&mut self, event: Event<'a, I, OT>) -> Result<(), AflError> {
#[inline]
fn fire<'a>(&mut self, event: Event<'a, I>) -> Result<(), AflError> {
let serialized = postcard::to_allocvec(&event)?;
self.llmp_broker
self
.send_buf(LLMP_TAG_EVENT_TO_CLIENT, &serialized)?;
Ok(())
}
fn process(
&mut self,
_state: &mut State<I, R, FT>,
_corpus: &mut C,
) -> Result<usize, AflError> {
// TODO: iterators
/*
let mut handled = vec![];
for x in self.events.iter() {
handled.push(x.handle_in_broker(state, corpus)?);
}
handled
.iter()
.zip(self.events.iter())
.map(|(x, event)| match x {
BrokerEventResult::Forward => event.handle_in_client(state, corpus),
// Ignore broker-only events
BrokerEventResult::Handled => Ok(()),
})
.for_each(drop);
let count = self.events.len();
dbg!("Handled {} events", count);
self.events.clear();
let num = self.events.len();
for event in &self.events {}
self.events.clear();
*/
Ok(0)
fn process(&mut self, _state: &mut State<I, R>, _corpus: &mut C) -> Result<usize, AflError> {
let c = self.count;
self.count = 0;
Ok(c)
}
fn on_recv(&self, _state: &mut State<I, R, FT>, _corpus: &mut C) -> Result<(), AflError> {
// TODO: Better way to move out of testcase, or get ref
//Ok(corpus.add(self.testcase.take().unwrap()))
Ok(())
fn client_stats_mut(&mut self) -> &mut Vec<ClientStats> {
&mut self.client_stats
}
fn handle_in_broker(&mut self, event: &Event<I, OT>) -> Result<BrokerEventResult, AflError> {
match event {
Event::LoadInitial {
sender_id: _,
phantom: _,
} => Ok(BrokerEventResult::Handled),
Event::NewTestcase {
sender_id: _,
input: _,
observers: _,
corpus_count: _,
} => Ok(BrokerEventResult::Forward),
Event::UpdateStats {
sender_id: _,
executions: _,
execs_over_sec: _,
phantom: _,
} => {
// TODO
Ok(BrokerEventResult::Handled)
}
Event::Crash {
sender_id: _,
input: _,
phantom: _,
} => Ok(BrokerEventResult::Handled),
Event::Timeout {
sender_id: _,
input: _,
phantom: _,
} => {
// TODO
Ok(BrokerEventResult::Handled)
}
Event::Log {
sender_id,
severity_level,
message,
phantom: _,
} => {
//TODO: broker.log()
#[cfg(feature = "std")]
println!("{}[{}]: {}", sender_id, severity_level, message);
Ok(BrokerEventResult::Handled)
}
Event::None { phantom: _ } => Ok(BrokerEventResult::Handled),
Event::Custom {
sender_id: _, /*custom_event} => custom_event.handle_in_broker(state, corpus)*/
} => Ok(BrokerEventResult::Forward),
//_ => Ok(BrokerEventResult::Forward),
}
fn client_stats(&self) -> &[ClientStats] {
&self.client_stats
}
fn handle_in_client(
&mut self,
event: Event<I, OT>,
_state: &mut State<I, R, FT>,
_corpus: &mut C,
) -> Result<(), AflError> {
match event {
Event::NewTestcase {
sender_id: _,
input: _,
observers: _,
corpus_count: _,
} => {
// here u should match sender_id, if equal to the current one do not re-execute
// we need to pass engine to process() too, TODO
#[cfg(feature = "std")]
println!("PLACEHOLDER: received NewTestcase");
Ok(())
}
_ => Err(AflError::Unknown(
"Received illegal message that message should not have arrived.".into(),
)),
}
fn corpus_size(&self) -> usize {
self.corpus_size
}
fn corpus_size_inc(&mut self) -> usize {
self.corpus_size += 1;
self.corpus_size
}
fn start_time(&mut self) -> time::Duration {
self.start_time
}
}
#[cfg(feature = "std")]

View File

@ -80,7 +80,7 @@ where
observers: PtrMut::Ref(engine.executor_mut().observers_mut()),
corpus_count: corpus.count() + 1,
})?;
let _ = corpus.add(testcase);
// let _ = corpus.add(testcase);
}
}
Ok(())

View File

@ -1,7 +1,7 @@
//! Utility functions for AFL
use alloc::rc::Rc;
use core::cell::RefCell;
use core::{cell::RefCell, time};
use core::debug_assert;
use core::fmt::Debug;
use xxhash_rust::xxh3::xxh3_64_with_seed;
@ -75,7 +75,21 @@ where
const HASH_CONST: u64 = 0xa5b35705;
/// Current time
#[cfg(feature = "std")]
#[inline]
pub fn current_time() -> time::Duration {
SystemTime::now().duration_since(UNIX_EPOCH).unwrap()
}
/// Current time (fixed fallback for no_std)
#[cfg(not(feature = "std"))]
#[inline]
fn current_time() -> time::Duration {
self.start_time()
}
#[cfg(feature = "std")]
#[inline]
/// Gets current nanoseconds since UNIX_EPOCH
pub fn current_nanos() -> u64 {
SystemTime::now()
@ -361,6 +375,7 @@ impl XKCDRand {
}
/// Get the next higher power of two
#[inline]
pub const fn next_pow2(val: u64) -> u64 {
let mut out = val.wrapping_sub(1);
out |= out >> 1;