Configurable LLMP client timeout (#1838)

* LLMP client timeout is now configurable.

* fix example.

* Fix for no-std case.
Make default timeout private.

* Fix import.
This commit is contained in:
Romain Malmain 2024-02-06 18:35:27 +01:00 committed by GitHub
parent c3473e5631
commit 9b82af4539
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 110 additions and 27 deletions

View File

@ -86,10 +86,19 @@ where
/// ///
/// The port must not be bound yet to have a broker. /// The port must not be bound yet to have a broker.
#[cfg(feature = "std")] #[cfg(feature = "std")]
pub fn on_port(shmem_provider: SP, port: u16) -> Result<Self, Error> { pub fn on_port(
shmem_provider: SP,
port: u16,
client_timeout: Option<Duration>,
) -> Result<Self, Error> {
Ok(Self { Ok(Self {
// TODO switch to false after solving the bug // TODO switch to false after solving the bug
llmp: LlmpBroker::with_keep_pages_attach_to_tcp(shmem_provider, port, true)?, llmp: LlmpBroker::with_keep_pages_attach_to_tcp(
shmem_provider,
port,
true,
client_timeout,
)?,
#[cfg(feature = "llmp_compression")] #[cfg(feature = "llmp_compression")]
compressor: GzipCompressor::new(COMPRESS_THRESHOLD), compressor: GzipCompressor::new(COMPRESS_THRESHOLD),
phantom: PhantomData, phantom: PhantomData,

View File

@ -24,7 +24,7 @@ use std::net::SocketAddr;
#[cfg(all(feature = "std", any(windows, not(feature = "fork"))))] #[cfg(all(feature = "std", any(windows, not(feature = "fork"))))]
use std::process::Stdio; use std::process::Stdio;
#[cfg(all(unix, feature = "std", feature = "fork"))] #[cfg(all(unix, feature = "std", feature = "fork"))]
use std::{fs::File, os::unix::io::AsRawFd}; use std::{fs::File, os::unix::io::AsRawFd, time::Duration};
#[cfg(all(feature = "std", any(windows, not(feature = "fork"))))] #[cfg(all(feature = "std", any(windows, not(feature = "fork"))))]
use libafl_bolts::os::startable_self; use libafl_bolts::os::startable_self;
@ -496,10 +496,9 @@ where
S: State + HasExecutions, S: State + HasExecutions,
SP: ShMemProvider + 'static, SP: ShMemProvider + 'static,
{ {
/// Launch the broker and the clients and fuzz
#[allow(clippy::similar_names)] #[allow(clippy::similar_names)]
#[allow(clippy::too_many_lines)] #[allow(clippy::too_many_lines)]
pub fn launch(&mut self) -> Result<(), Error> { fn launch_internal(&mut self, client_timeout: Option<Duration>) -> Result<(), Error> {
if self.cores.ids.is_empty() { if self.cores.ids.is_empty() {
return Err(Error::illegal_argument( return Err(Error::illegal_argument(
"No cores to spawn on given, cannot launch anything.", "No cores to spawn on given, cannot launch anything.",
@ -544,6 +543,7 @@ where
CentralizedLlmpEventBroker::on_port( CentralizedLlmpEventBroker::on_port(
self.shmem_provider.clone(), self.shmem_provider.clone(),
self.centralized_broker_port, self.centralized_broker_port,
client_timeout,
)?; )?;
broker.broker_loop()?; broker.broker_loop()?;
} }
@ -643,4 +643,14 @@ where
Ok(()) Ok(())
} }
/// Launch the broker and the clients and fuzz
pub fn launch(&mut self) -> Result<(), Error> {
self.launch_internal(None)
}
/// Launch the broker and the clients and fuzz with a given timeout for the clients
pub fn launch_with_client_timeout(&mut self, client_timeout: Duration) -> Result<(), Error> {
self.launch_internal(Some(client_timeout))
}
} }

View File

@ -109,10 +109,15 @@ where
/// ///
/// The port must not be bound yet to have a broker. /// The port must not be bound yet to have a broker.
#[cfg(feature = "std")] #[cfg(feature = "std")]
pub fn on_port(shmem_provider: SP, monitor: MT, port: u16) -> Result<Self, Error> { pub fn on_port(
shmem_provider: SP,
monitor: MT,
port: u16,
client_timeout: Option<Duration>,
) -> Result<Self, Error> {
Ok(Self { Ok(Self {
monitor, monitor,
llmp: llmp::LlmpBroker::create_attach_to_tcp(shmem_provider, port)?, llmp: llmp::LlmpBroker::create_attach_to_tcp(shmem_provider, port, client_timeout)?,
#[cfg(feature = "llmp_compression")] #[cfg(feature = "llmp_compression")]
compressor: GzipCompressor::new(COMPRESS_THRESHOLD), compressor: GzipCompressor::new(COMPRESS_THRESHOLD),
phantom: PhantomData, phantom: PhantomData,
@ -1172,8 +1177,10 @@ where
false false
} }
/// Launch the restarting manager fn launch_internal(
pub fn launch(&mut self) -> Result<(Option<S>, LlmpRestartingEventManager<S, SP>), Error> { &mut self,
client_timeout: Option<Duration>,
) -> Result<(Option<S>, LlmpRestartingEventManager<S, SP>), Error> {
// We start ourself as child process to actually fuzz // We start ourself as child process to actually fuzz
let (staterestorer, new_shmem_provider, core_id) = if std::env::var(_ENV_FUZZER_SENDER) let (staterestorer, new_shmem_provider, core_id) = if std::env::var(_ENV_FUZZER_SENDER)
.is_err() .is_err()
@ -1195,8 +1202,11 @@ where
// We get here if we are on Unix, or we are a broker on Windows (or without forks). // We get here if we are on Unix, or we are a broker on Windows (or without forks).
let (mgr, core_id) = match self.kind { let (mgr, core_id) = match self.kind {
ManagerKind::Any => { ManagerKind::Any => {
let connection = let connection = LlmpConnection::on_port(
LlmpConnection::on_port(self.shmem_provider.clone(), self.broker_port)?; self.shmem_provider.clone(),
self.broker_port,
client_timeout,
)?;
match connection { match connection {
LlmpConnection::IsBroker { broker } => { LlmpConnection::IsBroker { broker } => {
let event_broker = LlmpEventBroker::<S::Input, MT, SP>::new( let event_broker = LlmpEventBroker::<S::Input, MT, SP>::new(
@ -1224,6 +1234,7 @@ where
self.shmem_provider.clone(), self.shmem_provider.clone(),
self.monitor.take().unwrap(), self.monitor.take().unwrap(),
self.broker_port, self.broker_port,
client_timeout,
)?; )?;
broker_things(event_broker, self.remote_broker_addr)?; broker_things(event_broker, self.remote_broker_addr)?;
@ -1386,6 +1397,19 @@ where
Ok((state, mgr)) Ok((state, mgr))
} }
/// Launch the restarting manager
pub fn launch(&mut self) -> Result<(Option<S>, LlmpRestartingEventManager<S, SP>), Error> {
self.launch_internal(None)
}
/// Launch the restarting manager with a custom client timeout
pub fn launch_with_client_timeout(
&mut self,
client_timeout: Duration,
) -> Result<(Option<S>, LlmpRestartingEventManager<S, SP>), Error> {
self.launch_internal(Some(client_timeout))
}
} }
/// A manager-like llmp client that converts between input types /// A manager-like llmp client that converts between input types

View File

@ -155,7 +155,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
match mode.as_str() { match mode.as_str() {
"broker" => { "broker" => {
let mut broker = llmp::LlmpBroker::new(StdShMemProvider::new()?)?; let mut broker = llmp::LlmpBroker::new(StdShMemProvider::new()?, None)?;
broker.launch_tcp_listener_on(port)?; broker.launch_tcp_listener_on(port)?;
// Exit when we got at least _n_ nodes, and all of them quit. // Exit when we got at least _n_ nodes, and all of them quit.
broker.set_exit_cleanly_after(NonZeroUsize::new(1_usize).unwrap()); broker.set_exit_cleanly_after(NonZeroUsize::new(1_usize).unwrap());
@ -166,7 +166,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
); );
} }
"b2b" => { "b2b" => {
let mut broker = llmp::LlmpBroker::new(StdShMemProvider::new()?)?; let mut broker = llmp::LlmpBroker::new(StdShMemProvider::new()?, None)?;
broker.launch_tcp_listener_on(b2b_port)?; broker.launch_tcp_listener_on(b2b_port)?;
// connect back to the main broker. // connect back to the main broker.
broker.connect_b2b(("127.0.0.1", port))?; broker.connect_b2b(("127.0.0.1", port))?;

View File

@ -107,9 +107,9 @@ use crate::{
ClientId, Error, ClientId, Error,
}; };
/// The timeout after which a client will be considered stale, and removed. /// The default timeout in seconds after which a client will be considered stale, and removed.
#[cfg(feature = "std")] #[cfg(feature = "std")]
const CLIENT_TIMEOUT: Duration = Duration::from_secs(60 * 5); const DEFAULT_CLIENT_TIMEOUT_SECS: u64 = 60 * 5;
/// The max number of pages a [`client`] may have mapped that were not yet read by the [`broker`] /// The max number of pages a [`client`] may have mapped that were not yet read by the [`broker`]
/// Usually, this value should not exceed `1`, else the broker cannot keep up with the amount of incoming messages. /// Usually, this value should not exceed `1`, else the broker cannot keep up with the amount of incoming messages.
@ -699,13 +699,17 @@ where
{ {
#[cfg(feature = "std")] #[cfg(feature = "std")]
/// Creates either a broker, if the tcp port is not bound, or a client, connected to this port. /// Creates either a broker, if the tcp port is not bound, or a client, connected to this port.
pub fn on_port(shmem_provider: SP, port: u16) -> Result<Self, Error> { pub fn on_port(
shmem_provider: SP,
port: u16,
client_timeout: Option<Duration>,
) -> Result<Self, Error> {
match tcp_bind(port) { match tcp_bind(port) {
Ok(listener) => { Ok(listener) => {
// We got the port. We are the broker! :) // We got the port. We are the broker! :)
log::info!("We're the broker"); log::info!("We're the broker");
let mut broker = LlmpBroker::new(shmem_provider)?; let mut broker = LlmpBroker::new(shmem_provider, client_timeout)?;
let _listener_thread = broker.launch_listener(Listener::Tcp(listener))?; let _listener_thread = broker.launch_listener(Listener::Tcp(listener))?;
Ok(LlmpConnection::IsBroker { broker }) Ok(LlmpConnection::IsBroker { broker })
} }
@ -725,9 +729,13 @@ where
/// Creates a new broker on the given port /// Creates a new broker on the given port
#[cfg(feature = "std")] #[cfg(feature = "std")]
pub fn broker_on_port(shmem_provider: SP, port: u16) -> Result<Self, Error> { pub fn broker_on_port(
shmem_provider: SP,
port: u16,
client_timeout: Option<Duration>,
) -> Result<Self, Error> {
Ok(LlmpConnection::IsBroker { Ok(LlmpConnection::IsBroker {
broker: LlmpBroker::create_attach_to_tcp(shmem_provider, port)?, broker: LlmpBroker::create_attach_to_tcp(shmem_provider, port, client_timeout)?,
}) })
} }
@ -1961,6 +1969,9 @@ where
clients_to_remove: Vec<usize>, clients_to_remove: Vec<usize>,
/// The ShMemProvider to use /// The ShMemProvider to use
shmem_provider: SP, shmem_provider: SP,
#[cfg(feature = "std")]
/// The timeout after which a client will be considered stale, and removed.
client_timeout: Duration,
} }
/// A signal handler for the [`LlmpBroker`]. /// A signal handler for the [`LlmpBroker`].
@ -2009,16 +2020,28 @@ where
SP: ShMemProvider + 'static, SP: ShMemProvider + 'static,
{ {
/// Create and initialize a new [`LlmpBroker`] /// Create and initialize a new [`LlmpBroker`]
pub fn new(shmem_provider: SP) -> Result<Self, Error> { pub fn new(
shmem_provider: SP,
#[cfg(feature = "std")] client_timeout: Option<Duration>,
) -> Result<Self, Error> {
// Broker never cleans up the pages so that new // Broker never cleans up the pages so that new
// clients may join at any time // clients may join at any time
#[cfg(feature = "std")]
{
Self::with_keep_pages(shmem_provider, true, client_timeout)
}
#[cfg(not(feature = "std"))]
{
Self::with_keep_pages(shmem_provider, true) Self::with_keep_pages(shmem_provider, true)
} }
}
/// Create and initialize a new [`LlmpBroker`] telling if it has to keep pages forever /// Create and initialize a new [`LlmpBroker`] telling if it has to keep pages forever
pub fn with_keep_pages( pub fn with_keep_pages(
mut shmem_provider: SP, mut shmem_provider: SP,
keep_pages_forever: bool, keep_pages_forever: bool,
#[cfg(feature = "std")] client_timeout: Option<Duration>,
) -> Result<Self, Error> { ) -> Result<Self, Error> {
Ok(LlmpBroker { Ok(LlmpBroker {
llmp_out: LlmpSender { llmp_out: LlmpSender {
@ -2039,6 +2062,12 @@ where
listeners: vec![], listeners: vec![],
exit_cleanly_after: None, exit_cleanly_after: None,
num_clients_total: 0, num_clients_total: 0,
#[cfg(feature = "std")]
client_timeout: if let Some(to) = client_timeout {
to
} else {
Duration::from_secs(DEFAULT_CLIENT_TIMEOUT_SECS)
},
}) })
} }
@ -2058,8 +2087,12 @@ where
/// Create a new [`LlmpBroker`] attaching to a TCP port /// Create a new [`LlmpBroker`] attaching to a TCP port
#[cfg(feature = "std")] #[cfg(feature = "std")]
pub fn create_attach_to_tcp(shmem_provider: SP, port: u16) -> Result<Self, Error> { pub fn create_attach_to_tcp(
Self::with_keep_pages_attach_to_tcp(shmem_provider, port, true) shmem_provider: SP,
port: u16,
client_timeout: Option<Duration>,
) -> Result<Self, Error> {
Self::with_keep_pages_attach_to_tcp(shmem_provider, port, true, client_timeout)
} }
/// Create a new [`LlmpBroker`] attaching to a TCP port and telling if it has to keep pages forever /// Create a new [`LlmpBroker`] attaching to a TCP port and telling if it has to keep pages forever
@ -2068,10 +2101,15 @@ where
shmem_provider: SP, shmem_provider: SP,
port: u16, port: u16,
keep_pages_forever: bool, keep_pages_forever: bool,
client_timeout: Option<Duration>,
) -> Result<Self, Error> { ) -> Result<Self, Error> {
match tcp_bind(port) { match tcp_bind(port) {
Ok(listener) => { Ok(listener) => {
let mut broker = LlmpBroker::with_keep_pages(shmem_provider, keep_pages_forever)?; let mut broker = LlmpBroker::with_keep_pages(
shmem_provider,
keep_pages_forever,
client_timeout,
)?;
let _listener_thread = broker.launch_listener(Listener::Tcp(listener))?; let _listener_thread = broker.launch_listener(Listener::Tcp(listener))?;
Ok(broker) Ok(broker)
} }
@ -2233,7 +2271,7 @@ where
if !has_messages && !self.listeners.iter().any(|&x| x == client_id) { if !has_messages && !self.listeners.iter().any(|&x| x == client_id) {
let last_msg_time = self.llmp_clients[i].last_msg_time; let last_msg_time = self.llmp_clients[i].last_msg_time;
if last_msg_time < current_time if last_msg_time < current_time
&& current_time - last_msg_time > CLIENT_TIMEOUT && current_time - last_msg_time > self.client_timeout
{ {
self.clients_to_remove.push(i); self.clients_to_remove.push(i);
#[cfg(feature = "llmp_debug")] #[cfg(feature = "llmp_debug")]
@ -3257,13 +3295,15 @@ mod tests {
pub fn test_llmp_connection() { pub fn test_llmp_connection() {
#[allow(unused_variables)] #[allow(unused_variables)]
let shmem_provider = StdShMemProvider::new().unwrap(); let shmem_provider = StdShMemProvider::new().unwrap();
let mut broker = match LlmpConnection::on_port(shmem_provider.clone(), 1337).unwrap() { let mut broker = match LlmpConnection::on_port(shmem_provider.clone(), 1337, None).unwrap()
{
IsClient { client: _ } => panic!("Could not bind to port as broker"), IsClient { client: _ } => panic!("Could not bind to port as broker"),
IsBroker { broker } => broker, IsBroker { broker } => broker,
}; };
// Add the first client (2nd, actually, because of the tcp listener client) // Add the first client (2nd, actually, because of the tcp listener client)
let mut client = match LlmpConnection::on_port(shmem_provider.clone(), 1337).unwrap() { let mut client = match LlmpConnection::on_port(shmem_provider.clone(), 1337, None).unwrap()
{
IsBroker { broker: _ } => panic!("Second connect should be a client!"), IsBroker { broker: _ } => panic!("Second connect should be a client!"),
IsClient { client } => client, IsClient { client } => client,
}; };