From b2e1fa0de401df7486d0869b6c311a37e7f91edf Mon Sep 17 00:00:00 2001 From: Dominik Maier Date: Fri, 11 Dec 2020 18:06:54 +0100 Subject: [PATCH] llmp event mgr --- afl/src/engines/mod.rs | 15 ++ afl/src/events/llmp.rs | 33 +++ afl/src/events/mod.rs | 404 +++++++++++++---------------------- afl/src/stages/mutational.rs | 2 +- afl/src/utils.rs | 17 +- 5 files changed, 216 insertions(+), 255 deletions(-) diff --git a/afl/src/engines/mod.rs b/afl/src/engines/mod.rs index 26c8874345..b02cf298ac 100644 --- a/afl/src/engines/mod.rs +++ b/afl/src/engines/mod.rs @@ -113,6 +113,21 @@ where // TODO move some of these, like evaluate_input, to Engine + /// Adds a feedback + #[inline] + pub fn add_feedback(&mut self, feedback: Box>) { + self.feedbacks_mut().push(feedback); + } + + // TODO move some of these, like evaluate_input, to FuzzingEngine + pub fn is_interesting(&mut self, input: &I, observers: &crate::observers::observer_serde::NamedSerdeAnyMap) -> Result { + let mut fitness; + for feedback in self.feedbacks_mut() { + fitness += feedback.is_interesting(&input, observers)?; + } + Ok(fitness) + } + /// Runs the input and triggers observers and feedback pub fn evaluate_input(&mut self, input: &I, executor: &mut E) -> Result where diff --git a/afl/src/events/llmp.rs b/afl/src/events/llmp.rs index d3ff38cc88..ff4b5e8388 100644 --- a/afl/src/events/llmp.rs +++ b/afl/src/events/llmp.rs @@ -234,6 +234,39 @@ impl LlmpConnection { } } } + + /// Sends the given buffer over this connection, no matter if client or broker. + pub fn send_buf(&mut self, tag: Tag, buf: &[u8]) -> Result<(), AflError> { + match self { + LlmpConnection::IsBroker {broker, listener_thread: _} => { + broker.send_buf(tag, buf) + }, + LlmpConnection::IsClient {client} => { + client.send_buf(tag, buf) + } + } + } + + /// Returns the next message, tag, buf, if avaliable, else None + #[inline] + pub fn recv_buf(&mut self) -> Result, AflError> { + match self { + LlmpConnection::IsBroker {broker, listener_thread: _} => { + broker.recv_buf() + }, + LlmpConnection::IsClient {client} => { + client.recv_buf() + } + } + } + + /// A client blocks/spins until the next message gets posted to the page, + /// then returns that message. + #[inline] + pub unsafe fn recv_blocking(&mut self) -> Result<*mut LlmpMsg, AflError> { + self.llmp_in.recv_blocking() + } + } /// Contents of the share mem pages, used by llmp internally diff --git a/afl/src/events/mod.rs b/afl/src/events/mod.rs index eef412b909..b1b40f3eb4 100644 --- a/afl/src/events/mod.rs +++ b/afl/src/events/mod.rs @@ -4,7 +4,7 @@ pub mod llmp; pub mod shmem_translated; use alloc::string::String; -use core::marker::PhantomData; +use core::{marker::PhantomData, time}; use serde::{Deserialize, Serialize}; @@ -12,10 +12,10 @@ use serde::{Deserialize, Serialize}; //pub mod shmem_translated; #[cfg(feature = "std")] -use std::io::Write; +use std::{io::Write, time::{SystemTime, UNIX_EPOCH}}; use crate::corpus::Corpus; -use crate::engines::State; +use crate::{engines::State, utils}; use crate::executors::Executor; use crate::feedbacks::FeedbacksTuple; use crate::inputs::Input; @@ -32,7 +32,13 @@ pub enum BrokerEventResult { Forward, } -pub trait ShowStats {} +pub struct ClientStats { + // stats (maybe we need a separated struct?) + id: usize, + executions: u64, + execs_over_sec: u64, + corpus_size: usize, +} /// A custom event, for own messages, with own handler. pub trait CustomEvent: SerdeAny + Serialize @@ -178,86 +184,121 @@ where /// Return the number of processes events or an error fn process(&mut self, state: &mut State, corpus: &mut C) -> Result; + /// the client stat, mutable + fn client_stats_mut(&mut self) -> &mut Vec; + /// the client stat + fn client_stats(&self) -> &[ClientStats]; + + /// Amount of elements in the corpus (combined for all children) + fn corpus_size(&self) -> usize; + + /// Incremt the cropus size + fn corpus_size_inc(&mut self) -> usize; + + /// Time this fuzzing run stated + fn start_time(&mut self) -> time::Duration; + + /// Total executions #[inline] - fn on_recv(&self, _state: &mut State, _corpus: &mut C) -> Result<(), AflError> { - // TODO: Better way to move out of testcase, or get ref - //Ok(corpus.add(self.testcase.take().unwrap())) - Ok(()) + fn total_execs(&mut self) -> u64 { + self.client_stats().iter().fold(0u64, |acc, x| acc + x.executions) } - // TODO the broker has a state? do we need to pass state and corpus? - fn handle_in_broker(&mut self, event: &Event) -> Result { + /// Executions per second + #[inline] + fn execs_per_sec(&mut self) -> u64 { + let time_since_start = (utils::current_time() - self.start_time()).as_secs(); + if time_since_start == 0 { 0 } else { + self.total_execs() / time_since_start + } + } + + /// Broker fun + fn handle_in_broker(&mut self, event: &Event) -> Result { match event { - Event::LoadInitial { - sender_id: _, - phantom: _, - } => Ok(BrokerEventResult::Handled), Event::NewTestcase { sender_id: _, input: _, observers: _, - corpus_count: _, - } => Ok(BrokerEventResult::Forward), + corpus_count, + } => { + self.corpus_size_inc(); + println!( + "[NEW] corpus: {} execs: {} execs/s: {}", + self.corpus_size(), self.total_execs(), self.execs_per_sec() + ); + Ok(BrokerEventResult::Forward) + } Event::UpdateStats { - sender_id: _, - executions: _, - execs_over_sec: _, + sender_id, + executions, + execs_over_sec, phantom: _, } => { - // TODO + // TODO: The stats buffer should be added on client add. + let client_stat_count = self.client_stats().len(); + for i in client_stat_count..*sender_id as usize { + self.client_stats_mut().push(ClientStats { + id: client_stat_count + i, + corpus_size: 0, + execs_over_sec: 0, + executions: 0 + }) + } + let mut stat = &mut self.client_stats_mut()[*sender_id as usize]; + println!( + "[UPDATE] corpus: {} execs: {} execs/s: {}", + self.corpus_size(), self.total_execs(), self.execs_per_sec() + ); Ok(BrokerEventResult::Handled) } Event::Crash { sender_id: _, input: _, phantom: _, - } => Ok(BrokerEventResult::Handled), + } => { + panic!("LoggerEventManager cannot handle Event::Crash"); + } Event::Timeout { sender_id: _, input: _, phantom: _, } => { - // TODO - Ok(BrokerEventResult::Handled) + panic!("LoggerEventManager cannot handle Event::Timeout"); } Event::Log { - sender_id, + sender_id: _, severity_level, message, phantom: _, } => { - //TODO: broker.log() - #[cfg(feature = "std")] - println!("{}[{}]: {}", sender_id, severity_level, message); - // Silence warnings for no_std - let (_, _, _) = (sender_id, severity_level, message); + println!("[LOG {}]: {}", severity_level, message); Ok(BrokerEventResult::Handled) } - Event::None { phantom: _ } => Ok(BrokerEventResult::Handled), - Event::Custom { - sender_id: _, /*custom_event} => custom_event.handle_in_broker(state, corpus)*/ - } => Ok(BrokerEventResult::Forward), - //_ => Ok(BrokerEventResult::Forward), + _ => Ok(BrokerEventResult::Forward), } } + /// Client fun fn handle_in_client( &mut self, - event: Event, - _state: &mut State, - _corpus: &mut C, + event: Event, + state: &mut State, + corpus: &mut C, ) -> Result<(), AflError> { match event { Event::NewTestcase { sender_id: _, - input: _, - observers: _, + input, + observers, corpus_count: _, } => { // here u should match sender_id, if equal to the current one do not re-execute // we need to pass engine to process() too, TODO #[cfg(feature = "std")] - println!("PLACEHOLDER: received NewTestcase"); + println!("Received new Testcase"); + let interestingness = state.is_interesting(input, observers)?; + state.add_if_interesting(corpus, input, interestingness); Ok(()) } _ => Err(AflError::Unknown( @@ -267,18 +308,6 @@ where } } -/*TODO - fn on_recv(&self, state: &mut State, _corpus: &mut C) -> Result<(), AflError> { - println!( - "#{}\t exec/s: {}", - state.executions(), - //TODO: Count corpus.entries().len(), - state.executions_over_seconds() - ); - Ok(()) - } -*/ - #[cfg(feature = "std")] pub struct LoggerEventManager where @@ -297,9 +326,10 @@ where // stats (maybe we need a separated struct?) executions: usize, execs_over_sec: u64, - corpus_count: usize, - - phantom: PhantomData<(C, E, OT, FT, I, R)>, + corpus_size: usize, + start_time: time::Duration, + client_stats: Vec, + phantom: PhantomData<(C, E, I, R)>, } #[cfg(feature = "std")] @@ -335,63 +365,27 @@ where Ok(c) } - fn handle_in_broker(&mut self, event: &Event) -> Result { - match event { - Event::NewTestcase { - sender_id: _, - input: _, - observers: _, - corpus_count, - } => { - self.corpus_count = *corpus_count; - writeln!( - self.writer, - "[NEW] corpus: {} execs: {} execs/s: {}", - self.corpus_count, self.executions, self.execs_over_sec - )?; - Ok(BrokerEventResult::Handled) - } - Event::UpdateStats { - sender_id: _, - executions, - execs_over_sec, - phantom: _, - } => { - self.executions = *executions; - self.execs_over_sec = *execs_over_sec; - writeln!( - self.writer, - "[UPDATE] corpus: {} execs: {} execs/s: {}", - self.corpus_count, self.executions, self.execs_over_sec - )?; - Ok(BrokerEventResult::Handled) - } - Event::Crash { - sender_id: _, - input: _, - phantom: _, - } => { - panic!("LoggerEventManager cannot handle Event::Crash"); - } - Event::Timeout { - sender_id: _, - input: _, - phantom: _, - } => { - panic!("LoggerEventManager cannot handle Event::Timeout"); - } - Event::Log { - sender_id: _, - severity_level, - message, - phantom: _, - } => { - writeln!(self.writer, "[LOG {}]: {}", severity_level, message)?; - Ok(BrokerEventResult::Handled) - } - _ => Ok(BrokerEventResult::Handled), - } + fn client_stats_mut(&mut self) -> &mut Vec { + &mut self.client_stats } + + fn client_stats(&self) -> &[ClientStats] { + &self.client_stats + } + + fn corpus_size(&self) -> usize { + self.corpus_size + } + + fn corpus_size_inc(&mut self) -> usize { + self.corpus_size += 1; + self.corpus_size + } + + fn start_time(&mut self) -> time::Duration { + self.start_time + } + } #[cfg(feature = "std")] @@ -408,31 +402,18 @@ where { pub fn new(writer: W) -> Self { Self { + start_time: utils::current_time(), + client_stats: vec![], writer: writer, count: 0, executions: 0, execs_over_sec: 0, - corpus_count: 0, + corpus_size: 0, phantom: PhantomData, - } + }; } } -/// Eventmanager for multi-processed application -#[cfg(feature = "std")] -pub struct LlmpBrokerEventManager -where - C: Corpus, - I: Input, - E: Executor, - OT: ObserversTuple, - FT: FeedbacksTuple, - R: Rand, - //CE: CustomEvent, -{ - llmp_broker: llmp::LlmpBroker, - phantom: PhantomData<(C, E, OT, FT, I, R)>, -} #[cfg(feature = "std")] /// Forward this to the client @@ -444,25 +425,29 @@ const _LLMP_TAG_EVENT_TO_BROKER: llmp::Tag = 0x2B80438; /// Handle in both const _LLMP_TAG_EVENT_TO_BOTH: llmp::Tag = 0x2B0741; -/// Eventmanager for multi-processed application + + #[cfg(feature = "std")] -pub struct LlmpClientEventManager +pub struct LlmpEventManager where - C: Corpus, - I: Input, - E: Executor, - OT: ObserversTuple, - FT: FeedbacksTuple, - R: Rand, - //CE: CustomEvent, + W: Write, + //CE: CustomEvent, { - _llmp_client: llmp::LlmpClient, - phantom: PhantomData<(C, E, OT, FT, I, R)>, + writer: W, + count: usize, + + // stats (maybe we need a separated struct?) + executions: usize, + execs_over_sec: u64, + corpus_size: usize, + start_time: time::Duration, + client_stats: Vec, + llmp: llmp::LlmpConnection, + phantom: PhantomData<(C, E, I, R)>, } #[cfg(feature = "std")] -impl EventManager - for LlmpBrokerEventManager +impl EventManager for LlmpEventManager where C: Corpus, E: Executor, @@ -470,131 +455,44 @@ where FT: FeedbacksTuple, I: Input, R: Rand, + W: Write, + //CE: CustomEvent, { - /// Fire an Event - fn fire<'a>(&mut self, event: Event<'a, I, OT>) -> Result<(), AflError> { + #[inline] + fn fire<'a>(&mut self, event: Event<'a, I>) -> Result<(), AflError> { let serialized = postcard::to_allocvec(&event)?; - self.llmp_broker + self .send_buf(LLMP_TAG_EVENT_TO_CLIENT, &serialized)?; Ok(()) } - fn process( - &mut self, - _state: &mut State, - _corpus: &mut C, - ) -> Result { - // TODO: iterators - /* - let mut handled = vec![]; - for x in self.events.iter() { - handled.push(x.handle_in_broker(state, corpus)?); - } - handled - .iter() - .zip(self.events.iter()) - .map(|(x, event)| match x { - BrokerEventResult::Forward => event.handle_in_client(state, corpus), - // Ignore broker-only events - BrokerEventResult::Handled => Ok(()), - }) - .for_each(drop); - let count = self.events.len(); - dbg!("Handled {} events", count); - self.events.clear(); - - let num = self.events.len(); - for event in &self.events {} - - self.events.clear(); - */ - - Ok(0) + fn process(&mut self, _state: &mut State, _corpus: &mut C) -> Result { + let c = self.count; + self.count = 0; + Ok(c) } - fn on_recv(&self, _state: &mut State, _corpus: &mut C) -> Result<(), AflError> { - // TODO: Better way to move out of testcase, or get ref - //Ok(corpus.add(self.testcase.take().unwrap())) - Ok(()) + fn client_stats_mut(&mut self) -> &mut Vec { + &mut self.client_stats } - fn handle_in_broker(&mut self, event: &Event) -> Result { - match event { - Event::LoadInitial { - sender_id: _, - phantom: _, - } => Ok(BrokerEventResult::Handled), - Event::NewTestcase { - sender_id: _, - input: _, - observers: _, - corpus_count: _, - } => Ok(BrokerEventResult::Forward), - Event::UpdateStats { - sender_id: _, - executions: _, - execs_over_sec: _, - phantom: _, - } => { - // TODO - Ok(BrokerEventResult::Handled) - } - Event::Crash { - sender_id: _, - input: _, - phantom: _, - } => Ok(BrokerEventResult::Handled), - Event::Timeout { - sender_id: _, - input: _, - phantom: _, - } => { - // TODO - Ok(BrokerEventResult::Handled) - } - Event::Log { - sender_id, - severity_level, - message, - phantom: _, - } => { - //TODO: broker.log() - #[cfg(feature = "std")] - println!("{}[{}]: {}", sender_id, severity_level, message); - Ok(BrokerEventResult::Handled) - } - Event::None { phantom: _ } => Ok(BrokerEventResult::Handled), - Event::Custom { - sender_id: _, /*custom_event} => custom_event.handle_in_broker(state, corpus)*/ - } => Ok(BrokerEventResult::Forward), - //_ => Ok(BrokerEventResult::Forward), - } + fn client_stats(&self) -> &[ClientStats] { + &self.client_stats } - fn handle_in_client( - &mut self, - event: Event, - _state: &mut State, - _corpus: &mut C, - ) -> Result<(), AflError> { - match event { - Event::NewTestcase { - sender_id: _, - input: _, - observers: _, - corpus_count: _, - } => { - // here u should match sender_id, if equal to the current one do not re-execute - // we need to pass engine to process() too, TODO - #[cfg(feature = "std")] - println!("PLACEHOLDER: received NewTestcase"); - Ok(()) - } - _ => Err(AflError::Unknown( - "Received illegal message that message should not have arrived.".into(), - )), - } + fn corpus_size(&self) -> usize { + self.corpus_size } + + fn corpus_size_inc(&mut self) -> usize { + self.corpus_size += 1; + self.corpus_size + } + + fn start_time(&mut self) -> time::Duration { + self.start_time + } + } #[cfg(feature = "std")] diff --git a/afl/src/stages/mutational.rs b/afl/src/stages/mutational.rs index 5fc0e1ee3c..54227a3449 100644 --- a/afl/src/stages/mutational.rs +++ b/afl/src/stages/mutational.rs @@ -80,7 +80,7 @@ where observers: PtrMut::Ref(engine.executor_mut().observers_mut()), corpus_count: corpus.count() + 1, })?; - let _ = corpus.add(testcase); + // let _ = corpus.add(testcase); } } Ok(()) diff --git a/afl/src/utils.rs b/afl/src/utils.rs index 71618de124..72aad80edc 100644 --- a/afl/src/utils.rs +++ b/afl/src/utils.rs @@ -1,7 +1,7 @@ //! Utility functions for AFL use alloc::rc::Rc; -use core::cell::RefCell; +use core::{cell::RefCell, time}; use core::debug_assert; use core::fmt::Debug; use xxhash_rust::xxh3::xxh3_64_with_seed; @@ -75,7 +75,21 @@ where const HASH_CONST: u64 = 0xa5b35705; +/// Current time #[cfg(feature = "std")] +#[inline] +pub fn current_time() -> time::Duration { + SystemTime::now().duration_since(UNIX_EPOCH).unwrap() +} +/// Current time (fixed fallback for no_std) +#[cfg(not(feature = "std"))] +#[inline] +fn current_time() -> time::Duration { + self.start_time() +} + +#[cfg(feature = "std")] +#[inline] /// Gets current nanoseconds since UNIX_EPOCH pub fn current_nanos() -> u64 { SystemTime::now() @@ -361,6 +375,7 @@ impl XKCDRand { } /// Get the next higher power of two +#[inline] pub const fn next_pow2(val: u64) -> u64 { let mut out = val.wrapping_sub(1); out |= out >> 1;