Fix inconsistent settings of client_timeout (#1897)

* a

* fix client timeout

* revert

* more

* std

* import

* import

* sdt

* FMT

* backtick again
This commit is contained in:
Dongjia "toka" Zhang 2024-03-05 17:58:44 +01:00 committed by GitHub
parent 1a0e692f33
commit e3f837d712
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 55 additions and 61 deletions

View File

@ -87,11 +87,7 @@ where
/// ///
/// The port must not be bound yet to have a broker. /// The port must not be bound yet to have a broker.
#[cfg(feature = "std")] #[cfg(feature = "std")]
pub fn on_port( pub fn on_port(shmem_provider: SP, port: u16, client_timeout: Duration) -> Result<Self, Error> {
shmem_provider: SP,
port: u16,
client_timeout: Option<Duration>,
) -> Result<Self, Error> {
Ok(Self { Ok(Self {
// TODO switch to false after solving the bug // TODO switch to false after solving the bug
llmp: LlmpBroker::with_keep_pages_attach_to_tcp( llmp: LlmpBroker::with_keep_pages_attach_to_tcp(

View File

@ -18,13 +18,14 @@ use core::marker::PhantomData;
use core::{ use core::{
fmt::{self, Debug, Formatter}, fmt::{self, Debug, Formatter},
num::NonZeroUsize, num::NonZeroUsize,
time::Duration,
}; };
#[cfg(feature = "std")] #[cfg(feature = "std")]
use std::net::SocketAddr; use std::net::SocketAddr;
#[cfg(all(feature = "std", any(windows, not(feature = "fork"))))] #[cfg(all(feature = "std", any(windows, not(feature = "fork"))))]
use std::process::Stdio; use std::process::Stdio;
#[cfg(all(unix, feature = "std", feature = "fork"))] #[cfg(all(unix, feature = "std", feature = "fork"))]
use std::{fs::File, os::unix::io::AsRawFd, time::Duration}; use std::{fs::File, os::unix::io::AsRawFd};
#[cfg(all(feature = "std", any(windows, not(feature = "fork"))))] #[cfg(all(feature = "std", any(windows, not(feature = "fork"))))]
use libafl_bolts::os::startable_self; use libafl_bolts::os::startable_self;
@ -35,6 +36,7 @@ use libafl_bolts::{
}; };
use libafl_bolts::{ use libafl_bolts::{
core_affinity::{CoreId, Cores}, core_affinity::{CoreId, Cores},
llmp::DEFAULT_CLIENT_TIMEOUT_SECS,
shmem::ShMemProvider, shmem::ShMemProvider,
}; };
#[cfg(feature = "std")] #[cfg(feature = "std")]
@ -117,6 +119,9 @@ where
/// Then, clients launched by this [`Launcher`] can connect to the original `broker`. /// Then, clients launched by this [`Launcher`] can connect to the original `broker`.
#[builder(default = true)] #[builder(default = true)]
spawn_broker: bool, spawn_broker: bool,
/// The timeout duration used for llmp client timeout
#[builder(default = DEFAULT_CLIENT_TIMEOUT_SECS)]
client_timeout: Duration,
/// Tell the manager to serialize or not the state on restart /// Tell the manager to serialize or not the state on restart
#[builder(default = true)] #[builder(default = true)]
serialize_state: bool, serialize_state: bool,
@ -229,6 +234,7 @@ where
}) })
.configuration(self.configuration) .configuration(self.configuration)
.serialize_state(self.serialize_state) .serialize_state(self.serialize_state)
.client_timeout(self.client_timeout)
.build() .build()
.launch()?; .launch()?;
@ -252,6 +258,7 @@ where
.exit_cleanly_after(Some(NonZeroUsize::try_from(self.cores.ids.len()).unwrap())) .exit_cleanly_after(Some(NonZeroUsize::try_from(self.cores.ids.len()).unwrap()))
.configuration(self.configuration) .configuration(self.configuration)
.serialize_state(self.serialize_state) .serialize_state(self.serialize_state)
.client_timeout(self.client_timeout)
.build() .build()
.launch()?; .launch()?;
@ -303,6 +310,7 @@ where
}) })
.configuration(self.configuration) .configuration(self.configuration)
.serialize_state(self.serialize_state) .serialize_state(self.serialize_state)
.client_timeout(self.client_timeout)
.build() .build()
.launch()?; .launch()?;
@ -372,6 +380,7 @@ where
.exit_cleanly_after(Some(NonZeroUsize::try_from(self.cores.ids.len()).unwrap())) .exit_cleanly_after(Some(NonZeroUsize::try_from(self.cores.ids.len()).unwrap()))
.configuration(self.configuration) .configuration(self.configuration)
.serialize_state(self.serialize_state) .serialize_state(self.serialize_state)
.client_timeout(self.client_timeout)
.build() .build()
.launch()?; .launch()?;
@ -455,6 +464,9 @@ where
/// Tell the manager to serialize or not the state on restart /// Tell the manager to serialize or not the state on restart
#[builder(default = true)] #[builder(default = true)]
serialize_state: bool, serialize_state: bool,
/// The duration for the llmp client timeout
#[builder(default = DEFAULT_CLIENT_TIMEOUT_SECS)]
client_timeout: Duration,
#[builder(setter(skip), default = PhantomData)] #[builder(setter(skip), default = PhantomData)]
phantom_data: PhantomData<(&'a S, &'a SP)>, phantom_data: PhantomData<(&'a S, &'a SP)>,
} }
@ -498,7 +510,8 @@ where
{ {
#[allow(clippy::similar_names)] #[allow(clippy::similar_names)]
#[allow(clippy::too_many_lines)] #[allow(clippy::too_many_lines)]
fn launch_internal(&mut self, client_timeout: Option<Duration>) -> Result<(), Error> { /// launch the broker and the client and fuzz
pub fn launch(&mut self) -> Result<(), Error> {
if self.cores.ids.is_empty() { if self.cores.ids.is_empty() {
return Err(Error::illegal_argument( return Err(Error::illegal_argument(
"No cores to spawn on given, cannot launch anything.", "No cores to spawn on given, cannot launch anything.",
@ -545,7 +558,7 @@ where
CentralizedLlmpEventBroker::on_port( CentralizedLlmpEventBroker::on_port(
self.shmem_provider.clone(), self.shmem_provider.clone(),
self.centralized_broker_port, self.centralized_broker_port,
client_timeout, self.client_timeout,
)?; )?;
broker.broker_loop()?; broker.broker_loop()?;
} }
@ -592,6 +605,7 @@ where
}) })
.configuration(self.configuration) .configuration(self.configuration)
.serialize_state(self.serialize_state) .serialize_state(self.serialize_state)
.client_timeout(self.client_timeout)
.build() .build()
.launch()?; .launch()?;
@ -621,6 +635,7 @@ where
.exit_cleanly_after(Some(NonZeroUsize::try_from(self.cores.ids.len()).unwrap())) .exit_cleanly_after(Some(NonZeroUsize::try_from(self.cores.ids.len()).unwrap()))
.configuration(self.configuration) .configuration(self.configuration)
.serialize_state(self.serialize_state) .serialize_state(self.serialize_state)
.client_timeout(self.client_timeout)
.build() .build()
.launch()?; .launch()?;
@ -645,14 +660,4 @@ where
Ok(()) Ok(())
} }
/// Launch the broker and the clients and fuzz
pub fn launch(&mut self) -> Result<(), Error> {
self.launch_internal(None)
}
/// Launch the broker and the clients and fuzz with a given timeout for the clients
pub fn launch_with_client_timeout(&mut self, client_timeout: Duration) -> Result<(), Error> {
self.launch_internal(Some(client_timeout))
}
} }

View File

@ -13,6 +13,8 @@ use std::net::{SocketAddr, ToSocketAddrs};
use libafl_bolts::core_affinity::CoreId; use libafl_bolts::core_affinity::CoreId;
#[cfg(feature = "adaptive_serialization")] #[cfg(feature = "adaptive_serialization")]
use libafl_bolts::current_time; use libafl_bolts::current_time;
#[cfg(feature = "std")]
use libafl_bolts::llmp::DEFAULT_CLIENT_TIMEOUT_SECS;
#[cfg(all(feature = "std", any(windows, not(feature = "fork"))))] #[cfg(all(feature = "std", any(windows, not(feature = "fork"))))]
use libafl_bolts::os::startable_self; use libafl_bolts::os::startable_self;
#[cfg(all(unix, feature = "std", not(miri)))] #[cfg(all(unix, feature = "std", not(miri)))]
@ -110,7 +112,7 @@ where
shmem_provider: SP, shmem_provider: SP,
monitor: MT, monitor: MT,
port: u16, port: u16,
client_timeout: Option<Duration>, client_timeout: Duration,
) -> Result<Self, Error> { ) -> Result<Self, Error> {
Ok(Self { Ok(Self {
monitor, monitor,
@ -1155,6 +1157,9 @@ where
/// Tell the manager to serialize or not the state on restart /// Tell the manager to serialize or not the state on restart
#[builder(default = true)] #[builder(default = true)]
serialize_state: bool, serialize_state: bool,
/// The timeout duration used for llmp client timeout
#[builder(default = DEFAULT_CLIENT_TIMEOUT_SECS)]
client_timeout: Duration,
#[builder(setter(skip), default = PhantomData)] #[builder(setter(skip), default = PhantomData)]
phantom_data: PhantomData<S>, phantom_data: PhantomData<S>,
} }
@ -1180,10 +1185,8 @@ where
false false
} }
fn launch_internal( /// Launch the broker and the clients and fuzz
&mut self, pub fn launch(&mut self) -> Result<(Option<S>, LlmpRestartingEventManager<S, SP>), Error> {
client_timeout: Option<Duration>,
) -> Result<(Option<S>, LlmpRestartingEventManager<S, SP>), Error> {
// We start ourself as child process to actually fuzz // We start ourself as child process to actually fuzz
let (staterestorer, new_shmem_provider, core_id) = if std::env::var(_ENV_FUZZER_SENDER) let (staterestorer, new_shmem_provider, core_id) = if std::env::var(_ENV_FUZZER_SENDER)
.is_err() .is_err()
@ -1208,7 +1211,7 @@ where
let connection = LlmpConnection::on_port( let connection = LlmpConnection::on_port(
self.shmem_provider.clone(), self.shmem_provider.clone(),
self.broker_port, self.broker_port,
client_timeout, self.client_timeout,
)?; )?;
match connection { match connection {
LlmpConnection::IsBroker { broker } => { LlmpConnection::IsBroker { broker } => {
@ -1237,7 +1240,7 @@ where
self.shmem_provider.clone(), self.shmem_provider.clone(),
self.monitor.take().unwrap(), self.monitor.take().unwrap(),
self.broker_port, self.broker_port,
client_timeout, self.client_timeout,
)?; )?;
broker_things(event_broker, self.remote_broker_addr)?; broker_things(event_broker, self.remote_broker_addr)?;
@ -1400,19 +1403,6 @@ where
Ok((state, mgr)) Ok((state, mgr))
} }
/// Launch the restarting manager
pub fn launch(&mut self) -> Result<(Option<S>, LlmpRestartingEventManager<S, SP>), Error> {
self.launch_internal(None)
}
/// Launch the restarting manager with a custom client timeout
pub fn launch_with_client_timeout(
&mut self,
client_timeout: Duration,
) -> Result<(Option<S>, LlmpRestartingEventManager<S, SP>), Error> {
self.launch_internal(Some(client_timeout))
}
} }
/// A manager-like llmp client that converts between input types /// A manager-like llmp client that converts between input types

View File

@ -3,7 +3,7 @@ This shows how llmp can be used directly, without libafl abstractions
*/ */
extern crate alloc; extern crate alloc;
#[cfg(all(feature = "std", not(target_os = "haiku")))] #[cfg(not(target_os = "haiku"))]
use core::time::Duration; use core::time::Duration;
#[cfg(all(feature = "std", not(target_os = "haiku")))] #[cfg(all(feature = "std", not(target_os = "haiku")))]
use std::{num::NonZeroUsize, thread, time}; use std::{num::NonZeroUsize, thread, time};
@ -155,7 +155,8 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
match mode.as_str() { match mode.as_str() {
"broker" => { "broker" => {
let mut broker = llmp::LlmpBroker::new(StdShMemProvider::new()?, None)?; let mut broker =
llmp::LlmpBroker::new(StdShMemProvider::new()?, Duration::from_secs(5))?;
broker.launch_tcp_listener_on(port)?; broker.launch_tcp_listener_on(port)?;
// Exit when we got at least _n_ nodes, and all of them quit. // Exit when we got at least _n_ nodes, and all of them quit.
broker.set_exit_cleanly_after(NonZeroUsize::new(1_usize).unwrap()); broker.set_exit_cleanly_after(NonZeroUsize::new(1_usize).unwrap());
@ -166,7 +167,8 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
); );
} }
"b2b" => { "b2b" => {
let mut broker = llmp::LlmpBroker::new(StdShMemProvider::new()?, None)?; let mut broker =
llmp::LlmpBroker::new(StdShMemProvider::new()?, Duration::from_secs(5))?;
broker.launch_tcp_listener_on(b2b_port)?; broker.launch_tcp_listener_on(b2b_port)?;
// connect back to the main broker. // connect back to the main broker.
broker.connect_b2b(("127.0.0.1", port))?; broker.connect_b2b(("127.0.0.1", port))?;

View File

@ -105,8 +105,7 @@ use crate::{
}; };
/// The default timeout in seconds after which a client will be considered stale, and removed. /// The default timeout in seconds after which a client will be considered stale, and removed.
#[cfg(feature = "std")] pub const DEFAULT_CLIENT_TIMEOUT_SECS: Duration = Duration::from_secs(300);
const DEFAULT_CLIENT_TIMEOUT_SECS: u64 = 60 * 5;
/// The max number of pages a [`client`] may have mapped that were not yet read by the [`broker`] /// The max number of pages a [`client`] may have mapped that were not yet read by the [`broker`]
/// Usually, this value should not exceed `1`, else the broker cannot keep up with the amount of incoming messages. /// Usually, this value should not exceed `1`, else the broker cannot keep up with the amount of incoming messages.
@ -696,11 +695,7 @@ where
{ {
#[cfg(feature = "std")] #[cfg(feature = "std")]
/// Creates either a broker, if the tcp port is not bound, or a client, connected to this port. /// Creates either a broker, if the tcp port is not bound, or a client, connected to this port.
pub fn on_port( pub fn on_port(shmem_provider: SP, port: u16, client_timeout: Duration) -> Result<Self, Error> {
shmem_provider: SP,
port: u16,
client_timeout: Option<Duration>,
) -> Result<Self, Error> {
match tcp_bind(port) { match tcp_bind(port) {
Ok(listener) => { Ok(listener) => {
// We got the port. We are the broker! :) // We got the port. We are the broker! :)
@ -729,7 +724,7 @@ where
pub fn broker_on_port( pub fn broker_on_port(
shmem_provider: SP, shmem_provider: SP,
port: u16, port: u16,
client_timeout: Option<Duration>, client_timeout: Duration,
) -> Result<Self, Error> { ) -> Result<Self, Error> {
Ok(LlmpConnection::IsBroker { Ok(LlmpConnection::IsBroker {
broker: LlmpBroker::create_attach_to_tcp(shmem_provider, port, client_timeout)?, broker: LlmpBroker::create_attach_to_tcp(shmem_provider, port, client_timeout)?,
@ -2038,7 +2033,7 @@ where
/// Create and initialize a new [`LlmpBroker`] /// Create and initialize a new [`LlmpBroker`]
pub fn new( pub fn new(
shmem_provider: SP, shmem_provider: SP,
#[cfg(feature = "std")] client_timeout: Option<Duration>, #[cfg(feature = "std")] client_timeout: Duration,
) -> Result<Self, Error> { ) -> Result<Self, Error> {
// Broker never cleans up the pages so that new // Broker never cleans up the pages so that new
// clients may join at any time // clients may join at any time
@ -2057,7 +2052,7 @@ where
pub fn with_keep_pages( pub fn with_keep_pages(
mut shmem_provider: SP, mut shmem_provider: SP,
keep_pages_forever: bool, keep_pages_forever: bool,
#[cfg(feature = "std")] client_timeout: Option<Duration>, #[cfg(feature = "std")] client_timeout: Duration,
) -> Result<Self, Error> { ) -> Result<Self, Error> {
Ok(LlmpBroker { Ok(LlmpBroker {
llmp_out: LlmpSender { llmp_out: LlmpSender {
@ -2079,11 +2074,7 @@ where
exit_cleanly_after: None, exit_cleanly_after: None,
num_clients_total: 0, num_clients_total: 0,
#[cfg(feature = "std")] #[cfg(feature = "std")]
client_timeout: if let Some(to) = client_timeout { client_timeout,
to
} else {
Duration::from_secs(DEFAULT_CLIENT_TIMEOUT_SECS)
},
}) })
} }
@ -2106,7 +2097,7 @@ where
pub fn create_attach_to_tcp( pub fn create_attach_to_tcp(
shmem_provider: SP, shmem_provider: SP,
port: u16, port: u16,
client_timeout: Option<Duration>, client_timeout: Duration,
) -> Result<Self, Error> { ) -> Result<Self, Error> {
Self::with_keep_pages_attach_to_tcp(shmem_provider, port, true, client_timeout) Self::with_keep_pages_attach_to_tcp(shmem_provider, port, true, client_timeout)
} }
@ -2117,7 +2108,7 @@ where
shmem_provider: SP, shmem_provider: SP,
port: u16, port: u16,
keep_pages_forever: bool, keep_pages_forever: bool,
client_timeout: Option<Duration>, client_timeout: Duration,
) -> Result<Self, Error> { ) -> Result<Self, Error> {
match tcp_bind(port) { match tcp_bind(port) {
Ok(listener) => { Ok(listener) => {
@ -3302,7 +3293,7 @@ mod tests {
LlmpClient, LlmpClient,
LlmpConnection::{self, IsBroker, IsClient}, LlmpConnection::{self, IsBroker, IsClient},
LlmpMsgHookResult::ForwardToClients, LlmpMsgHookResult::ForwardToClients,
Tag, Tag, DEFAULT_CLIENT_TIMEOUT_SECS,
}; };
use crate::shmem::{ShMemProvider, StdShMemProvider}; use crate::shmem::{ShMemProvider, StdShMemProvider};
@ -3312,14 +3303,24 @@ mod tests {
pub fn test_llmp_connection() { pub fn test_llmp_connection() {
#[allow(unused_variables)] #[allow(unused_variables)]
let shmem_provider = StdShMemProvider::new().unwrap(); let shmem_provider = StdShMemProvider::new().unwrap();
let mut broker = match LlmpConnection::on_port(shmem_provider.clone(), 1337, None).unwrap() let mut broker = match LlmpConnection::on_port(
shmem_provider.clone(),
1337,
DEFAULT_CLIENT_TIMEOUT_SECS,
)
.unwrap()
{ {
IsClient { client: _ } => panic!("Could not bind to port as broker"), IsClient { client: _ } => panic!("Could not bind to port as broker"),
IsBroker { broker } => broker, IsBroker { broker } => broker,
}; };
// Add the first client (2nd, actually, because of the tcp listener client) // Add the first client (2nd, actually, because of the tcp listener client)
let mut client = match LlmpConnection::on_port(shmem_provider.clone(), 1337, None).unwrap() let mut client = match LlmpConnection::on_port(
shmem_provider.clone(),
1337,
DEFAULT_CLIENT_TIMEOUT_SECS,
)
.unwrap()
{ {
IsBroker { broker: _ } => panic!("Second connect should be a client!"), IsBroker { broker: _ } => panic!("Second connect should be a client!"),
IsClient { client } => client, IsClient { client } => client,