From 9591ed995e84c1ef8b05f74b1b23c54e7f09a41c Mon Sep 17 00:00:00 2001 From: Dominik Maier Date: Tue, 20 Jul 2021 02:39:03 +0200 Subject: [PATCH] Panic on Too Slow Broker (#230) * panic on too slow broker * constant for max pending pages --- libafl/src/bolts/llmp.rs | 131 +++++++++++++++++++++++---------------- libafl/src/events/mod.rs | 11 +++- 2 files changed, 85 insertions(+), 57 deletions(-) diff --git a/libafl/src/bolts/llmp.rs b/libafl/src/bolts/llmp.rs index 022adc98cb..cf4c5eadb9 100644 --- a/libafl/src/bolts/llmp.rs +++ b/libafl/src/bolts/llmp.rs @@ -96,6 +96,10 @@ use nix::sys::socket::{self, sockopt::ReusePort}; #[cfg(all(unix, feature = "std"))] use std::os::unix::io::AsRawFd; +/// The max number of pages a [`client`] may have mapped that were not yet read by the [`broker`] +/// Usually, this value should not exceed `1`, else the broker cannot keep up with the amount of incoming messages. +/// Instead of increasing this value, you may consider sending new messages at a lower rate, else your Sender will eventually `OOM`. +const LLMP_CFG_MAX_PENDING_UNREAD_PAGES: usize = 3; /// We'll start off with 256 megabyte maps per fuzzer client #[cfg(not(feature = "llmp_small_maps"))] const LLMP_CFG_INITIAL_MAP_SIZE: usize = 1 << 28; @@ -117,6 +121,8 @@ const LLMP_TAG_END_OF_PAGE: Tag = 0xAF1E0F1; const LLMP_TAG_NEW_SHM_CLIENT: Tag = 0xC11E471; /// The sender on this map is exiting (if broker exits, clients should exit gracefully); const LLMP_TAG_EXITING: Tag = 0x13C5171; +/// Client gave up as the receiver/broker was too slow +const LLMP_SLOW_RECEIVER_PANIC: Tag = 0x70051041; /// Unused... pub const LLMP_FLAG_INITIALIZED: Flags = 0x0; @@ -851,6 +857,13 @@ where } unmap_until_excl += 1; } + + if unmap_until_excl == 0 && self.out_maps.len() > LLMP_CFG_MAX_PENDING_UNREAD_PAGES { + // We send one last information to the broker before quitting. + self.send_buf(LLMP_SLOW_RECEIVER_PANIC, &[]).unwrap(); + panic!("The receiver/broker could not process our sent llmp messages in time. Either we're sending too many messages too fast, the broker got stuck, or it crashed. Giving up."); + } + // Remove all maps that the broker already mapped // simply removing them from the vec should then call drop and unmap them. self.out_maps.drain(0..unmap_until_excl); @@ -2231,64 +2244,72 @@ where } }; - if (*msg).tag == LLMP_TAG_NEW_SHM_CLIENT { - /* This client informs us about yet another new client - add it to the list! Also, no need to forward this msg. */ - let msg_buf_len_padded = (*msg).buf_len_padded; - if (*msg).buf_len < size_of::() as u64 { - #[cfg(feature = "std")] - println!("Ignoring broken CLIENT_ADDED msg due to incorrect size. Expected {} but got {}", - msg_buf_len_padded, - size_of::() - ); - #[cfg(not(feature = "std"))] - return Err(Error::Unknown(format!("Broken CLIENT_ADDED msg with incorrect size received. Expected {} but got {}", - msg_buf_len_padded, - size_of::() - ))); - } else { - let pageinfo = (*msg).buf.as_mut_ptr() as *mut LlmpPayloadSharedMapInfo; - - match self.shmem_provider.from_id_and_size( - ShMemId::from_slice(&(*pageinfo).shm_str), - (*pageinfo).map_size, - ) { - Ok(new_map) => { - let mut new_page = LlmpSharedMap::existing(new_map); - let id = next_id; - next_id += 1; - new_page.mark_save_to_unmap(); - self.llmp_clients.push(LlmpReceiver { - id, - current_recv_map: new_page, - last_msg_recvd: ptr::null_mut(), - shmem_provider: self.shmem_provider.clone(), - }); - } - Err(e) => { - #[cfg(feature = "std")] - println!("Error adding client! Ignoring: {:?}", e); - #[cfg(not(feature = "std"))] - return Err(Error::Unknown(format!( - "Error adding client! PANIC! {:?}", - e - ))); - } - }; + match (*msg).tag { + // first, handle the special, llmp-internal messages + LLMP_SLOW_RECEIVER_PANIC => { + return Err(Error::Unknown(format!("The broker was too slow to handle messages of client {} in time, so it quit. Either the client sent messages too fast, or we (the broker) got stuck!", client_id))); } - } else { - // The message is not specifically for use. Let the user handle it, then forward it to the clients, if necessary. - let mut should_forward_msg = true; + LLMP_TAG_NEW_SHM_CLIENT => { + /* This client informs us about yet another new client + add it to the list! Also, no need to forward this msg. */ + let msg_buf_len_padded = (*msg).buf_len_padded; + if (*msg).buf_len < size_of::() as u64 { + #[cfg(feature = "std")] + println!("Ignoring broken CLIENT_ADDED msg due to incorrect size. Expected {} but got {}", + msg_buf_len_padded, + size_of::() + ); + #[cfg(not(feature = "std"))] + return Err(Error::Unknown(format!("Broken CLIENT_ADDED msg with incorrect size received. Expected {} but got {}", + msg_buf_len_padded, + size_of::() + ))); + } else { + let pageinfo = (*msg).buf.as_mut_ptr() as *mut LlmpPayloadSharedMapInfo; - let map = &mut self.llmp_clients[client_id as usize].current_recv_map; - let msg_buf = (*msg).as_slice(map)?; - if let LlmpMsgHookResult::Handled = - (on_new_msg)(client_id, (*msg).tag, (*msg).flags, msg_buf)? - { - should_forward_msg = false; + match self.shmem_provider.from_id_and_size( + ShMemId::from_slice(&(*pageinfo).shm_str), + (*pageinfo).map_size, + ) { + Ok(new_map) => { + let mut new_page = LlmpSharedMap::existing(new_map); + let id = next_id; + next_id += 1; + new_page.mark_save_to_unmap(); + self.llmp_clients.push(LlmpReceiver { + id, + current_recv_map: new_page, + last_msg_recvd: ptr::null_mut(), + shmem_provider: self.shmem_provider.clone(), + }); + } + Err(e) => { + #[cfg(feature = "std")] + println!("Error adding client! Ignoring: {:?}", e); + #[cfg(not(feature = "std"))] + return Err(Error::Unknown(format!( + "Error adding client! PANIC! {:?}", + e + ))); + } + }; + } } - if should_forward_msg { - self.forward_msg(msg)?; + // handle all other messages + _ => { + // The message is not specifically for use. Let the user handle it, then forward it to the clients, if necessary. + let mut should_forward_msg = true; + + let map = &mut self.llmp_clients[client_id as usize].current_recv_map; + let msg_buf = (*msg).as_slice(map)?; + if let LlmpMsgHookResult::Handled = + (on_new_msg)(client_id, (*msg).tag, (*msg).flags, msg_buf)? + { + should_forward_msg = false; + } + if should_forward_msg { + self.forward_msg(msg)?; + } } } } diff --git a/libafl/src/events/mod.rs b/libafl/src/events/mod.rs index 9c67f8949c..a884142799 100644 --- a/libafl/src/events/mod.rs +++ b/libafl/src/events/mod.rs @@ -13,7 +13,7 @@ use crate::{ executors::ExitKind, inputs::Input, observers::ObserversTuple, stats::UserStats, Error, }; -/// A per-fuzzer unique ID, usually starting with `0` and increasing +/// A per-fuzzer unique `ID`, usually starting with `0` and increasing /// by `1` in multiprocessed `EventManager`s, such as [`self::llmp::LlmpEventManager`]. #[derive(Clone, Copy, PartialEq, Eq)] pub struct EventManagerId { @@ -209,7 +209,14 @@ pub trait EventFirer where I: Input, { - /// Send off an event to the broker + /// Send off an [`Event`] to the broker + /// + /// For multi-processed managers, such as [`llmp::LlmpEventManager`], + /// this serializes the [`Event`] and commits it to the [`llmp`] page. + /// In this case, if you `fire` faster than the broker can consume + /// (for example for each [`Input`], on multiple cores) + /// the [`llmp`] [`ShMem`] may fill up and the client will eventually OOM or [`panic`]. + /// This should not happen for a normal use-cases. fn fire(&mut self, state: &mut S, event: Event) -> Result<(), Error>; /// Serialize all observers for this type and manager