diff --git a/afl/src/events/mod.rs b/afl/src/events/mod.rs index b6208e0f3f..e97b75404a 100644 --- a/afl/src/events/mod.rs +++ b/afl/src/events/mod.rs @@ -1,13 +1,15 @@ //! Eventmanager manages all events that go to other instances of the fuzzer. +pub mod stats; +pub use stats::*; + +use crate::llmp::LlmpReceiver; +use crate::llmp::LlmpSender; use alloc::{ string::{String, ToString}, vec::Vec, }; -use core::{ - time::Duration, - {marker::PhantomData, time}, -}; +use core::{fmt, marker::PhantomData, time::Duration}; use serde::{Deserialize, Serialize}; #[cfg(feature = "std")] @@ -19,15 +21,46 @@ use crate::{ inputs::Input, llmp::{self, LlmpClient, LlmpClientDescription, Tag}, observers::ObserversTuple, - serde_anymap::Ptr, shmem::ShMem, state::State, - utils::{current_time, Rand}, + utils::Rand, AflError, }; -#[derive(Debug, Copy, Clone)] +/// The log event severity +#[derive(Serialize, Deserialize, Debug, Clone, Copy)] +pub enum LogSeverity { + /// Debug severity + Debug, + /// Information + Info, + /// Warning + Warn, + /// Error + Error, +} + +impl fmt::Display for LogSeverity { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + LogSeverity::Debug => { + write!(f, "Debug") + } + LogSeverity::Info => { + write!(f, "Info") + } + LogSeverity::Warn => { + write!(f, "Warn") + } + LogSeverity::Error => { + write!(f, "Error") + } + } + } +} + /// Indicate if an event worked or not +#[derive(Serialize, Deserialize, Debug, Copy, Clone)] pub enum BrokerEventResult { /// The broker haneled this. No need to pass it on. Handled, @@ -35,158 +68,6 @@ pub enum BrokerEventResult { Forward, } -const CLIENT_STATS_TIME_WINDOW_SECS: u64 = 5; // 5 seconds - -#[derive(Debug, Clone, Default)] -pub struct ClientStats { - // stats (maybe we need a separated struct?) - corpus_size: u64, - executions: u64, - last_window_executions: u64, - last_window_time: time::Duration, - last_execs_per_sec: u64, -} - -impl ClientStats { - pub fn update_executions(&mut self, executions: u64, cur_time: time::Duration) { - self.executions = executions; - if (cur_time - self.last_window_time).as_secs() > CLIENT_STATS_TIME_WINDOW_SECS { - self.last_execs_per_sec = self.execs_per_sec(cur_time); - self.last_window_time = cur_time; - self.last_window_executions = executions; - } - } - - pub fn execs_per_sec(&self, cur_time: time::Duration) -> u64 { - if self.executions == 0 { - return 0; - } - let secs = (cur_time - self.last_window_time).as_secs(); - if secs == 0 { - self.last_execs_per_sec - } else { - let diff = self.executions - self.last_window_executions; - diff / secs - } - } -} - -pub trait Stats { - /// the client stats (mut) - fn client_stats_mut(&mut self) -> &mut Vec; - - /// the client stats - fn client_stats(&self) -> &[ClientStats]; - - /// creation time - fn start_time(&mut self) -> time::Duration; - - /// show the stats to the user - fn show(&mut self, event_msg: String); - - /// Amount of elements in the corpus (combined for all children) - fn corpus_size(&self) -> u64 { - self.client_stats() - .iter() - .fold(0u64, |acc, x| acc + x.corpus_size) - } - - /// Total executions - #[inline] - fn total_execs(&mut self) -> u64 { - self.client_stats() - .iter() - .fold(0u64, |acc, x| acc + x.executions) - } - - /// Executions per second - #[inline] - fn execs_per_sec(&mut self) -> u64 { - let cur_time = current_time(); - self.client_stats() - .iter() - .fold(0u64, |acc, x| acc + x.execs_per_sec(cur_time)) - } - - /// The client stats for a specific id, creating new if it doesn't exist - fn client_stats_mut_for(&mut self, client_id: u32) -> &mut ClientStats { - let client_stat_count = self.client_stats().len(); - for _ in client_stat_count..(client_id + 1) as usize { - self.client_stats_mut().push(ClientStats { - last_window_time: current_time(), - ..Default::default() - }) - } - &mut self.client_stats_mut()[client_id as usize] - } -} - -#[derive(Clone, Debug)] -pub struct SimpleStats -where - F: FnMut(String), -{ - print_fn: F, - start_time: Duration, - corpus_size: usize, - client_stats: Vec, -} - -impl Stats for SimpleStats -where - F: FnMut(String), -{ - /// the client stats, mutable - fn client_stats_mut(&mut self) -> &mut Vec { - &mut self.client_stats - } - - /// the client stats - fn client_stats(&self) -> &[ClientStats] { - &self.client_stats - } - - /// Time this fuzzing run stated - fn start_time(&mut self) -> time::Duration { - self.start_time - } - - fn show(&mut self, event_msg: String) { - let fmt = format!( - "[{}] clients: {}, corpus: {}, executions: {}, exec/sec: {}", - event_msg, - self.client_stats().len(), - self.corpus_size(), - self.total_execs(), - self.execs_per_sec() - ); - (self.print_fn)(fmt); - } -} - -impl SimpleStats -where - F: FnMut(String), -{ - pub fn new(print_fn: F) -> Self { - Self { - print_fn: print_fn, - start_time: current_time(), - corpus_size: 0, - client_stats: vec![], - } - } - - pub fn with_time(print_fn: F, start_time: time::Duration) -> Self { - Self { - print_fn: print_fn, - start_time: start_time, - corpus_size: 0, - client_stats: vec![], - } - } -} - /* /// A custom event, for own messages, with own handler. pub trait CustomEvent: SerdeAny @@ -203,24 +84,91 @@ where */ /// Events sent around in the library -pub trait Event +#[derive(Serialize, Deserialize, Clone, Debug)] +#[serde(bound = "I: serde::de::DeserializeOwned")] +pub enum Event where I: Input, { - /// Returns the name of this event - fn name(&self) -> &str; - /// This method will be called in the broker - fn handle_in_broker(&self, stats: &mut ST) -> Result - where - ST: Stats; - /// This method will be called in the clients after handle_in_broker (unless BrokerEventResult::Handled) was returned in handle_in_broker - fn handle_in_client(self, state: &mut State) -> Result<(), AflError> - where - C: Corpus, - FT: FeedbacksTuple, - R: Rand; + // TODO use an ID to keep track of the original index in the sender Corpus + // The sender can then use it to send Testcase metadatas with CustomEvent + /// A fuzzer found a new testcase. Rejoice! + NewTestcase { + /// The input for the new testcase + input: I, + /// The state of the observers when this testcase was found + observers_buf: Vec, + /// The new corpus size of this client + corpus_size: usize, + /// The client config for this observers/testcase combination + client_config: String, + }, + /// New stats. + UpdateStats { + /// The executions of this client + executions: usize, + /// The execs per sec for this client + execs_over_sec: u64, + phantom: PhantomData, + }, + /// A crash was found + Crash { + /// Crashing input + input: I, + }, + /// A timeout was found + Timeout { + /// Timeouting input + input: I, + }, + /// Write a new log + Log { + /// the severity level + severity_level: LogSeverity, + /// The message + message: String, + phantom: PhantomData, + }, + /*/// A custom type + Custom { + // TODO: Allow custom events + // custom_event: Box>, + },*/ } +impl Event +where + I: Input, +{ + fn name(&self) -> &str { + match self { + Event::NewTestcase { + input: _, + client_config: _, + corpus_size: _, + observers_buf: _, + } => "New Testcase", + Event::UpdateStats { + executions: _, + execs_over_sec: _, + phantom: _, + } => "Stats", + Event::Crash { input: _ } => "Crash", + Event::Timeout { input: _ } => "Timeout", + Event::Log { + severity_level: _, + message: _, + phantom: _, + } => "Log", + /*Event::Custom { + sender_id: _, /*custom_event} => custom_event.name()*/ + } => "todo",*/ + } + } +} + +/// EventManager is the main communications hub. +/// For the "normal" multi-processed mode, you may want to look into `RestartingEventManager` pub trait EventManager where I: Input, @@ -236,6 +184,7 @@ where FT: FeedbacksTuple, R: Rand; + /// Serialize all observers for this type and manager fn serialize_observers(&mut self, observers: &OT) -> Result, AflError> where OT: ObserversTuple, @@ -243,6 +192,7 @@ where Ok(postcard::to_allocvec(observers)?) } + /// Deserialize all observers for this type and manager fn deserialize_observers(&mut self, observers_buf: &[u8]) -> Result where OT: ObserversTuple, @@ -250,36 +200,8 @@ where Ok(postcard::from_bytes(observers_buf)?) } - fn new_testcase( - &mut self, - _input: &I, - _observers: &OT, - _corpus_size: usize, - _config: String, - ) -> Result<(), AflError> - where - OT: ObserversTuple, - { - Ok(()) - } - - fn update_stats(&mut self, _executions: usize, _execs_over_sec: u64) -> Result<(), AflError> { - Ok(()) - } - - fn crash(&mut self, _input: &I) -> Result<(), AflError> { - Ok(()) - } - - fn timeout(&mut self, _input: &I) -> Result<(), AflError> { - Ok(()) - } - - fn log(&mut self, _severity_level: u8, _message: String) -> Result<(), AflError> { - Ok(()) - } - - // TODO Custom event fire (dyn CustomEvent or similar) + /// Send off an event to the broker + fn fire(&mut self, event: Event) -> Result<(), AflError>; } /// An eventmgr for tests, and as placeholder if you really don't need an event manager. @@ -299,100 +221,99 @@ where { Ok(0) } + + fn fire(&mut self, _event: Event) -> Result<(), AflError> { + Ok(()) + } } -/// Events that may happen +/// A simple, single-threaded event manager that just logs #[derive(Clone, Debug)] -pub enum LoggerEvent +pub struct LoggerEventManager where I: Input, + ST: Stats, //CE: CustomEvent, { - NewTestcase { - corpus_size: usize, - phantom: PhantomData, - }, - UpdateStats { - executions: usize, - execs_over_sec: u64, - phantom: PhantomData, - }, - Crash { - input: I, - }, - Timeout { - input: I, - }, - Log { - severity_level: u8, - message: String, - phantom: PhantomData, - }, - /*Custom { - // TODO: Allow custom events - // custom_event: Box>, - },*/ + /// The stats + stats: ST, + /// The events that happened since the last handle_in_broker + events: Vec>, } -impl Event for LoggerEvent +impl EventManager for LoggerEventManager where I: Input, + ST: Stats, //CE: CustomEvent, { - #[inline] - fn name(&self) -> &str { - match self { - LoggerEvent::NewTestcase { - corpus_size: _, - phantom: _, - } => "New Testcase", - LoggerEvent::UpdateStats { - executions: _, - execs_over_sec: _, - phantom: _, - } => "Stats", - LoggerEvent::Crash { input: _ } => "Crash", - LoggerEvent::Timeout { input: _ } => "Timeout", - LoggerEvent::Log { - severity_level: _, - message: _, - phantom: _, - } => "Log", - /*Event::Custom => custom_event.name() - } => "todo",*/ + fn process(&mut self, state: &mut State) -> Result + where + C: Corpus, + FT: FeedbacksTuple, + R: Rand, + { + let count = self.events.len(); + while self.events.len() > 0 { + let event = self.events.pop().unwrap(); + self.handle_in_client(state, 0, event)?; + } + Ok(count) + } + + fn fire(&mut self, event: Event) -> Result<(), AflError> { + match Self::handle_in_broker(&mut self.stats, 0, &event)? { + BrokerEventResult::Forward => self.events.push(event), + BrokerEventResult::Handled => (), + }; + Ok(()) + } +} + +impl LoggerEventManager +where + I: Input, + ST: Stats, //TODO CE: CustomEvent, +{ + pub fn new(stats: ST) -> Self { + Self { + stats: stats, + events: vec![], } } - /// Broker fun - #[inline] - fn handle_in_broker(&self, stats: &mut ST) -> Result - where - ST: Stats, - { - match self { - LoggerEvent::NewTestcase { + // Handle arriving events in the broker + fn handle_in_broker( + stats: &mut ST, + _sender_id: u32, + event: &Event, + ) -> Result { + match event { + Event::NewTestcase { + input: _, + client_config: _, corpus_size, - phantom: _, + observers_buf: _, } => { stats.client_stats_mut()[0].corpus_size = *corpus_size as u64; - stats.show(self.name().to_string()); + stats.show(event.name().to_string()); Ok(BrokerEventResult::Handled) } - LoggerEvent::UpdateStats { + Event::UpdateStats { executions, execs_over_sec: _, phantom: _, } => { // TODO: The stats buffer should be added on client add. stats.client_stats_mut()[0].executions = *executions as u64; - stats.show(self.name().to_string()); + stats.show(event.name().to_string()); Ok(BrokerEventResult::Handled) } - LoggerEvent::Crash { input: _ } => { + Event::Crash { input: _ } => { panic!("LoggerEventManager cannot handle Event::Crash"); } - LoggerEvent::Timeout { input: _ } => { + Event::Timeout { input: _ } => { panic!("LoggerEventManager cannot handle Event::Timeout"); } - LoggerEvent::Log { + Event::Log { severity_level, message, phantom: _, @@ -405,294 +326,22 @@ where } } - #[inline] - fn handle_in_client(self, _state: &mut State) -> Result<(), AflError> - where - C: Corpus, - FT: FeedbacksTuple, - R: Rand, - { - match self { - _ => Err(AflError::Unknown(format!( - "Received illegal message that message should not have arrived: {:?}.", - self - ))), - } - } -} - -#[derive(Clone, Debug)] -pub struct LoggerEventManager -where - I: Input, - //CE: CustomEvent, -{ - stats: ST, - events: Vec>, -} - -impl EventManager for LoggerEventManager -where - I: Input, - ST: Stats, - //CE: CustomEvent, -{ - fn process(&mut self, state: &mut State) -> Result - where - C: Corpus, - FT: FeedbacksTuple, - R: Rand, - { - let count = self.events.len(); - self.events - .drain(..) - .try_for_each(|event| event.handle_in_client(state))?; - Ok(count) - } - - fn new_testcase( + // Handle arriving events in the client + fn handle_in_client( &mut self, - _input: &I, - _observers: &OT, - corpus_size: usize, - _config: String, + _state: &mut State, + _sender_id: u32, + event: Event, ) -> Result<(), AflError> - where - OT: ObserversTuple, - { - let event = LoggerEvent::NewTestcase { - corpus_size: corpus_size, - phantom: PhantomData, - }; - match event.handle_in_broker(&mut self.stats)? { - BrokerEventResult::Forward => self.events.push(event), - _ => (), - }; - Ok(()) - } - - fn update_stats(&mut self, executions: usize, execs_over_sec: u64) -> Result<(), AflError> { - let event = LoggerEvent::UpdateStats { - executions: executions, - execs_over_sec: execs_over_sec, - phantom: PhantomData, - }; - match event.handle_in_broker(&mut self.stats)? { - BrokerEventResult::Forward => self.events.push(event), - _ => (), - }; - Ok(()) - } - - fn crash(&mut self, input: &I) -> Result<(), AflError> { - let event = LoggerEvent::Crash { - input: input.clone(), - }; - match event.handle_in_broker(&mut self.stats)? { - BrokerEventResult::Forward => self.events.push(event), - _ => (), - }; - Ok(()) - } - - fn timeout(&mut self, input: &I) -> Result<(), AflError> { - let event = LoggerEvent::Timeout { - input: input.clone(), - }; - match event.handle_in_broker(&mut self.stats)? { - BrokerEventResult::Forward => self.events.push(event), - _ => (), - }; - Ok(()) - } - - fn log(&mut self, severity_level: u8, message: String) -> Result<(), AflError> { - let event = LoggerEvent::Log { - severity_level: severity_level, - message: message, - phantom: PhantomData, - }; - match event.handle_in_broker(&mut self.stats)? { - BrokerEventResult::Forward => self.events.push(event), - _ => (), - }; - Ok(()) - } -} - -impl LoggerEventManager -where - I: Input, - ST: Stats, - //TODO CE: CustomEvent, -{ - pub fn new(stats: ST) -> Self { - Self { - stats: stats, - events: vec![], - } - } -} - -#[derive(Serialize, Deserialize, Clone, Debug)] -#[serde(bound = "I: serde::de::DeserializeOwned")] -pub enum LLMPEventKind<'a, I> -where - I: Input, -{ - // TODO use an ID to keep track of the original index in the sender Corpus - // The sender can then use it to send Testcase metadatas with CustomEvent - NewTestcase { - input: Ptr<'a, I>, - observers_buf: Vec, - corpus_size: usize, - client_config: String, - }, - UpdateStats { - executions: usize, - execs_over_sec: u64, - phantom: PhantomData<&'a I>, - }, - Crash { - input: I, - }, - Timeout { - input: I, - }, - Log { - severity_level: u8, - message: String, - phantom: PhantomData, - }, - /*Custom { - // TODO: Allow custom events - // custom_event: Box>, - },*/ -} - -#[derive(Serialize, Deserialize, Clone, Debug)] -#[serde(bound = "I: serde::de::DeserializeOwned")] -pub struct LLMPEvent<'a, I> -where - I: Input, -{ - sender_id: u32, - kind: LLMPEventKind<'a, I>, -} - -impl<'a, I> Event for LLMPEvent<'a, I> -where - I: Input, -{ - fn name(&self) -> &str { - match self.kind { - LLMPEventKind::NewTestcase { - input: _, - client_config: _, - corpus_size: _, - observers_buf: _, - } => "New Testcase", - LLMPEventKind::UpdateStats { - executions: _, - execs_over_sec: _, - phantom: _, - } => "Stats", - LLMPEventKind::Crash { input: _ } => "Crash", - LLMPEventKind::Timeout { input: _ } => "Timeout", - LLMPEventKind::Log { - severity_level: _, - message: _, - phantom: _, - } => "Log", - /*Event::Custom { - sender_id: _, /*custom_event} => custom_event.name()*/ - } => "todo",*/ - } - } - - /// Broker fun - #[inline] - fn handle_in_broker(&self, stats: &mut ST) -> Result - where - ST: Stats, - { - match &self.kind { - LLMPEventKind::NewTestcase { - input: _, - client_config: _, - corpus_size, - observers_buf: _, - } => { - let client = stats.client_stats_mut_for(self.sender_id); - client.corpus_size = *corpus_size as u64; - stats.show(self.name().to_string() + " #" + &self.sender_id.to_string()); - Ok(BrokerEventResult::Handled) - } - LLMPEventKind::UpdateStats { - executions, - execs_over_sec: _, - phantom: _, - } => { - // TODO: The stats buffer should be added on client add. - let client = stats.client_stats_mut_for(self.sender_id); - client.executions = *executions as u64; - stats.show(self.name().to_string() + " #" + &self.sender_id.to_string()); - Ok(BrokerEventResult::Handled) - } - LLMPEventKind::Crash { input: _ } => { - #[cfg(feature = "std")] - println!("LLMPEvent::Crash"); - Ok(BrokerEventResult::Handled) - } - LLMPEventKind::Timeout { input: _ } => { - #[cfg(feature = "std")] - println!("LLMPEvent::Timeout"); - Ok(BrokerEventResult::Handled) - } - LLMPEventKind::Log { - severity_level, - message, - phantom: _, - } => { - let (_, _) = (severity_level, message); - #[cfg(feature = "std")] - println!("[LOG {}]: {}", severity_level, message); - Ok(BrokerEventResult::Handled) - } //_ => Ok(BrokerEventResult::Forward), - } - } - - #[inline] - fn handle_in_client(self, state: &mut State) -> Result<(), AflError> where C: Corpus, FT: FeedbacksTuple, R: Rand, { - match self.kind { - LLMPEventKind::NewTestcase { - input, - client_config: _, - corpus_size: _, - observers_buf, - } => { - // TODO: here u should match client_config, if equal to the current one do not re-execute - // we need to pass engine to process() too, TODO - #[cfg(feature = "std")] - println!("Received new Testcase"); - let observers = postcard::from_bytes(&observers_buf)?; - let interestingness = state.is_interesting(input.as_ref(), &observers)?; - match input { - Ptr::Owned(b) => { - state.add_if_interesting(*b, interestingness)?; - } - _ => {} - }; - Ok(()) - } + match event { _ => Err(AflError::Unknown(format!( "Received illegal message that message should not have arrived: {:?}.", - self.name() + event ))), } } @@ -703,6 +352,7 @@ const _LLMP_TAG_EVENT_TO_CLIENT: llmp::Tag = 0x2C11E471; /// Only handle this in the broker const _LLMP_TAG_EVENT_TO_BROKER: llmp::Tag = 0x2B80438; /// Handle in both +/// const LLMP_TAG_EVENT_TO_BOTH: llmp::Tag = 0x2B0741; #[derive(Clone, Debug)] @@ -713,8 +363,8 @@ where ST: Stats, //CE: CustomEvent, { + stats: Option, llmp: llmp::LlmpConnection, - stats: ST, phantom: PhantomData, } @@ -729,10 +379,10 @@ where /// If the port is not yet bound, it will act as broker /// Else, it will act as client. #[cfg(feature = "std")] - pub fn new_on_port_std(port: u16, stats: ST) -> Result { + pub fn new_on_port_std(stats: ST, port: u16) -> Result { Ok(Self { + stats: Some(stats), llmp: llmp::LlmpConnection::on_port(port)?, - stats: stats, phantom: PhantomData, }) } @@ -740,12 +390,12 @@ where /// If a client respawns, it may reuse the existing connection, previously stored by LlmpClient::to_env /// Std uses AflShmem. #[cfg(feature = "std")] - pub fn existing_client_from_env_std(env_name: &str, stats: ST) -> Result { - Self::existing_client_from_env(env_name, stats) + pub fn existing_client_from_env_std(env_name: &str) -> Result { + Self::existing_client_from_env(env_name) } } -impl LlmpEventManager +impl LlmpEventManager where I: Input, SH: ShMem, @@ -755,24 +405,24 @@ where /// If the port is not yet bound, it will act as broker /// Else, it will act as client. #[cfg(feature = "std")] - pub fn new_on_port(port: u16, stats: ST) -> Result { + pub fn new_on_port(stats: ST, port: u16) -> Result { Ok(Self { + stats: Some(stats), llmp: llmp::LlmpConnection::on_port(port)?, - stats: stats, phantom: PhantomData, }) } /// If a client respawns, it may reuse the existing connection, previously stored by LlmpClient::to_env #[cfg(feature = "std")] - pub fn existing_client_from_env(env_name: &str, stats: ST) -> Result { + pub fn existing_client_from_env(env_name: &str) -> Result { Ok(Self { + stats: None, llmp: llmp::LlmpConnection::IsClient { client: LlmpClient::on_existing_from_env(env_name)?, }, // Inserting a nop-stats element here so rust won't complain. // In any case, the client won't currently use it. - stats: stats, phantom: PhantomData, }) } @@ -785,22 +435,21 @@ where /// Create an existing client from description pub fn existing_client_from_description( description: &LlmpClientDescription, - stats: ST, ) -> Result { Ok(Self { + stats: None, llmp: llmp::LlmpConnection::existing_client_from_description(description)?, // Inserting a nop-stats element here so rust won't complain. // In any case, the client won't currently use it. - stats: stats, phantom: PhantomData, }) } /// A client on an existing map - pub fn for_client(client: LlmpClient, stats: ST) -> Self { + pub fn for_client(client: LlmpClient) -> Self { Self { + stats: None, llmp: llmp::LlmpConnection::IsClient { client }, - stats, phantom: PhantomData, } } @@ -828,16 +477,12 @@ where pub fn broker_loop(&mut self) -> Result<(), AflError> { match &mut self.llmp { llmp::LlmpConnection::IsBroker { broker } => { - let stats = &mut self.stats; + let stats = self.stats.as_mut().unwrap(); broker.loop_forever( &mut |sender_id: u32, tag: Tag, msg: &[u8]| { if tag == LLMP_TAG_EVENT_TO_BOTH { - let kind: LLMPEventKind = postcard::from_bytes(msg)?; - let event = LLMPEvent { - sender_id: sender_id, - kind: kind, - }; - match event.handle_in_broker(stats)? { + let event: Event = postcard::from_bytes(msg)?; + match Self::handle_in_broker(stats, sender_id, &event)? { BrokerEventResult::Forward => { Ok(llmp::LlmpMsgHookResult::ForwardToClients) } @@ -858,19 +503,105 @@ where /// Send an event kind via llmp #[inline] - fn send_event_kind<'a>(&mut self, event: LLMPEventKind<'a, I>) -> Result<(), AflError> { + fn send_event_kind(&mut self, event: Event) -> Result<(), AflError> { let serialized = postcard::to_allocvec(&event)?; self.llmp.send_buf(LLMP_TAG_EVENT_TO_BOTH, &serialized)?; Ok(()) } + + /// Handle arriving events in the broker + fn handle_in_broker( + stats: &mut ST, + sender_id: u32, + event: &Event, + ) -> Result { + match &event { + Event::NewTestcase { + input: _, + client_config: _, + corpus_size, + observers_buf: _, + } => { + let client = stats.client_stats_mut_for(sender_id); + client.corpus_size = *corpus_size as u64; + stats.show(event.name().to_string() + " #" + &sender_id.to_string()); + Ok(BrokerEventResult::Handled) + } + Event::UpdateStats { + executions, + execs_over_sec: _, + phantom: _, + } => { + // TODO: The stats buffer should be added on client add. + let client = stats.client_stats_mut_for(sender_id); + client.executions = *executions as u64; + stats.show(event.name().to_string() + " #" + &sender_id.to_string()); + Ok(BrokerEventResult::Handled) + } + Event::Crash { input: _ } => { + #[cfg(feature = "std")] + println!("Event::Crash"); + Ok(BrokerEventResult::Handled) + } + Event::Timeout { input: _ } => { + #[cfg(feature = "std")] + println!("Event::Timeout"); + Ok(BrokerEventResult::Handled) + } + Event::Log { + severity_level, + message, + phantom: _, + } => { + let (_, _) = (severity_level, message); + #[cfg(feature = "std")] + println!("[LOG {}]: {}", severity_level, message); + Ok(BrokerEventResult::Handled) + } //_ => Ok(BrokerEventResult::Forward), + } + } + + // Handle arriving events in the client + fn handle_in_client( + &mut self, + state: &mut State, + _sender_id: u32, + event: Event, + ) -> Result<(), AflError> + where + C: Corpus, + FT: FeedbacksTuple, + R: Rand, + { + match event { + Event::NewTestcase { + input, + client_config: _, + corpus_size: _, + observers_buf, + } => { + // TODO: here u should match client_config, if equal to the current one do not re-execute + // we need to pass engine to process() too, TODO + #[cfg(feature = "std")] + println!("Received new Testcase"); + let observers = postcard::from_bytes(&observers_buf)?; + let interestingness = state.is_interesting(&input, &observers)?; + state.add_if_interesting(input, interestingness)?; + Ok(()) + } + _ => Err(AflError::Unknown(format!( + "Received illegal message that message should not have arrived: {:?}.", + event.name() + ))), + } + } } -impl EventManager for LlmpEventManager +impl EventManager for LlmpEventManager where I: Input, SH: ShMem, - ST: Stats, - //CE: CustomEvent, + ST: Stats, //CE: CustomEvent, { fn process(&mut self, state: &mut State) -> Result where @@ -879,95 +610,63 @@ where R: Rand, { // TODO: Get around local event copy by moving handle_in_client - Ok(match &mut self.llmp { - llmp::LlmpConnection::IsClient { client } => { - let mut count = 0; - loop { - match client.recv_buf()? { - Some((sender_id, tag, msg)) => { - if tag == _LLMP_TAG_EVENT_TO_BROKER { - continue; - } - let kind: LLMPEventKind = postcard::from_bytes(msg)?; - let event = LLMPEvent { - sender_id: sender_id, - kind: kind, - }; - event.handle_in_client(state)?; - count += 1; + let mut events = vec![]; + match &mut self.llmp { + llmp::LlmpConnection::IsClient { client } => loop { + match client.recv_buf()? { + Some((sender_id, tag, msg)) => { + if tag == _LLMP_TAG_EVENT_TO_BROKER { + continue; } - None => break count, + let event: Event = postcard::from_bytes(msg)?; + events.push((sender_id, event)); } + None => break, } - } + }, _ => { #[cfg(feature = "std")] dbg!("Skipping process in broker"); - 0 } - }) + }; + let count = events.len(); + events + .drain(..) + .try_for_each(|(sender_id, event)| self.handle_in_client(state, sender_id, event))?; + Ok(count) } - fn new_testcase( - &mut self, - input: &I, - observers: &OT, - corpus_size: usize, - config: String, - ) -> Result<(), AflError> - where - OT: ObserversTuple, - { - let kind = LLMPEventKind::NewTestcase { - input: Ptr::Ref(input), - observers_buf: postcard::to_allocvec(observers)?, - corpus_size: corpus_size, - client_config: config, - }; - self.send_event_kind(kind) - } - - fn update_stats(&mut self, executions: usize, execs_over_sec: u64) -> Result<(), AflError> { - let kind = LLMPEventKind::UpdateStats { - executions: executions, - execs_over_sec: execs_over_sec, - phantom: PhantomData, - }; - self.send_event_kind(kind) - } - - fn crash(&mut self, input: &I) -> Result<(), AflError> { - let kind = LLMPEventKind::Crash { - input: input.clone(), - }; - self.send_event_kind(kind) - } - - fn timeout(&mut self, input: &I) -> Result<(), AflError> { - let kind = LLMPEventKind::Timeout { - input: input.clone(), - }; - self.send_event_kind(kind) - } - - fn log(&mut self, severity_level: u8, message: String) -> Result<(), AflError> { - let kind = LLMPEventKind::Log { - severity_level: severity_level, - message: message, - phantom: PhantomData, - }; - self.send_event_kind(kind) + fn fire(&mut self, event: Event) -> Result<(), AflError> { + self.send_event_kind(event) } } +/* TODO + + match exit_kind { + Exit::Timeout => mgr.fire(Event::Timeout(&input)).expect(&format!( + "Error sending Timeout event for input {:?}", + input + )), + Exit::Crash => mgr + .crash(input) + .expect(&format!("Error sending crash event for input {:?}", input)), + _ => (), + } + println!("foo"); + let state_corpus_serialized = serialize_state_corpus_mgr(state, corpus, mgr).unwrap(); + println!("bar: {:?}", &state_corpus_serialized); + sender.send_buf(0x1, &state_corpus_serialized).unwrap(); + +*/ + #[cfg(test)] mod tests { - use crate::events::{LLMPEvent, LLMPEventKind}; + use crate::events::Event; use crate::inputs::bytes::BytesInput; use crate::observers::ObserversTuple; use crate::observers::StdMapObserver; - use crate::serde_anymap::Ptr; use crate::tuples::{tuple_list, MatchNameAndType, Named}; static mut MAP: [u32; 4] = [0; 4]; @@ -979,21 +678,18 @@ mod tests { let observers_buf = map.serialize().unwrap(); let i = BytesInput::new(vec![0]); - let e = LLMPEvent { - sender_id: 0, - kind: LLMPEventKind::NewTestcase { - input: Ptr::Ref(&i), - observers_buf, - corpus_size: 123, - client_config: "conf".into(), - }, + let e = Event::NewTestcase { + input: i, + observers_buf, + corpus_size: 123, + client_config: "conf".into(), }; let serialized = postcard::to_allocvec(&e).unwrap(); - let d = postcard::from_bytes::>(&serialized).unwrap(); - match d.kind { - LLMPEventKind::NewTestcase { + let d = postcard::from_bytes::>(&serialized).unwrap(); + match d { + Event::NewTestcase { input: _, observers_buf, corpus_size: _, diff --git a/afl/src/events/stats.rs b/afl/src/events/stats.rs new file mode 100644 index 0000000000..c270863f22 --- /dev/null +++ b/afl/src/events/stats.rs @@ -0,0 +1,161 @@ +use core::{time, time::Duration}; + +use crate::utils::current_time; + +const CLIENT_STATS_TIME_WINDOW_SECS: u64 = 5; // 5 seconds + +/// A simple struct to keep track of client stats +#[derive(Debug, Clone, Default)] +pub struct ClientStats { + // stats (maybe we need a separated struct?) + /// The corpus size for this client + pub corpus_size: u64, + /// The total executions for this client + pub executions: u64, + /// The last reported executions for this client + pub last_window_executions: u64, + /// The last time we got this information + pub last_window_time: time::Duration, + /// The last executions per sec + pub last_execs_per_sec: u64, +} + +impl ClientStats { + pub fn update_executions(&mut self, executions: u64, cur_time: time::Duration) { + self.executions = executions; + if (cur_time - self.last_window_time).as_secs() > CLIENT_STATS_TIME_WINDOW_SECS { + self.last_execs_per_sec = self.execs_per_sec(cur_time); + self.last_window_time = cur_time; + self.last_window_executions = executions; + } + } + + pub fn execs_per_sec(&self, cur_time: time::Duration) -> u64 { + if self.executions == 0 { + return 0; + } + let secs = (cur_time - self.last_window_time).as_secs(); + if secs == 0 { + self.last_execs_per_sec + } else { + let diff = self.executions - self.last_window_executions; + diff / secs + } + } +} + +pub trait Stats { + /// the client stats (mut) + fn client_stats_mut(&mut self) -> &mut Vec; + + /// the client stats + fn client_stats(&self) -> &[ClientStats]; + + /// creation time + fn start_time(&mut self) -> time::Duration; + + /// show the stats to the user + fn show(&mut self, event_msg: String); + + /// Amount of elements in the corpus (combined for all children) + fn corpus_size(&self) -> u64 { + self.client_stats() + .iter() + .fold(0u64, |acc, x| acc + x.corpus_size) + } + + /// Total executions + #[inline] + fn total_execs(&mut self) -> u64 { + self.client_stats() + .iter() + .fold(0u64, |acc, x| acc + x.executions) + } + + /// Executions per second + #[inline] + fn execs_per_sec(&mut self) -> u64 { + let cur_time = current_time(); + self.client_stats() + .iter() + .fold(0u64, |acc, x| acc + x.execs_per_sec(cur_time)) + } + + /// The client stats for a specific id, creating new if it doesn't exist + fn client_stats_mut_for(&mut self, client_id: u32) -> &mut ClientStats { + let client_stat_count = self.client_stats().len(); + for _ in client_stat_count..(client_id + 1) as usize { + self.client_stats_mut().push(ClientStats { + last_window_time: current_time(), + ..Default::default() + }) + } + &mut self.client_stats_mut()[client_id as usize] + } +} + +#[derive(Clone, Debug)] +pub struct SimpleStats +where + F: FnMut(String), +{ + print_fn: F, + start_time: Duration, + corpus_size: usize, + client_stats: Vec, +} + +impl Stats for SimpleStats +where + F: FnMut(String), +{ + /// the client stats, mutable + fn client_stats_mut(&mut self) -> &mut Vec { + &mut self.client_stats + } + + /// the client stats + fn client_stats(&self) -> &[ClientStats] { + &self.client_stats + } + + /// Time this fuzzing run stated + fn start_time(&mut self) -> time::Duration { + self.start_time + } + + fn show(&mut self, event_msg: String) { + let fmt = format!( + "[{}] clients: {}, corpus: {}, executions: {}, exec/sec: {}", + event_msg, + self.client_stats().len(), + self.corpus_size(), + self.total_execs(), + self.execs_per_sec() + ); + (self.print_fn)(fmt); + } +} + +impl SimpleStats +where + F: FnMut(String), +{ + pub fn new(print_fn: F) -> Self { + Self { + print_fn: print_fn, + start_time: current_time(), + corpus_size: 0, + client_stats: vec![], + } + } + + pub fn with_time(print_fn: F, start_time: time::Duration) -> Self { + Self { + print_fn: print_fn, + start_time: start_time, + corpus_size: 0, + client_stats: vec![], + } + } +} diff --git a/afl/src/lib.rs b/afl/src/lib.rs index 719c15dff8..9986c0649c 100644 --- a/afl/src/lib.rs +++ b/afl/src/lib.rs @@ -29,7 +29,7 @@ pub mod utils; use alloc::string::String; use core::{fmt, marker::PhantomData}; use corpus::Corpus; -use events::EventManager; +use events::{Event, EventManager}; use executors::{Executor, HasObservers}; use feedbacks::FeedbacksTuple; use inputs::Input; @@ -86,7 +86,11 @@ where let cur = current_milliseconds(); if cur - last > 60 * 100 { last = cur; - manager.update_stats(state.executions(), state.executions_over_seconds())?; + manager.fire(Event::UpdateStats { + executions: state.executions(), + execs_over_sec: state.executions_over_seconds(), + phantom: PhantomData, + })? } } } @@ -265,9 +269,10 @@ mod tests { let mut state = State::new(corpus, tuple_list!()); - let mut event_manager = LoggerEventManager::new(SimpleStats::new(|s| { + let stats = SimpleStats::new(|s| { println!("{}", s); - })); + }); + let mut event_manager = LoggerEventManager::new(stats); let mut executor = InMemoryExecutor::new( "main", diff --git a/afl/src/mutators/scheduled.rs b/afl/src/mutators/scheduled.rs index 9d0cfbe3e2..8cf5b91ba7 100644 --- a/afl/src/mutators/scheduled.rs +++ b/afl/src/mutators/scheduled.rs @@ -208,7 +208,7 @@ where let idx = self.scheduled.schedule(14, rand, input); let mutation = match idx { 0 => mutation_bitflip, - 1 => mutation_byteflip, + /*1 => mutation_byteflip, 2 => mutation_byteinc, 3 => mutation_bytedec, 4 => mutation_byteneg, @@ -219,7 +219,7 @@ where 8 => mutation_dwordadd, 9 => mutation_byteinteresting, 10 => mutation_wordinteresting, - 11 => mutation_dwordinteresting, + 11 => mutation_dwordinteresting,*/ _ => mutation_splice, }; mutation(self, rand, state, input)?; @@ -277,7 +277,7 @@ where pub fn new_default() -> Self { let mut scheduled = StdScheduledMutator::::new(); scheduled.add_mutation(mutation_bitflip); - scheduled.add_mutation(mutation_byteflip); + /*scheduled.add_mutation(mutation_byteflip); scheduled.add_mutation(mutation_byteinc); scheduled.add_mutation(mutation_bytedec); scheduled.add_mutation(mutation_byteneg); @@ -301,7 +301,7 @@ where scheduled.add_mutation(mutation_bytesset); scheduled.add_mutation(mutation_bytesrandset); scheduled.add_mutation(mutation_bytescopy); - scheduled.add_mutation(mutation_bytesswap); + scheduled.add_mutation(mutation_bytesswap);*/ /* TODO scheduled.add_mutation(mutation_tokeninsert); diff --git a/afl/src/stages/mutational.rs b/afl/src/stages/mutational.rs index f6ce787a95..7f69231a11 100644 --- a/afl/src/stages/mutational.rs +++ b/afl/src/stages/mutational.rs @@ -1,7 +1,7 @@ use core::marker::PhantomData; use crate::{ - events::EventManager, + events::{Event, EventManager}, executors::{Executor, HasObservers}, feedbacks::FeedbacksTuple, inputs::Input, @@ -76,13 +76,15 @@ where // So by default we shoudl trigger it in corpus.add, so that the user can override it and remove // if needed by particular cases if fitness > 0 { + let observers_buf = manager.serialize_observers(observers)?; + // TODO decouple events manager and engine - manager.new_testcase( - &input_mut, - observers, - state.corpus().count() + 1, - "test".into(), - )?; + manager.fire(Event::NewTestcase { + input: input_mut.clone(), + observers_buf, + corpus_size: state.corpus().count() + 1, + client_config: "TODO".into(), + })?; state.add_if_interesting(input_mut, fitness)?; // let _ = corpus.add(testcase); } else { diff --git a/afl/src/state/mod.rs b/afl/src/state/mod.rs index 5cc8f52cd5..b607dd46ef 100644 --- a/afl/src/state/mod.rs +++ b/afl/src/state/mod.rs @@ -10,7 +10,7 @@ use std::{ use crate::{ corpus::{Corpus, Testcase}, - events::EventManager, + events::{Event, EventManager, LogSeverity}, executors::{Executor, HasObservers}, feedbacks::FeedbacksTuple, generators::Generator, @@ -143,10 +143,11 @@ where for in_dir in in_dirs { self.load_from_directory(executor, generator, manager, in_dir)?; } - manager.log( - 0, - format!("Loaded {} initial testcases.", self.corpus().count()), // get corpus count - )?; + manager.fire(Event::Log { + severity_level: LogSeverity::Debug, + message: format!("Loaded {} initial testcases.", self.corpus().count()), // get corpus count + phantom: PhantomData, + })?; manager.process(self)?; Ok(()) } @@ -350,10 +351,11 @@ where added += 1; } } - manager.log( - 0, - format!("Loaded {} over {} initial testcases", added, num), - )?; + manager.fire(Event::Log { + severity_level: LogSeverity::Debug, + message: format!("Loaded {} over {} initial testcases", added, num), + phantom: PhantomData, + })?; manager.process(self)?; Ok(()) } diff --git a/afl/src/utils.rs b/afl/src/utils.rs index c1839e946f..2b6593e054 100644 --- a/afl/src/utils.rs +++ b/afl/src/utils.rs @@ -56,7 +56,6 @@ where /// Deserialize the state and corpus tuple, previously serialized with `serialize_state_corpus(...)` pub fn deserialize_state_corpus_mgr( state_corpus_serialized: &[u8], - stats: ST, ) -> Result<(State, LlmpEventManager), AflError> where C: Corpus, @@ -70,7 +69,7 @@ where let client_description = postcard::from_bytes(&tuple.1)?; Ok(( postcard::from_bytes(&tuple.0)?, - LlmpEventManager::existing_client_from_description(&client_description, stats)?, + LlmpEventManager::existing_client_from_description(&client_description)?, )) }