Remove shmem associated type (#2870)

* reduce shm trait bound

* Rename to SendExiting

* alpha beta gamam

* alphabet

* work

* std only

---------

Co-authored-by: Romain Malmain <romain.malmain@pm.me>
This commit is contained in:
Dominik Maier 2025-01-20 20:28:19 +01:00 committed by GitHub
parent 72adb483b5
commit 7e18887a32
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 524 additions and 484 deletions

View File

@ -7,7 +7,7 @@ use std::{env, fmt::Write, io, path::PathBuf, process, ptr::NonNull};
use clap::{builder::Str, Parser};
use libafl::{
corpus::{Corpus, InMemoryOnDiskCorpus, NopCorpus},
events::{EventRestarter, ManagerExit, SimpleRestartingEventManager},
events::{EventRestarter, SendExiting, SimpleRestartingEventManager},
executors::ExitKind,
feedbacks::MaxMapFeedback,
fuzzer::StdFuzzer,

View File

@ -9,7 +9,7 @@ use clap::{builder::Str, Parser};
use libafl::{
corpus::{Corpus, InMemoryCorpus},
events::{
launcher::Launcher, ClientDescription, EventConfig, LlmpRestartingEventManager, ManagerExit,
launcher::Launcher, ClientDescription, EventConfig, LlmpRestartingEventManager, SendExiting,
},
executors::ExitKind,
fuzzer::StdFuzzer,

View File

@ -18,27 +18,28 @@ use libafl_bolts::{
};
use libafl_bolts::{
llmp::{LlmpClient, LlmpClientDescription, Tag},
shmem::{NopShMem, NopShMemProvider, ShMem, ShMemProvider},
shmem::{ShMem, ShMemProvider},
tuples::{Handle, MatchNameRef},
ClientId,
};
use serde::{de::DeserializeOwned, Serialize};
use super::{CanSerializeObserver, ManagerExit, NopEventManager};
use super::AwaitRestartSafe;
#[cfg(feature = "llmp_compression")]
use crate::events::llmp::COMPRESS_THRESHOLD;
use crate::{
common::HasMetadata,
events::{
serialize_observers_adaptive, std_maybe_report_progress, std_report_progress,
AdaptiveSerializer, Event, EventConfig, EventFirer, EventManagerHooksTuple, EventManagerId,
EventProcessor, EventRestarter, HasEventManagerId, LogSeverity, ProgressReporter,
AdaptiveSerializer, CanSerializeObserver, Event, EventConfig, EventFirer,
EventManagerHooksTuple, EventManagerId, EventProcessor, EventRestarter, HasEventManagerId,
LogSeverity, ProgressReporter, SendExiting,
},
executors::HasObservers,
fuzzer::{EvaluatorObservers, ExecutionProcessor},
inputs::{Input, NopInput},
inputs::Input,
observers::TimeObserver,
state::{HasExecutions, HasLastReportTime, MaybeHasClientPerfMonitor, NopState, Stoppable},
state::{HasExecutions, HasLastReportTime, MaybeHasClientPerfMonitor, Stoppable},
Error,
};
@ -58,16 +59,7 @@ pub struct CentralizedEventManager<EM, EMH, I, S, SHM, SP> {
phantom: PhantomData<(I, S)>,
}
impl
CentralizedEventManager<
NopEventManager,
(),
NopInput,
NopState<NopInput>,
NopShMem,
NopShMemProvider,
>
{
impl CentralizedEventManager<(), (), (), (), (), ()> {
/// Creates a builder for [`CentralizedEventManager`]
#[must_use]
pub fn builder() -> CentralizedEventManagerBuilder {
@ -291,7 +283,7 @@ impl<EM, EMH, I, OT, S, SHM, SP> CanSerializeObserver<OT>
for CentralizedEventManager<EM, EMH, I, S, SHM, SP>
where
EM: AdaptiveSerializer,
OT: Serialize + MatchNameRef,
OT: MatchNameRef + Serialize,
{
fn serialize_observers(&mut self, observers: &OT) -> Result<Option<Vec<u8>>, Error> {
serialize_observers_adaptive::<EM, OT>(
@ -303,9 +295,9 @@ where
}
}
impl<EM, EMH, I, S, SHM, SP> ManagerExit for CentralizedEventManager<EM, EMH, I, S, SHM, SP>
impl<EM, EMH, I, S, SHM, SP> SendExiting for CentralizedEventManager<EM, EMH, I, S, SHM, SP>
where
EM: ManagerExit,
EM: SendExiting,
SHM: ShMem,
SP: ShMemProvider<ShMem = SHM>,
{
@ -313,7 +305,13 @@ where
self.client.sender_mut().send_exiting()?;
self.inner.send_exiting()
}
}
impl<EM, EMH, I, S, SHM, SP> AwaitRestartSafe for CentralizedEventManager<EM, EMH, I, S, SHM, SP>
where
SHM: ShMem,
EM: AwaitRestartSafe,
{
#[inline]
fn await_restart_safe(&mut self) {
self.client.await_safe_to_unmap_blocking();

View File

@ -18,7 +18,7 @@ use libafl_bolts::{
use libafl_bolts::{
current_time,
llmp::{LlmpClient, LlmpClientDescription, LLMP_FLAG_FROM_MM},
shmem::{NopShMem, NopShMemProvider, ShMem, ShMemProvider},
shmem::{NopShMem, ShMem, ShMemProvider},
tuples::Handle,
ClientId,
};
@ -38,18 +38,18 @@ use crate::events::{serialize_observers_adaptive, CanSerializeObserver};
use crate::{
events::{
llmp::{LLMP_TAG_EVENT_TO_BOTH, _LLMP_TAG_EVENT_TO_BROKER},
std_maybe_report_progress, std_on_restart, std_report_progress, AdaptiveSerializer, Event,
EventConfig, EventFirer, EventManagerHooksTuple, EventManagerId, EventProcessor,
EventRestarter, HasEventManagerId, ManagerExit, ProgressReporter,
std_maybe_report_progress, std_on_restart, std_report_progress, AdaptiveSerializer,
AwaitRestartSafe, Event, EventConfig, EventFirer, EventManagerHooksTuple, EventManagerId,
EventProcessor, EventRestarter, HasEventManagerId, ProgressReporter, SendExiting,
},
executors::HasObservers,
fuzzer::{EvaluatorObservers, ExecutionProcessor},
inputs::{Input, NopInput},
inputs::Input,
observers::TimeObserver,
stages::HasCurrentStageId,
state::{
HasCurrentTestcase, HasExecutions, HasImported, HasLastReportTime, HasSolutions,
MaybeHasClientPerfMonitor, NopState, Stoppable,
MaybeHasClientPerfMonitor, Stoppable,
},
Error, HasMetadata,
};
@ -62,7 +62,6 @@ const INITIAL_EVENT_BUFFER_SIZE: usize = 1024 * 4;
pub struct LlmpEventManager<EMH, I, S, SHM, SP>
where
SHM: ShMem,
SP: ShMemProvider<ShMem = SHM>,
{
/// We only send 1 testcase for every `throttle` second
pub(crate) throttle: Option<Duration>,
@ -82,11 +81,11 @@ where
serializations_cnt: usize,
should_serialize_cnt: usize,
pub(crate) time_ref: Option<Handle<TimeObserver>>,
phantom: PhantomData<(I, S)>,
event_buffer: Vec<u8>,
phantom: PhantomData<(I, S)>,
}
impl LlmpEventManager<(), NopState<NopInput>, NopInput, NopShMem, NopShMemProvider> {
impl LlmpEventManager<(), (), (), NopShMem, ()> {
/// Creates a builder for [`LlmpEventManager`]
#[must_use]
pub fn builder() -> LlmpEventManagerBuilder<()> {
@ -143,7 +142,6 @@ impl<EMH> LlmpEventManagerBuilder<EMH> {
) -> Result<LlmpEventManager<EMH, I, S, SHM, SP>, Error>
where
SHM: ShMem,
SP: ShMemProvider<ShMem = SHM>,
{
Ok(LlmpEventManager {
throttle: self.throttle,
@ -158,8 +156,8 @@ impl<EMH> LlmpEventManagerBuilder<EMH> {
serializations_cnt: 0,
should_serialize_cnt: 0,
time_ref,
phantom: PhantomData,
event_buffer: Vec::with_capacity(INITIAL_EVENT_BUFFER_SIZE),
phantom: PhantomData,
})
}
@ -217,11 +215,10 @@ impl<EMH> LlmpEventManagerBuilder<EMH> {
}
#[cfg(feature = "std")]
impl<EMH, I, OT, S, SHM, SP> CanSerializeObserver<OT> for LlmpEventManager<EMH, I, S, SHM, SP>
impl<EMH, I, S, OT, SHM, SP> CanSerializeObserver<OT> for LlmpEventManager<EMH, I, S, SHM, SP>
where
OT: Serialize + MatchNameRef,
OT: MatchNameRef + Serialize,
SHM: ShMem,
SP: ShMemProvider<ShMem = SHM>,
{
fn serialize_observers(&mut self, observers: &OT) -> Result<Option<Vec<u8>>, Error> {
serialize_observers_adaptive::<Self, OT>(self, observers, 2, 80)
@ -231,7 +228,6 @@ where
impl<EMH, I, S, SHM, SP> AdaptiveSerializer for LlmpEventManager<EMH, I, S, SHM, SP>
where
SHM: ShMem,
SP: ShMemProvider<ShMem = SHM>,
{
fn serialization_time(&self) -> Duration {
self.serialization_time
@ -267,7 +263,7 @@ where
impl<EMH, I, S, SHM, SP> Debug for LlmpEventManager<EMH, I, S, SHM, SP>
where
SHM: ShMem,
SP: ShMemProvider<ShMem = SHM>,
SP: Debug,
{
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
let mut debug_struct = f.debug_struct("LlmpEventManager");
@ -277,7 +273,6 @@ where
let debug = debug.field("compressor", &self.compressor);
debug
.field("configuration", &self.configuration)
.field("phantom", &self.phantom)
.finish_non_exhaustive()
}
}
@ -285,7 +280,6 @@ where
impl<EMH, I, S, SHM, SP> Drop for LlmpEventManager<EMH, I, S, SHM, SP>
where
SHM: ShMem,
SP: ShMemProvider<ShMem = SHM>,
{
/// LLMP clients will have to wait until their pages are mapped by somebody.
fn drop(&mut self) {
@ -296,7 +290,6 @@ where
impl<EMH, I, S, SHM, SP> LlmpEventManager<EMH, I, S, SHM, SP>
where
SHM: ShMem,
SP: ShMemProvider<ShMem = SHM>,
{
/// Calling this function will tell the llmp broker that this client is exiting
/// This should be called from the restarter not from the actual fuzzer client
@ -330,7 +323,12 @@ where
log::debug!("Asking he broker to be disconnected");
Ok(())
}
}
impl<EMH, I, S, SHM, SP> LlmpEventManager<EMH, I, S, SHM, SP>
where
SHM: ShMem,
{
/// Describe the client event manager's LLMP parts in a restorable fashion
pub fn describe(&self) -> Result<LlmpClientDescription, Error> {
self.llmp.describe()
@ -347,7 +345,6 @@ where
impl<EMH, I, S, SHM, SP> LlmpEventManager<EMH, I, S, SHM, SP>
where
SHM: ShMem,
SP: ShMemProvider<ShMem = SHM>,
{
// Handle arriving events in the client
fn handle_in_client<E, Z>(
@ -516,23 +513,26 @@ impl<EMH, I, S, SHM, SP> EventRestarter<S> for LlmpEventManager<EMH, I, S, SHM,
where
S: HasCurrentStageId,
SHM: ShMem,
SP: ShMemProvider<ShMem = SHM>,
{
fn on_restart(&mut self, state: &mut S) -> Result<(), Error> {
std_on_restart(self, state)
}
}
impl<EMH, I, S, SHM, SP> ManagerExit for LlmpEventManager<EMH, I, S, SHM, SP>
impl<EMH, I, S, SHM, SP> SendExiting for LlmpEventManager<EMH, I, S, SHM, SP>
where
SHM: ShMem,
SHM: ShMem,
SP: ShMemProvider<ShMem = SHM>,
{
fn send_exiting(&mut self) -> Result<(), Error> {
self.llmp.sender_mut().send_exiting()
}
}
impl<EMH, I, S, SHM, SP> AwaitRestartSafe for LlmpEventManager<EMH, I, S, SHM, SP>
where
SHM: ShMem,
{
/// The LLMP client needs to wait until a broker has mapped all pages before shutting down.
/// Otherwise, the OS may already have removed the shared maps.
fn await_restart_safe(&mut self) {
@ -621,7 +621,6 @@ where
impl<EMH, I, S, SHM, SP> HasEventManagerId for LlmpEventManager<EMH, I, S, SHM, SP>
where
SHM: ShMem,
SP: ShMemProvider<ShMem = SHM>,
{
/// Gets the id assigned to this staterestorer.
fn mgr_id(&self) -> EventManagerId {

View File

@ -35,10 +35,10 @@ use crate::{
common::HasMetadata,
events::{
launcher::ClientDescription, serialize_observers_adaptive, std_maybe_report_progress,
std_report_progress, AdaptiveSerializer, CanSerializeObserver, Event, EventConfig,
EventFirer, EventManagerHooksTuple, EventManagerId, EventProcessor, EventRestarter,
HasEventManagerId, LlmpEventManager, LlmpShouldSaveState, ManagerExit, ProgressReporter,
StdLlmpEventHook,
std_report_progress, AdaptiveSerializer, AwaitRestartSafe, CanSerializeObserver, Event,
EventConfig, EventFirer, EventManagerHooksTuple, EventManagerId, EventProcessor,
EventRestarter, HasEventManagerId, LlmpEventManager, LlmpShouldSaveState, ProgressReporter,
SendExiting, StdLlmpEventHook,
},
executors::HasObservers,
fuzzer::{EvaluatorObservers, ExecutionProcessor},
@ -58,7 +58,6 @@ use crate::{
pub struct LlmpRestartingEventManager<EMH, I, S, SHM, SP>
where
SHM: ShMem,
SP: ShMemProvider<ShMem = SHM>,
{
/// The embedded LLMP event manager
llmp_mgr: LlmpEventManager<EMH, I, S, SHM, SP>,
@ -71,7 +70,6 @@ where
impl<EMH, I, S, SHM, SP> AdaptiveSerializer for LlmpRestartingEventManager<EMH, I, S, SHM, SP>
where
SHM: ShMem,
SP: ShMemProvider<ShMem = SHM>,
{
fn serialization_time(&self) -> Duration {
self.llmp_mgr.serialization_time()
@ -151,9 +149,8 @@ where
impl<EMH, I, OT, S, SHM, SP> CanSerializeObserver<OT>
for LlmpRestartingEventManager<EMH, I, S, SHM, SP>
where
OT: Serialize + MatchNameRef,
OT: MatchNameRef + Serialize,
SHM: ShMem,
SP: ShMemProvider<ShMem = SHM>,
{
fn serialize_observers(&mut self, observers: &OT) -> Result<Option<Vec<u8>>, Error> {
serialize_observers_adaptive::<Self, OT>(self, observers, 2, 80)
@ -187,7 +184,7 @@ where
}
}
impl<EMH, I, S, SHM, SP> ManagerExit for LlmpRestartingEventManager<EMH, I, S, SHM, SP>
impl<EMH, I, S, SHM, SP> SendExiting for LlmpRestartingEventManager<EMH, I, S, SHM, SP>
where
SHM: ShMem,
SP: ShMemProvider<ShMem = SHM>,
@ -198,7 +195,12 @@ where
// This way, the broker can clean up the pages, and eventually exit.
self.llmp_mgr.send_exiting()
}
}
impl<EMH, I, S, SHM, SP> AwaitRestartSafe for LlmpRestartingEventManager<EMH, I, S, SHM, SP>
where
SHM: ShMem,
{
/// The llmp client needs to wait until a broker mapped all pages, before shutting down.
/// Otherwise, the OS may already have removed the shared maps,
#[inline]
@ -331,9 +333,9 @@ pub fn setup_restarting_mgr_std<I, MT, S>(
Error,
>
where
I: DeserializeOwned,
MT: Monitor + Clone,
S: Serialize + DeserializeOwned,
I: DeserializeOwned,
{
RestartingMgr::builder()
.shmem_provider(StdShMemProvider::new()?)

View File

@ -564,11 +564,9 @@ pub trait EventRestarter<S> {
/// Default implementation of [`EventRestarter::on_restart`] for implementors with the given
/// constraints
pub fn std_on_restart<S>(
restarter: &mut (impl EventRestarter<S> + ManagerExit),
state: &mut S,
) -> Result<(), Error>
pub fn std_on_restart<EM, S>(restarter: &mut EM, state: &mut S) -> Result<(), Error>
where
EM: EventRestarter<S> + AwaitRestartSafe,
S: HasCurrentStageId,
{
state.on_restart()?;
@ -582,11 +580,15 @@ pub trait CanSerializeObserver<OT> {
fn serialize_observers(&mut self, observers: &OT) -> Result<Option<Vec<u8>>, Error>;
}
/// Routines called before exiting
pub trait ManagerExit {
/// Send that we're about to exit
pub trait SendExiting {
/// Send information that this client is exiting.
/// No need to restart us any longer, and no need to print an error, either.
fn send_exiting(&mut self) -> Result<(), Error>;
}
/// Wait until it's safe to restart
pub trait AwaitRestartSafe {
/// Block until we are safe to exit, usually called inside `on_restart`.
fn await_restart_safe(&mut self);
}
@ -640,12 +642,15 @@ where
}
}
impl ManagerExit for NopEventManager {
impl SendExiting for NopEventManager {
/// Send information that this client is exiting.
/// No need to restart us any longer, and no need to print an error, either.
fn send_exiting(&mut self) -> Result<(), Error> {
Ok(())
}
}
impl AwaitRestartSafe for NopEventManager {
/// Block until we are safe to exit, usually called inside `on_restart`.
fn await_restart_safe(&mut self) {}
}
@ -761,15 +766,20 @@ where
}
}
impl<EM, M> ManagerExit for MonitorTypedEventManager<EM, M>
impl<EM, M> SendExiting for MonitorTypedEventManager<EM, M>
where
EM: ManagerExit,
EM: SendExiting,
{
#[inline]
fn send_exiting(&mut self) -> Result<(), Error> {
self.inner.send_exiting()
}
}
impl<EM, M> AwaitRestartSafe for MonitorTypedEventManager<EM, M>
where
EM: AwaitRestartSafe,
{
#[inline]
fn await_restart_safe(&mut self) {
self.inner.await_restart_safe();

View File

@ -22,14 +22,14 @@ use libafl_bolts::{
use serde::de::DeserializeOwned;
use serde::Serialize;
use super::{std_on_restart, ProgressReporter};
use super::{std_on_restart, AwaitRestartSafe, ProgressReporter};
#[cfg(all(unix, feature = "std", not(miri)))]
use crate::events::EVENTMGR_SIGHANDLER_STATE;
use crate::{
events::{
std_maybe_report_progress, std_report_progress, BrokerEventResult, CanSerializeObserver,
Event, EventFirer, EventManagerId, EventProcessor, EventRestarter, HasEventManagerId,
ManagerExit,
SendExiting,
},
monitors::Monitor,
stages::HasCurrentStageId,
@ -90,11 +90,13 @@ where
}
}
impl<I, MT, S> ManagerExit for SimpleEventManager<I, MT, S> {
impl<I, MT, S> SendExiting for SimpleEventManager<I, MT, S> {
fn send_exiting(&mut self) -> Result<(), Error> {
Ok(())
}
}
impl<I, MT, S> AwaitRestartSafe for SimpleEventManager<I, MT, S> {
fn await_restart_safe(&mut self) {}
}
@ -341,7 +343,7 @@ where
}
#[cfg(feature = "std")]
impl<I, MT, S, SHM, SP> ManagerExit for SimpleRestartingEventManager<I, MT, S, SHM, SP>
impl<I, MT, S, SHM, SP> SendExiting for SimpleRestartingEventManager<I, MT, S, SHM, SP>
where
SHM: ShMem,
SP: ShMemProvider<ShMem = SHM>,
@ -350,6 +352,10 @@ where
self.staterestorer.send_exiting();
Ok(())
}
}
#[cfg(feature = "std")]
impl<I, MT, S, SHM, SP> AwaitRestartSafe for SimpleRestartingEventManager<I, MT, S, SHM, SP> {
/// Block until we are safe to exit, usually called inside `on_restart`.
#[inline]
fn await_restart_safe(&mut self) {}

View File

@ -38,7 +38,7 @@ use tokio::{
};
use typed_builder::TypedBuilder;
use super::{std_maybe_report_progress, std_report_progress, ManagerExit};
use super::{std_maybe_report_progress, std_report_progress, AwaitRestartSafe, SendExiting};
#[cfg(feature = "share_objectives")]
use crate::corpus::{Corpus, Testcase};
#[cfg(all(unix, not(miri)))]
@ -776,14 +776,16 @@ where
}
}
impl<EMH, I, S> ManagerExit for TcpEventManager<EMH, I, S> {
impl<EMH, I, S> AwaitRestartSafe for TcpEventManager<EMH, I, S> {
/// The TCP client needs to wait until a broker has mapped all pages before shutting down.
/// Otherwise, the OS may already have removed the shared maps.
fn await_restart_safe(&mut self) {
// wait until we can drop the message safely.
//self.tcp.await_safe_to_unmap_blocking();
}
}
impl<EMH, I, S> SendExiting for TcpEventManager<EMH, I, S> {
fn send_exiting(&mut self) -> Result<(), Error> {
//TODO: Should not be needed since TCP does that for us
//self.tcp.sender.send_exiting()
@ -866,7 +868,7 @@ where
}
}
impl<EMH, I, S, SHM, SP> ManagerExit for TcpRestartingEventManager<EMH, I, S, SHM, SP>
impl<EMH, I, S, SHM, SP> SendExiting for TcpRestartingEventManager<EMH, I, S, SHM, SP>
where
SHM: ShMem,
SP: ShMemProvider<ShMem = SHM>,
@ -877,7 +879,12 @@ where
// This way, the broker can clean up the pages, and eventually exit.
self.tcp_mgr.send_exiting()
}
}
impl<EMH, I, S, SHM, SP> AwaitRestartSafe for TcpRestartingEventManager<EMH, I, S, SHM, SP>
where
SHM: ShMem,
{
/// The tcp client needs to wait until a broker mapped all pages, before shutting down.
/// Otherwise, the OS may already have removed the shared maps,
#[inline]

View File

@ -950,47 +950,6 @@ where
})
}
/// ID of this sender.
#[must_use]
pub fn id(&self) -> ClientId {
self.id
}
/// Completely reset the current sender map.
/// Afterwards, no receiver should read from it at a different location.
/// This is only useful if all connected llmp parties start over, for example after a crash.
///
/// # Safety
/// Only safe if you really really restart the page on everything connected
/// No receiver should read from this page at a different location.
pub unsafe fn reset(&mut self) {
llmp_page_init(
&mut self.out_shmems.last_mut().unwrap().shmem,
self.id,
true,
);
self.last_msg_sent = ptr::null_mut();
}
/// Reads the stored sender / client id for the given `env_name` (by appending `_CLIENT_ID`).
/// If the content of the env is `_NULL`, returns [`Option::None`].
#[cfg(feature = "std")]
#[inline]
fn client_id_from_env(env_name: &str) -> Result<Option<ClientId>, Error> {
let client_id_str = env::var(format!("{env_name}_CLIENT_ID"))?;
Ok(if client_id_str == _NULL_ENV_STR {
None
} else {
Some(ClientId(client_id_str.parse()?))
})
}
/// Writes the `id` to an env var
#[cfg(feature = "std")]
fn client_id_to_env(env_name: &str, id: ClientId) {
env::set_var(format!("{env_name}_CLIENT_ID"), format!("{}", id.0));
}
/// Reattach to a vacant `out_shmem`, to with a previous sender stored the information in an env before.
#[cfg(feature = "std")]
pub fn on_existing_from_env(mut shmem_provider: SP, env_name: &str) -> Result<Self, Error> {
@ -1010,56 +969,6 @@ where
Ok(ret)
}
/// Store the info to this sender to env.
/// A new client can reattach to it using [`LlmpSender::on_existing_from_env()`].
#[cfg(feature = "std")]
pub fn to_env(&self, env_name: &str) -> Result<(), Error> {
let current_out_shmem = self.out_shmems.last().unwrap();
current_out_shmem.shmem.write_to_env(env_name)?;
Self::client_id_to_env(env_name, self.id);
unsafe { current_out_shmem.msg_to_env(self.last_msg_sent, env_name) }
}
/// Waits for this sender to be save to unmap.
/// If a receiver is involved, this function should always be called.
pub fn await_safe_to_unmap_blocking(&self) {
#[cfg(feature = "std")]
let mut ctr = 0_u16;
loop {
if self.safe_to_unmap() {
return;
}
hint::spin_loop();
// We log that we're looping -> see when we're blocking.
#[cfg(feature = "std")]
{
ctr = ctr.wrapping_add(1);
if ctr == 0 {
log::info!("Awaiting safe_to_unmap_blocking");
}
}
}
}
/// If we are allowed to unmap this client
pub fn safe_to_unmap(&self) -> bool {
let current_out_shmem = self.out_shmems.last().unwrap();
unsafe {
// log::info!("Reading safe_to_unmap from {:?}", current_out_shmem.page() as *const _);
(*current_out_shmem.page())
.receivers_joined_count
.load(Ordering::Relaxed)
>= 1
}
}
/// For debug purposes: Mark save to unmap, even though it might not have been read by a receiver yet.
/// # Safety
/// If this method is called, the page may be unmapped before it is read by any receiver.
pub unsafe fn mark_safe_to_unmap(&mut self) {
(*self.out_shmems.last_mut().unwrap().page_mut()).receiver_joined();
}
/// Reattach to a vacant `out_shmem`.
/// It is essential, that the receiver (or someone else) keeps a pointer to this map
/// else reattach will get a new, empty page, from the OS, or fail.
@ -1135,169 +1044,6 @@ where
}
}
/// Intern: Special allocation function for `EOP` messages (and nothing else!)
/// The normal alloc will fail if there is not enough space for `buf_len_padded + EOP`
/// So if [`alloc_next`] fails, create new page if necessary, use this function,
/// place `EOP`, commit `EOP`, reset, alloc again on the new space.
unsafe fn alloc_eop(&mut self) -> Result<*mut LlmpMsg, Error> {
let map = self.out_shmems.last_mut().unwrap();
let page = map.page_mut();
let last_msg = self.last_msg_sent;
assert!((*page).size_used + EOP_MSG_SIZE <= (*page).size_total,
"PROGRAM ABORT : BUG: EOP does not fit in page! page {page:?}, size_current {:?}, size_total {:?}",
&raw const (*page).size_used, &raw const (*page).size_total);
let ret: *mut LlmpMsg = if last_msg.is_null() {
(*page).messages.as_mut_ptr()
} else {
llmp_next_msg_ptr_checked(map, last_msg, EOP_MSG_SIZE)?
};
assert!(
(*ret).tag != LLMP_TAG_UNINITIALIZED,
"Did not call send() on last message!"
);
(*ret).buf_len = size_of::<LlmpPayloadSharedMapInfo>() as u64;
// We don't need to pad the EOP message: it'll always be the last in this page.
(*ret).buf_len_padded = (*ret).buf_len;
(*ret).message_id = if last_msg.is_null() {
MessageId(1)
} else {
MessageId((*last_msg).message_id.0 + 1)
};
(*ret).tag = LLMP_TAG_END_OF_PAGE;
(*page).size_used += EOP_MSG_SIZE;
Ok(ret)
}
/// Intern: Will return a ptr to the next msg buf, or None if map is full.
/// Never call [`alloc_next`] without either sending or cancelling the last allocated message for this page!
/// There can only ever be up to one message allocated per page at each given time.
unsafe fn alloc_next_if_space(&mut self, buf_len: usize) -> Option<*mut LlmpMsg> {
let map = self.out_shmems.last_mut().unwrap();
let page = map.page_mut();
let last_msg = self.last_msg_sent;
assert!(
!self.has_unsent_message,
"Called alloc without calling send inbetween"
);
#[cfg(feature = "llmp_debug")]
log::info!(
"Allocating {} bytes on page {:?} / map {:?} (last msg: {:?})",
buf_len,
page,
&map.shmem.id().as_str(),
last_msg
);
let msg_start = (*page).messages.as_mut_ptr() as usize + (*page).size_used;
// Make sure the end of our msg is aligned.
let buf_len_padded = llmp_align(msg_start + buf_len + size_of::<LlmpMsg>())
- msg_start
- size_of::<LlmpMsg>();
#[cfg(feature = "llmp_debug")]
log::trace!(
"{page:?} {:?} size_used={:x} buf_len_padded={:x} EOP_MSG_SIZE={:x} size_total={}",
&(*page),
(*page).size_used,
buf_len_padded,
EOP_MSG_SIZE,
(*page).size_total
);
// For future allocs, keep track of the maximum (aligned) alloc size we used
(*page).max_alloc_size = max(
(*page).max_alloc_size,
size_of::<LlmpMsg>() + buf_len_padded,
);
// We need enough space for the current page size_used + payload + padding
if (*page).size_used + size_of::<LlmpMsg>() + buf_len_padded + EOP_MSG_SIZE
> (*page).size_total
{
#[cfg(feature = "llmp_debug")]
log::info!("LLMP: Page full.");
/* We're full. */
return None;
}
let ret = msg_start as *mut LlmpMsg;
/* We need to start with 1 for ids, as current message id is initialized
* with 0... */
(*ret).message_id = if last_msg.is_null() {
MessageId(1)
} else if (*page).current_msg_id.load(Ordering::Relaxed) == (*last_msg).message_id.0 {
MessageId((*last_msg).message_id.0 + 1)
} else {
/* Oops, wrong usage! */
panic!("BUG: The current message never got committed using send! (page->current_msg_id {:?}, last_msg->message_id: {:?})", &raw const (*page).current_msg_id, (*last_msg).message_id);
};
(*ret).buf_len = buf_len as u64;
(*ret).buf_len_padded = buf_len_padded as u64;
(*page).size_used += size_of::<LlmpMsg>() + buf_len_padded;
(*llmp_next_msg_ptr(ret)).tag = LLMP_TAG_UNSET;
(*ret).tag = LLMP_TAG_UNINITIALIZED;
self.has_unsent_message = true;
Some(ret)
}
/// Commit the message last allocated by [`alloc_next`] to the queue.
/// After commiting, the msg shall no longer be altered!
/// It will be read by the consuming threads (`broker->clients` or `client->broker`)
/// If `overwrite_client_id` is `false`, the message's `sender` won't be touched (for broker forwarding)
#[inline(never)] // Not inlined to make cpu-level reodering (hopefully?) improbable
unsafe fn send(&mut self, msg: *mut LlmpMsg, overwrite_client_id: bool) -> Result<(), Error> {
// log::info!("Sending msg {:?}", msg);
assert!(self.last_msg_sent != msg, "Message sent twice!");
assert!(
(*msg).tag != LLMP_TAG_UNSET,
"No tag set on message with id {:?}",
(*msg).message_id
);
// A client gets the sender id assigned to by the broker during the initial handshake.
if overwrite_client_id {
(*msg).sender = self.id;
}
let page = self.out_shmems.last_mut().unwrap().page_mut();
if msg.is_null() || !llmp_msg_in_page(page, msg) {
return Err(Error::unknown(format!(
"Llmp Message {msg:?} is null or not in current page"
)));
}
let mid = (*page).current_msg_id.load(Ordering::Relaxed) + 1;
(*msg).message_id.0 = mid;
// Make sure all things have been written to the page, and commit the message to the page
(*page)
.current_msg_id
.store((*msg).message_id.0, Ordering::Release);
self.last_msg_sent = msg;
self.has_unsent_message = false;
log::debug!(
"[{} - {:#x}] Send message with id {}",
self.id.0,
ptr::from_ref::<Self>(self) as u64,
mid
);
Ok(())
}
/// Grab an unused `LlmpSharedMap` from `unused_shmem_cache` or allocate a new map,
/// if no suitable maps could be found.
unsafe fn new_or_unused_shmem(
@ -1546,20 +1292,6 @@ where
}
}
/// Describe this [`LlmpClient`] in a way that it can be restored later, using [`Self::on_existing_from_description`].
pub fn describe(&self) -> Result<LlmpDescription, Error> {
let map = self.out_shmems.last().unwrap();
let last_message_offset = if self.last_msg_sent.is_null() {
None
} else {
Some(unsafe { map.msg_to_offset(self.last_msg_sent) }?)
};
Ok(LlmpDescription {
shmem: map.shmem.description(),
last_message_offset,
})
}
/// Create this client on an existing map from the given description.
/// Acquired with [`self.describe`].
pub fn on_existing_from_description(
@ -1581,6 +1313,279 @@ where
}
}
impl<SHM, SP> LlmpSender<SHM, SP>
where
SHM: ShMem,
{
/// ID of this sender.
#[must_use]
pub fn id(&self) -> ClientId {
self.id
}
/// Completely reset the current sender map.
/// Afterwards, no receiver should read from it at a different location.
/// This is only useful if all connected llmp parties start over, for example after a crash.
///
/// # Safety
/// Only safe if you really really restart the page on everything connected
/// No receiver should read from this page at a different location.
pub unsafe fn reset(&mut self) {
llmp_page_init(
&mut self.out_shmems.last_mut().unwrap().shmem,
self.id,
true,
);
self.last_msg_sent = ptr::null_mut();
}
/// Reads the stored sender / client id for the given `env_name` (by appending `_CLIENT_ID`).
/// If the content of the env is `_NULL`, returns [`Option::None`].
#[cfg(feature = "std")]
#[inline]
fn client_id_from_env(env_name: &str) -> Result<Option<ClientId>, Error> {
let client_id_str = env::var(format!("{env_name}_CLIENT_ID"))?;
Ok(if client_id_str == _NULL_ENV_STR {
None
} else {
Some(ClientId(client_id_str.parse()?))
})
}
/// Writes the `id` to an env var
#[cfg(feature = "std")]
fn client_id_to_env(env_name: &str, id: ClientId) {
env::set_var(format!("{env_name}_CLIENT_ID"), format!("{}", id.0));
}
/// Store the info to this sender to env.
/// A new client can reattach to it using [`LlmpSender::on_existing_from_env()`].
#[cfg(feature = "std")]
pub fn to_env(&self, env_name: &str) -> Result<(), Error> {
let current_out_shmem = self.out_shmems.last().unwrap();
current_out_shmem.shmem.write_to_env(env_name)?;
Self::client_id_to_env(env_name, self.id);
unsafe { current_out_shmem.msg_to_env(self.last_msg_sent, env_name) }
}
/// Waits for this sender to be save to unmap.
/// If a receiver is involved, this function should always be called.
pub fn await_safe_to_unmap_blocking(&self) {
#[cfg(feature = "std")]
let mut ctr = 0_u16;
loop {
if self.safe_to_unmap() {
return;
}
hint::spin_loop();
// We log that we're looping -> see when we're blocking.
#[cfg(feature = "std")]
{
ctr = ctr.wrapping_add(1);
if ctr == 0 {
log::info!("Awaiting safe_to_unmap_blocking");
}
}
}
}
/// If we are allowed to unmap this client
pub fn safe_to_unmap(&self) -> bool {
let current_out_shmem = self.out_shmems.last().unwrap();
unsafe {
// log::info!("Reading safe_to_unmap from {:?}", current_out_shmem.page() as *const _);
(*current_out_shmem.page())
.receivers_joined_count
.load(Ordering::Relaxed)
>= 1
}
}
/// For debug purposes: Mark save to unmap, even though it might not have been read by a receiver yet.
/// # Safety
/// If this method is called, the page may be unmapped before it is read by any receiver.
pub unsafe fn mark_safe_to_unmap(&mut self) {
(*self.out_shmems.last_mut().unwrap().page_mut()).receiver_joined();
}
/// Intern: Special allocation function for `EOP` messages (and nothing else!)
/// The normal alloc will fail if there is not enough space for `buf_len_padded + EOP`
/// So if [`alloc_next`] fails, create new page if necessary, use this function,
/// place `EOP`, commit `EOP`, reset, alloc again on the new space.
unsafe fn alloc_eop(&mut self) -> Result<*mut LlmpMsg, Error> {
let map = self.out_shmems.last_mut().unwrap();
let page = map.page_mut();
let last_msg = self.last_msg_sent;
assert!((*page).size_used + EOP_MSG_SIZE <= (*page).size_total,
"PROGRAM ABORT : BUG: EOP does not fit in page! page {page:?}, size_current {:?}, size_total {:?}",
&raw const (*page).size_used, &raw const (*page).size_total);
let ret: *mut LlmpMsg = if last_msg.is_null() {
(*page).messages.as_mut_ptr()
} else {
llmp_next_msg_ptr_checked(map, last_msg, EOP_MSG_SIZE)?
};
assert!(
(*ret).tag != LLMP_TAG_UNINITIALIZED,
"Did not call send() on last message!"
);
(*ret).buf_len = size_of::<LlmpPayloadSharedMapInfo>() as u64;
// We don't need to pad the EOP message: it'll always be the last in this page.
(*ret).buf_len_padded = (*ret).buf_len;
(*ret).message_id = if last_msg.is_null() {
MessageId(1)
} else {
MessageId((*last_msg).message_id.0 + 1)
};
(*ret).tag = LLMP_TAG_END_OF_PAGE;
(*page).size_used += EOP_MSG_SIZE;
Ok(ret)
}
/// Intern: Will return a ptr to the next msg buf, or None if map is full.
/// Never call [`alloc_next`] without either sending or cancelling the last allocated message for this page!
/// There can only ever be up to one message allocated per page at each given time.
unsafe fn alloc_next_if_space(&mut self, buf_len: usize) -> Option<*mut LlmpMsg> {
let map = self.out_shmems.last_mut().unwrap();
let page = map.page_mut();
let last_msg = self.last_msg_sent;
assert!(
!self.has_unsent_message,
"Called alloc without calling send inbetween"
);
#[cfg(feature = "llmp_debug")]
log::info!(
"Allocating {} bytes on page {:?} / map {:?} (last msg: {:?})",
buf_len,
page,
&map.shmem.id().as_str(),
last_msg
);
let msg_start = (*page).messages.as_mut_ptr() as usize + (*page).size_used;
// Make sure the end of our msg is aligned.
let buf_len_padded = llmp_align(msg_start + buf_len + size_of::<LlmpMsg>())
- msg_start
- size_of::<LlmpMsg>();
#[cfg(feature = "llmp_debug")]
log::trace!(
"{page:?} {:?} size_used={:x} buf_len_padded={:x} EOP_MSG_SIZE={:x} size_total={}",
&(*page),
(*page).size_used,
buf_len_padded,
EOP_MSG_SIZE,
(*page).size_total
);
// For future allocs, keep track of the maximum (aligned) alloc size we used
(*page).max_alloc_size = max(
(*page).max_alloc_size,
size_of::<LlmpMsg>() + buf_len_padded,
);
// We need enough space for the current page size_used + payload + padding
if (*page).size_used + size_of::<LlmpMsg>() + buf_len_padded + EOP_MSG_SIZE
> (*page).size_total
{
#[cfg(feature = "llmp_debug")]
log::info!("LLMP: Page full.");
/* We're full. */
return None;
}
let ret = msg_start as *mut LlmpMsg;
/* We need to start with 1 for ids, as current message id is initialized
* with 0... */
(*ret).message_id = if last_msg.is_null() {
MessageId(1)
} else if (*page).current_msg_id.load(Ordering::Relaxed) == (*last_msg).message_id.0 {
MessageId((*last_msg).message_id.0 + 1)
} else {
/* Oops, wrong usage! */
panic!("BUG: The current message never got committed using send! (page->current_msg_id {:?}, last_msg->message_id: {:?})", &raw const (*page).current_msg_id, (*last_msg).message_id);
};
(*ret).buf_len = buf_len as u64;
(*ret).buf_len_padded = buf_len_padded as u64;
(*page).size_used += size_of::<LlmpMsg>() + buf_len_padded;
(*llmp_next_msg_ptr(ret)).tag = LLMP_TAG_UNSET;
(*ret).tag = LLMP_TAG_UNINITIALIZED;
self.has_unsent_message = true;
Some(ret)
}
/// Commit the message last allocated by [`alloc_next`] to the queue.
/// After commiting, the msg shall no longer be altered!
/// It will be read by the consuming threads (`broker->clients` or `client->broker`)
/// If `overwrite_client_id` is `false`, the message's `sender` won't be touched (for broker forwarding)
#[inline(never)] // Not inlined to make cpu-level reodering (hopefully?) improbable
unsafe fn send(&mut self, msg: *mut LlmpMsg, overwrite_client_id: bool) -> Result<(), Error> {
// log::info!("Sending msg {:?}", msg);
assert!(self.last_msg_sent != msg, "Message sent twice!");
assert!(
(*msg).tag != LLMP_TAG_UNSET,
"No tag set on message with id {:?}",
(*msg).message_id
);
// A client gets the sender id assigned to by the broker during the initial handshake.
if overwrite_client_id {
(*msg).sender = self.id;
}
let page = self.out_shmems.last_mut().unwrap().page_mut();
if msg.is_null() || !llmp_msg_in_page(page, msg) {
return Err(Error::unknown(format!(
"Llmp Message {msg:?} is null or not in current page"
)));
}
let mid = (*page).current_msg_id.load(Ordering::Relaxed) + 1;
(*msg).message_id.0 = mid;
// Make sure all things have been written to the page, and commit the message to the page
(*page)
.current_msg_id
.store((*msg).message_id.0, Ordering::Release);
self.last_msg_sent = msg;
self.has_unsent_message = false;
log::debug!(
"[{} - {:#x}] Send message with id {}",
self.id.0,
ptr::from_ref::<Self>(self) as u64,
mid
);
Ok(())
}
/// Describe this [`LlmpClient`] in a way that it can be restored later, using [`Self::on_existing_from_description`].
pub fn describe(&self) -> Result<LlmpDescription, Error> {
let map = self.out_shmems.last().unwrap();
let last_message_offset = if self.last_msg_sent.is_null() {
None
} else {
Some(unsafe { map.msg_to_offset(self.last_msg_sent) }?)
};
Ok(LlmpDescription {
shmem: map.shmem.description(),
last_message_offset,
})
}
}
/// Receiving end on a (unidirectional) sharedmap channel
#[derive(Debug)]
pub struct LlmpReceiver<SHM, SP> {
@ -1615,15 +1620,6 @@ where
)
}
/// Store the info to this receiver to env.
/// A new client can reattach to it using [`LlmpReceiver::on_existing_from_env()`]
#[cfg(feature = "std")]
pub fn to_env(&self, env_name: &str) -> Result<(), Error> {
let current_out_shmem = &self.current_recv_shmem;
current_out_shmem.shmem.write_to_env(env_name)?;
unsafe { current_out_shmem.msg_to_env(self.last_msg_recvd, env_name) }
}
/// Create a Receiver, reattaching to an existing sender map.
/// It is essential, that the sender (or someone else) keeps a pointer to the `sender_shmem`
/// else reattach will get a new, empty page, from the OS, or fail.
@ -1863,6 +1859,33 @@ where
}
}
/// Create this client on an existing map from the given description. acquired with `self.describe`
pub fn on_existing_from_description(
mut shmem_provider: SP,
description: &LlmpDescription,
) -> Result<Self, Error> {
Self::on_existing_shmem(
shmem_provider.clone(),
shmem_provider.shmem_from_description(description.shmem)?,
description.last_message_offset,
)
}
}
/// Receiving end of an llmp channel
impl<SHM, SP> LlmpReceiver<SHM, SP>
where
SHM: ShMem,
{
/// Store the info to this receiver to env.
/// A new client can reattach to it using [`LlmpReceiver::on_existing_from_env()`]
#[cfg(feature = "std")]
pub fn to_env(&self, env_name: &str) -> Result<(), Error> {
let current_out_shmem = &self.current_recv_shmem;
current_out_shmem.shmem.write_to_env(env_name)?;
unsafe { current_out_shmem.msg_to_env(self.last_msg_recvd, env_name) }
}
/// Describe this client in a way, that it can be restored later with [`Self::on_existing_from_description`]
pub fn describe(&self) -> Result<LlmpDescription, Error> {
let map = &self.current_recv_shmem;
@ -1876,18 +1899,6 @@ where
last_message_offset,
})
}
/// Create this client on an existing map from the given description. acquired with `self.describe`
pub fn on_existing_from_description(
mut shmem_provider: SP,
description: &LlmpDescription,
) -> Result<Self, Error> {
Self::on_existing_shmem(
shmem_provider.clone(),
shmem_provider.shmem_from_description(description.shmem)?,
description.last_message_offset,
)
}
}
/// A page wrapper
@ -3441,124 +3452,6 @@ where
SHM: ShMem,
SP: ShMemProvider<ShMem = SHM>,
{
/// Reattach to a vacant client map.
/// It is essential, that the broker (or someone else) kept a pointer to the `out_shmem`
/// else reattach will get a new, empty page, from the OS, or fail
#[allow(clippy::needless_pass_by_value)] // no longer necessary on nightly
pub fn on_existing_shmem(
shmem_provider: SP,
_current_out_shmem: SHM,
_last_msg_sent_offset: Option<u64>,
current_broker_shmem: SHM,
last_msg_recvd_offset: Option<u64>,
) -> Result<Self, Error> {
Ok(Self {
receiver: LlmpReceiver::on_existing_shmem(
shmem_provider.clone(),
current_broker_shmem.clone(),
last_msg_recvd_offset,
)?,
sender: LlmpSender::on_existing_shmem(
shmem_provider,
current_broker_shmem,
last_msg_recvd_offset,
)?,
})
}
/// Recreate this client from a previous [`client.to_env()`]
#[cfg(feature = "std")]
pub fn on_existing_from_env(shmem_provider: SP, env_name: &str) -> Result<Self, Error> {
Ok(Self {
sender: LlmpSender::on_existing_from_env(
shmem_provider.clone(),
&format!("{env_name}_SENDER"),
)?,
receiver: LlmpReceiver::on_existing_from_env(
shmem_provider,
&format!("{env_name}_RECEIVER"),
)?,
})
}
/// Write the current state to env.
/// A new client can attach to exactly the same state by calling [`LlmpClient::on_existing_shmem()`].
#[cfg(feature = "std")]
pub fn to_env(&self, env_name: &str) -> Result<(), Error> {
self.sender.to_env(&format!("{env_name}_SENDER"))?;
self.receiver.to_env(&format!("{env_name}_RECEIVER"))
}
/// Describe this client in a way that it can be recreated, for example after crash
pub fn describe(&self) -> Result<LlmpClientDescription, Error> {
Ok(LlmpClientDescription {
sender: self.sender.describe()?,
receiver: self.receiver.describe()?,
})
}
/// Create an existing client from description
pub fn existing_client_from_description(
shmem_provider: SP,
description: &LlmpClientDescription,
) -> Result<Self, Error> {
Ok(Self {
sender: LlmpSender::on_existing_from_description(
shmem_provider.clone(),
&description.sender,
)?,
receiver: LlmpReceiver::on_existing_from_description(
shmem_provider,
&description.receiver,
)?,
})
}
/// Outgoing channel to the broker
#[must_use]
pub fn sender(&self) -> &LlmpSender<SHM, SP> {
&self.sender
}
/// Outgoing channel to the broker (mut)
#[must_use]
pub fn sender_mut(&mut self) -> &mut LlmpSender<SHM, SP> {
&mut self.sender
}
/// Incoming (broker) broadcast map
#[must_use]
pub fn receiver(&self) -> &LlmpReceiver<SHM, SP> {
&self.receiver
}
/// Incoming (broker) broadcast map (mut)
#[must_use]
pub fn receiver_mut(&mut self) -> &mut LlmpReceiver<SHM, SP> {
&mut self.receiver
}
/// Waits for the sender to be save to unmap.
/// If a receiver is involved on the other side, this function should always be called.
pub fn await_safe_to_unmap_blocking(&self) {
self.sender.await_safe_to_unmap_blocking();
}
/// If we are allowed to unmap this client
pub fn safe_to_unmap(&self) -> bool {
self.sender.safe_to_unmap()
}
/// For debug purposes: mark the client as save to unmap, even though it might not have been read.
///
/// # Safety
/// This should only be called in a debug scenario.
/// Calling this in other contexts may lead to a premature page unmap and result in a crash in another process,
/// or an unexpected read from an empty page in a receiving process.
pub unsafe fn mark_safe_to_unmap(&mut self) {
self.sender.mark_safe_to_unmap();
}
/// Creates a new [`LlmpClient`]
pub fn new(
mut shmem_provider: SP,
@ -3603,11 +3496,61 @@ where
Ok(Self { sender, receiver })
}
/// Commits a msg to the client's out map
/// # Safety
/// Needs to be called with a proper msg pointer
pub unsafe fn send(&mut self, msg: *mut LlmpMsg) -> Result<(), Error> {
self.sender.send(msg, true)
/// Reattach to a vacant client map.
/// It is essential, that the broker (or someone else) kept a pointer to the `out_shmem`
/// else reattach will get a new, empty page, from the OS, or fail
#[allow(clippy::needless_pass_by_value)] // no longer necessary on nightly
pub fn on_existing_shmem(
shmem_provider: SP,
_current_out_shmem: SHM,
_last_msg_sent_offset: Option<u64>,
current_broker_shmem: SHM,
last_msg_recvd_offset: Option<u64>,
) -> Result<Self, Error> {
Ok(Self {
receiver: LlmpReceiver::on_existing_shmem(
shmem_provider.clone(),
current_broker_shmem.clone(),
last_msg_recvd_offset,
)?,
sender: LlmpSender::on_existing_shmem(
shmem_provider,
current_broker_shmem,
last_msg_recvd_offset,
)?,
})
}
/// Recreate this client from a previous [`client.to_env()`]
#[cfg(feature = "std")]
pub fn on_existing_from_env(shmem_provider: SP, env_name: &str) -> Result<Self, Error> {
Ok(Self {
sender: LlmpSender::on_existing_from_env(
shmem_provider.clone(),
&format!("{env_name}_SENDER"),
)?,
receiver: LlmpReceiver::on_existing_from_env(
shmem_provider,
&format!("{env_name}_RECEIVER"),
)?,
})
}
/// Create an existing client from description
pub fn existing_client_from_description(
shmem_provider: SP,
description: &LlmpClientDescription,
) -> Result<Self, Error> {
Ok(Self {
sender: LlmpSender::on_existing_from_description(
shmem_provider.clone(),
&description.sender,
)?,
receiver: LlmpReceiver::on_existing_from_description(
shmem_provider,
&description.receiver,
)?,
})
}
/// Allocates a message of the given size, tags it, and sends it off.
@ -3751,6 +3694,81 @@ where
}
}
impl<SHM, SP> LlmpClient<SHM, SP>
where
SHM: ShMem,
{
/// Waits for the sender to be save to unmap.
/// If a receiver is involved on the other side, this function should always be called.
pub fn await_safe_to_unmap_blocking(&self) {
self.sender.await_safe_to_unmap_blocking();
}
/// If we are allowed to unmap this client
pub fn safe_to_unmap(&self) -> bool {
self.sender.safe_to_unmap()
}
/// For debug purposes: mark the client as save to unmap, even though it might not have been read.
///
/// # Safety
/// This should only be called in a debug scenario.
/// Calling this in other contexts may lead to a premature page unmap and result in a crash in another process,
/// or an unexpected read from an empty page in a receiving process.
pub unsafe fn mark_safe_to_unmap(&mut self) {
self.sender.mark_safe_to_unmap();
}
/// Commits a msg to the client's out map
/// # Safety
/// Needs to be called with a proper msg pointer
pub unsafe fn send(&mut self, msg: *mut LlmpMsg) -> Result<(), Error> {
self.sender.send(msg, true)
}
/// Write the current state to env.
/// A new client can attach to exactly the same state by calling [`LlmpClient::on_existing_shmem()`].
#[cfg(feature = "std")]
pub fn to_env(&self, env_name: &str) -> Result<(), Error> {
self.sender.to_env(&format!("{env_name}_SENDER"))?;
self.receiver.to_env(&format!("{env_name}_RECEIVER"))
}
/// Describe this client in a way that it can be recreated, for example after crash
pub fn describe(&self) -> Result<LlmpClientDescription, Error> {
Ok(LlmpClientDescription {
sender: self.sender.describe()?,
receiver: self.receiver.describe()?,
})
}
}
impl<SHM, SP> LlmpClient<SHM, SP> {
/// Outgoing channel to the broker
#[must_use]
pub fn sender(&self) -> &LlmpSender<SHM, SP> {
&self.sender
}
/// Outgoing channel to the broker (mut)
#[must_use]
pub fn sender_mut(&mut self) -> &mut LlmpSender<SHM, SP> {
&mut self.sender
}
/// Incoming (broker) broadcast map
#[must_use]
pub fn receiver(&self) -> &LlmpReceiver<SHM, SP> {
&self.receiver
}
/// Incoming (broker) broadcast map (mut)
#[must_use]
pub fn receiver_mut(&mut self) -> &mut LlmpReceiver<SHM, SP> {
&mut self.receiver
}
}
#[cfg(test)]
#[cfg(all(unix, feature = "std", not(target_os = "haiku")))]
mod tests {

View File

@ -9,7 +9,7 @@ use std::{
use libafl::{
corpus::Corpus,
events::{ManagerExit, SimpleRestartingEventManager},
events::{SendExiting, SimpleRestartingEventManager},
executors::{ExitKind, InProcessExecutor},
feedback_and_fast, feedback_or_fast,
feedbacks::{CrashFeedback, MinMapFeedback, TimeoutFeedback},