Panic on Too Slow Broker (#230)

* panic on too slow broker

* constant for max pending pages
This commit is contained in:
Dominik Maier 2021-07-20 02:39:03 +02:00 committed by GitHub
parent b0cb74324c
commit 9591ed995e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 85 additions and 57 deletions

View File

@ -96,6 +96,10 @@ use nix::sys::socket::{self, sockopt::ReusePort};
#[cfg(all(unix, feature = "std"))]
use std::os::unix::io::AsRawFd;
/// The max number of pages a [`client`] may have mapped that were not yet read by the [`broker`]
/// Usually, this value should not exceed `1`, else the broker cannot keep up with the amount of incoming messages.
/// Instead of increasing this value, you may consider sending new messages at a lower rate, else your Sender will eventually `OOM`.
const LLMP_CFG_MAX_PENDING_UNREAD_PAGES: usize = 3;
/// We'll start off with 256 megabyte maps per fuzzer client
#[cfg(not(feature = "llmp_small_maps"))]
const LLMP_CFG_INITIAL_MAP_SIZE: usize = 1 << 28;
@ -117,6 +121,8 @@ const LLMP_TAG_END_OF_PAGE: Tag = 0xAF1E0F1;
const LLMP_TAG_NEW_SHM_CLIENT: Tag = 0xC11E471;
/// The sender on this map is exiting (if broker exits, clients should exit gracefully);
const LLMP_TAG_EXITING: Tag = 0x13C5171;
/// Client gave up as the receiver/broker was too slow
const LLMP_SLOW_RECEIVER_PANIC: Tag = 0x70051041;
/// Unused...
pub const LLMP_FLAG_INITIALIZED: Flags = 0x0;
@ -851,6 +857,13 @@ where
}
unmap_until_excl += 1;
}
if unmap_until_excl == 0 && self.out_maps.len() > LLMP_CFG_MAX_PENDING_UNREAD_PAGES {
// We send one last information to the broker before quitting.
self.send_buf(LLMP_SLOW_RECEIVER_PANIC, &[]).unwrap();
panic!("The receiver/broker could not process our sent llmp messages in time. Either we're sending too many messages too fast, the broker got stuck, or it crashed. Giving up.");
}
// Remove all maps that the broker already mapped
// simply removing them from the vec should then call drop and unmap them.
self.out_maps.drain(0..unmap_until_excl);
@ -2231,64 +2244,72 @@ where
}
};
if (*msg).tag == LLMP_TAG_NEW_SHM_CLIENT {
/* This client informs us about yet another new client
add it to the list! Also, no need to forward this msg. */
let msg_buf_len_padded = (*msg).buf_len_padded;
if (*msg).buf_len < size_of::<LlmpPayloadSharedMapInfo>() as u64 {
#[cfg(feature = "std")]
println!("Ignoring broken CLIENT_ADDED msg due to incorrect size. Expected {} but got {}",
msg_buf_len_padded,
size_of::<LlmpPayloadSharedMapInfo>()
);
#[cfg(not(feature = "std"))]
return Err(Error::Unknown(format!("Broken CLIENT_ADDED msg with incorrect size received. Expected {} but got {}",
msg_buf_len_padded,
size_of::<LlmpPayloadSharedMapInfo>()
)));
} else {
let pageinfo = (*msg).buf.as_mut_ptr() as *mut LlmpPayloadSharedMapInfo;
match self.shmem_provider.from_id_and_size(
ShMemId::from_slice(&(*pageinfo).shm_str),
(*pageinfo).map_size,
) {
Ok(new_map) => {
let mut new_page = LlmpSharedMap::existing(new_map);
let id = next_id;
next_id += 1;
new_page.mark_save_to_unmap();
self.llmp_clients.push(LlmpReceiver {
id,
current_recv_map: new_page,
last_msg_recvd: ptr::null_mut(),
shmem_provider: self.shmem_provider.clone(),
});
}
Err(e) => {
#[cfg(feature = "std")]
println!("Error adding client! Ignoring: {:?}", e);
#[cfg(not(feature = "std"))]
return Err(Error::Unknown(format!(
"Error adding client! PANIC! {:?}",
e
)));
}
};
match (*msg).tag {
// first, handle the special, llmp-internal messages
LLMP_SLOW_RECEIVER_PANIC => {
return Err(Error::Unknown(format!("The broker was too slow to handle messages of client {} in time, so it quit. Either the client sent messages too fast, or we (the broker) got stuck!", client_id)));
}
} else {
// The message is not specifically for use. Let the user handle it, then forward it to the clients, if necessary.
let mut should_forward_msg = true;
LLMP_TAG_NEW_SHM_CLIENT => {
/* This client informs us about yet another new client
add it to the list! Also, no need to forward this msg. */
let msg_buf_len_padded = (*msg).buf_len_padded;
if (*msg).buf_len < size_of::<LlmpPayloadSharedMapInfo>() as u64 {
#[cfg(feature = "std")]
println!("Ignoring broken CLIENT_ADDED msg due to incorrect size. Expected {} but got {}",
msg_buf_len_padded,
size_of::<LlmpPayloadSharedMapInfo>()
);
#[cfg(not(feature = "std"))]
return Err(Error::Unknown(format!("Broken CLIENT_ADDED msg with incorrect size received. Expected {} but got {}",
msg_buf_len_padded,
size_of::<LlmpPayloadSharedMapInfo>()
)));
} else {
let pageinfo = (*msg).buf.as_mut_ptr() as *mut LlmpPayloadSharedMapInfo;
let map = &mut self.llmp_clients[client_id as usize].current_recv_map;
let msg_buf = (*msg).as_slice(map)?;
if let LlmpMsgHookResult::Handled =
(on_new_msg)(client_id, (*msg).tag, (*msg).flags, msg_buf)?
{
should_forward_msg = false;
match self.shmem_provider.from_id_and_size(
ShMemId::from_slice(&(*pageinfo).shm_str),
(*pageinfo).map_size,
) {
Ok(new_map) => {
let mut new_page = LlmpSharedMap::existing(new_map);
let id = next_id;
next_id += 1;
new_page.mark_save_to_unmap();
self.llmp_clients.push(LlmpReceiver {
id,
current_recv_map: new_page,
last_msg_recvd: ptr::null_mut(),
shmem_provider: self.shmem_provider.clone(),
});
}
Err(e) => {
#[cfg(feature = "std")]
println!("Error adding client! Ignoring: {:?}", e);
#[cfg(not(feature = "std"))]
return Err(Error::Unknown(format!(
"Error adding client! PANIC! {:?}",
e
)));
}
};
}
}
if should_forward_msg {
self.forward_msg(msg)?;
// handle all other messages
_ => {
// The message is not specifically for use. Let the user handle it, then forward it to the clients, if necessary.
let mut should_forward_msg = true;
let map = &mut self.llmp_clients[client_id as usize].current_recv_map;
let msg_buf = (*msg).as_slice(map)?;
if let LlmpMsgHookResult::Handled =
(on_new_msg)(client_id, (*msg).tag, (*msg).flags, msg_buf)?
{
should_forward_msg = false;
}
if should_forward_msg {
self.forward_msg(msg)?;
}
}
}
}

View File

@ -13,7 +13,7 @@ use crate::{
executors::ExitKind, inputs::Input, observers::ObserversTuple, stats::UserStats, Error,
};
/// A per-fuzzer unique ID, usually starting with `0` and increasing
/// A per-fuzzer unique `ID`, usually starting with `0` and increasing
/// by `1` in multiprocessed `EventManager`s, such as [`self::llmp::LlmpEventManager`].
#[derive(Clone, Copy, PartialEq, Eq)]
pub struct EventManagerId {
@ -209,7 +209,14 @@ pub trait EventFirer<I, S>
where
I: Input,
{
/// Send off an event to the broker
/// Send off an [`Event`] to the broker
///
/// For multi-processed managers, such as [`llmp::LlmpEventManager`],
/// this serializes the [`Event`] and commits it to the [`llmp`] page.
/// In this case, if you `fire` faster than the broker can consume
/// (for example for each [`Input`], on multiple cores)
/// the [`llmp`] [`ShMem`] may fill up and the client will eventually OOM or [`panic`].
/// This should not happen for a normal use-cases.
fn fire(&mut self, state: &mut S, event: Event<I>) -> Result<(), Error>;
/// Serialize all observers for this type and manager