diff --git a/afl/src/events/llmp.rs b/afl/src/events/llmp.rs index b3a4297e5d..0f139c37aa 100644 --- a/afl/src/events/llmp.rs +++ b/afl/src/events/llmp.rs @@ -88,9 +88,6 @@ const EOP_MSG_SIZE: usize = /// The header length of a llmp page in a shared map (until messages start) const LLMP_PAGE_HEADER_LEN: usize = size_of::(); -/// Message hook type -pub type LlmpMsgHookFn = unsafe fn(client_id: u32, msg: *mut LlmpMsg) -> LlmpMsgHookResult; - /// TAGs used thorughout llmp pub type Tag = u32; @@ -270,7 +267,6 @@ pub struct LlmpBroker { /// Users of Llmp can add message handlers in the broker. /// This allows us to intercept messages right in the broker /// This keeps the out map clean. - pub msg_hooks: Vec, pub llmp_clients: Vec, } @@ -797,7 +793,6 @@ impl LlmpBroker { // clients may join at any time keep_pages_forever: true, }, - msg_hooks: vec![], llmp_clients: vec![], }; @@ -820,12 +815,6 @@ impl LlmpBroker { }); } - /// Adds a hook that gets called in the broker for each new message the broker touches. - /// if the callback returns false, the message is not forwarded to the clients. */ - pub fn add_message_hook(&mut self, hook: LlmpMsgHookFn) { - self.msg_hooks.push(hook); - } - /// For internal use: Forward the current message to the out map. unsafe fn forward_msg(&mut self, msg: *mut LlmpMsg) -> Result<(), AflError> { let mut out: *mut LlmpMsg = self.alloc_next((*msg).buf_len_padded as usize)?; @@ -847,7 +836,14 @@ impl LlmpBroker { /// broker broadcast to its own page for all others to read */ #[inline] - unsafe fn handle_new_msgs(&mut self, client_id: u32) -> Result<(), AflError> { + unsafe fn handle_new_msgs( + &mut self, + client_id: u32, + on_new_msg: &mut F, + ) -> Result<(), AflError> + where + F: FnMut(u32, Tag, &[u8]) -> Result, + { let mut next_id = self.llmp_clients.len() as u32; // TODO: We could memcpy a range of pending messages, instead of one by one. @@ -862,6 +858,7 @@ impl LlmpBroker { Some(msg) => msg, } }; + if (*msg).tag == LLMP_TAG_NEW_SHM_CLIENT { /* This client informs us about yet another new client add it to the list! Also, no need to forward this msg. */ @@ -890,11 +887,12 @@ impl LlmpBroker { } else { // The message is not specifically for use. Let the user handle it, then forward it to the clients, if necessary. let mut should_forward_msg = true; - for hook in &self.msg_hooks { - match (hook)(client_id, msg) { - LlmpMsgHookResult::Handled => should_forward_msg = false, - _ => (), - } + + let map = &self.llmp_clients[client_id as usize].current_recv_map; + let msg_buf = (*msg).as_slice(map)?; + match (on_new_msg)(client_id, (*msg).tag, msg_buf)? { + LlmpMsgHookResult::Handled => should_forward_msg = false, + _ => (), } if should_forward_msg { self.forward_msg(msg)?; @@ -906,11 +904,14 @@ impl LlmpBroker { /// The broker walks all pages and looks for changes, then broadcasts them on /// its own shared page, once. #[inline] - pub fn once(&mut self) -> Result<(), AflError> { + pub fn once(&mut self, on_new_msg: &mut F) -> Result<(), AflError> + where + F: FnMut(u32, Tag, &[u8]) -> Result, + { compiler_fence(Ordering::SeqCst); for i in 0..self.llmp_clients.len() { unsafe { - self.handle_new_msgs(i as u32)?; + self.handle_new_msgs(i as u32, on_new_msg)?; } } Ok(()) @@ -919,10 +920,13 @@ impl LlmpBroker { /// Loops infinitely, forwarding and handling all incoming messages from clients. /// Never returns. Panics on error. /// 5 millis of sleep can't hurt to keep busywait not at 100% - pub fn loop_forever(&mut self, sleep_time: Option) -> ! { + pub fn loop_forever(&mut self, on_new_msg: &mut F, sleep_time: Option) -> ! + where + F: FnMut(u32, Tag, &[u8]) -> Result, + { loop { compiler_fence(Ordering::SeqCst); - self.once() + self.once(on_new_msg) .expect("An error occurred when brokering. Exiting."); match sleep_time { Some(time) => thread::sleep(time), diff --git a/afl/src/events/mod.rs b/afl/src/events/mod.rs index 8453339073..ee78bbe3c6 100644 --- a/afl/src/events/mod.rs +++ b/afl/src/events/mod.rs @@ -24,8 +24,9 @@ use crate::utils::Rand; use crate::AflError; use crate::{engines::State, utils}; -use self::llmp::LlmpMsg; +use self::llmp::Tag; +#[derive(Debug, Copy, Clone)] /// Indicate if an event worked or not pub enum BrokerEventResult { /// The broker haneled this. No need to pass it on. @@ -34,6 +35,7 @@ pub enum BrokerEventResult { Forward, } +#[derive(Debug, Clone, Default)] pub struct ClientStats { // stats (maybe we need a separated struct?) executions: u64, @@ -178,75 +180,39 @@ where } } -/// Client fun -fn handle_in_client( - event: Event, - state: &mut State, - corpus: &mut C, -) -> Result<(), AflError> -where - C: Corpus, - OT: ObserversTuple, - FT: FeedbacksTuple, - I: Input, - R: Rand, -{ - match event { - Event::NewTestcase { - sender_id: _, - input, - observers_buf, - client_config: _, - } => { - // TODO: here u should match client_config, if equal to the current one do not re-execute - // we need to pass engine to process() too, TODO - #[cfg(feature = "std")] - println!("Received new Testcase"); - let observers = postcard::from_bytes(&observers_buf)?; - let interestingness = state.is_interesting(&input, &observers)?; - state.add_if_interesting(corpus, input, interestingness)?; - Ok(()) - } - _ => Err(AflError::Unknown(format!( - "Received illegal message that message should not have arrived: {:?}.", - event - ))), - } +#[derive(Debug, Clone, Default)] +struct Stats { + start_time: Duration, + corpus_size: usize, + client_stats: Vec, } -pub trait EventManager -where - C: Corpus, - E: Executor, - OT: ObserversTuple, - FT: FeedbacksTuple, - I: Input, - R: Rand, -{ - /// Fire an Event - fn fire<'a>(&mut self, event: Event) -> Result<(), AflError>; +impl Stats { + /// the client stats, mutable + fn client_stats_mut(&mut self) -> &mut Vec { + &mut self.client_stats + } - /// Lookup for incoming events and process them. - /// Return the number of processes events or an error - fn process( - &mut self, - state: &mut State, - corpus: &mut C, - ) -> Result; - - /// the client stat, mutable - fn client_stats_mut(&mut self) -> &mut Vec; - /// the client stat - fn client_stats(&self) -> &[ClientStats]; + /// the client stats + fn client_stats(&self) -> &[ClientStats] { + &self.client_stats + } /// Amount of elements in the corpus (combined for all children) - fn corpus_size(&self) -> usize; + fn corpus_size(&self) -> usize { + self.corpus_size + } /// Incremt the cropus size - fn corpus_size_inc(&mut self) -> usize; + fn corpus_size_inc(&mut self) -> usize { + self.corpus_size += 1; + self.corpus_size + } /// Time this fuzzing run stated - fn start_time(&mut self) -> time::Duration; + fn start_time(&mut self) -> time::Duration { + self.start_time + } /// Total executions #[inline] @@ -268,7 +234,11 @@ where } /// Broker fun - fn handle_in_broker(&mut self, event: &Event) -> Result { + #[inline] + fn handle_in_broker(&mut self, event: &Event) -> Result + where + I: Input, + { match event { Event::LoadInitial { sender_id: _, @@ -339,6 +309,64 @@ where _ => Ok(BrokerEventResult::Forward), } } +} + +/// Client fun +#[inline] +fn handle_in_client( + event: Event, + state: &mut State, + corpus: &mut C, +) -> Result<(), AflError> +where + C: Corpus, + OT: ObserversTuple, + FT: FeedbacksTuple, + I: Input, + R: Rand, +{ + match event { + Event::NewTestcase { + sender_id: _, + input, + observers_buf, + client_config: _, + } => { + // TODO: here u should match client_config, if equal to the current one do not re-execute + // we need to pass engine to process() too, TODO + #[cfg(feature = "std")] + println!("Received new Testcase"); + let observers = postcard::from_bytes(&observers_buf)?; + let interestingness = state.is_interesting(&input, &observers)?; + state.add_if_interesting(corpus, input, interestingness)?; + Ok(()) + } + _ => Err(AflError::Unknown(format!( + "Received illegal message that message should not have arrived: {:?}.", + event + ))), + } +} + +pub trait EventManager +where + C: Corpus, + E: Executor, + OT: ObserversTuple, + FT: FeedbacksTuple, + I: Input, + R: Rand, +{ + /// Fire an Event + fn fire<'a>(&mut self, event: Event) -> Result<(), AflError>; + + /// Lookup for incoming events and process them. + /// Return the number of processes events or an error + fn process( + &mut self, + state: &mut State, + corpus: &mut C, + ) -> Result; fn serialize_observers(&mut self, observers: &OT) -> Result, AflError> { Ok(postcard::to_allocvec(observers)?) @@ -363,11 +391,9 @@ where { writer: W, + stats: Stats, events: Vec>, // stats (maybe we need a separated struct?) - corpus_size: usize, - start_time: time::Duration, - client_stats: Vec, phantom: PhantomData<(C, E, I, R, OT, FT)>, } @@ -386,7 +412,7 @@ where { #[inline] fn fire<'a>(&mut self, event: Event) -> Result<(), AflError> { - match self.handle_in_broker(&event)? { + match self.stats.handle_in_broker(&event)? { BrokerEventResult::Forward => self.events.push(event), // Ignore broker-only events BrokerEventResult::Handled => (), @@ -405,27 +431,6 @@ where .try_for_each(|event| handle_in_client(event, state, corpus))?; Ok(count) } - - fn client_stats_mut(&mut self) -> &mut Vec { - &mut self.client_stats - } - - fn client_stats(&self) -> &[ClientStats] { - &self.client_stats - } - - fn corpus_size(&self) -> usize { - self.corpus_size - } - - fn corpus_size_inc(&mut self) -> usize { - self.corpus_size += 1; - self.corpus_size - } - - fn start_time(&mut self) -> time::Duration { - self.start_time - } } #[cfg(feature = "std")] @@ -442,10 +447,11 @@ where { pub fn new(writer: W) -> Self { Self { - start_time: utils::current_time(), - client_stats: vec![], + stats: Stats { + start_time: utils::current_time(), + ..Default::default() + }, writer: writer, - corpus_size: 0, phantom: PhantomData, events: vec![], } @@ -454,13 +460,13 @@ where #[cfg(feature = "std")] /// Forward this to the client -const LLMP_TAG_EVENT_TO_CLIENT: llmp::Tag = 0x2C11E471; +const _LLMP_TAG_EVENT_TO_CLIENT: llmp::Tag = 0x2C11E471; #[cfg(feature = "std")] /// Only handle this in the broker const _LLMP_TAG_EVENT_TO_BROKER: llmp::Tag = 0x2B80438; #[cfg(feature = "std")] /// Handle in both -const _LLMP_TAG_EVENT_TO_BOTH: llmp::Tag = 0x2B0741; +const LLMP_TAG_EVENT_TO_BOTH: llmp::Tag = 0x2B0741; #[cfg(feature = "std")] pub struct LlmpEventManager @@ -471,10 +477,8 @@ where writer: W, // stats (maybe we need a separated struct?) - corpus_size: usize, - start_time: time::Duration, - client_stats: Vec, llmp: llmp::LlmpConnection, + stats: Stats, phantom: PhantomData<(C, E, OT, FT, I, R)>, } @@ -494,10 +498,11 @@ where pub fn new_on_port(port: u16, writer: W) -> Result { let mgr = Self { llmp: llmp::LlmpConnection::on_port(port)?, - start_time: utils::current_time(), - corpus_size: 0, + stats: Stats { + start_time: utils::current_time(), + ..Default::default() + }, phantom: PhantomData, - client_stats: vec![], writer, }; Ok(mgr) @@ -521,23 +526,24 @@ where broker, listener_thread: _, } => { - // TODO: Clean up that api by.. a lot! - /* - broker.add_message_hook(|client_id: u32, msg: *mut LlmpMsg| { - unsafe { - if (*msg).tag == _LLMP_TAG_EVENT_TO_BOTH { - let event = postcard::from_bytes((*msg).as_slice_unsafe())?; - match self.handle_in_broker(event)? { - BrokerEventResult::Forward => llmp::LlmpMsgHookResult::ForwardToClients, - BrokerEventResult::Handled => llmp::LlmpMsgHookResult::Handled, + let stats = &mut self.stats; + broker.loop_forever( + &mut |_client_id: u32, tag: Tag, msg: &[u8]| { + if tag == LLMP_TAG_EVENT_TO_BOTH { + let event = postcard::from_bytes(msg)?; + match stats.handle_in_broker::(&event)? { + BrokerEventResult::Forward => { + Ok(llmp::LlmpMsgHookResult::ForwardToClients) + } + BrokerEventResult::Handled => Ok(llmp::LlmpMsgHookResult::Handled), } } else { - llmp::LlmpMsgHookResult::ForwardToClients + Ok(llmp::LlmpMsgHookResult::ForwardToClients) } - } - });*/ - broker.loop_forever(Some(Duration::from_millis(5))) - }, + }, + Some(Duration::from_millis(5)), + ); + } _ => Err(AflError::IllegalState( "Called broker loop in the client".into(), )), @@ -561,7 +567,7 @@ where #[inline] fn fire<'a>(&mut self, event: Event) -> Result<(), AflError> { let serialized = postcard::to_allocvec(&event)?; - self.llmp.send_buf(LLMP_TAG_EVENT_TO_CLIENT, &serialized)?; + self.llmp.send_buf(LLMP_TAG_EVENT_TO_BOTH, &serialized)?; Ok(()) } @@ -594,27 +600,6 @@ where } }) } - - fn client_stats_mut(&mut self) -> &mut Vec { - &mut self.client_stats - } - - fn client_stats(&self) -> &[ClientStats] { - &self.client_stats - } - - fn corpus_size(&self) -> usize { - self.corpus_size - } - - fn corpus_size_inc(&mut self) -> usize { - self.corpus_size += 1; - self.corpus_size - } - - fn start_time(&mut self) -> time::Duration { - self.start_time - } } #[cfg(feature = "std")]