Address comments from #2302 (#2322)

* Address comments from #2302

* secure?

* cleanup

* early exit ftw

* address clippy

* Fix all the things
This commit is contained in:
Dominik Maier 2024-06-18 14:58:37 +01:00 committed by GitHub
parent 5519ee7d77
commit e64f0fb536
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 1581 additions and 146 deletions

View File

@ -58,6 +58,12 @@ handle_sigpipe = []
#! ## Additional Components #! ## Additional Components
## Enables `TcpEventManager`, a simple EventManager proxying everything via TCP. This uses `tokio`.
tcp_manager = ["tokio", "std"]
## Enables compression for the TCP manager
tcp_compression = ["tcp_manager", "libafl_bolts/gzip"]
## Enable multi-machine support ## Enable multi-machine support
multi_machine = ["tokio", "std", "enumflags2", "ahash/std"] multi_machine = ["tokio", "std", "enumflags2", "ahash/std"]

View File

@ -14,7 +14,6 @@ use libafl_bolts::{
shmem::ShMemProvider, shmem::ShMemProvider,
ClientId, Error, ClientId, Error,
}; };
use log::debug;
use tokio::{ use tokio::{
net::ToSocketAddrs, net::ToSocketAddrs,
runtime::Runtime, runtime::Runtime,
@ -102,7 +101,7 @@ where
A: Clone + Display + ToSocketAddrs + Send + Sync + 'static, A: Clone + Display + ToSocketAddrs + Send + Sync + 'static,
I: Input + Send + Sync + 'static, I: Input + Send + Sync + 'static,
{ {
/// Should not be created alone. Use [`TcpMultiMachineBuilder`] instead. /// Should not be created alone. Use [`TcpMultiMachineHooksBuilder`] instead.
pub(crate) fn new( pub(crate) fn new(
shared_state: Arc<RwLock<TcpMultiMachineState<A>>>, shared_state: Arc<RwLock<TcpMultiMachineState<A>>>,
rt: Arc<Runtime>, rt: Arc<Runtime>,
@ -120,8 +119,13 @@ where
A: Clone + Display + ToSocketAddrs + Send + Sync + 'static, A: Clone + Display + ToSocketAddrs + Send + Sync + 'static,
I: Input + Send + Sync + 'static, I: Input + Send + Sync + 'static,
{ {
/// Should not be created alone. Use [`TcpMultiMachineBuilder`] instead. /// Should not be created alone. Use [`TcpMultiMachineHooksBuilder`] instead.
pub(crate) fn new( ///
/// # Safety
/// For [`Self::on_new_message`], this struct assumes that the `msg` parameter
/// (or rather, the memory it points to), lives sufficiently long
/// for an async background task to process it.
pub(crate) unsafe fn new(
shared_state: Arc<RwLock<TcpMultiMachineState<A>>>, shared_state: Arc<RwLock<TcpMultiMachineState<A>>>,
rt: Arc<Runtime>, rt: Arc<Runtime>,
) -> Self { ) -> Self {
@ -200,7 +204,7 @@ where
// TODO: do not copy here // TODO: do not copy here
state_wr_lock.add_past_msg(msg); state_wr_lock.add_past_msg(msg);
debug!("Sending msg..."); log::debug!("Sending msg...");
state_wr_lock state_wr_lock
.send_interesting_event_to_nodes(&mm_msg) .send_interesting_event_to_nodes(&mm_msg)
@ -239,7 +243,7 @@ where
.receive_new_messages_from_nodes(&mut incoming_msgs) .receive_new_messages_from_nodes(&mut incoming_msgs)
.await?; .await?;
debug!("received {} new incoming msg(s)", incoming_msgs.len()); log::debug!("received {} new incoming msg(s)", incoming_msgs.len());
let msgs_to_forward: Result<Vec<(Tag, Flags, Vec<u8>)>, Error> = incoming_msgs let msgs_to_forward: Result<Vec<(Tag, Flags, Vec<u8>)>, Error> = incoming_msgs
.into_iter() .into_iter()

View File

@ -22,7 +22,6 @@ use libafl_bolts::{
tuples::Handle, tuples::Handle,
ClientId, ClientId,
}; };
use log::debug;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use super::NopEventManager; use super::NopEventManager;
@ -551,7 +550,7 @@ where
}; };
let event: Event<<<Self as UsesState>::State as UsesInput>::Input> = let event: Event<<<Self as UsesState>::State as UsesInput>::Input> =
postcard::from_bytes(event_bytes)?; postcard::from_bytes(event_bytes)?;
debug!("Processor received message {}", event.name_detailed()); log::debug!("Processor received message {}", event.name_detailed());
self.handle_in_main(fuzzer, executor, state, client_id, event)?; self.handle_in_main(fuzzer, executor, state, client_id, event)?;
count += 1; count += 1;
} }
@ -574,7 +573,7 @@ where
Z: ExecutionProcessor<E::Observers, State = <Self as UsesState>::State> Z: ExecutionProcessor<E::Observers, State = <Self as UsesState>::State>
+ EvaluatorObservers<E::Observers>, + EvaluatorObservers<E::Observers>,
{ {
debug!("handle_in_main!"); log::debug!("handle_in_main!");
let event_name = event.name_detailed(); let event_name = event.name_detailed();
@ -591,7 +590,7 @@ where
#[cfg(feature = "multi_machine")] #[cfg(feature = "multi_machine")]
node_id, node_id,
} => { } => {
debug!( log::debug!(
"Received {} from {client_id:?} ({client_config:?}, forward {forward_id:?})", "Received {} from {client_id:?} ({client_config:?}, forward {forward_id:?})",
event_name event_name
); );
@ -604,7 +603,7 @@ where
{ {
state.scalability_monitor_mut().testcase_with_observers += 1; state.scalability_monitor_mut().testcase_with_observers += 1;
} }
debug!( log::debug!(
"[{}] Running fuzzer with event {}", "[{}] Running fuzzer with event {}",
process::id(), process::id(),
event_name event_name
@ -622,7 +621,7 @@ where
{ {
state.scalability_monitor_mut().testcase_without_observers += 1; state.scalability_monitor_mut().testcase_without_observers += 1;
} }
debug!( log::debug!(
"[{}] Running fuzzer with event {}", "[{}] Running fuzzer with event {}",
process::id(), process::id(),
event_name event_name
@ -652,7 +651,7 @@ where
self.hooks.on_fire_all(state, client_id, &event)?; self.hooks.on_fire_all(state, client_id, &event)?;
debug!( log::debug!(
"[{}] Adding received Testcase {} as item #{item}...", "[{}] Adding received Testcase {} as item #{item}...",
process::id(), process::id(),
event_name event_name
@ -660,7 +659,7 @@ where
self.inner.fire(state, event)?; self.inner.fire(state, event)?;
} else { } else {
debug!("[{}] {} was discarded...)", process::id(), event_name); log::debug!("[{}] {} was discarded...)", process::id(), event_name);
} }
} }
_ => { _ => {

View File

@ -46,8 +46,6 @@ use libafl_bolts::{
shmem::ShMemProvider, shmem::ShMemProvider,
tuples::{tuple_list, Handle}, tuples::{tuple_list, Handle},
}; };
#[cfg(all(unix, feature = "std", feature = "fork"))]
use log::debug;
#[cfg(feature = "std")] #[cfg(feature = "std")]
use typed_builder::TypedBuilder; use typed_builder::TypedBuilder;
@ -57,7 +55,7 @@ use super::StdLlmpEventHook;
#[cfg(all(unix, feature = "std", feature = "fork", feature = "multi_machine"))] #[cfg(all(unix, feature = "std", feature = "fork", feature = "multi_machine"))]
use crate::events::multi_machine::NodeDescriptor; use crate::events::multi_machine::NodeDescriptor;
#[cfg(all(unix, feature = "std", feature = "fork", feature = "multi_machine"))] #[cfg(all(unix, feature = "std", feature = "fork", feature = "multi_machine"))]
use crate::events::multi_machine::TcpMultiMachineBuilder; use crate::events::multi_machine::TcpMultiMachineHooks;
#[cfg(all(unix, feature = "std", feature = "fork"))] #[cfg(all(unix, feature = "std", feature = "fork"))]
use crate::events::{centralized::CentralizedEventManager, CentralizedLlmpHook}; use crate::events::{centralized::CentralizedEventManager, CentralizedLlmpHook};
#[cfg(all(unix, feature = "std", feature = "fork"))] #[cfg(all(unix, feature = "std", feature = "fork"))]
@ -657,7 +655,7 @@ where
let num_cores = core_ids.len(); let num_cores = core_ids.len();
let mut handles = vec![]; let mut handles = vec![];
debug!("spawning on cores: {:?}", self.cores); log::debug!("spawning on cores: {:?}", self.cores);
self.opened_stdout_file = self self.opened_stdout_file = self
.stdout_file .stdout_file
@ -700,7 +698,7 @@ where
if index == 1 { if index == 1 {
// Main client // Main client
debug!("Running main client on PID {}", std::process::id()); log::debug!("Running main client on PID {}", std::process::id());
let (state, mgr) = let (state, mgr) =
main_inner_mgr_builder.take().unwrap()(self, *bind_to)?; main_inner_mgr_builder.take().unwrap()(self, *bind_to)?;
@ -721,7 +719,7 @@ where
self.main_run_client.take().unwrap()(state, c_mgr, *bind_to) self.main_run_client.take().unwrap()(state, c_mgr, *bind_to)
} else { } else {
// Secondary clients // Secondary clients
debug!("Running secondary client on PID {}", std::process::id()); log::debug!("Running secondary client on PID {}", std::process::id());
let (state, mgr) = let (state, mgr) =
secondary_inner_mgr_builder.take().unwrap()(self, *bind_to)?; secondary_inner_mgr_builder.take().unwrap()(self, *bind_to)?;
@ -744,11 +742,18 @@ where
#[cfg(feature = "multi_machine")] #[cfg(feature = "multi_machine")]
// Create this after forks, to avoid problems with tokio runtime // Create this after forks, to avoid problems with tokio runtime
let (multi_machine_sender_hook, multi_machine_receiver_hook) =
TcpMultiMachineBuilder::build::< // # Safety
SocketAddr, // The `multi_machine_receiver_hook` needs messages to outlive the receiver.
<<EM as UsesState>::State as UsesInput>::Input, // The underlying memory region for incoming messages lives longer than the async thread processing them.
>(self.multi_machine_node_descriptor.clone())?; let TcpMultiMachineHooks {
sender: multi_machine_sender_hook,
receiver: multi_machine_receiver_hook,
} = unsafe {
TcpMultiMachineHooks::builder()
.node_descriptor(self.multi_machine_node_descriptor.clone())
.build::<<<EM as UsesState>::State as UsesInput>::Input>()?
};
let mut brokers = Brokers::new(); let mut brokers = Brokers::new();
@ -812,7 +817,7 @@ where
brokers.add(Box::new(broker)); brokers.add(Box::new(broker));
} }
debug!( log::debug!(
"Brokers have been initialized on port {}.", "Brokers have been initialized on port {}.",
std::process::id() std::process::id()
); );

View File

@ -25,7 +25,6 @@ use libafl_bolts::{
llmp::{recv_tcp_msg, send_tcp_msg, TcpRequest, TcpResponse}, llmp::{recv_tcp_msg, send_tcp_msg, TcpRequest, TcpResponse},
IP_LOCALHOST, IP_LOCALHOST,
}; };
use log::debug;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
#[cfg(feature = "llmp_compression")] #[cfg(feature = "llmp_compression")]
@ -370,7 +369,7 @@ where
Ok(_) => (), Ok(_) => (),
Err(e) => log::error!("Failed to send tcp message {:#?}", e), Err(e) => log::error!("Failed to send tcp message {:#?}", e),
} }
debug!("Asking he broker to be disconnected"); log::debug!("Asking he broker to be disconnected");
Ok(()) Ok(())
} }
@ -423,11 +422,11 @@ where
.. ..
} => { } => {
#[cfg(feature = "std")] #[cfg(feature = "std")]
debug!("[{}] Received new Testcase {evt_name} from {client_id:?} ({client_config:?}, forward {forward_id:?})", std::process::id()); log::debug!("[{}] Received new Testcase {evt_name} from {client_id:?} ({client_config:?}, forward {forward_id:?})", std::process::id());
if self.always_interesting { if self.always_interesting {
let item = fuzzer.add_input(state, executor, self, input)?; let item = fuzzer.add_input(state, executor, self, input)?;
debug!("Added received Testcase as item #{item}"); log::debug!("Added received Testcase as item #{item}");
} else { } else {
let res = if client_config.match_with(&self.configuration) let res = if client_config.match_with(&self.configuration)
&& observers_buf.is_some() && observers_buf.is_some()
@ -455,9 +454,9 @@ where
)? )?
}; };
if let Some(item) = res.1 { if let Some(item) = res.1 {
debug!("Added received Testcase {evt_name} as item #{item}"); log::debug!("Added received Testcase {evt_name} as item #{item}");
} else { } else {
debug!("Testcase {evt_name} was discarded"); log::debug!("Testcase {evt_name} was discarded");
} }
} }
} }
@ -620,7 +619,7 @@ where
msg msg
}; };
let event: Event<S::Input> = postcard::from_bytes(event_bytes)?; let event: Event<S::Input> = postcard::from_bytes(event_bytes)?;
debug!("Received event in normal llmp {}", event.name_detailed()); log::debug!("Received event in normal llmp {}", event.name_detailed());
self.handle_in_client(fuzzer, executor, state, client_id, event)?; self.handle_in_client(fuzzer, executor, state, client_id, event)?;
count += 1; count += 1;
} }

View File

@ -13,7 +13,6 @@ use libafl_bolts::{
shmem::{NopShMemProvider, ShMemProvider}, shmem::{NopShMemProvider, ShMemProvider},
ClientId, ClientId,
}; };
use log::debug;
use serde::Deserialize; use serde::Deserialize;
use crate::{ use crate::{
@ -303,7 +302,7 @@ where
Event::NewTestcase { Event::NewTestcase {
input, forward_id, .. input, forward_id, ..
} => { } => {
debug!("Received new Testcase to convert from {client_id:?} (forward {forward_id:?}, forward {forward_id:?})"); log::debug!("Received new Testcase to convert from {client_id:?} (forward {forward_id:?}, forward {forward_id:?})");
let Some(converter) = self.converter_back.as_mut() else { let Some(converter) = self.converter_back.as_mut() else {
return Ok(()); return Ok(());
@ -377,7 +376,7 @@ where
}; };
let event: Event<DI> = postcard::from_bytes(event_bytes)?; let event: Event<DI> = postcard::from_bytes(event_bytes)?;
debug!("Processor received message {}", event.name_detailed()); log::debug!("Processor received message {}", event.name_detailed());
self.handle_in_client(fuzzer, executor, state, manager, client_id, event)?; self.handle_in_client(fuzzer, executor, state, manager, client_id, event)?;
count += 1; count += 1;
} }

View File

@ -31,8 +31,6 @@ use libafl_bolts::{
use libafl_bolts::{ use libafl_bolts::{
llmp::LlmpConnection, os::CTRL_C_EXIT, shmem::StdShMemProvider, staterestore::StateRestorer, llmp::LlmpConnection, os::CTRL_C_EXIT, shmem::StdShMemProvider, staterestore::StateRestorer,
}; };
#[cfg(all(unix, feature = "fork"))]
use log::debug;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
#[cfg(feature = "std")] #[cfg(feature = "std")]
use typed_builder::TypedBuilder; use typed_builder::TypedBuilder;
@ -564,7 +562,7 @@ where
handle.status() handle.status()
} }
ForkResult::Child => { ForkResult::Child => {
debug!( log::debug!(
"{} has been forked into {}", "{} has been forked into {}",
std::os::unix::process::parent_id(), std::os::unix::process::parent_id(),
std::process::id() std::process::id()

View File

@ -16,6 +16,9 @@ pub mod launcher;
#[allow(clippy::ignored_unit_patterns)] #[allow(clippy::ignored_unit_patterns)]
pub mod llmp; pub mod llmp;
pub use llmp::*; pub use llmp::*;
#[cfg(feature = "tcp_manager")]
#[allow(clippy::ignored_unit_patterns)]
pub mod tcp;
pub mod broker_hooks; pub mod broker_hooks;
use alloc::{ use alloc::{

View File

@ -14,9 +14,8 @@ use std::{
use enumflags2::{bitflags, BitFlags}; use enumflags2::{bitflags, BitFlags};
#[cfg(feature = "llmp_compression")] #[cfg(feature = "llmp_compression")]
use libafl_bolts::bolts_prelude::GzipCompressor; use libafl_bolts::compress::GzipCompressor;
use libafl_bolts::{current_time, ownedref::OwnedRef, Error}; use libafl_bolts::{current_time, ownedref::OwnedRef, Error};
use log::{debug, error};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use tokio::{ use tokio::{
io::{AsyncReadExt, AsyncWriteExt}, io::{AsyncReadExt, AsyncWriteExt},
@ -30,7 +29,7 @@ use typed_builder::TypedBuilder;
use crate::{ use crate::{
events::{Event, TcpMultiMachineLlmpReceiverHook, TcpMultiMachineLlmpSenderHook}, events::{Event, TcpMultiMachineLlmpReceiverHook, TcpMultiMachineLlmpSenderHook},
inputs::Input, inputs::{Input, NopInput},
}; };
const MAX_NB_RECEIVED_AT_ONCE: usize = 10; const MAX_NB_RECEIVED_AT_ONCE: usize = 10;
@ -154,31 +153,85 @@ pub struct NodeDescriptor<A> {
pub flags: BitFlags<NodePolicy>, // The policy for shared messages between nodes. pub flags: BitFlags<NodePolicy>, // The policy for shared messages between nodes.
} }
/// A Multi-machine `broker_hooks` builder. /// A set of multi-machine `broker_hooks`.
///
/// Beware, the hooks should run in the same process as the one this function is called.
/// This is because we spawn a tokio runtime underneath.
/// Check `<https://github.com/tokio-rs/tokio/issues/4301>` for more details.
///
/// Use `TcpMultiMachineHooks::builder()` to initialize the hooks.
///
/// # Safety
/// The [`TcpMultiMachineLlmpReceiverHook`] assumes that the `msg` parameter
/// passed to the `on_new_message` method (or rather, the memory it points to),
/// lives sufficiently long for an async background task to process it.
#[derive(Debug)] #[derive(Debug)]
pub struct TcpMultiMachineBuilder { pub struct TcpMultiMachineHooks<A, I>
_private: (), where
I: Input,
{
/// The sender hooks
pub sender: TcpMultiMachineLlmpSenderHook<A, I>,
/// The hooks
pub receiver: TcpMultiMachineLlmpReceiverHook<A, I>,
} }
impl TcpMultiMachineBuilder { impl TcpMultiMachineHooks<(), NopInput> {
/// Build a new couple [`TcpMultiMachineLlmpSenderHook`] / [`TcpMultiMachineLlmpReceiverHook`] from a [`NodeDescriptor`]. /// Create the builder to build a new [`TcpMultiMachineHooks`]
/// containing a sender and a receiver from a [`NodeDescriptor`].
#[must_use]
pub fn builder() -> TcpMultiMachineHooksBuilder<()> {
TcpMultiMachineHooksBuilder::<()> {
node_descriptor: None,
}
}
}
/// A Multi-machine `broker_hooks` builder.
#[derive(Debug)]
pub struct TcpMultiMachineHooksBuilder<A> {
node_descriptor: Option<NodeDescriptor<A>>,
}
impl<A> TcpMultiMachineHooksBuilder<A> {
/// Set the multi machine [`NodeDescriptor`] used by the resulting [`TcpMultiMachineHooks`].
pub fn node_descriptor<A2>(
self,
node_descriptor: NodeDescriptor<A2>,
) -> TcpMultiMachineHooksBuilder<A2>
where
A2: Clone + Display + ToSocketAddrs + Send + Sync + 'static,
{
TcpMultiMachineHooksBuilder::<A2> {
node_descriptor: Some(node_descriptor),
}
}
}
impl<A> TcpMultiMachineHooksBuilder<A>
where
A: Clone + Display + ToSocketAddrs + Send + Sync + 'static,
{
/// Build a new [`TcpMultiMachineHooks`] containing a sender and a receiver from a [`NodeDescriptor`].
/// Everything is initialized and ready to be used. /// Everything is initialized and ready to be used.
/// Beware, the hooks should run in the same process as the one this function is called. /// Beware, the hooks should run in the same process as the one this function is called.
/// This is because we spawn a tokio runtime underneath. /// This is because we spawn a tokio runtime underneath.
/// Check `<https://github.com/tokio-rs/tokio/issues/4301>` for more details. /// Check `<https://github.com/tokio-rs/tokio/issues/4301>` for more details.
pub fn build<A, I>( ///
node_descriptor: NodeDescriptor<A>, /// # Safety
) -> Result< /// The returned [`TcpMultiMachineLlmpReceiverHook`] assumes that the `msg` parameter
( /// passed to the `on_new_message` method (or rather, the memory it points to),
TcpMultiMachineLlmpSenderHook<A, I>, /// lives sufficiently long for an async background task to process it.
TcpMultiMachineLlmpReceiverHook<A, I>, pub unsafe fn build<I>(mut self) -> Result<TcpMultiMachineHooks<A, I>, Error>
),
Error,
>
where where
A: Clone + Display + ToSocketAddrs + Send + Sync + 'static,
I: Input + Send + Sync + 'static, I: Input + Send + Sync + 'static,
{ {
let node_descriptor = self.node_descriptor.take().ok_or_else(|| {
Error::illegal_state(
"The node descriptor can never be `None` at this point in the code",
)
})?;
// Create the state of the hook. This will be shared with the background server, so we wrap // Create the state of the hook. This will be shared with the background server, so we wrap
// it with concurrent-safe objects // it with concurrent-safe objects
let state = Arc::new(RwLock::new(TcpMultiMachineState { let state = Arc::new(RwLock::new(TcpMultiMachineState {
@ -197,10 +250,10 @@ impl TcpMultiMachineBuilder {
TcpMultiMachineState::init::<I>(&state.clone(), &rt.clone())?; TcpMultiMachineState::init::<I>(&state.clone(), &rt.clone())?;
} }
Ok(( Ok(TcpMultiMachineHooks {
TcpMultiMachineLlmpSenderHook::new(state.clone(), rt.clone()), sender: TcpMultiMachineLlmpSenderHook::new(state.clone(), rt.clone()),
TcpMultiMachineLlmpReceiverHook::new(state, rt), receiver: TcpMultiMachineLlmpReceiverHook::new(state, rt),
)) })
} }
} }
@ -230,10 +283,10 @@ where
let timeout = current_time() + parent_lock.node_descriptor.timeout; let timeout = current_time() + parent_lock.node_descriptor.timeout;
parent_lock.parent = loop { parent_lock.parent = loop {
debug!("Trying to connect to parent @ {}..", parent_addr); log::debug!("Trying to connect to parent @ {}..", parent_addr);
match TcpStream::connect(parent_addr).await { match TcpStream::connect(parent_addr).await {
Ok(stream) => { Ok(stream) => {
debug!("Connected to parent @ {}", parent_addr); log::debug!("Connected to parent @ {}", parent_addr);
break Some(stream); break Some(stream);
} }
@ -256,7 +309,7 @@ where
let bg_state = self_mutex.clone(); let bg_state = self_mutex.clone();
let _handle: JoinHandle<Result<(), Error>> = rt.spawn(async move { let _handle: JoinHandle<Result<(), Error>> = rt.spawn(async move {
let addr = format!("0.0.0.0:{listening_port}"); let addr = format!("0.0.0.0:{listening_port}");
debug!("Starting background child task on {addr}..."); log::debug!("Starting background child task on {addr}...");
let listener = TcpListener::bind(addr).await.map_err(|e| { let listener = TcpListener::bind(addr).await.map_err(|e| {
Error::os_error(e, format!("Error while binding to port {listening_port}")) Error::os_error(e, format!("Error while binding to port {listening_port}"))
})?; })?;
@ -264,31 +317,30 @@ where
// The main listening loop. Should never fail. // The main listening loop. Should never fail.
'listening: loop { 'listening: loop {
debug!("listening for children on {:?}...", listener); log::debug!("listening for children on {:?}...", listener);
match listener.accept().await { match listener.accept().await {
Ok((mut stream, addr)) => { Ok((mut stream, addr)) => {
debug!("{} joined the children.", addr); log::debug!("{} joined the children.", addr);
let mut state_guard = state.write().await; let mut state_guard = state.write().await;
if let Err(e) = state_guard if let Err(e) = state_guard
.send_old_events_to_stream::<I>(&mut stream) .send_old_events_to_stream::<I>(&mut stream)
.await .await
{ {
error!("Error while send old messages: {:?}.", e); log::error!("Error while send old messages: {e:?}.");
error!("The loop will resume"); log::error!("The loop will resume");
continue 'listening; continue 'listening;
} }
state_guard.children.insert(NodeId::new(), stream); state_guard.children.insert(NodeId::new(), stream);
debug!( log::debug!(
"[pid {}]{} added the child. nb children: {}", "[pid {}]{addr} added the child. nb children: {}",
process::id(), process::id(),
addr,
state_guard.children.len() state_guard.children.len()
); );
} }
Err(e) => { Err(e) => {
error!("Error while accepting child {:?}.", e); log::error!("Error while accepting child {e:?}.");
} }
} }
} }
@ -318,7 +370,7 @@ where
) -> Result<Option<MultiMachineMsg<'a, I>>, Error> { ) -> Result<Option<MultiMachineMsg<'a, I>>, Error> {
// 0. Check if we should try to fetch something from the stream // 0. Check if we should try to fetch something from the stream
let mut dummy_byte: [u8; 1] = [0u8]; let mut dummy_byte: [u8; 1] = [0u8];
debug!("Starting read msg..."); log::debug!("Starting read msg...");
let n_read = match stream.try_read(&mut dummy_byte) { let n_read = match stream.try_read(&mut dummy_byte) {
Ok(n) => n, Ok(n) => n,
@ -328,14 +380,14 @@ where
Err(e) => return Err(Error::os_error(e, "try read failed")), Err(e) => return Err(Error::os_error(e, "try read failed")),
}; };
debug!("msg read."); log::debug!("msg read.");
if n_read == 0 { if n_read == 0 {
debug!("No dummy byte received..."); log::debug!("No dummy byte received...");
return Ok(None); // Nothing to read from this stream return Ok(None); // Nothing to read from this stream
} }
debug!("Received dummy byte!"); log::debug!("Received dummy byte!");
// we should always read the dummy byte at this point. // we should always read the dummy byte at this point.
assert_eq!(u8::from_le_bytes(dummy_byte), DUMMY_BYTE); assert_eq!(u8::from_le_bytes(dummy_byte), DUMMY_BYTE);
@ -368,15 +420,15 @@ where
let msg_len = u32::to_le_bytes(serialized_msg.len() as u32); let msg_len = u32::to_le_bytes(serialized_msg.len() as u32);
// 0. Write the dummy byte // 0. Write the dummy byte
debug!("Sending dummy byte"); log::debug!("Sending dummy byte");
stream.write_all(&[DUMMY_BYTE]).await?; stream.write_all(&[DUMMY_BYTE]).await?;
// 1. Write msg size // 1. Write msg size
debug!("Sending msg len"); log::debug!("Sending msg len");
stream.write_all(&msg_len).await?; stream.write_all(&msg_len).await?;
// 2. Write msg // 2. Write msg
debug!("Sending msg"); log::debug!("Sending msg");
stream.write_all(serialized_msg).await?; stream.write_all(serialized_msg).await?;
Ok(()) Ok(())
@ -386,17 +438,17 @@ where
&mut self, &mut self,
stream: &mut TcpStream, stream: &mut TcpStream,
) -> Result<(), Error> { ) -> Result<(), Error> {
debug!("Send old events to new child..."); log::debug!("Send old events to new child...");
for old_msg in &self.old_msgs { for old_msg in &self.old_msgs {
let event_ref: MultiMachineMsg<I> = let event_ref: MultiMachineMsg<I> =
MultiMachineMsg::llmp_msg(OwnedRef::Ref(old_msg.as_slice())); MultiMachineMsg::llmp_msg(OwnedRef::Ref(old_msg.as_slice()));
debug!("Sending an old message..."); log::debug!("Sending an old message...");
Self::write_msg(stream, &event_ref).await?; Self::write_msg(stream, &event_ref).await?;
debug!("Old message sent."); log::debug!("Old message sent.");
} }
debug!("Sent {} old messages.", self.old_msgs.len()); log::debug!("Sent {} old messages.", self.old_msgs.len());
Ok(()) Ok(())
} }
@ -405,7 +457,7 @@ where
&mut self, &mut self,
msg: &MultiMachineMsg<'a, I>, msg: &MultiMachineMsg<'a, I>,
) -> Result<(), Error> { ) -> Result<(), Error> {
debug!("Sending interesting events to nodes..."); log::debug!("Sending interesting events to nodes...");
if self if self
.node_descriptor .node_descriptor
@ -413,10 +465,12 @@ where
.intersects(NodePolicy::SendToParent) .intersects(NodePolicy::SendToParent)
{ {
if let Some(parent) = &mut self.parent { if let Some(parent) = &mut self.parent {
debug!("Sending to parent..."); log::debug!("Sending to parent...");
if let Err(e) = Self::write_msg(parent, msg).await { if let Err(e) = Self::write_msg(parent, msg).await {
error!("The parent disconnected. We won't try to communicate with it again."); log::error!(
error!("Error: {:?}", e); "The parent disconnected. We won't try to communicate with it again."
);
log::error!("Error: {e:?}");
self.parent.take(); self.parent.take();
} }
} }
@ -429,17 +483,19 @@ where
{ {
let mut ids_to_remove: Vec<NodeId> = Vec::new(); let mut ids_to_remove: Vec<NodeId> = Vec::new();
for (child_id, child_stream) in &mut self.children { for (child_id, child_stream) in &mut self.children {
debug!("Sending to child..."); log::debug!("Sending to child...");
if (Self::write_msg(child_stream, msg).await).is_err() { if (Self::write_msg(child_stream, msg).await).is_err() {
// most likely the child disconnected. drop the connection later on and continue. // most likely the child disconnected. drop the connection later on and continue.
debug!("The child disconnected. We won't try to communicate with it again."); log::debug!(
"The child disconnected. We won't try to communicate with it again."
);
ids_to_remove.push(*child_id); ids_to_remove.push(*child_id);
} }
} }
// Garbage collect disconnected children // Garbage collect disconnected children
for id_to_remove in &ids_to_remove { for id_to_remove in &ids_to_remove {
debug!("Child {:?} has been garbage collected.", id_to_remove); log::debug!("Child {:?} has been garbage collected.", id_to_remove);
self.children.remove(id_to_remove); self.children.remove(id_to_remove);
} }
} }
@ -453,7 +509,7 @@ where
&mut self, &mut self,
msgs: &mut Vec<MultiMachineMsg<'a, I>>, msgs: &mut Vec<MultiMachineMsg<'a, I>>,
) -> Result<(), Error> { ) -> Result<(), Error> {
debug!("Checking for new events from other nodes..."); log::debug!("Checking for new events from other nodes...");
let mut nb_received = 0usize; let mut nb_received = 0usize;
// Our (potential) parent could have something for us // Our (potential) parent could have something for us
@ -464,10 +520,10 @@ where
return Ok(()); return Ok(());
} }
debug!("Receiving from parent..."); log::debug!("Receiving from parent...");
match Self::read_msg(parent).await { match Self::read_msg(parent).await {
Ok(Some(msg)) => { Ok(Some(msg)) => {
debug!("Received event from parent"); log::debug!("Received event from parent");
// The parent has something for us, we store it // The parent has something for us, we store it
msgs.push(msg); msgs.push(msg);
nb_received += 1; nb_received += 1;
@ -475,13 +531,13 @@ where
Ok(None) => { Ok(None) => {
// nothing from the parent, we continue // nothing from the parent, we continue
debug!("Nothing from parent"); log::debug!("Nothing from parent");
break; break;
} }
Err(Error::OsError(_, _, _)) => { Err(Error::OsError(_, _, _)) => {
// most likely the parent disconnected. drop the connection // most likely the parent disconnected. drop the connection
debug!( log::debug!(
"The parent disconnected. We won't try to communicate with it again." "The parent disconnected. We won't try to communicate with it again."
); );
self.parent.take(); self.parent.take();
@ -489,7 +545,7 @@ where
} }
Err(e) => { Err(e) => {
debug!("An error occurred and was not expected."); log::debug!("An error occurred and was not expected.");
return Err(e); return Err(e);
} }
} }
@ -498,7 +554,7 @@ where
// What about the (potential) children? // What about the (potential) children?
let mut ids_to_remove: Vec<NodeId> = Vec::new(); let mut ids_to_remove: Vec<NodeId> = Vec::new();
debug!( log::debug!(
"[pid {}] Nb children: {}", "[pid {}] Nb children: {}",
process::id(), process::id(),
self.children.len() self.children.len()
@ -510,34 +566,34 @@ where
return Ok(()); return Ok(());
} }
debug!("Receiving from child {:?}...", child_id); log::debug!("Receiving from child {:?}...", child_id);
match Self::read_msg(child_stream).await { match Self::read_msg(child_stream).await {
Ok(Some(msg)) => { Ok(Some(msg)) => {
// The parent has something for us, we store it // The parent has something for us, we store it
debug!("Received event from child!"); log::debug!("Received event from child!");
msgs.push(msg); msgs.push(msg);
nb_received += 1; nb_received += 1;
} }
Ok(None) => { Ok(None) => {
// nothing from the parent, we continue // nothing from the parent, we continue
debug!("Nothing from child"); log::debug!("Nothing from child");
break; break;
} }
Err(Error::OsError(e, _, _)) => { Err(Error::OsError(e, _, _)) => {
// most likely the parent disconnected. drop the connection // most likely the parent disconnected. drop the connection
error!( log::error!(
"The parent disconnected. We won't try to communicate with it again." "The parent disconnected. We won't try to communicate with it again."
); );
error!("Error: {:?}", e); log::error!("Error: {e:?}");
ids_to_remove.push(*child_id); ids_to_remove.push(*child_id);
break; break;
} }
Err(e) => { Err(e) => {
// Other error // Other error
debug!("An error occurred and was not expected."); log::debug!("An error occurred and was not expected.");
return Err(e); return Err(e);
} }
} }
@ -546,7 +602,7 @@ where
// Garbage collect disconnected children // Garbage collect disconnected children
for id_to_remove in &ids_to_remove { for id_to_remove in &ids_to_remove {
debug!("Child {:?} has been garbage collected.", id_to_remove); log::debug!("Child {:?} has been garbage collected.", id_to_remove);
self.children.remove(id_to_remove); self.children.remove(id_to_remove);
} }

1364
libafl/src/events/tcp.rs Normal file

File diff suppressed because it is too large Load Diff

View File

@ -32,7 +32,6 @@ Welcome to `LibAFL`
clippy::similar_names, clippy::similar_names,
clippy::too_many_lines, clippy::too_many_lines,
clippy::into_iter_without_iter, // broken clippy::into_iter_without_iter, // broken
clippy::type_complexity,
)] )]
#![cfg_attr(not(test), warn( #![cfg_attr(not(test), warn(
missing_debug_implementations, missing_debug_implementations,

View File

@ -31,7 +31,6 @@
clippy::ptr_cast_constness, clippy::ptr_cast_constness,
clippy::negative_feature_names, clippy::negative_feature_names,
clippy::too_many_lines, clippy::too_many_lines,
clippy::if_not_else
)] )]
#![cfg_attr(not(test), warn( #![cfg_attr(not(test), warn(
missing_debug_implementations, missing_debug_implementations,

View File

@ -87,7 +87,6 @@ use std::{
#[cfg(all(debug_assertions, feature = "llmp_debug", feature = "std"))] #[cfg(all(debug_assertions, feature = "llmp_debug", feature = "std"))]
use backtrace::Backtrace; use backtrace::Backtrace;
use log::debug;
#[cfg(all(unix, feature = "std"))] #[cfg(all(unix, feature = "std"))]
#[cfg(not(any(target_os = "solaris", target_os = "illumos")))] #[cfg(not(any(target_os = "solaris", target_os = "illumos")))]
use nix::sys::socket::{self, sockopt::ReusePort}; use nix::sys::socket::{self, sockopt::ReusePort};
@ -1288,7 +1287,7 @@ where
self.last_msg_sent = msg; self.last_msg_sent = msg;
self.has_unsent_message = false; self.has_unsent_message = false;
debug!( log::debug!(
"[{} - {:#x}] Send message with id {}", "[{} - {:#x}] Send message with id {}",
self.id.0, self as *const Self as u64, mid self.id.0, self as *const Self as u64, mid
); );
@ -1702,7 +1701,7 @@ where
return Err(Error::illegal_state("Unexpected message in map (out of map bounds) - buggy client or tampered shared map detected!")); return Err(Error::illegal_state("Unexpected message in map (out of map bounds) - buggy client or tampered shared map detected!"));
} }
debug!( log::debug!(
"[{} - {:#x}] Received message with ID {}...", "[{} - {:#x}] Received message with ID {}...",
self.id.0, self.id.0,
self as *const Self as u64, self as *const Self as u64,
@ -2369,7 +2368,16 @@ impl Brokers {
loop { loop {
self.llmp_brokers.retain_mut(|broker| { self.llmp_brokers.retain_mut(|broker| {
if !broker.is_shutting_down() { if broker.is_shutting_down() {
broker.send_buf(LLMP_TAG_EXITING, &[]).expect(
"Error when shutting down broker: Could not send LLMP_TAG_EXITING msg.",
);
return false
}
if current_milliseconds() > end_time { if current_milliseconds() > end_time {
broker broker
.on_timeout() .on_timeout()
@ -2403,13 +2411,6 @@ impl Brokers {
} }
true true
} else {
broker.send_buf(LLMP_TAG_EXITING, &[]).expect(
"Error when shutting down broker: Could not send LLMP_TAG_EXITING msg.",
);
false
}
}); });
if self.llmp_brokers.is_empty() { if self.llmp_brokers.is_empty() {
@ -2769,7 +2770,7 @@ where
self.inner.forward_msg(msg)?; self.inner.forward_msg(msg)?;
} }
debug!("New msg vector: {}", new_msgs.len()); log::debug!("New msg vector: {}", new_msgs.len());
for (new_msg_tag, new_msg_flag, new_msg) in new_msgs { for (new_msg_tag, new_msg_flag, new_msg) in new_msgs {
self.inner.llmp_out.send_buf_with_flags( self.inner.llmp_out.send_buf_with_flags(
new_msg_tag, new_msg_tag,
@ -3696,7 +3697,7 @@ where
break stream; break stream;
} }
debug!("Connection Refused. Retrying..."); log::debug!("Connection Refused. Retrying...");
#[cfg(feature = "std")] #[cfg(feature = "std")]
thread::sleep(Duration::from_millis(50)); thread::sleep(Duration::from_millis(50));

View File

@ -27,6 +27,7 @@ mod bindings {
pub use bindings::*; pub use bindings::*;
#[cfg(any(feature = "clippy", not(target_os = "linux")))] #[cfg(any(feature = "clippy", not(target_os = "linux")))]
#[allow(dead_code)]
#[rustfmt::skip] #[rustfmt::skip]
mod x86_64_stub_bindings; mod x86_64_stub_bindings;
@ -126,6 +127,7 @@ pub type GuestHwAddrInfo = crate::qemu_plugin_hwaddr;
#[derive(Debug)] #[derive(Debug)]
#[repr(C)] #[repr(C)]
#[cfg(target_os = "linux")]
#[cfg_attr(feature = "python", pyclass(unsendable))] #[cfg_attr(feature = "python", pyclass(unsendable))]
pub struct MapInfo { pub struct MapInfo {
start: GuestAddr, start: GuestAddr,
@ -213,6 +215,7 @@ extern_c_checked! {
pub fn libafl_qemu_gdb_reply(buf: *const u8, len: usize); pub fn libafl_qemu_gdb_reply(buf: *const u8, len: usize);
} }
#[cfg(target_os = "linux")]
#[cfg_attr(feature = "python", pymethods)] #[cfg_attr(feature = "python", pymethods)]
impl MapInfo { impl MapInfo {
#[must_use] #[must_use]