llmp store to env and restore from env works
This commit is contained in:
parent
85ad4c43eb
commit
1d5a9d2a35
@ -51,6 +51,7 @@ Then register some clientloops using llmp_broker_register_threaded_clientloop
|
||||
use alloc::vec::Vec;
|
||||
use core::{
|
||||
cmp::max,
|
||||
fmt::Debug,
|
||||
mem::size_of,
|
||||
ptr, slice,
|
||||
sync::atomic::{compiler_fence, Ordering},
|
||||
@ -58,6 +59,7 @@ use core::{
|
||||
};
|
||||
#[cfg(feature = "std")]
|
||||
use std::{
|
||||
env,
|
||||
io::{Read, Write},
|
||||
net::{TcpListener, TcpStream},
|
||||
thread,
|
||||
@ -83,6 +85,12 @@ const LLMP_TAG_END_OF_PAGE: u32 = 0xAF1E0F1;
|
||||
/// A new client for this broekr got added.
|
||||
const LLMP_TAG_NEW_SHM_CLIENT: u32 = 0xC11E471;
|
||||
|
||||
/// An env var of this value indicates that the set value was a NULL PTR
|
||||
const NULL_ENV_STR: &str = "_NULL";
|
||||
|
||||
/// Magic indicating that a got initialized correctly
|
||||
const PAGE_INITIALIZED_MAGIC: u64 = 0x1A1A1A1A1A1A1AF1;
|
||||
|
||||
/// Size of a new page message, header, payload, and alignment
|
||||
const EOP_MSG_SIZE: usize =
|
||||
llmp_align(size_of::<LlmpMsg>() + size_of::<LlmpPayloadSharedMapInfo>());
|
||||
@ -92,6 +100,132 @@ const LLMP_PAGE_HEADER_LEN: usize = size_of::<LlmpPage>();
|
||||
/// TAGs used thorughout llmp
|
||||
pub type Tag = u32;
|
||||
|
||||
/// Get sharedmem from a page
|
||||
#[inline]
|
||||
unsafe fn shmem2page_mut<SH: ShMem>(afl_shmem: &mut SH) -> *mut LlmpPage {
|
||||
afl_shmem.map_mut().as_mut_ptr() as *mut LlmpPage
|
||||
}
|
||||
|
||||
/// Get sharedmem from a page
|
||||
#[inline]
|
||||
unsafe fn shmem2page<SH: ShMem>(afl_shmem: &SH) -> *const LlmpPage {
|
||||
afl_shmem.map().as_ptr() as *const LlmpPage
|
||||
}
|
||||
|
||||
/// Return, if a msg is contained in the current page
|
||||
#[inline]
|
||||
unsafe fn llmp_msg_in_page(page: *const LlmpPage, msg: *const LlmpMsg) -> bool {
|
||||
/* DBG("llmp_msg_in_page %p within %p-%p\n", msg, page, page + page->size_total); */
|
||||
return (page as *const u8) < msg as *const u8
|
||||
&& (page as *const u8).offset((*page).size_total as isize) > msg as *const u8;
|
||||
}
|
||||
|
||||
/// allign to LLMP_PREF_ALIGNNMENT=64 bytes
|
||||
#[inline]
|
||||
const fn llmp_align(to_align: usize) -> usize {
|
||||
// check if we need to align first
|
||||
if LLMP_PREF_ALIGNNMENT == 0 {
|
||||
return to_align;
|
||||
}
|
||||
// Then do the alignment
|
||||
let modulo = to_align % LLMP_PREF_ALIGNNMENT;
|
||||
if modulo == 0 {
|
||||
to_align
|
||||
} else {
|
||||
to_align + LLMP_PREF_ALIGNNMENT - modulo
|
||||
}
|
||||
}
|
||||
|
||||
/// Reads the stored message offset for the given env_name (by appending _OFFSET)
|
||||
/// If the content of the env is _NULL, returns None
|
||||
#[cfg(feature = "std")]
|
||||
#[inline]
|
||||
fn msg_offset_from_env(env_name: &str) -> Result<Option<u64>, AflError> {
|
||||
let msg_offset_str = env::var(&format!("{}_OFFSET", env_name))?;
|
||||
Ok(if msg_offset_str == NULL_ENV_STR {
|
||||
None
|
||||
} else {
|
||||
Some(msg_offset_str.parse()?)
|
||||
})
|
||||
}
|
||||
|
||||
/// In case we don't have enough space, make sure the next page will be large
|
||||
/// enough. For now, we want to have at least enough space to store 2 of the
|
||||
/// largest messages we encountered (plus message one new_page message).
|
||||
#[inline]
|
||||
fn new_map_size(max_alloc: usize) -> usize {
|
||||
next_pow2(max(
|
||||
max_alloc * 2 + EOP_MSG_SIZE + LLMP_PAGE_HEADER_LEN,
|
||||
LLMP_PREF_INITIAL_MAP_SIZE,
|
||||
) as u64) as usize
|
||||
}
|
||||
|
||||
/// Initialize a new llmp_page. size should be relative to
|
||||
/// llmp_page->messages
|
||||
unsafe fn _llmp_page_init<SH: ShMem>(shmem: &mut SH, sender: u32) {
|
||||
let map_size = shmem.map().len();
|
||||
let page = shmem2page_mut(shmem);
|
||||
if (*page).magic == PAGE_INITIALIZED_MAGIC {
|
||||
panic!(
|
||||
"Tried to initialize page {:?} twice (for shmem {:?})",
|
||||
page, shmem
|
||||
);
|
||||
};
|
||||
(*page).magic = PAGE_INITIALIZED_MAGIC;
|
||||
(*page).sender = sender;
|
||||
ptr::write_volatile(&mut (*page).current_msg_id, 0);
|
||||
(*page).max_alloc_size = 0;
|
||||
// Don't forget to subtract our own header size
|
||||
(*page).size_total = map_size - LLMP_PAGE_HEADER_LEN;
|
||||
(*page).size_used = 0;
|
||||
(*(*page).messages.as_mut_ptr()).message_id = 0;
|
||||
(*(*page).messages.as_mut_ptr()).tag = LLMP_TAG_UNSET;
|
||||
ptr::write_volatile(&mut (*page).save_to_unmap, 0);
|
||||
ptr::write_volatile(&mut (*page).sender_dead, 0);
|
||||
}
|
||||
|
||||
/// Get the next pointer and make sure it's in the current page, and has enough space.
|
||||
#[inline]
|
||||
unsafe fn llmp_next_msg_ptr_checked<SH: ShMem>(
|
||||
map: &mut LlmpSharedMap<SH>,
|
||||
last_msg: *const LlmpMsg,
|
||||
alloc_size: usize,
|
||||
) -> Result<*mut LlmpMsg, AflError> {
|
||||
let page = map.page_mut();
|
||||
let map_size = map.shmem.map().len();
|
||||
let msg_begin_min = (page as *const u8).offset(size_of::<LlmpPage>() as isize);
|
||||
// We still need space for this msg (alloc_size).
|
||||
let msg_begin_max = (page as *const u8).offset((map_size - alloc_size) as isize);
|
||||
let next = _llmp_next_msg_ptr(last_msg);
|
||||
let next_ptr = next as *const u8;
|
||||
if next_ptr >= msg_begin_min && next_ptr <= msg_begin_max {
|
||||
Ok(next)
|
||||
} else {
|
||||
Err(AflError::IllegalState(format!(
|
||||
"Inconsistent data on sharedmap, or Bug (next_ptr was {:x}, sharedmap page was {:x})",
|
||||
next_ptr as usize, page as usize
|
||||
)))
|
||||
}
|
||||
}
|
||||
|
||||
/// Pointer to the message behind the last message
|
||||
#[inline]
|
||||
unsafe fn _llmp_next_msg_ptr(last_msg: *const LlmpMsg) -> *mut LlmpMsg {
|
||||
/* DBG("_llmp_next_msg_ptr %p %lu + %lu\n", last_msg, last_msg->buf_len_padded, sizeof(llmp_message)); */
|
||||
return (last_msg as *mut u8)
|
||||
.offset(size_of::<LlmpMsg>() as isize)
|
||||
.offset((*last_msg).buf_len_padded as isize) as *mut LlmpMsg;
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Debug)]
|
||||
/// Result of an LLMP Mesasge hook
|
||||
pub enum LlmpMsgHookResult {
|
||||
/// This has been handled in the broker. No need to forward.
|
||||
Handled,
|
||||
/// Forward this to the clients. We are not done here.
|
||||
ForwardToClients,
|
||||
}
|
||||
|
||||
/// Message sent over the "wire"
|
||||
#[derive(Copy, Clone, Debug)]
|
||||
#[repr(C, packed)]
|
||||
@ -152,6 +286,7 @@ impl LlmpMsg {
|
||||
}
|
||||
|
||||
/// An Llmp instance
|
||||
#[derive(Clone, Debug)]
|
||||
pub enum LlmpConnection<SH>
|
||||
where
|
||||
SH: ShMem,
|
||||
@ -205,132 +340,41 @@ where
|
||||
#[derive(Copy, Clone, Debug)]
|
||||
#[repr(C, packed)]
|
||||
pub struct LlmpPage {
|
||||
/// to check if this page got initialized properly
|
||||
pub magic: u64,
|
||||
/// The id of the sender
|
||||
pub sender: u32,
|
||||
/// Set to != 1 by the receiver, once it got mapped
|
||||
/// It's not safe for the sender to unmap this page before
|
||||
/// (The os may have tidied up the memory when the receiver starts to map)
|
||||
pub save_to_unmap: u16,
|
||||
/// Not used at the moment (would indicate that the sender is no longer there)
|
||||
pub sender_dead: u16,
|
||||
/// The current message ID
|
||||
pub current_msg_id: u64,
|
||||
/// How much space is available on this page in bytes
|
||||
pub size_total: usize,
|
||||
/// How much space is used on this page in bytes
|
||||
pub size_used: usize,
|
||||
/// The maximum amount of bytes that ever got allocated on this page in one go
|
||||
/// An inidactor of what to use as size for future pages
|
||||
pub max_alloc_size: usize,
|
||||
/// Pointer to the messages, from here on.
|
||||
pub messages: [LlmpMsg; 0],
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Debug)]
|
||||
/// Result of an LLMP Mesasge hook
|
||||
pub enum LlmpMsgHookResult {
|
||||
/// This has been handled in the broker. No need to forward.
|
||||
Handled,
|
||||
/// Forward this to the clients. We are not done here.
|
||||
ForwardToClients,
|
||||
}
|
||||
|
||||
/// Message payload when a client got added LLMP_TAG_CLIENT_ADDED_V1 */
|
||||
/// This is an internal message!
|
||||
/// LLMP_TAG_END_OF_PAGE_V1
|
||||
#[derive(Copy, Clone, Debug)]
|
||||
#[repr(C, packed)]
|
||||
struct LlmpPayloadSharedMapInfo {
|
||||
/// The map size
|
||||
pub map_size: usize,
|
||||
/// The id of this map, as 0-terminated c string of at most 19 chars
|
||||
pub shm_str: [u8; 20],
|
||||
}
|
||||
|
||||
/// Get sharedmem from a page
|
||||
#[inline]
|
||||
unsafe fn shmem2page_mut<SH: ShMem>(afl_shmem: &mut SH) -> *mut LlmpPage {
|
||||
afl_shmem.map_mut().as_mut_ptr() as *mut LlmpPage
|
||||
}
|
||||
|
||||
/// Get sharedmem from a page
|
||||
#[inline]
|
||||
unsafe fn shmem2page<SH: ShMem>(afl_shmem: &SH) -> *const LlmpPage {
|
||||
afl_shmem.map().as_ptr() as *const LlmpPage
|
||||
}
|
||||
|
||||
/// Return, if a msg is contained in the current page
|
||||
#[inline]
|
||||
unsafe fn llmp_msg_in_page(page: *mut LlmpPage, msg: *mut LlmpMsg) -> bool {
|
||||
/* DBG("llmp_msg_in_page %p within %p-%p\n", msg, page, page + page->size_total); */
|
||||
return (page as *mut u8) < msg as *mut u8
|
||||
&& (page as *mut u8).offset((*page).size_total as isize) > msg as *mut u8;
|
||||
}
|
||||
|
||||
/// allign to LLMP_PREF_ALIGNNMENT=64 bytes
|
||||
#[inline]
|
||||
const fn llmp_align(to_align: usize) -> usize {
|
||||
// check if we need to align first
|
||||
if LLMP_PREF_ALIGNNMENT == 0 {
|
||||
return to_align;
|
||||
}
|
||||
// Then do the alignment
|
||||
let modulo = to_align % LLMP_PREF_ALIGNNMENT;
|
||||
if modulo == 0 {
|
||||
to_align
|
||||
} else {
|
||||
to_align + LLMP_PREF_ALIGNNMENT - modulo
|
||||
}
|
||||
}
|
||||
|
||||
/// In case we don't have enough space, make sure the next page will be large
|
||||
/// enough. For now, we want to have at least enough space to store 2 of the
|
||||
/// largest messages we encountered (plus message one new_page message).
|
||||
#[inline]
|
||||
fn new_map_size(max_alloc: usize) -> usize {
|
||||
next_pow2(max(
|
||||
max_alloc * 2 + EOP_MSG_SIZE + LLMP_PAGE_HEADER_LEN,
|
||||
LLMP_PREF_INITIAL_MAP_SIZE,
|
||||
) as u64) as usize
|
||||
}
|
||||
|
||||
/// Initialize a new llmp_page. size should be relative to
|
||||
/// llmp_page->messages
|
||||
unsafe fn _llmp_page_init<SH: ShMem>(shmem: &mut SH, sender: u32) {
|
||||
let map_size = shmem.map().len();
|
||||
let page = shmem2page_mut(shmem);
|
||||
(*page).sender = sender;
|
||||
ptr::write_volatile(&mut (*page).current_msg_id, 0);
|
||||
(*page).max_alloc_size = 0;
|
||||
// Don't forget to subtract our own header size
|
||||
(*page).size_total = map_size - LLMP_PAGE_HEADER_LEN;
|
||||
(*page).size_used = 0;
|
||||
(*(*page).messages.as_mut_ptr()).message_id = 0;
|
||||
(*(*page).messages.as_mut_ptr()).tag = LLMP_TAG_UNSET;
|
||||
ptr::write_volatile(&mut (*page).save_to_unmap, 0);
|
||||
ptr::write_volatile(&mut (*page).sender_dead, 0);
|
||||
}
|
||||
|
||||
/// Get the next pointer and make sure it's in the current page, and has enough space.
|
||||
#[inline]
|
||||
unsafe fn llmp_next_msg_ptr_checked<SH: ShMem>(
|
||||
map: &mut LlmpSharedMap<SH>,
|
||||
last_msg: *const LlmpMsg,
|
||||
alloc_size: usize,
|
||||
) -> Result<*mut LlmpMsg, AflError> {
|
||||
let page = map.page_mut();
|
||||
let map_size = map.shmem.map().len();
|
||||
let msg_begin_min = (page as *const u8).offset(size_of::<LlmpPage>() as isize);
|
||||
// We still need space for this msg (alloc_size).
|
||||
let msg_begin_max = (page as *const u8).offset((map_size - alloc_size) as isize);
|
||||
let next = _llmp_next_msg_ptr(last_msg);
|
||||
let next_ptr = next as *const u8;
|
||||
if next_ptr >= msg_begin_min && next_ptr <= msg_begin_max {
|
||||
Ok(next)
|
||||
} else {
|
||||
Err(AflError::IllegalState(format!(
|
||||
"Inconsistent data on sharedmap, or Bug (next_ptr was {:x}, sharedmap page was {:x})",
|
||||
next_ptr as usize, page as usize
|
||||
)))
|
||||
}
|
||||
}
|
||||
|
||||
/// Pointer to the message behind the last message
|
||||
#[inline]
|
||||
unsafe fn _llmp_next_msg_ptr(last_msg: *const LlmpMsg) -> *mut LlmpMsg {
|
||||
/* DBG("_llmp_next_msg_ptr %p %lu + %lu\n", last_msg, last_msg->buf_len_padded, sizeof(llmp_message)); */
|
||||
return (last_msg as *mut u8)
|
||||
.offset(size_of::<LlmpMsg>() as isize)
|
||||
.offset((*last_msg).buf_len_padded as isize) as *mut LlmpMsg;
|
||||
}
|
||||
|
||||
/// Sending end on a (unidirectional) sharedmap channel
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct LlmpSender<SH>
|
||||
@ -341,7 +385,7 @@ where
|
||||
pub id: u32,
|
||||
/// Ref to the last message this sender sent on the last page.
|
||||
/// If null, a new page (just) started.
|
||||
pub last_msg_sent: *mut LlmpMsg,
|
||||
pub last_msg_sent: *const LlmpMsg,
|
||||
/// A vec of page wrappers, each containing an intialized AfShmem
|
||||
pub out_maps: Vec<LlmpSharedMap<SH>>,
|
||||
/// If true, pages will never be pruned.
|
||||
@ -356,6 +400,44 @@ impl<SH> LlmpSender<SH>
|
||||
where
|
||||
SH: ShMem,
|
||||
{
|
||||
/// Reattach to a vacant out_map, to with a previous sender stored the information in an env before.
|
||||
#[cfg(feature = "std")]
|
||||
pub fn on_existing_from_env(env_name: &str) -> Result<Self, AflError> {
|
||||
let msg_sent_offset = msg_offset_from_env(env_name)?;
|
||||
Self::on_existing_map(SH::existing_from_env(env_name)?, msg_sent_offset)
|
||||
}
|
||||
|
||||
/// Store the info to this sender to env.
|
||||
/// A new client can reattach to it using on_existing_from_env
|
||||
#[cfg(feature = "std")]
|
||||
pub fn to_env(&self, env_name: &str) -> Result<(), AflError> {
|
||||
let current_out_map = self.out_maps.last().unwrap();
|
||||
// TODO: Make sure somebody else has mapped this
|
||||
// current_out_map.await_read_blocking();
|
||||
|
||||
current_out_map.shmem.write_to_env(env_name)?;
|
||||
current_out_map.msg_to_env(self.last_msg_sent, env_name)
|
||||
}
|
||||
|
||||
/// Waits for this sender to be save to unmap.
|
||||
/// If a receiver is involved, this function should always be called.
|
||||
pub fn await_save_to_unmap_blocking(&self) {
|
||||
loop {
|
||||
if self.save_to_unmap() {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// If we are allowed to unmap this client
|
||||
pub fn save_to_unmap(&self) -> bool {
|
||||
let current_out_map = self.out_maps.last().unwrap();
|
||||
unsafe {
|
||||
compiler_fence(Ordering::SeqCst);
|
||||
ptr::read_volatile(&(*current_out_map.page()).save_to_unmap) != 0
|
||||
}
|
||||
}
|
||||
|
||||
/// Reattach to a vacant out_map.
|
||||
/// It is essential, that the receiver (or someone else) keeps a pointer to this map
|
||||
/// else reattach will get a new, empty page, from the OS, or fail.
|
||||
@ -363,7 +445,7 @@ where
|
||||
current_out_map: SH,
|
||||
last_msg_sent_offset: Option<u64>,
|
||||
) -> Result<Self, AflError> {
|
||||
let mut out_map = LlmpSharedMap::new(0, current_out_map);
|
||||
let mut out_map = LlmpSharedMap::existing(current_out_map);
|
||||
let last_msg_sent = match last_msg_sent_offset {
|
||||
Some(offset) => out_map.msg_from_offset(offset)?,
|
||||
None => 0 as *mut LlmpMsg,
|
||||
@ -500,6 +582,7 @@ where
|
||||
/* DBG("XXX ret %p - page->messages %p = %lu != %lu, will add %lu -> %p\n", ret, page->messages,
|
||||
(c_ulong)((u8 *)ret - (u8 *)page->messages), page->size_used, complete_msg_size, ((u8 *)ret) + complete_msg_size);
|
||||
*/
|
||||
|
||||
if last_msg.is_null() && (*page).size_used != 0
|
||||
|| ((ret as usize) - (*page).messages.as_mut_ptr() as usize) != (*page).size_used
|
||||
{
|
||||
@ -638,7 +721,7 @@ where
|
||||
{
|
||||
pub id: u32,
|
||||
/// Pointer to the last meg this received
|
||||
pub last_msg_recvd: *mut LlmpMsg,
|
||||
pub last_msg_recvd: *const LlmpMsg,
|
||||
/// current page. After EOP, this gets replaced with the new one
|
||||
pub current_recv_map: LlmpSharedMap<SH>,
|
||||
}
|
||||
@ -648,6 +731,24 @@ impl<SH> LlmpReceiver<SH>
|
||||
where
|
||||
SH: ShMem,
|
||||
{
|
||||
/// Reattach to a vacant recv_map, to with a previous sender stored the information in an env before.
|
||||
#[cfg(feature = "std")]
|
||||
pub fn on_existing_from_env(env_name: &str) -> Result<Self, AflError> {
|
||||
Self::on_existing_map(
|
||||
SH::existing_from_env(env_name)?,
|
||||
msg_offset_from_env(env_name)?,
|
||||
)
|
||||
}
|
||||
|
||||
/// Store the info to this receiver to env.
|
||||
/// A new client can reattach to it using on_existing_from_env
|
||||
#[cfg(feature = "std")]
|
||||
pub fn to_env(&self, env_name: &str) -> Result<(), AflError> {
|
||||
let current_out_map = &self.current_recv_map;
|
||||
current_out_map.shmem.write_to_env(env_name)?;
|
||||
current_out_map.msg_to_env(self.last_msg_recvd, env_name)
|
||||
}
|
||||
|
||||
/// Create a Receiver, reattaching to an existing sender map.
|
||||
/// It is essential, that the sender (or someone else) keeps a pointer to the sender_map
|
||||
/// else reattach will get a new, empty page, from the OS, or fail.
|
||||
@ -655,7 +756,7 @@ where
|
||||
current_sender_map: SH,
|
||||
last_msg_recvd_offset: Option<u64>,
|
||||
) -> Result<Self, AflError> {
|
||||
let mut current_recv_map = LlmpSharedMap::new(0, current_sender_map);
|
||||
let mut current_recv_map = LlmpSharedMap::existing(current_sender_map);
|
||||
let last_msg_recvd = match last_msg_recvd_offset {
|
||||
Some(offset) => current_recv_map.msg_from_offset(offset)?,
|
||||
None => 0 as *mut LlmpMsg,
|
||||
@ -676,7 +777,7 @@ where
|
||||
compiler_fence(Ordering::SeqCst);
|
||||
let page = self.current_recv_map.page_mut();
|
||||
let last_msg = self.last_msg_recvd;
|
||||
let current_msg_id = ptr::read_volatile(&mut (*page).current_msg_id);
|
||||
let current_msg_id = ptr::read_volatile(&(*page).current_msg_id);
|
||||
|
||||
// Read the message from the page
|
||||
let ret = if current_msg_id == 0 {
|
||||
@ -765,7 +866,7 @@ where
|
||||
}
|
||||
loop {
|
||||
compiler_fence(Ordering::SeqCst);
|
||||
if ptr::read_volatile(&mut (*page).current_msg_id) != current_msg_id {
|
||||
if ptr::read_volatile(&(*page).current_msg_id) != current_msg_id {
|
||||
return match self.recv()? {
|
||||
Some(msg) => Ok(msg),
|
||||
None => panic!("BUG: blocking llmp message should never be NULL"),
|
||||
@ -831,9 +932,15 @@ where
|
||||
|
||||
/// Maps and wraps an existing
|
||||
pub fn existing(existing_map: SH) -> Self {
|
||||
Self {
|
||||
let ret = Self {
|
||||
shmem: existing_map,
|
||||
};
|
||||
unsafe {
|
||||
if (*ret.page()).magic != PAGE_INITIALIZED_MAGIC {
|
||||
panic!("Map was not priviously initialized at {:?}", &ret.shmem);
|
||||
}
|
||||
}
|
||||
ret
|
||||
}
|
||||
|
||||
/// Get the unsafe ptr to this page, situated on the shared map
|
||||
@ -848,9 +955,9 @@ where
|
||||
|
||||
/// Gets the offset of a message on this here page.
|
||||
/// Will return IllegalArgument error if msg is not on page.
|
||||
pub fn msg_to_offset(&mut self, msg: *mut LlmpMsg) -> Result<u64, AflError> {
|
||||
pub fn msg_to_offset(&self, msg: *const LlmpMsg) -> Result<u64, AflError> {
|
||||
unsafe {
|
||||
let page = self.page_mut();
|
||||
let page = self.page();
|
||||
if llmp_msg_in_page(page, msg) {
|
||||
// Cast both sides to u8 arrays, get the offset, then cast the return isize to u64
|
||||
Ok((msg as *const u8).offset_from((*page).messages.as_ptr() as *const u8) as u64)
|
||||
@ -863,6 +970,31 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
/// Retrieve the stored msg from env_name + _OFFSET.
|
||||
/// It will restore the stored offset by env_name and return the message.
|
||||
#[cfg(feature = "std")]
|
||||
pub fn msg_from_env(&mut self, map_env_name: &str) -> Result<*mut LlmpMsg, AflError> {
|
||||
match msg_offset_from_env(map_env_name)? {
|
||||
Some(offset) => self.msg_from_offset(offset),
|
||||
None => Ok(0 as *mut LlmpMsg),
|
||||
}
|
||||
}
|
||||
|
||||
/// Store this msg offset to env_name + _OFFSET env variable.
|
||||
/// It can be restored using msg_from_env with the same env_name later.
|
||||
#[cfg(feature = "std")]
|
||||
pub fn msg_to_env(&self, msg: *const LlmpMsg, map_env_name: &str) -> Result<(), AflError> {
|
||||
if msg.is_null() {
|
||||
env::set_var(&format!("{}_OFFSET", map_env_name), NULL_ENV_STR)
|
||||
} else {
|
||||
env::set_var(
|
||||
&format!("{}_OFFSET", map_env_name),
|
||||
format!("{}", self.msg_to_offset(msg)?),
|
||||
)
|
||||
};
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Gets this message from this page, at the indicated offset.
|
||||
/// Will return IllegalArgument error if the offset is out of bounds.
|
||||
pub fn msg_from_offset(&mut self, offset: u64) -> Result<*mut LlmpMsg, AflError> {
|
||||
@ -886,7 +1018,6 @@ where
|
||||
|
||||
/// The broker (node 0)
|
||||
#[derive(Clone, Debug)]
|
||||
#[repr(C)]
|
||||
pub struct LlmpBroker<SH>
|
||||
where
|
||||
SH: ShMem,
|
||||
@ -1186,9 +1317,9 @@ where
|
||||
SH: ShMem,
|
||||
{
|
||||
/// Outgoing channel to the broker
|
||||
pub llmp_out: LlmpSender<SH>,
|
||||
pub sender: LlmpSender<SH>,
|
||||
/// Incoming (broker) broadcast map
|
||||
pub llmp_in: LlmpReceiver<SH>,
|
||||
pub receiver: LlmpReceiver<SH>,
|
||||
}
|
||||
|
||||
/// `n` clients connect to a broker. They share an outgoing map with the broker,
|
||||
@ -1207,15 +1338,41 @@ where
|
||||
last_msg_recvd_offset: Option<u64>,
|
||||
) -> Result<Self, AflError> {
|
||||
Ok(Self {
|
||||
llmp_in: LlmpReceiver::on_existing_map(current_broker_map, last_msg_recvd_offset)?,
|
||||
llmp_out: LlmpSender::on_existing_map(current_out_map, last_msg_sent_offset)?,
|
||||
receiver: LlmpReceiver::on_existing_map(current_broker_map, last_msg_recvd_offset)?,
|
||||
sender: LlmpSender::on_existing_map(current_out_map, last_msg_sent_offset)?,
|
||||
})
|
||||
}
|
||||
|
||||
/// Recreate this client from a previous client.to_env
|
||||
pub fn on_existing_from_env(env_name: &str) -> Result<Self, AflError> {
|
||||
Ok(Self {
|
||||
sender: LlmpSender::on_existing_from_env(&format!("{}_SENDER", env_name))?,
|
||||
receiver: LlmpReceiver::on_existing_from_env(&format!("{}_RECEIVER", env_name))?,
|
||||
})
|
||||
}
|
||||
|
||||
/// Write the current state to env.
|
||||
/// A new client can attach to exactly the same state by calling on_existing_map.
|
||||
pub fn to_env(&self, env_name: &str) -> Result<(), AflError> {
|
||||
self.sender.to_env(&format!("{}_SENDER", env_name))?;
|
||||
self.receiver.to_env(&format!("{}_RECEIVER", env_name))
|
||||
}
|
||||
|
||||
/// Waits for the sender to be save to unmap.
|
||||
/// If a receiver is involved on the other side, this function should always be called.
|
||||
pub fn await_save_to_unmap_blocking(&self) {
|
||||
self.sender.await_save_to_unmap_blocking();
|
||||
}
|
||||
|
||||
/// If we are allowed to unmap this client
|
||||
pub fn save_to_unmap(&self) -> bool {
|
||||
self.sender.save_to_unmap()
|
||||
}
|
||||
|
||||
/// Creates a new LlmpClient
|
||||
pub fn new(initial_broker_map: LlmpSharedMap<SH>) -> Result<Self, AflError> {
|
||||
Ok(Self {
|
||||
llmp_out: LlmpSender {
|
||||
sender: LlmpSender {
|
||||
id: 0,
|
||||
last_msg_sent: 0 as *mut LlmpMsg,
|
||||
out_maps: vec![LlmpSharedMap::new(
|
||||
@ -1225,7 +1382,7 @@ where
|
||||
// drop pages to the broker if it already read them
|
||||
keep_pages_forever: false,
|
||||
},
|
||||
llmp_in: LlmpReceiver {
|
||||
receiver: LlmpReceiver {
|
||||
id: 0,
|
||||
current_recv_map: initial_broker_map,
|
||||
last_msg_recvd: 0 as *mut LlmpMsg,
|
||||
@ -1235,12 +1392,12 @@ where
|
||||
|
||||
/// Commits a msg to the client's out map
|
||||
pub unsafe fn send(&mut self, msg: *mut LlmpMsg) -> Result<(), AflError> {
|
||||
self.llmp_out.send(msg)
|
||||
self.sender.send(msg)
|
||||
}
|
||||
|
||||
/// Allocates a message of the given size, tags it, and sends it off.
|
||||
pub fn send_buf(&mut self, tag: Tag, buf: &[u8]) -> Result<(), AflError> {
|
||||
self.llmp_out.send_buf(tag, buf)
|
||||
self.sender.send_buf(tag, buf)
|
||||
}
|
||||
|
||||
/// Informs the broker about a new client in town, with the given map id
|
||||
@ -1266,33 +1423,33 @@ where
|
||||
/// Returns null if no message is availiable
|
||||
#[inline]
|
||||
pub unsafe fn recv(&mut self) -> Result<Option<*mut LlmpMsg>, AflError> {
|
||||
self.llmp_in.recv()
|
||||
self.receiver.recv()
|
||||
}
|
||||
|
||||
/// A client blocks/spins until the next message gets posted to the page,
|
||||
/// then returns that message.
|
||||
#[inline]
|
||||
pub unsafe fn recv_blocking(&mut self) -> Result<*mut LlmpMsg, AflError> {
|
||||
self.llmp_in.recv_blocking()
|
||||
self.receiver.recv_blocking()
|
||||
}
|
||||
|
||||
/// The current page could have changed in recv (EOP)
|
||||
/// Alloc the next message, internally handling end of page by allocating a new one.
|
||||
#[inline]
|
||||
pub unsafe fn alloc_next(&mut self, buf_len: usize) -> Result<*mut LlmpMsg, AflError> {
|
||||
self.llmp_out.alloc_next(buf_len)
|
||||
self.sender.alloc_next(buf_len)
|
||||
}
|
||||
|
||||
/// Returns the next message, tag, buf, if avaliable, else None
|
||||
#[inline]
|
||||
pub fn recv_buf(&mut self) -> Result<Option<(u32, u32, &[u8])>, AflError> {
|
||||
self.llmp_in.recv_buf()
|
||||
self.receiver.recv_buf()
|
||||
}
|
||||
|
||||
/// Receives a buf from the broker, looping until a messages becomes avaliable
|
||||
#[inline]
|
||||
pub fn recv_buf_blocking(&mut self) -> Result<(u32, u32, &[u8]), AflError> {
|
||||
self.llmp_in.recv_buf_blocking()
|
||||
self.receiver.recv_buf_blocking()
|
||||
}
|
||||
|
||||
#[cfg(feature = "std")]
|
||||
@ -1315,7 +1472,7 @@ where
|
||||
LLMP_PREF_INITIAL_MAP_SIZE,
|
||||
)?))?;
|
||||
|
||||
stream.write(ret.llmp_out.out_maps.first().unwrap().shmem.shm_slice())?;
|
||||
stream.write(ret.sender.out_maps.first().unwrap().shmem.shm_slice())?;
|
||||
Ok(ret)
|
||||
}
|
||||
}
|
||||
@ -1326,6 +1483,7 @@ mod tests {
|
||||
#[cfg(feature = "std")]
|
||||
use std::{thread::sleep, time::Duration};
|
||||
|
||||
use super::LlmpClient;
|
||||
#[cfg(feature = "std")]
|
||||
use super::{
|
||||
LlmpConnection::{self, IsBroker, IsClient},
|
||||
@ -1359,6 +1517,19 @@ mod tests {
|
||||
let arr: [u8; 1] = [1u8];
|
||||
// Send stuff
|
||||
client.send_buf(tag, &arr).unwrap();
|
||||
|
||||
client.to_env("_ENV_TEST").unwrap();
|
||||
dbg!(std::env::vars());
|
||||
|
||||
for (key, value) in std::env::vars_os() {
|
||||
println!("{:?}: {:?}", key, value);
|
||||
}
|
||||
|
||||
/* recreate the client from env, check if it still works */
|
||||
client = LlmpClient::<AflShmem>::on_existing_from_env("_ENV_TEST").unwrap();
|
||||
|
||||
client.send_buf(tag, &arr).unwrap();
|
||||
|
||||
// Forward stuff to clients
|
||||
broker
|
||||
.once(&mut |_sender_id, _tag, _msg| Ok(ForwardToClients))
|
||||
|
@ -2,6 +2,7 @@
|
||||
// too.)
|
||||
|
||||
use alloc::string::{String, ToString};
|
||||
use core::fmt::Debug;
|
||||
#[cfg(feature = "std")]
|
||||
use core::{mem::size_of, slice};
|
||||
#[cfg(feature = "std")]
|
||||
@ -58,7 +59,7 @@ struct shmid_ds {
|
||||
}
|
||||
|
||||
/// A Shared map
|
||||
pub trait ShMem: Sized {
|
||||
pub trait ShMem: Sized + Debug {
|
||||
/// Creates a nes variable with the given name, strigified to 20 bytes.
|
||||
fn existing_from_shm_slice(map_str_bytes: &[u8; 20], map_size: usize)
|
||||
-> Result<Self, AflError>;
|
||||
|
Loading…
x
Reference in New Issue
Block a user