LLMP Broker: timeouts for inactive clients (#1005)

* LLMP Timeouts

* Make broker timeouts optional

* fix warning

* fix warning
This commit is contained in:
Dominik Maier 2023-01-25 12:03:23 +01:00 committed by GitHub
parent 92c0c5eeab
commit e5c220519e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 122 additions and 11 deletions

View File

@ -12,7 +12,7 @@ edition = "2021"
categories = ["development-tools::testing", "emulators", "embedded", "os", "no-std"]
[features]
default = ["std", "derive", "llmp_compression", "rand_trait", "fork", "prelude", "gzip"]
default = ["std", "derive", "llmp_compression", "llmp_small_maps", "llmp_broker_timeouts", "rand_trait", "fork", "prelude", "gzip"]
std = ["serde_json", "serde_json/std", "hostname", "nix", "serde/std", "bincode", "wait-timeout", "regex", "byteorder", "once_cell", "uuid", "tui_monitor", "ctor", "backtrace", "uds"] # print, env, launcher ... support
derive = ["libafl_derive"] # provide derive(SerdeAny) macro.
fork = [] # uses the fork() syscall to spawn children, instead of launching a new command, if supported by the OS (has no effect on Windows, no_std).
@ -43,6 +43,7 @@ llmp_bind_public = [] # If set, llmp will bind to 0.0.0.0, allowing cross-device
llmp_compression = ["gzip"] # llmp compression using GZip
llmp_debug = [] # Enables debug output for LLMP
llmp_small_maps = [] # reduces initial map size for llmp
llmp_broker_timeouts = ["std"] # The broker loop will yield occasionally, even without status messages from client nodes
[build-dependencies]
rustversion = "1.0"

View File

@ -1930,16 +1930,17 @@ where
/// The broker walks all pages and looks for changes, then broadcasts them on
/// its own shared page, once.
#[inline]
pub fn once<F>(&mut self, on_new_msg: &mut F) -> Result<(), Error>
pub fn once<F>(&mut self, on_new_msg: &mut F) -> Result<bool, Error>
where
F: FnMut(ClientId, Tag, Flags, &[u8]) -> Result<LlmpMsgHookResult, Error>,
{
let mut new_messages = false;
for i in 0..self.llmp_clients.len() {
unsafe {
self.handle_new_msgs(i as u32, on_new_msg)?;
new_messages |= self.handle_new_msgs(i as u32, on_new_msg)?;
}
}
Ok(())
Ok(new_messages)
}
/// Internal function, returns true when shuttdown is requested by a `SIGINT` signal
@ -1958,9 +1959,66 @@ where
false
}
/// Loops infinitely, forwarding and handling all incoming messages from clients.
/// Never returns.
/// Will call `on_timeout` roughly after `timeout`
/// Panics on error.
/// 5 millis of sleep can't hurt to keep busywait not at 100%
#[cfg(feature = "std")]
pub fn loop_with_timeouts<F>(
&mut self,
on_new_msg_or_timeout: &mut F,
timeout: Duration,
sleep_time: Option<Duration>,
) where
F: FnMut(Option<(ClientId, Tag, Flags, &[u8])>) -> Result<LlmpMsgHookResult, Error>,
{
use super::current_milliseconds;
#[cfg(unix)]
if let Err(_e) = unsafe { setup_signal_handler(&mut LLMP_SIGHANDLER_STATE) } {
// We can live without a proper ctrl+c signal handler. Print and ignore.
#[cfg(feature = "std")]
println!("Failed to setup signal handlers: {_e}");
}
let timeout = timeout.as_millis() as u64;
let mut end_time = current_milliseconds() + timeout;
while !self.is_shutting_down() {
if current_milliseconds() > end_time {
on_new_msg_or_timeout(None).expect("An error occured in broker timeout. Exiting.");
end_time = current_milliseconds() + timeout;
}
if self
.once(&mut |client_id, tag, flags, buf| {
on_new_msg_or_timeout(Some((client_id, tag, flags, buf)))
})
.expect("An error occurred when brokering. Exiting.")
{
end_time = current_milliseconds() + timeout;
}
#[cfg(feature = "std")]
if let Some(time) = sleep_time {
thread::sleep(time);
}
#[cfg(not(feature = "std"))]
if let Some(time) = sleep_time {
panic!("Cannot sleep on no_std platform (requested {:?})", time);
}
}
self.llmp_out
.send_buf(LLMP_TAG_EXITING, &[])
.expect("Error when shutting down broker: Could not send LLMP_TAG_EXITING msg.");
}
/// Loops infinitely, forwarding and handling all incoming messages from clients.
/// Never returns. Panics on error.
/// 5 millis of sleep can't hurt to keep busywait not at 100%
/// On std, if you need to run code even if no update got sent, use `Self::loop_with_timeout` (needs the `std` feature).
pub fn loop_forever<F>(&mut self, on_new_msg: &mut F, sleep_time: Option<Duration>)
where
F: FnMut(ClientId, Tag, Flags, &[u8]) -> Result<LlmpMsgHookResult, Error>,
@ -2338,13 +2396,19 @@ where
Ok(ret)
}
/// broker broadcast to its own page for all others to read */
/// Broker broadcast to its own page for all others to read
/// Returns `true` if new messages were broker-ed
#[inline]
#[allow(clippy::cast_ptr_alignment)]
unsafe fn handle_new_msgs<F>(&mut self, client_id: u32, on_new_msg: &mut F) -> Result<(), Error>
unsafe fn handle_new_msgs<F>(
&mut self,
client_id: u32,
on_new_msg: &mut F,
) -> Result<bool, Error>
where
F: FnMut(ClientId, Tag, Flags, &[u8]) -> Result<LlmpMsgHookResult, Error>,
{
let mut new_mesages = false;
let mut next_id = self.llmp_clients.len() as u32;
// TODO: We could memcpy a range of pending messages, instead of one by one.
@ -2354,11 +2418,13 @@ where
match client.recv()? {
None => {
// We're done handling this client
return Ok(());
return Ok(new_mesages);
}
Some(msg) => msg,
}
};
// We got a new message
new_mesages = true;
match (*msg).tag {
// first, handle the special, llmp-internal messages

View File

@ -38,7 +38,7 @@ use crate::{
};
use crate::{
bolts::{
llmp::{self, Flags, LlmpClient, LlmpClientDescription, Tag},
llmp::{self, LlmpClient, LlmpClientDescription, Tag},
shmem::ShMemProvider,
},
events::{
@ -123,12 +123,13 @@ where
}
/// Run forever in the broker
#[cfg(not(feature = "llmp_broker_timeouts"))]
pub fn broker_loop(&mut self) -> Result<(), Error> {
let monitor = &mut self.monitor;
#[cfg(feature = "llmp_compression")]
let compressor = &self.compressor;
self.llmp.loop_forever(
&mut |client_id: u32, tag: Tag, _flags: Flags, msg: &[u8]| {
&mut |client_id, tag, _flags, msg| {
if tag == LLMP_TAG_EVENT_TO_BOTH {
#[cfg(not(feature = "llmp_compression"))]
let event_bytes = msg;
@ -156,6 +157,49 @@ where
Ok(())
}
/// Run forever in the broker
#[cfg(feature = "llmp_broker_timeouts")]
pub fn broker_loop(&mut self) -> Result<(), Error> {
let monitor = &mut self.monitor;
#[cfg(feature = "llmp_compression")]
let compressor = &self.compressor;
self.llmp.loop_with_timeouts(
&mut |msg_or_timeout| {
if let Some((client_id, tag, _flags, msg)) = msg_or_timeout {
if tag == LLMP_TAG_EVENT_TO_BOTH {
#[cfg(not(feature = "llmp_compression"))]
let event_bytes = msg;
#[cfg(feature = "llmp_compression")]
let compressed;
#[cfg(feature = "llmp_compression")]
let event_bytes = if _flags & LLMP_FLAG_COMPRESSED == LLMP_FLAG_COMPRESSED {
compressed = compressor.decompress(msg)?;
&compressed
} else {
msg
};
let event: Event<I> = postcard::from_bytes(event_bytes)?;
match Self::handle_in_broker(monitor, client_id, &event)? {
BrokerEventResult::Forward => {
Ok(llmp::LlmpMsgHookResult::ForwardToClients)
}
BrokerEventResult::Handled => Ok(llmp::LlmpMsgHookResult::Handled),
}
} else {
Ok(llmp::LlmpMsgHookResult::ForwardToClients)
}
} else {
monitor.display("Timeout".into(), 0);
Ok(llmp::LlmpMsgHookResult::Handled)
}
},
Duration::from_millis(1000),
Some(Duration::from_millis(5)),
);
Ok(())
}
/// Handle arriving events in the broker
#[allow(clippy::unnecessary_wraps)]
fn handle_in_broker(
@ -453,7 +497,7 @@ where
event: Event<<Self::State as UsesInput>::Input>,
) -> Result<(), Error> {
let serialized = postcard::to_allocvec(&event)?;
let flags: Flags = LLMP_FLAG_INITIALIZED;
let flags = LLMP_FLAG_INITIALIZED;
match self.compressor.compress(&serialized)? {
Some(comp_buf) => {
@ -1303,7 +1347,7 @@ where
}
};
let serialized = postcard::to_allocvec(&converted_event)?;
let flags: Flags = LLMP_FLAG_INITIALIZED;
let flags = LLMP_FLAG_INITIALIZED;
match self.compressor.compress(&serialized)? {
Some(comp_buf) => {