Streamline ShMem API (#472)

* from warning

* fix latest clippy

* clippy fixes++

* renamed shmem parameters

* renamed map to shmem

* make forkserver executor work for any (non-system) shmem

* Mem -> ShMem

* rework windows

* fix nit

* fix symbolic
This commit is contained in:
Dominik Maier 2022-01-17 18:28:26 +01:00 committed by GitHub
parent ac43997950
commit 4f6f76e857
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 422 additions and 351 deletions

View File

@ -26,10 +26,10 @@ Next, we'll take a look at the `ForkserverExecutor`. In this case, it is `afl-cc
As you can see from the forkserver example, As you can see from the forkserver example,
```rust,ignore ```rust,ignore
//Coverage map shared between observer and executor //Coverage map shared between observer and executor
let mut shmem = StdShMemProvider::new().unwrap().new_map(MAP_SIZE).unwrap(); let mut shmem = StdShMemProvider::new().unwrap().new_shmem(MAP_SIZE).unwrap();
//let the forkserver know the shmid //let the forkserver know the shmid
shmem.write_to_env("__AFL_SHM_ID").unwrap(); shmem.write_to_env("__AFL_SHM_ID").unwrap();
let mut shmem_map = shmem.map_mut(); let mut shmem_buf = shmem.as_mut_slice();
``` ```
Here we make a shared memory region; `shmem`, and write this to environmental variable `__AFL_SHM_ID`. Then the instrumented binary, or the forkserver, finds this shared memory region (from the aforementioned env var) to record its coverage. On your fuzzer side, you can pass this shmem map to your `Observer` to obtain coverage feedbacks combined with any `Feedback`. Here we make a shared memory region; `shmem`, and write this to environmental variable `__AFL_SHM_ID`. Then the instrumented binary, or the forkserver, finds this shared memory region (from the aforementioned env var) to record its coverage. On your fuzzer side, you can pass this shmem map to your `Observer` to obtain coverage feedbacks combined with any `Feedback`.
@ -47,11 +47,11 @@ On your fuzzer side, you can allocate a shared memory region and make the `EDGES
```rust,ignore ```rust,ignore
let mut shmem; let mut shmem;
unsafe{ unsafe{
shmem = StdShMemProvider::new().unwrap().new_map(MAX_EDGES_NUM).unwrap(); shmem = StdShMemProvider::new().unwrap().new_shmem(MAX_EDGES_NUM).unwrap();
} }
let shmem_map = shmem.map_mut(); let shmem_buf = shmem.as_mut_slice();
unsafe{ unsafe{
EDGES_PTR = shmem_map.as_ptr(); EDGES_PTR = shmem_buf.as_ptr();
} }
``` ```
Again, you can pass this shmem map to your `Observer` and `Feedback` to obtain coverage feedbacks. Again, you can pass this shmem map to your `Observer` and `Feedback` to obtain coverage feedbacks.

View File

@ -21,16 +21,16 @@ The broker can also intercept and filter the messages it receives instead of for
A common use-case for messages filtered by the broker are the status messages sent from each client to the broker directly. A common use-case for messages filtered by the broker are the status messages sent from each client to the broker directly.
The broker used this information to paint a simple UI, with up-to-date information about all clients, however the other clients don't need to receive this information. The broker used this information to paint a simple UI, with up-to-date information about all clients, however the other clients don't need to receive this information.
### Speedy Local Messages via Shared Maps ### Speedy Local Messages via Shared Memory
Throughout LibAFL, we use a wrapper around different operating system's shared maps, called `ShMem`. Throughout LibAFL, we use a wrapper around different operating system's shared maps, called `ShMem`.
Shared maps are the backbone of `LLMP`. Shared maps, called shared memory for the sake of not colliding with Rust's `map()` functions, are the backbone of `LLMP`.
Each client, usually a fuzzer trying to share stats and new testcases, maps an outgoing `ShMem` map. Each client, usually a fuzzer trying to share stats and new testcases, maps an outgoing `ShMem` map.
With very few exceptions, only this client writes to this map, therefore, we do not run in race conditions and can live without locks. With very few exceptions, only this client writes to this map, therefore, we do not run in race conditions and can live without locks.
The broker reads from all client's `ShMem` maps. The broker reads from all client's `ShMem` maps.
It checks all incoming client maps periodically, and then forwards new messages to its outgoing broadcast-`ShMem`, mapped by all connected clients. It checks all incoming client maps periodically and then forwards new messages to its outgoing broadcast-`ShMem`, mapped by all connected clients.
To send new messages, a client places a new message at the end of their map, and then updates a static field to notify the broker. To send new messages, a client places a new message at the end of their shared memory and then updates a static field to notify the broker.
Once the outgoing map is full, the sender allocates a new `ShMem` using the respective `ShMemProvider`. Once the outgoing map is full, the sender allocates a new `ShMem` using the respective `ShMemProvider`.
It then sends the information needed to map the newly-allocated page in connected processes to the old page, using an end of page (`EOP`) message. It then sends the information needed to map the newly-allocated page in connected processes to the old page, using an end of page (`EOP`) message.
Once the receiver maps the new page, flags it as safe for unmapping from the sending process (to avoid race conditions if we have more than a single EOP in a short time), and then continues to read from the new `ShMem`. Once the receiver maps the new page, flags it as safe for unmapping from the sending process (to avoid race conditions if we have more than a single EOP in a short time), and then continues to read from the new `ShMem`.
@ -49,7 +49,7 @@ The schema for client's maps to the broker is as follows:
The broker loops over all incoming maps, and checks for new messages. The broker loops over all incoming maps, and checks for new messages.
On `std` builds, the broker will sleep a few milliseconds after a loop, since we do not need the messages to arrive instantly. On `std` builds, the broker will sleep a few milliseconds after a loop, since we do not need the messages to arrive instantly.
After the broker received a new message from clientN, (`clientN_out->current_id != last_message->message_id`) the broker copies the message content to its own broadcast map. After the broker received a new message from clientN, (`clientN_out->current_id != last_message->message_id`) the broker copies the message content to its own broadcast shared memory.
The clients periodically, for example after finishing `n` mutations, check for new incoming messages by checking if (`current_broadcast_map->current_id != last_message->message_id`). The clients periodically, for example after finishing `n` mutations, check for new incoming messages by checking if (`current_broadcast_map->current_id != last_message->message_id`).
While the broker uses the same EOP mechanism to map new `ShMem`s for its outgoing map, it never unmaps old pages. While the broker uses the same EOP mechanism to map new `ShMem`s for its outgoing map, it never unmaps old pages.
@ -61,7 +61,7 @@ So the outgoing messages flow like this over the outgoing broadcast `Shmem`:
```text ```text
[broker] [broker]
| |
[current_broadcast_map] [current_broadcast_shmem]
| |
|___________________________________ |___________________________________
|_________________ \ |_________________ \
@ -83,10 +83,10 @@ Finally, call `LlmpBroker::loop_forever()`.
### B2B: Connecting Fuzzers via TCP ### B2B: Connecting Fuzzers via TCP
For `broker2broker` communication, all broadcast messages are additionally forwarded via network sockets. For `broker2broker` communication, all broadcast messages are additionally forwarded via network sockets.
To facilitate this, we spawn an additional client thread in the broker, that reads the broadcast map, just like any other client would. To facilitate this, we spawn an additional client thread in the broker, that reads the broadcast shared memory, just like any other client would.
For broker2broker communication, this b2b client listens for TCP connections from other, remote brokers. For broker2broker communication, this b2b client listens for TCP connections from other, remote brokers.
It keeps a pool of open sockets to other, remote, b2b brokers around at any time. It keeps a pool of open sockets to other, remote, b2b brokers around at any time.
When receiving a new message on the local broker map, the b2b client will forward it to all connected remote brokers via TCP. When receiving a new message on the local broker shared memory, the b2b client will forward it to all connected remote brokers via TCP.
Additionally, the broker can receive messages from all connected (remote) brokers, and forward them to the local broker over a client `ShMem`. Additionally, the broker can receive messages from all connected (remote) brokers, and forward them to the local broker over a client `ShMem`.
As a sidenote, the tcp listener used for b2b communication is also used for an initial handshake when a new client tries to connect to a broker locally, simply exchanging the initial `ShMem` descriptions. As a sidenote, the tcp listener used for b2b communication is also used for an initial handshake when a new client tries to connect to a broker locally, simply exchanging the initial `ShMem` descriptions.

View File

@ -49,6 +49,12 @@ pub fn main() {
.long("timeout") .long("timeout")
.default_value("1200"), .default_value("1200"),
) )
.arg(
Arg::new("debug_child")
.help("If not set, the child's stdout and stderror will be redirected to /dev/null")
.short('d')
.long("debug-child"),
)
.arg( .arg(
Arg::new("arguments") Arg::new("arguments")
.help("Arguments passed to the target") .help("Arguments passed to the target")
@ -61,16 +67,18 @@ pub fn main() {
const MAP_SIZE: usize = 65536; const MAP_SIZE: usize = 65536;
//Coverage map shared between observer and executor // The default, OS-specific privider for shared memory
let mut shmem = StdShMemProvider::new().unwrap().new_map(MAP_SIZE).unwrap(); let mut shmem_provider = StdShMemProvider::new().unwrap();
//let the forkserver know the shmid // The coverage map shared between observer and executor
let mut shmem = shmem_provider.new_shmem(MAP_SIZE).unwrap();
// let the forkserver know the shmid
shmem.write_to_env("__AFL_SHM_ID").unwrap(); shmem.write_to_env("__AFL_SHM_ID").unwrap();
let shmem_map = shmem.map_mut(); let shmem_buf = shmem.as_mut_slice();
// Create an observation channel using the signals map // Create an observation channel using the signals map
let edges_observer = HitcountsMapObserver::new(ConstMapObserver::<_, MAP_SIZE>::new( let edges_observer = HitcountsMapObserver::new(ConstMapObserver::<_, MAP_SIZE>::new(
"shared_mem", "shared_mem",
shmem_map, shmem_buf,
)); ));
// Create an observation channel to keep track of the execution time // Create an observation channel to keep track of the execution time
@ -127,6 +135,9 @@ pub fn main() {
// A fuzzer with feedbacks and a corpus scheduler // A fuzzer with feedbacks and a corpus scheduler
let mut fuzzer = StdFuzzer::new(scheduler, feedback, objective); let mut fuzzer = StdFuzzer::new(scheduler, feedback, objective);
// If we should debug the child
let debug_child = res.value_of("debug_child").is_some();
// Create the executor for the forkserver // Create the executor for the forkserver
let args = match res.values_of("arguments") { let args = match res.values_of("arguments") {
Some(vec) => vec.map(|s| s.to_string()).collect::<Vec<String>>().to_vec(), Some(vec) => vec.map(|s| s.to_string()).collect::<Vec<String>>().to_vec(),
@ -134,11 +145,12 @@ pub fn main() {
}; };
let mut executor = TimeoutForkserverExecutor::new( let mut executor = TimeoutForkserverExecutor::new(
ForkserverExecutor::new( ForkserverExecutor::with_shmem_inputs(
res.value_of("executable").unwrap().to_string(), res.value_of("executable").unwrap().to_string(),
&args, &args,
true,
tuple_list!(edges_observer, time_observer), tuple_list!(edges_observer, time_observer),
debug_child,
&mut shmem_provider,
) )
.unwrap(), .unwrap(),
Duration::from_millis( Duration::from_millis(

View File

@ -206,13 +206,13 @@ fn fuzz(
// The shared memory for the concolic runtime to write its trace to // The shared memory for the concolic runtime to write its trace to
let mut concolic_shmem = StdShMemProvider::new() let mut concolic_shmem = StdShMemProvider::new()
.unwrap() .unwrap()
.new_map(DEFAULT_SIZE) .new_shmem(DEFAULT_SIZE)
.unwrap(); .unwrap();
concolic_shmem.write_to_env(DEFAULT_ENV_NAME).unwrap(); concolic_shmem.write_to_env(DEFAULT_ENV_NAME).unwrap();
// The concolic observer observers the concolic shared memory map. // The concolic observer observers the concolic shared memory map.
let concolic_observer = let concolic_observer =
ConcolicObserver::new("concolic".to_string(), concolic_shmem.map_mut()); ConcolicObserver::new("concolic".to_string(), concolic_shmem.as_slice_mut());
let concolic_observer_name = concolic_observer.name().to_string(); let concolic_observer_name = concolic_observer.name().to_string();

View File

@ -2,7 +2,7 @@
A library for low level message passing A library for low level message passing
To send new messages, the clients place a new message at the end of their To send new messages, the clients place a new message at the end of their
`client_out_map`. If the current map is filled up, they place an end of page (`EOP`) `client_out_mem`. If the current map is filled up, they place an end of page (`EOP`)
msg and alloc a new [`ShMem`]. msg and alloc a new [`ShMem`].
Once the broker mapped this same page, it flags it as safe for unmapping. Once the broker mapped this same page, it flags it as safe for unmapping.
@ -21,7 +21,7 @@ After the broker received a new message for clientN, (`clientN_out->current_id
!= last_message->message_id`) the broker will copy the message content to its != last_message->message_id`) the broker will copy the message content to its
own, centralized page. own, centralized page.
The clients periodically check (`current_broadcast_map->current_id != The clients periodically check (`current_broadcast_shmem->current_id !=
last_message->message_id`) for new incoming messages. If the page is filled up, last_message->message_id`) for new incoming messages. If the page is filled up,
the broker instead creates a new page and places an end of page (`EOP`) the broker instead creates a new page and places an end of page (`EOP`)
message in its queue. The `EOP` buf contains the new description to message in its queue. The `EOP` buf contains the new description to
@ -31,7 +31,7 @@ current map.
```text ```text
[broker] [broker]
| |
[current_broadcast_map] [current_broadcast_shmem]
| |
|___________________________________ |___________________________________
|_________________ \ |_________________ \
@ -41,8 +41,8 @@ current map.
[client0] [client1] ... [clientN] [client0] [client1] ... [clientN]
``` ```
In the future, if we would need zero copy, the `current_broadcast_map` could instead In the future, if we would need zero copy, the `current_broadcast_shmem` could instead
list the `client_out_map` ID an offset for each message. In that case, the clients list the `client_out_shmem` ID an offset for each message. In that case, the clients
also need to create a new [`ShMem`] each time their bufs are filled up. also need to create a new [`ShMem`] each time their bufs are filled up.
@ -227,7 +227,7 @@ pub enum TcpResponse {
/// After receiving a new connection, the broker immediately sends a Hello. /// After receiving a new connection, the broker immediately sends a Hello.
BrokerConnectHello { BrokerConnectHello {
/// The broker page a new local client can listen on /// The broker page a new local client can listen on
broker_map_description: ShMemDescription, broker_shmem_description: ShMemDescription,
/// This broker's hostname /// This broker's hostname
hostname: String, hostname: String,
}, },
@ -294,14 +294,14 @@ impl Listener {
#[inline] #[inline]
#[allow(clippy::cast_ptr_alignment)] #[allow(clippy::cast_ptr_alignment)]
unsafe fn shmem2page_mut<SHM: ShMem>(afl_shmem: &mut SHM) -> *mut LlmpPage { unsafe fn shmem2page_mut<SHM: ShMem>(afl_shmem: &mut SHM) -> *mut LlmpPage {
afl_shmem.map_mut().as_mut_ptr() as *mut LlmpPage afl_shmem.as_mut_slice().as_mut_ptr() as *mut LlmpPage
} }
/// Get sharedmem from a page /// Get sharedmem from a page
#[inline] #[inline]
#[allow(clippy::cast_ptr_alignment)] #[allow(clippy::cast_ptr_alignment)]
unsafe fn shmem2page<SHM: ShMem>(afl_shmem: &SHM) -> *const LlmpPage { unsafe fn shmem2page<SHM: ShMem>(afl_shmem: &SHM) -> *const LlmpPage {
afl_shmem.map().as_ptr() as *const LlmpPage afl_shmem.as_slice().as_ptr() as *const LlmpPage
} }
/// Return, if a msg is contained in the current page /// Return, if a msg is contained in the current page
@ -411,7 +411,7 @@ fn recv_tcp_msg(stream: &mut TcpStream) -> Result<Vec<u8>, Error> {
/// enough. For now, we want to have at least enough space to store 2 of the /// enough. For now, we want to have at least enough space to store 2 of the
/// largest messages we encountered (plus message one `new_page` message). /// largest messages we encountered (plus message one `new_page` message).
#[inline] #[inline]
fn new_map_size(max_alloc: usize) -> usize { fn next_shmem_size(max_alloc: usize) -> usize {
max( max(
max_alloc * 2 + EOP_MSG_SIZE + LLMP_PAGE_HEADER_LEN, max_alloc * 2 + EOP_MSG_SIZE + LLMP_PAGE_HEADER_LEN,
LLMP_CFG_INITIAL_MAP_SIZE - 1, LLMP_CFG_INITIAL_MAP_SIZE - 1,
@ -460,7 +460,7 @@ unsafe fn llmp_next_msg_ptr_checked<SHM: ShMem>(
alloc_size: usize, alloc_size: usize,
) -> Result<*mut LlmpMsg, Error> { ) -> Result<*mut LlmpMsg, Error> {
let page = map.page_mut(); let page = map.page_mut();
let map_size = map.shmem.map().len(); let map_size = map.shmem.as_slice().len();
let msg_begin_min = (page as *const u8).add(size_of::<LlmpPage>()); let msg_begin_min = (page as *const u8).add(size_of::<LlmpPage>());
// We still need space for this msg (alloc_size). // We still need space for this msg (alloc_size).
let msg_begin_max = (page as *const u8).add(map_size - alloc_size); let msg_begin_max = (page as *const u8).add(map_size - alloc_size);
@ -544,7 +544,7 @@ impl LlmpMsg {
#[inline] #[inline]
pub fn as_slice<SHM: ShMem>(&self, map: &mut LlmpSharedMap<SHM>) -> Result<&[u8], Error> { pub fn as_slice<SHM: ShMem>(&self, map: &mut LlmpSharedMap<SHM>) -> Result<&[u8], Error> {
unsafe { unsafe {
if self.in_map(map) { if self.in_shmem(map) {
Ok(self.as_slice_unsafe()) Ok(self.as_slice_unsafe())
} else { } else {
Err(Error::IllegalState("Current message not in page. The sharedmap get tampered with or we have a BUG.".into())) Err(Error::IllegalState("Current message not in page. The sharedmap get tampered with or we have a BUG.".into()))
@ -554,9 +554,9 @@ impl LlmpMsg {
/// Returns true, if the pointer is, indeed, in the page of this shared map. /// Returns true, if the pointer is, indeed, in the page of this shared map.
#[inline] #[inline]
pub fn in_map<SHM: ShMem>(&self, map: &mut LlmpSharedMap<SHM>) -> bool { pub fn in_shmem<SHM: ShMem>(&self, map: &mut LlmpSharedMap<SHM>) -> bool {
unsafe { unsafe {
let map_size = map.shmem.map().len(); let map_size = map.shmem.as_slice().len();
let buf_ptr = self.buf.as_ptr(); let buf_ptr = self.buf.as_ptr();
if buf_ptr > (map.page_mut() as *const u8).add(size_of::<LlmpPage>()) if buf_ptr > (map.page_mut() as *const u8).add(size_of::<LlmpPage>())
&& buf_ptr <= (map.page_mut() as *const u8).add(map_size - size_of::<LlmpMsg>()) && buf_ptr <= (map.page_mut() as *const u8).add(map_size - size_of::<LlmpMsg>())
@ -721,7 +721,7 @@ where
/// If null, a new page (just) started. /// If null, a new page (just) started.
pub last_msg_sent: *const LlmpMsg, pub last_msg_sent: *const LlmpMsg,
/// A vec of page wrappers, each containing an intialized AfShmem /// A vec of page wrappers, each containing an intialized AfShmem
pub out_maps: Vec<LlmpSharedMap<SP::Mem>>, pub out_shmems: Vec<LlmpSharedMap<SP::ShMem>>,
/// If true, pages will never be pruned. /// If true, pages will never be pruned.
/// The broker uses this feature. /// The broker uses this feature.
/// By keeping the message history around, /// By keeping the message history around,
@ -745,9 +745,9 @@ where
Ok(Self { Ok(Self {
id, id,
last_msg_sent: ptr::null_mut(), last_msg_sent: ptr::null_mut(),
out_maps: vec![LlmpSharedMap::new( out_shmems: vec![LlmpSharedMap::new(
0, 0,
shmem_provider.new_map(LLMP_CFG_INITIAL_MAP_SIZE)?, shmem_provider.new_shmem(LLMP_CFG_INITIAL_MAP_SIZE)?,
)], )],
// drop pages to the broker if it already read them // drop pages to the broker if it already read them
keep_pages_forever, keep_pages_forever,
@ -762,7 +762,11 @@ where
/// # Safety /// # Safety
/// Only safe if you really really restart the page on everything connected /// Only safe if you really really restart the page on everything connected
pub unsafe fn reset(&mut self) { pub unsafe fn reset(&mut self) {
_llmp_page_init(&mut self.out_maps.last_mut().unwrap().shmem, self.id, true); _llmp_page_init(
&mut self.out_shmems.last_mut().unwrap().shmem,
self.id,
true,
);
self.last_msg_sent = ptr::null_mut(); self.last_msg_sent = ptr::null_mut();
} }
@ -785,11 +789,11 @@ where
env::set_var(&format!("{}_CLIENT_ID", env_name), &format!("{}", id)); env::set_var(&format!("{}_CLIENT_ID", env_name), &format!("{}", id));
} }
/// Reattach to a vacant `out_map`, to with a previous sender stored the information in an env before. /// Reattach to a vacant `out_shmem`, to with a previous sender stored the information in an env before.
#[cfg(feature = "std")] #[cfg(feature = "std")]
pub fn on_existing_from_env(mut shmem_provider: SP, env_name: &str) -> Result<Self, Error> { pub fn on_existing_from_env(mut shmem_provider: SP, env_name: &str) -> Result<Self, Error> {
let msg_sent_offset = msg_offset_from_env(env_name)?; let msg_sent_offset = msg_offset_from_env(env_name)?;
let mut ret = Self::on_existing_map( let mut ret = Self::on_existing_shmem(
shmem_provider.clone(), shmem_provider.clone(),
shmem_provider.existing_from_env(env_name)?, shmem_provider.existing_from_env(env_name)?,
msg_sent_offset, msg_sent_offset,
@ -802,10 +806,10 @@ where
/// A new client can reattach to it using [`LlmpSender::on_existing_from_env()`]. /// A new client can reattach to it using [`LlmpSender::on_existing_from_env()`].
#[cfg(feature = "std")] #[cfg(feature = "std")]
pub fn to_env(&self, env_name: &str) -> Result<(), Error> { pub fn to_env(&self, env_name: &str) -> Result<(), Error> {
let current_out_map = self.out_maps.last().unwrap(); let current_out_shmem = self.out_shmems.last().unwrap();
current_out_map.shmem.write_to_env(env_name)?; current_out_shmem.shmem.write_to_env(env_name)?;
Self::client_id_to_env(env_name, self.id); Self::client_id_to_env(env_name, self.id);
unsafe { current_out_map.msg_to_env(self.last_msg_sent, env_name) } unsafe { current_out_shmem.msg_to_env(self.last_msg_sent, env_name) }
} }
/// Waits for this sender to be save to unmap. /// Waits for this sender to be save to unmap.
@ -831,10 +835,10 @@ where
/// If we are allowed to unmap this client /// If we are allowed to unmap this client
pub fn safe_to_unmap(&self) -> bool { pub fn safe_to_unmap(&self) -> bool {
let current_out_map = self.out_maps.last().unwrap(); let current_out_shmem = self.out_shmems.last().unwrap();
unsafe { unsafe {
// println!("Reading safe_to_unmap from {:?}", current_out_map.page() as *const _); // println!("Reading safe_to_unmap from {:?}", current_out_shmem.page() as *const _);
(*current_out_map.page()) (*current_out_shmem.page())
.safe_to_unmap .safe_to_unmap
.load(Ordering::Relaxed) .load(Ordering::Relaxed)
!= 0 != 0
@ -845,29 +849,29 @@ where
/// # Safety /// # Safety
/// If this method is called, the page may be unmapped before it is read by any receiver. /// If this method is called, the page may be unmapped before it is read by any receiver.
pub unsafe fn mark_safe_to_unmap(&mut self) { pub unsafe fn mark_safe_to_unmap(&mut self) {
(*self.out_maps.last_mut().unwrap().page_mut()) (*self.out_shmems.last_mut().unwrap().page_mut())
.safe_to_unmap .safe_to_unmap
.store(1, Ordering::Relaxed); .store(1, Ordering::Relaxed);
} }
/// Reattach to a vacant `out_map`. /// Reattach to a vacant `out_shmem`.
/// It is essential, that the receiver (or someone else) keeps a pointer to this map /// It is essential, that the receiver (or someone else) keeps a pointer to this map
/// else reattach will get a new, empty page, from the OS, or fail. /// else reattach will get a new, empty page, from the OS, or fail.
pub fn on_existing_map( pub fn on_existing_shmem(
shmem_provider: SP, shmem_provider: SP,
current_out_map: SP::Mem, current_out_shmem: SP::ShMem,
last_msg_sent_offset: Option<u64>, last_msg_sent_offset: Option<u64>,
) -> Result<Self, Error> { ) -> Result<Self, Error> {
let mut out_map = LlmpSharedMap::existing(current_out_map); let mut out_shmem = LlmpSharedMap::existing(current_out_shmem);
let last_msg_sent = match last_msg_sent_offset { let last_msg_sent = match last_msg_sent_offset {
Some(offset) => out_map.msg_from_offset(offset)?, Some(offset) => out_shmem.msg_from_offset(offset)?,
None => ptr::null_mut(), None => ptr::null_mut(),
}; };
Ok(Self { Ok(Self {
id: 0, id: 0,
last_msg_sent, last_msg_sent,
out_maps: vec![out_map], out_shmems: vec![out_shmem],
// drop pages to the broker if it already read them // drop pages to the broker if it already read them
keep_pages_forever: false, keep_pages_forever: false,
has_unsent_message: false, has_unsent_message: false,
@ -881,7 +885,7 @@ where
unsafe fn prune_old_pages(&mut self) { unsafe fn prune_old_pages(&mut self) {
// Exclude the current page by splitting of the last element for this iter // Exclude the current page by splitting of the last element for this iter
let mut unmap_until_excl = 0; let mut unmap_until_excl = 0;
for map in self.out_maps.split_last_mut().unwrap().1 { for map in self.out_shmems.split_last_mut().unwrap().1 {
if (*map.page()).safe_to_unmap.load(Ordering::Relaxed) == 0 { if (*map.page()).safe_to_unmap.load(Ordering::Relaxed) == 0 {
// The broker didn't read this page yet, no more pages to unmap. // The broker didn't read this page yet, no more pages to unmap.
break; break;
@ -889,7 +893,7 @@ where
unmap_until_excl += 1; unmap_until_excl += 1;
} }
if unmap_until_excl == 0 && self.out_maps.len() > LLMP_CFG_MAX_PENDING_UNREAD_PAGES { if unmap_until_excl == 0 && self.out_shmems.len() > LLMP_CFG_MAX_PENDING_UNREAD_PAGES {
// We send one last information to the broker before quitting. // We send one last information to the broker before quitting.
self.send_buf(LLMP_SLOW_RECEIVER_PANIC, &[]).unwrap(); self.send_buf(LLMP_SLOW_RECEIVER_PANIC, &[]).unwrap();
panic!("The receiver/broker could not process our sent llmp messages in time. Either we're sending too many messages too fast, the broker got stuck, or it crashed. Giving up."); panic!("The receiver/broker could not process our sent llmp messages in time. Either we're sending too many messages too fast, the broker got stuck, or it crashed. Giving up.");
@ -897,7 +901,7 @@ where
// Remove all maps that the broker already mapped // Remove all maps that the broker already mapped
// simply removing them from the vec should then call drop and unmap them. // simply removing them from the vec should then call drop and unmap them.
self.out_maps.drain(0..unmap_until_excl); self.out_shmems.drain(0..unmap_until_excl);
} }
/// Intern: Special allocation function for `EOP` messages (and nothing else!) /// Intern: Special allocation function for `EOP` messages (and nothing else!)
@ -905,7 +909,7 @@ where
/// So if [`alloc_next`] fails, create new page if necessary, use this function, /// So if [`alloc_next`] fails, create new page if necessary, use this function,
/// place `EOP`, commit `EOP`, reset, alloc again on the new space. /// place `EOP`, commit `EOP`, reset, alloc again on the new space.
unsafe fn alloc_eop(&mut self) -> Result<*mut LlmpMsg, Error> { unsafe fn alloc_eop(&mut self) -> Result<*mut LlmpMsg, Error> {
let map = self.out_maps.last_mut().unwrap(); let map = self.out_shmems.last_mut().unwrap();
let page = map.page_mut(); let page = map.page_mut();
let last_msg = self.last_msg_sent; let last_msg = self.last_msg_sent;
assert!((*page).size_used + EOP_MSG_SIZE <= (*page).size_total, assert!((*page).size_used + EOP_MSG_SIZE <= (*page).size_total,
@ -940,7 +944,7 @@ where
/// Never call [`alloc_next`] without either sending or cancelling the last allocated message for this page! /// Never call [`alloc_next`] without either sending or cancelling the last allocated message for this page!
/// There can only ever be up to one message allocated per page at each given time. /// There can only ever be up to one message allocated per page at each given time.
unsafe fn alloc_next_if_space(&mut self, buf_len: usize) -> Option<*mut LlmpMsg> { unsafe fn alloc_next_if_space(&mut self, buf_len: usize) -> Option<*mut LlmpMsg> {
let map = self.out_maps.last_mut().unwrap(); let map = self.out_shmems.last_mut().unwrap();
let page = map.page_mut(); let page = map.page_mut();
let last_msg = self.last_msg_sent; let last_msg = self.last_msg_sent;
@ -1032,7 +1036,7 @@ where
if overwrite_client_id { if overwrite_client_id {
(*msg).sender = self.id; (*msg).sender = self.id;
} }
let page = self.out_maps.last_mut().unwrap().page_mut(); let page = self.out_shmems.last_mut().unwrap().page_mut();
if msg.is_null() || !llmp_msg_in_page(page, msg) { if msg.is_null() || !llmp_msg_in_page(page, msg) {
return Err(Error::Unknown(format!( return Err(Error::Unknown(format!(
"Llmp Message {:?} is null or not in current page", "Llmp Message {:?} is null or not in current page",
@ -1060,7 +1064,7 @@ where
let bt = Backtrace::new(); let bt = Backtrace::new();
#[cfg(not(debug_assertions))] #[cfg(not(debug_assertions))]
let bt = "<n/a (release)>"; let bt = "<n/a (release)>";
let shm = self.out_maps.last().unwrap(); let shm = self.out_shmems.last().unwrap();
println!( println!(
"LLMP_DEBUG: End of page reached for map {} with len {}, sending EOP, bt: {:?}", "LLMP_DEBUG: End of page reached for map {} with len {}, sending EOP, bt: {:?}",
shm.shmem.id(), shm.shmem.id(),
@ -1069,16 +1073,19 @@ where
); );
} }
let old_map = self.out_maps.last_mut().unwrap().page_mut(); let old_map = self.out_shmems.last_mut().unwrap().page_mut();
#[cfg(all(feature = "llmp_debug", feature = "std"))] #[cfg(all(feature = "llmp_debug", feature = "std"))]
println!("New Map Size {}", new_map_size((*old_map).max_alloc_size)); println!(
"Next ShMem Size {}",
next_shmem_size((*old_map).max_alloc_size)
);
// Create a new shard page. // Create a new shard page.
let mut new_map_shmem = LlmpSharedMap::new( let mut new_map_shmem = LlmpSharedMap::new(
(*old_map).sender, (*old_map).sender,
self.shmem_provider self.shmem_provider
.new_map(new_map_size((*old_map).max_alloc_size))?, .new_shmem(next_shmem_size((*old_map).max_alloc_size))?,
); );
let mut new_map = new_map_shmem.page_mut(); let mut new_map = new_map_shmem.page_mut();
@ -1109,7 +1116,7 @@ where
self.send(out, true)?; self.send(out, true)?;
// Set the new page as current page. // Set the new page as current page.
self.out_maps.push(new_map_shmem); self.out_shmems.push(new_map_shmem);
// We never sent a msg on the new buf */ // We never sent a msg on the new buf */
self.last_msg_sent = ptr::null_mut(); self.last_msg_sent = ptr::null_mut();
@ -1152,7 +1159,7 @@ where
pub unsafe fn cancel_send(&mut self, msg: *mut LlmpMsg) { pub unsafe fn cancel_send(&mut self, msg: *mut LlmpMsg) {
/* DBG("Client %d cancels send of msg at %p with tag 0x%X and size %ld", client->id, msg, msg->tag, /* DBG("Client %d cancels send of msg at %p with tag 0x%X and size %ld", client->id, msg, msg->tag,
* msg->buf_len_padded); */ * msg->buf_len_padded); */
let page = self.out_maps.last_mut().unwrap().page_mut(); let page = self.out_shmems.last_mut().unwrap().page_mut();
(*msg).tag = LLMP_TAG_UNSET; (*msg).tag = LLMP_TAG_UNSET;
(*page).size_used -= (*msg).buf_len_padded as usize + size_of::<LlmpMsg>(); (*page).size_used -= (*msg).buf_len_padded as usize + size_of::<LlmpMsg>();
} }
@ -1191,7 +1198,7 @@ where
(*msg).buf_len = shrinked_len as u64; (*msg).buf_len = shrinked_len as u64;
(*msg).buf_len_padded = buf_len_padded as u64; (*msg).buf_len_padded = buf_len_padded as u64;
let page = self.out_maps.last_mut().unwrap().page_mut(); let page = self.out_shmems.last_mut().unwrap().page_mut();
// Doing this step by step will catch underflows in debug builds :) // Doing this step by step will catch underflows in debug builds :)
(*page).size_used -= old_len_padded as usize; (*page).size_used -= old_len_padded as usize;
@ -1252,7 +1259,7 @@ where
/// Describe this [`LlmpClient`] in a way that it can be restored later, using [`Self::on_existing_from_description`]. /// Describe this [`LlmpClient`] in a way that it can be restored later, using [`Self::on_existing_from_description`].
pub fn describe(&self) -> Result<LlmpDescription, Error> { pub fn describe(&self) -> Result<LlmpDescription, Error> {
let map = self.out_maps.last().unwrap(); let map = self.out_shmems.last().unwrap();
let last_message_offset = if self.last_msg_sent.is_null() { let last_message_offset = if self.last_msg_sent.is_null() {
None None
} else { } else {
@ -1270,9 +1277,9 @@ where
mut shmem_provider: SP, mut shmem_provider: SP,
description: &LlmpDescription, description: &LlmpDescription,
) -> Result<Self, Error> { ) -> Result<Self, Error> {
Self::on_existing_map( Self::on_existing_shmem(
shmem_provider.clone(), shmem_provider.clone(),
shmem_provider.map_from_decription(description.shmem)?, shmem_provider.shmem_from_description(description.shmem)?,
description.last_message_offset, description.last_message_offset,
) )
} }
@ -1291,7 +1298,7 @@ where
/// The shmem provider /// The shmem provider
pub shmem_provider: SP, pub shmem_provider: SP,
/// current page. After EOP, this gets replaced with the new one /// current page. After EOP, this gets replaced with the new one
pub current_recv_map: LlmpSharedMap<SP::Mem>, pub current_recv_shmem: LlmpSharedMap<SP::ShMem>,
/// Caches the highest msg id we've seen so far /// Caches the highest msg id we've seen so far
highest_msg_id: u64, highest_msg_id: u64,
} }
@ -1301,10 +1308,10 @@ impl<SP> LlmpReceiver<SP>
where where
SP: ShMemProvider, SP: ShMemProvider,
{ {
/// Reattach to a vacant `recv_map`, to with a previous sender stored the information in an env before. /// Reattach to a vacant `recv_shmem`, to with a previous sender stored the information in an env before.
#[cfg(feature = "std")] #[cfg(feature = "std")]
pub fn on_existing_from_env(mut shmem_provider: SP, env_name: &str) -> Result<Self, Error> { pub fn on_existing_from_env(mut shmem_provider: SP, env_name: &str) -> Result<Self, Error> {
Self::on_existing_map( Self::on_existing_shmem(
shmem_provider.clone(), shmem_provider.clone(),
shmem_provider.existing_from_env(env_name)?, shmem_provider.existing_from_env(env_name)?,
msg_offset_from_env(env_name)?, msg_offset_from_env(env_name)?,
@ -1315,28 +1322,28 @@ where
/// A new client can reattach to it using [`LlmpReceiver::on_existing_from_env()`] /// A new client can reattach to it using [`LlmpReceiver::on_existing_from_env()`]
#[cfg(feature = "std")] #[cfg(feature = "std")]
pub fn to_env(&self, env_name: &str) -> Result<(), Error> { pub fn to_env(&self, env_name: &str) -> Result<(), Error> {
let current_out_map = &self.current_recv_map; let current_out_shmem = &self.current_recv_shmem;
current_out_map.shmem.write_to_env(env_name)?; current_out_shmem.shmem.write_to_env(env_name)?;
unsafe { current_out_map.msg_to_env(self.last_msg_recvd, env_name) } unsafe { current_out_shmem.msg_to_env(self.last_msg_recvd, env_name) }
} }
/// Create a Receiver, reattaching to an existing sender map. /// Create a Receiver, reattaching to an existing sender map.
/// It is essential, that the sender (or someone else) keeps a pointer to the `sender_map` /// It is essential, that the sender (or someone else) keeps a pointer to the `sender_shmem`
/// else reattach will get a new, empty page, from the OS, or fail. /// else reattach will get a new, empty page, from the OS, or fail.
pub fn on_existing_map( pub fn on_existing_shmem(
shmem_provider: SP, shmem_provider: SP,
current_sender_map: SP::Mem, current_sender_shmem: SP::ShMem,
last_msg_recvd_offset: Option<u64>, last_msg_recvd_offset: Option<u64>,
) -> Result<Self, Error> { ) -> Result<Self, Error> {
let mut current_recv_map = LlmpSharedMap::existing(current_sender_map); let mut current_recv_shmem = LlmpSharedMap::existing(current_sender_shmem);
let last_msg_recvd = match last_msg_recvd_offset { let last_msg_recvd = match last_msg_recvd_offset {
Some(offset) => current_recv_map.msg_from_offset(offset)?, Some(offset) => current_recv_shmem.msg_from_offset(offset)?,
None => ptr::null_mut(), None => ptr::null_mut(),
}; };
Ok(Self { Ok(Self {
id: 0, id: 0,
current_recv_map, current_recv_shmem,
last_msg_recvd, last_msg_recvd,
shmem_provider, shmem_provider,
highest_msg_id: 0, highest_msg_id: 0,
@ -1348,7 +1355,7 @@ where
#[inline(never)] #[inline(never)]
unsafe fn recv(&mut self) -> Result<Option<*mut LlmpMsg>, Error> { unsafe fn recv(&mut self) -> Result<Option<*mut LlmpMsg>, Error> {
/* DBG("recv %p %p\n", page, last_msg); */ /* DBG("recv %p %p\n", page, last_msg); */
let mut page = self.current_recv_map.page_mut(); let mut page = self.current_recv_shmem.page_mut();
let last_msg = self.last_msg_recvd; let last_msg = self.last_msg_recvd;
let (current_msg_id, loaded) = let (current_msg_id, loaded) =
@ -1380,7 +1387,7 @@ where
} }
// We don't know how big the msg wants to be, assert at least the header has space. // We don't know how big the msg wants to be, assert at least the header has space.
Some(llmp_next_msg_ptr_checked( Some(llmp_next_msg_ptr_checked(
&mut self.current_recv_map, &mut self.current_recv_shmem,
last_msg, last_msg,
size_of::<LlmpMsg>(), size_of::<LlmpMsg>(),
)?) )?)
@ -1388,7 +1395,7 @@ where
// Let's see what we got. // Let's see what we got.
if let Some(msg) = ret { if let Some(msg) = ret {
if !(*msg).in_map(&mut self.current_recv_map) { if !(*msg).in_shmem(&mut self.current_recv_shmem) {
return Err(Error::IllegalState("Unexpected message in map (out of map bounds) - bugy client or tampered shared map detedted!".into())); return Err(Error::IllegalState("Unexpected message in map (out of map bounds) - bugy client or tampered shared map detedted!".into()));
} }
// Handle special, LLMP internal, messages. // Handle special, LLMP internal, messages.
@ -1430,20 +1437,20 @@ where
(*page).safe_to_unmap.store(1, Ordering::Relaxed); (*page).safe_to_unmap.store(1, Ordering::Relaxed);
// Map the new page. The old one should be unmapped by Drop // Map the new page. The old one should be unmapped by Drop
self.current_recv_map = self.current_recv_shmem =
LlmpSharedMap::existing(self.shmem_provider.map_from_id_and_size( LlmpSharedMap::existing(self.shmem_provider.shmem_from_id_and_size(
ShMemId::from_slice(&pageinfo_cpy.shm_str), ShMemId::from_slice(&pageinfo_cpy.shm_str),
pageinfo_cpy.map_size, pageinfo_cpy.map_size,
)?); )?);
page = self.current_recv_map.page_mut(); page = self.current_recv_shmem.page_mut();
// Mark the new page save to unmap also (it's mapped by us, the broker now) // Mark the new page save to unmap also (it's mapped by us, the broker now)
(*page).safe_to_unmap.store(1, Ordering::Relaxed); (*page).safe_to_unmap.store(1, Ordering::Relaxed);
#[cfg(all(feature = "llmp_debug", feature = "std"))] #[cfg(all(feature = "llmp_debug", feature = "std"))]
println!( println!(
"LLMP_DEBUG: Got a new recv map {} with len {:?}", "LLMP_DEBUG: Got a new recv map {} with len {:?}",
self.current_recv_map.shmem.id(), self.current_recv_shmem.shmem.id(),
self.current_recv_map.shmem.len() self.current_recv_shmem.shmem.len()
); );
// After we mapped the new page, return the next message, if available // After we mapped the new page, return the next message, if available
return self.recv(); return self.recv();
@ -1463,7 +1470,7 @@ where
/// Returns a raw ptr, on the recv map. Should be safe in general /// Returns a raw ptr, on the recv map. Should be safe in general
pub unsafe fn recv_blocking(&mut self) -> Result<*mut LlmpMsg, Error> { pub unsafe fn recv_blocking(&mut self) -> Result<*mut LlmpMsg, Error> {
let mut current_msg_id = 0; let mut current_msg_id = 0;
let page = self.current_recv_map.page_mut(); let page = self.current_recv_shmem.page_mut();
let last_msg = self.last_msg_recvd; let last_msg = self.last_msg_recvd;
if !last_msg.is_null() { if !last_msg.is_null() {
assert!( assert!(
@ -1505,7 +1512,7 @@ where
(*msg).sender, (*msg).sender,
(*msg).tag, (*msg).tag,
(*msg).flags, (*msg).flags,
(*msg).as_slice(&mut self.current_recv_map)?, (*msg).as_slice(&mut self.current_recv_shmem)?,
)), )),
None => None, None => None,
}) })
@ -1520,14 +1527,14 @@ where
Ok(( Ok((
(*msg).sender, (*msg).sender,
(*msg).tag, (*msg).tag,
(*msg).as_slice(&mut self.current_recv_map)?, (*msg).as_slice(&mut self.current_recv_shmem)?,
)) ))
} }
} }
/// Describe this client in a way, that it can be restored later with [`Self::on_existing_from_description`] /// Describe this client in a way, that it can be restored later with [`Self::on_existing_from_description`]
pub fn describe(&self) -> Result<LlmpDescription, Error> { pub fn describe(&self) -> Result<LlmpDescription, Error> {
let map = &self.current_recv_map; let map = &self.current_recv_shmem;
let last_message_offset = if self.last_msg_recvd.is_null() { let last_message_offset = if self.last_msg_recvd.is_null() {
None None
} else { } else {
@ -1544,9 +1551,9 @@ where
mut shmem_provider: SP, mut shmem_provider: SP,
description: &LlmpDescription, description: &LlmpDescription,
) -> Result<Self, Error> { ) -> Result<Self, Error> {
Self::on_existing_map( Self::on_existing_shmem(
shmem_provider.clone(), shmem_provider.clone(),
shmem_provider.map_from_decription(description.shmem)?, shmem_provider.shmem_from_description(description.shmem)?,
description.last_message_offset, description.last_message_offset,
) )
} }
@ -1571,22 +1578,22 @@ where
SHM: ShMem, SHM: ShMem,
{ {
/// Creates a new page, initializing the passed shared mem struct /// Creates a new page, initializing the passed shared mem struct
pub fn new(sender: ClientId, mut new_map: SHM) -> Self { pub fn new(sender: ClientId, mut new_shmem: SHM) -> Self {
#[cfg(all(feature = "llmp_debug", feature = "std"))] #[cfg(all(feature = "llmp_debug", feature = "std"))]
println!( println!(
"LLMP_DEBUG: Initializing map on {} with size {}", "LLMP_DEBUG: Initializing map on {} with size {}",
new_map.id(), new_shmem.id(),
new_map.len() new_shmem.len()
); );
unsafe { unsafe {
_llmp_page_init(&mut new_map, sender, false); _llmp_page_init(&mut new_shmem, sender, false);
} }
Self { shmem: new_map } Self { shmem: new_shmem }
} }
/// Maps and wraps an existing /// Maps and wraps an existing
pub fn existing(existing_map: SHM) -> Self { pub fn existing(existing_shmem: SHM) -> Self {
#[cfg(all(feature = "llmp_debug", feature = "std"))] #[cfg(all(feature = "llmp_debug", feature = "std"))]
//{ //{
//#[cfg(debug_assertions)] //#[cfg(debug_assertions)]
@ -1595,14 +1602,14 @@ where
//let bt = "<n/a (release)>"; //let bt = "<n/a (release)>";
println!( println!(
"LLMP_DEBUG: Using existing map {} with size {}", "LLMP_DEBUG: Using existing map {} with size {}",
existing_map.id(), existing_shmem.id(),
existing_map.len(), existing_shmem.len(),
//bt //bt
); );
//} //}
let ret = Self { let ret = Self {
shmem: existing_map, shmem: existing_shmem,
}; };
unsafe { unsafe {
assert!( assert!(
@ -1692,7 +1699,7 @@ where
let offset = offset as usize; let offset = offset as usize;
unsafe { unsafe {
let page = self.page_mut(); let page = self.page_mut();
let page_size = self.shmem.map().len() - size_of::<LlmpPage>(); let page_size = self.shmem.as_slice().len() - size_of::<LlmpPage>();
if offset > page_size { if offset > page_size {
Err(Error::IllegalArgument(format!( Err(Error::IllegalArgument(format!(
"Msg offset out of bounds (size: {}, requested offset: {})", "Msg offset out of bounds (size: {}, requested offset: {})",
@ -1753,9 +1760,9 @@ where
llmp_out: LlmpSender { llmp_out: LlmpSender {
id: 0, id: 0,
last_msg_sent: ptr::null_mut(), last_msg_sent: ptr::null_mut(),
out_maps: vec![LlmpSharedMap::new( out_shmems: vec![LlmpSharedMap::new(
0, 0,
shmem_provider.new_map(new_map_size(0))?, shmem_provider.new_shmem(next_shmem_size(0))?,
)], )],
// Broker never cleans up the pages so that new // Broker never cleans up the pages so that new
// clients may join at any time // clients may join at any time
@ -1787,15 +1794,15 @@ where
} }
/// Registers a new client for the given sharedmap str and size. /// Registers a new client for the given sharedmap str and size.
/// Returns the id of the new client in [`broker.client_map`] /// Returns the id of the new client in [`broker.client_shmem`]
pub fn register_client(&mut self, mut client_page: LlmpSharedMap<SP::Mem>) { pub fn register_client(&mut self, mut client_page: LlmpSharedMap<SP::ShMem>) {
// Tell the client it may unmap this page now. // Tell the client it may unmap this page now.
client_page.mark_safe_to_unmap(); client_page.mark_safe_to_unmap();
let id = self.llmp_clients.len() as u32; let id = self.llmp_clients.len() as u32;
self.llmp_clients.push(LlmpReceiver { self.llmp_clients.push(LlmpReceiver {
id, id,
current_recv_map: client_page, current_recv_shmem: client_page,
last_msg_recvd: ptr::null_mut(), last_msg_recvd: ptr::null_mut(),
shmem_provider: self.shmem_provider.clone(), shmem_provider: self.shmem_provider.clone(),
highest_msg_id: 0, highest_msg_id: 0,
@ -1815,7 +1822,7 @@ where
match (&recv_tcp_msg(&mut stream)?).try_into()? { match (&recv_tcp_msg(&mut stream)?).try_into()? {
TcpResponse::BrokerConnectHello { TcpResponse::BrokerConnectHello {
broker_map_description: _, broker_shmem_description: _,
hostname, hostname,
} => println!("B2B: Connected to {}", hostname), } => println!("B2B: Connected to {}", hostname),
_ => { _ => {
@ -1851,14 +1858,22 @@ where
let map_description = Self::b2b_thread_on( let map_description = Self::b2b_thread_on(
stream, stream,
self.llmp_clients.len() as ClientId, self.llmp_clients.len() as ClientId,
&self.llmp_out.out_maps.first().unwrap().shmem.description(), &self
.llmp_out
.out_shmems
.first()
.unwrap()
.shmem
.description(),
)?; )?;
let new_map = let new_shmem = LlmpSharedMap::existing(
LlmpSharedMap::existing(self.shmem_provider.map_from_decription(map_description)?); self.shmem_provider
.shmem_from_description(map_description)?,
);
{ {
self.register_client(new_map); self.register_client(new_shmem);
} }
Ok(()) Ok(())
@ -1998,9 +2013,9 @@ where
fn b2b_thread_on( fn b2b_thread_on(
mut stream: TcpStream, mut stream: TcpStream,
b2b_client_id: ClientId, b2b_client_id: ClientId,
broker_map_description: &ShMemDescription, broker_shmem_description: &ShMemDescription,
) -> Result<ShMemDescription, Error> { ) -> Result<ShMemDescription, Error> {
let broker_map_description = *broker_map_description; let broker_shmem_description = *broker_shmem_description;
// A channel to get the new "client's" sharedmap id from // A channel to get the new "client's" sharedmap id from
let (send, recv) = channel(); let (send, recv) = channel();
@ -2026,7 +2041,7 @@ where
} }
}; };
send.send(new_sender.out_maps.first().unwrap().shmem.description()) send.send(new_sender.out_shmems.first().unwrap().shmem.description())
.expect("B2B: Error sending map description to channel!"); .expect("B2B: Error sending map description to channel!");
// the receiver receives from the local broker, and forwards it to the tcp stream. // the receiver receives from the local broker, and forwards it to the tcp stream.
@ -2034,7 +2049,7 @@ where
shmem_provider_bg, shmem_provider_bg,
&LlmpDescription { &LlmpDescription {
last_message_offset: None, last_message_offset: None,
shmem: broker_map_description, shmem: broker_shmem_description,
}, },
) )
.expect("Failed to map local page in broker 2 broker thread!"); .expect("Failed to map local page in broker 2 broker thread!");
@ -2120,7 +2135,7 @@ where
request: &TcpRequest, request: &TcpRequest,
current_client_id: &mut u32, current_client_id: &mut u32,
sender: &mut LlmpSender<SP>, sender: &mut LlmpSender<SP>,
broker_map_description: &ShMemDescription, broker_shmem_description: &ShMemDescription,
) { ) {
match request { match request {
TcpRequest::LocalClientHello { shmem_description } => { TcpRequest::LocalClientHello { shmem_description } => {
@ -2156,7 +2171,7 @@ where
} }
if let Ok(shmem_description) = if let Ok(shmem_description) =
Self::b2b_thread_on(stream, *current_client_id, broker_map_description) Self::b2b_thread_on(stream, *current_client_id, broker_shmem_description)
{ {
if Self::announce_new_client(sender, &shmem_description).is_err() { if Self::announce_new_client(sender, &shmem_description).is_err() {
println!("B2B: Error announcing client {:?}", shmem_description); println!("B2B: Error announcing client {:?}", shmem_description);
@ -2175,26 +2190,26 @@ where
// However, the original map is (as of now) never freed, new clients will start // However, the original map is (as of now) never freed, new clients will start
// to read from the initial map id. // to read from the initial map id.
let client_out_map_mem = &self.llmp_out.out_maps.first().unwrap().shmem; let client_out_shmem_mem = &self.llmp_out.out_shmems.first().unwrap().shmem;
let broker_map_description = client_out_map_mem.description(); let broker_shmem_description = client_out_shmem_mem.description();
let hostname = hostname::get() let hostname = hostname::get()
.unwrap_or_else(|_| "<unknown>".into()) .unwrap_or_else(|_| "<unknown>".into())
.to_string_lossy() .to_string_lossy()
.into(); .into();
let broker_hello = TcpResponse::BrokerConnectHello { let broker_hello = TcpResponse::BrokerConnectHello {
broker_map_description, broker_shmem_description: broker_shmem_description,
hostname, hostname,
}; };
let llmp_tcp_id = self.llmp_clients.len() as ClientId; let llmp_tcp_id = self.llmp_clients.len() as ClientId;
// Tcp out map sends messages from background thread tcp server to foreground client // Tcp out map sends messages from background thread tcp server to foreground client
let tcp_out_map = LlmpSharedMap::new( let tcp_out_shmem = LlmpSharedMap::new(
llmp_tcp_id, llmp_tcp_id,
self.shmem_provider.new_map(LLMP_CFG_INITIAL_MAP_SIZE)?, self.shmem_provider.new_shmem(LLMP_CFG_INITIAL_MAP_SIZE)?,
); );
let tcp_out_map_description = tcp_out_map.shmem.description(); let tcp_out_shmem_description = tcp_out_shmem.shmem.description();
self.register_client(tcp_out_map); self.register_client(tcp_out_shmem);
let ret = thread::spawn(move || { let ret = thread::spawn(move || {
// Create a new ShMemProvider for this background thread. // Create a new ShMemProvider for this background thread.
@ -2205,9 +2220,9 @@ where
let mut tcp_incoming_sender = LlmpSender { let mut tcp_incoming_sender = LlmpSender {
id: llmp_tcp_id, id: llmp_tcp_id,
last_msg_sent: ptr::null_mut(), last_msg_sent: ptr::null_mut(),
out_maps: vec![LlmpSharedMap::existing( out_shmems: vec![LlmpSharedMap::existing(
shmem_provider_bg shmem_provider_bg
.map_from_decription(tcp_out_map_description) .shmem_from_description(tcp_out_shmem_description)
.unwrap(), .unwrap(),
)], )],
// drop pages to the broker, if it already read them. // drop pages to the broker, if it already read them.
@ -2255,7 +2270,7 @@ where
&req, &req,
&mut current_client_id, &mut current_client_id,
&mut tcp_incoming_sender, &mut tcp_incoming_sender,
&broker_map_description, &broker_shmem_description,
); );
} }
ListenerStream::Empty() => { ListenerStream::Empty() => {
@ -2313,18 +2328,18 @@ where
} else { } else {
let pageinfo = (*msg).buf.as_mut_ptr() as *mut LlmpPayloadSharedMapInfo; let pageinfo = (*msg).buf.as_mut_ptr() as *mut LlmpPayloadSharedMapInfo;
match self.shmem_provider.map_from_id_and_size( match self.shmem_provider.shmem_from_id_and_size(
ShMemId::from_slice(&(*pageinfo).shm_str), ShMemId::from_slice(&(*pageinfo).shm_str),
(*pageinfo).map_size, (*pageinfo).map_size,
) { ) {
Ok(new_map) => { Ok(new_shmem) => {
let mut new_page = LlmpSharedMap::existing(new_map); let mut new_page = LlmpSharedMap::existing(new_shmem);
let id = next_id; let id = next_id;
next_id += 1; next_id += 1;
new_page.mark_safe_to_unmap(); new_page.mark_safe_to_unmap();
self.llmp_clients.push(LlmpReceiver { self.llmp_clients.push(LlmpReceiver {
id, id,
current_recv_map: new_page, current_recv_shmem: new_page,
last_msg_recvd: ptr::null_mut(), last_msg_recvd: ptr::null_mut(),
shmem_provider: self.shmem_provider.clone(), shmem_provider: self.shmem_provider.clone(),
highest_msg_id: 0, highest_msg_id: 0,
@ -2347,7 +2362,7 @@ where
// The message is not specifically for use. Let the user handle it, then forward it to the clients, if necessary. // The message is not specifically for use. Let the user handle it, then forward it to the clients, if necessary.
let mut should_forward_msg = true; let mut should_forward_msg = true;
let map = &mut self.llmp_clients[client_id as usize].current_recv_map; let map = &mut self.llmp_clients[client_id as usize].current_recv_shmem;
let msg_buf = (*msg).as_slice(map)?; let msg_buf = (*msg).as_slice(map)?;
if let LlmpMsgHookResult::Handled = if let LlmpMsgHookResult::Handled =
(on_new_msg)(client_id, (*msg).tag, (*msg).flags, msg_buf)? (on_new_msg)(client_id, (*msg).tag, (*msg).flags, msg_buf)?
@ -2391,25 +2406,25 @@ where
SP: ShMemProvider, SP: ShMemProvider,
{ {
/// Reattach to a vacant client map. /// Reattach to a vacant client map.
/// It is essential, that the broker (or someone else) kept a pointer to the `out_map` /// It is essential, that the broker (or someone else) kept a pointer to the `out_shmem`
/// else reattach will get a new, empty page, from the OS, or fail /// else reattach will get a new, empty page, from the OS, or fail
#[allow(clippy::needless_pass_by_value)] #[allow(clippy::needless_pass_by_value)]
pub fn on_existing_map( pub fn on_existing_shmem(
shmem_provider: SP, shmem_provider: SP,
_current_out_map: SP::Mem, _current_out_shmem: SP::ShMem,
_last_msg_sent_offset: Option<u64>, _last_msg_sent_offset: Option<u64>,
current_broker_map: SP::Mem, current_broker_shmem: SP::ShMem,
last_msg_recvd_offset: Option<u64>, last_msg_recvd_offset: Option<u64>,
) -> Result<Self, Error> { ) -> Result<Self, Error> {
Ok(Self { Ok(Self {
receiver: LlmpReceiver::on_existing_map( receiver: LlmpReceiver::on_existing_shmem(
shmem_provider.clone(), shmem_provider.clone(),
current_broker_map.clone(), current_broker_shmem.clone(),
last_msg_recvd_offset, last_msg_recvd_offset,
)?, )?,
sender: LlmpSender::on_existing_map( sender: LlmpSender::on_existing_shmem(
shmem_provider, shmem_provider,
current_broker_map, current_broker_shmem,
last_msg_recvd_offset, last_msg_recvd_offset,
)?, )?,
}) })
@ -2431,7 +2446,7 @@ where
} }
/// Write the current state to env. /// Write the current state to env.
/// A new client can attach to exactly the same state by calling [`LlmpClient::on_existing_map()`]. /// A new client can attach to exactly the same state by calling [`LlmpClient::on_existing_shmem()`].
#[cfg(feature = "std")] #[cfg(feature = "std")]
pub fn to_env(&self, env_name: &str) -> Result<(), Error> { pub fn to_env(&self, env_name: &str) -> Result<(), Error> {
self.sender.to_env(&format!("{}_SENDER", env_name))?; self.sender.to_env(&format!("{}_SENDER", env_name))?;
@ -2487,14 +2502,14 @@ where
/// Creates a new [`LlmpClient`] /// Creates a new [`LlmpClient`]
pub fn new( pub fn new(
mut shmem_provider: SP, mut shmem_provider: SP,
initial_broker_map: LlmpSharedMap<SP::Mem>, initial_broker_shmem: LlmpSharedMap<SP::ShMem>,
) -> Result<Self, Error> { ) -> Result<Self, Error> {
Ok(Self { Ok(Self {
sender: LlmpSender { sender: LlmpSender {
id: 0, id: 0,
last_msg_sent: ptr::null_mut(), last_msg_sent: ptr::null_mut(),
out_maps: vec![LlmpSharedMap::new(0, { out_shmems: vec![LlmpSharedMap::new(0, {
shmem_provider.new_map(LLMP_CFG_INITIAL_MAP_SIZE)? shmem_provider.new_shmem(LLMP_CFG_INITIAL_MAP_SIZE)?
})], })],
// drop pages to the broker if it already read them // drop pages to the broker if it already read them
keep_pages_forever: false, keep_pages_forever: false,
@ -2504,7 +2519,7 @@ where
receiver: LlmpReceiver { receiver: LlmpReceiver {
id: 0, id: 0,
current_recv_map: initial_broker_map, current_recv_shmem: initial_broker_shmem,
last_msg_recvd: ptr::null_mut(), last_msg_recvd: ptr::null_mut(),
shmem_provider, shmem_provider,
highest_msg_id: 0, highest_msg_id: 0,
@ -2626,24 +2641,25 @@ where
}; };
println!("Connected to port {}", port); println!("Connected to port {}", port);
let broker_map_description = if let TcpResponse::BrokerConnectHello { let broker_shmem_description = if let TcpResponse::BrokerConnectHello {
broker_map_description, broker_shmem_description,
hostname: _, hostname: _,
} = (&recv_tcp_msg(&mut stream)?).try_into()? } = (&recv_tcp_msg(&mut stream)?).try_into()?
{ {
broker_map_description broker_shmem_description
} else { } else {
return Err(Error::IllegalState( return Err(Error::IllegalState(
"Received unexpected Broker Hello".to_string(), "Received unexpected Broker Hello".to_string(),
)); ));
}; };
let map = let map = LlmpSharedMap::existing(
LlmpSharedMap::existing(shmem_provider.map_from_decription(broker_map_description)?); shmem_provider.shmem_from_description(broker_shmem_description)?,
);
let mut ret = Self::new(shmem_provider, map)?; let mut ret = Self::new(shmem_provider, map)?;
let client_hello_req = TcpRequest::LocalClientHello { let client_hello_req = TcpRequest::LocalClientHello {
shmem_description: ret.sender.out_maps.first().unwrap().shmem.description(), shmem_description: ret.sender.out_shmems.first().unwrap().shmem.description(),
}; };
send_tcp_msg(&mut stream, &client_hello_req)?; send_tcp_msg(&mut stream, &client_hello_req)?;
@ -2663,7 +2679,7 @@ where
ret.sender.id = client_id; ret.sender.id = client_id;
// Also set the sender on our initial llmp map correctly. // Also set the sender on our initial llmp map correctly.
unsafe { unsafe {
(*ret.sender.out_maps.first_mut().unwrap().page_mut()).sender = client_id; (*ret.sender.out_shmems.first_mut().unwrap().page_mut()).sender = client_id;
} }
Ok(ret) Ok(ret)

View File

@ -89,12 +89,12 @@ where
self.inner.len() self.inner.len()
} }
fn map(&self) -> &[u8] { fn as_slice(&self) -> &[u8] {
self.inner.map() self.inner.as_slice()
} }
fn map_mut(&mut self) -> &mut [u8] { fn as_mut_slice(&mut self) -> &mut [u8] {
self.inner.map_mut() self.inner.as_mut_slice()
} }
} }
@ -154,7 +154,7 @@ impl<SP> ShMemProvider for ServedShMemProvider<SP>
where where
SP: ShMemProvider, SP: ShMemProvider,
{ {
type Mem = ServedShMem<SP::Mem>; type ShMem = ServedShMem<SP::ShMem>;
/// Connect to the server and return a new [`ServedShMemProvider`] /// Connect to the server and return a new [`ServedShMemProvider`]
/// Will try to spawn a [`ShMemService`]. This will only work for the first try. /// Will try to spawn a [`ShMemService`]. This will only work for the first try.
@ -172,21 +172,19 @@ where
res.id = id; res.id = id;
Ok(res) Ok(res)
} }
fn new_map(&mut self, map_size: usize) -> Result<Self::Mem, Error> { fn new_shmem(&mut self, map_size: usize) -> Result<Self::ShMem, Error> {
let (server_fd, client_fd) = self.send_receive(ServedShMemRequest::NewMap(map_size))?; let (server_fd, client_fd) = self.send_receive(ServedShMemRequest::NewMap(map_size))?;
Ok(ServedShMem { Ok(ServedShMem {
inner: ManuallyDrop::new( inner: ManuallyDrop::new(self.inner.shmem_from_id_and_size(
self.inner.map_from_id_and_size(
ShMemId::from_string(&format!("{}", client_fd)), ShMemId::from_string(&format!("{}", client_fd)),
map_size, map_size,
)?, )?),
),
server_fd, server_fd,
}) })
} }
fn map_from_id_and_size(&mut self, id: ShMemId, size: usize) -> Result<Self::Mem, Error> { fn shmem_from_id_and_size(&mut self, id: ShMemId, size: usize) -> Result<Self::ShMem, Error> {
let parts = id.as_str().split(':').collect::<Vec<&str>>(); let parts = id.as_str().split(':').collect::<Vec<&str>>();
let server_id_str = parts.get(0).unwrap(); let server_id_str = parts.get(0).unwrap();
let (server_fd, client_fd) = self.send_receive(ServedShMemRequest::ExistingMap( let (server_fd, client_fd) = self.send_receive(ServedShMemRequest::ExistingMap(
@ -194,8 +192,10 @@ where
))?; ))?;
Ok(ServedShMem { Ok(ServedShMem {
inner: ManuallyDrop::new( inner: ManuallyDrop::new(
self.inner self.inner.shmem_from_id_and_size(
.map_from_id_and_size(ShMemId::from_string(&format!("{}", client_fd)), size)?, ShMemId::from_string(&format!("{}", client_fd)),
size,
)?,
), ),
server_fd, server_fd,
}) })
@ -224,7 +224,7 @@ where
Ok(()) Ok(())
} }
fn release_map(&mut self, map: &mut Self::Mem) { fn release_shmem(&mut self, map: &mut Self::ShMem) {
let (refcount, _) = self let (refcount, _) = self
.send_receive(ServedShMemRequest::Deregister(map.server_fd)) .send_receive(ServedShMemRequest::Deregister(map.server_fd))
.expect("Could not communicate with ServedShMem server!"); .expect("Could not communicate with ServedShMem server!");
@ -284,7 +284,7 @@ enum ServedShMemResponse<SP>
where where
SP: ShMemProvider, SP: ShMemProvider,
{ {
Mapping(Rc<RefCell<SP::Mem>>), Mapping(Rc<RefCell<SP::ShMem>>),
Id(i32), Id(i32),
RefCount(u32), RefCount(u32),
} }
@ -446,10 +446,10 @@ where
SP: ShMemProvider, SP: ShMemProvider,
{ {
provider: SP, provider: SP,
clients: HashMap<RawFd, SharedShMemClient<SP::Mem>>, clients: HashMap<RawFd, SharedShMemClient<SP::ShMem>>,
/// Maps from a pre-fork (parent) client id to its cloned maps. /// Maps from a pre-fork (parent) client id to its cloned maps.
forking_clients: HashMap<RawFd, HashMap<i32, Vec<Rc<RefCell<SP::Mem>>>>>, forking_clients: HashMap<RawFd, HashMap<i32, Vec<Rc<RefCell<SP::ShMem>>>>>,
all_maps: HashMap<i32, Weak<RefCell<SP::Mem>>>, all_shmems: HashMap<i32, Weak<RefCell<SP::ShMem>>>,
} }
impl<SP> ServedShMemServiceWorker<SP> impl<SP> ServedShMemServiceWorker<SP>
@ -461,13 +461,13 @@ where
Ok(Self { Ok(Self {
provider: SP::new()?, provider: SP::new()?,
clients: HashMap::new(), clients: HashMap::new(),
all_maps: HashMap::new(), all_shmems: HashMap::new(),
forking_clients: HashMap::new(), forking_clients: HashMap::new(),
}) })
} }
fn upgrade_map_with_id(&mut self, description_id: i32) -> Rc<RefCell<SP::Mem>> { fn upgrade_shmem_with_id(&mut self, description_id: i32) -> Rc<RefCell<SP::ShMem>> {
self.all_maps self.all_shmems
.get_mut(&description_id) .get_mut(&description_id)
.unwrap() .unwrap()
.clone() .clone()
@ -496,11 +496,11 @@ where
/* /*
// remove temporarily // remove temporarily
let client = self.clients.remove(&client_id); let client = self.clients.remove(&client_id);
let mut forking_maps = HashMap::new(); let mut forking_shmems = HashMap::new();
for (id, map) in client.as_ref().unwrap().maps.iter() { for (id, map) in client.as_ref().unwrap().maps.iter() {
forking_maps.insert(*id, map.clone()); forking_shmems.insert(*id, map.clone());
} }
self.forking_clients.insert(client_id, forking_maps); self.forking_clients.insert(client_id, forking_shmems);
self.clients.insert(client_id, client.unwrap()); self.clients.insert(client_id, client.unwrap());
*/ */
@ -512,10 +512,10 @@ where
Ok(ServedShMemResponse::Id(client_id)) Ok(ServedShMemResponse::Id(client_id))
} }
ServedShMemRequest::NewMap(map_size) => { ServedShMemRequest::NewMap(map_size) => {
let new_map = self.provider.new_map(map_size)?; let new_shmem = self.provider.new_shmem(map_size)?;
let description = new_map.description(); let description = new_shmem.description();
let new_rc = Rc::new(RefCell::new(new_map)); let new_rc = Rc::new(RefCell::new(new_shmem));
self.all_maps self.all_shmems
.insert(description.id.into(), Rc::downgrade(&new_rc)); .insert(description.id.into(), Rc::downgrade(&new_rc));
Ok(ServedShMemResponse::Mapping(new_rc)) Ok(ServedShMemResponse::Mapping(new_rc))
} }
@ -536,12 +536,12 @@ where
{ {
map.clone() map.clone()
} else { } else {
self.upgrade_map_with_id(description_id) self.upgrade_shmem_with_id(description_id)
}, },
)) ))
} else { } else {
Ok(ServedShMemResponse::Mapping( Ok(ServedShMemResponse::Mapping(
self.upgrade_map_with_id(description_id), self.upgrade_shmem_with_id(description_id),
)) ))
} }
} }

View File

@ -24,39 +24,28 @@ pub use unix_shmem::{MmapShMem, MmapShMemProvider};
#[cfg(all(feature = "std", unix))] #[cfg(all(feature = "std", unix))]
pub use unix_shmem::{UnixShMem, UnixShMemProvider}; pub use unix_shmem::{UnixShMem, UnixShMemProvider};
#[cfg(all(windows, feature = "std"))]
pub use win32_shmem::{Win32ShMem, Win32ShMemProvider};
#[cfg(all(feature = "std", unix))] #[cfg(all(feature = "std", unix))]
pub use crate::bolts::os::unix_shmem_server::{ServedShMemProvider, ShMemService}; pub use crate::bolts::os::unix_shmem_server::{ServedShMemProvider, ShMemService};
#[cfg(all(windows, feature = "std"))]
pub use win32_shmem::{Win32ShMem, Win32ShMemProvider};
/// The standard sharedmem provider /// The standard sharedmem provider
#[cfg(all(windows, feature = "std"))] #[cfg(all(windows, feature = "std"))]
pub type StdShMemProvider = Win32ShMemProvider; pub type StdShMemProvider = Win32ShMemProvider;
/// The standard sharedmem type
#[cfg(all(windows, feature = "std"))]
pub type StdShMem = Win32ShMem;
/// The standard sharedmem provider /// The standard sharedmem provider
#[cfg(all(target_os = "android", feature = "std"))] #[cfg(all(target_os = "android", feature = "std"))]
pub type StdShMemProvider = pub type StdShMemProvider =
RcShMemProvider<ServedShMemProvider<unix_shmem::ashmem::AshmemShMemProvider>>; RcShMemProvider<ServedShMemProvider<unix_shmem::ashmem::AshmemShMemProvider>>;
/// The standard sharedmem type
#[cfg(all(target_os = "android", feature = "std"))]
pub type StdShMem = RcShMem<ServedShMemProvider<unix_shmem::ashmem::AshmemShMemProvider>>;
/// The standard sharedmem service /// The standard sharedmem service
#[cfg(all(target_os = "android", feature = "std"))] #[cfg(all(target_os = "android", feature = "std"))]
pub type StdShMemService = ShMemService<unix_shmem::ashmem::AshmemShMemProvider>; pub type StdShMemService = ShMemService<unix_shmem::ashmem::AshmemShMemProvider>;
/// The standard sharedmem provider /// The standard sharedmem provider
#[cfg(all(feature = "std", target_vendor = "apple"))] #[cfg(all(feature = "std", target_vendor = "apple"))]
pub type StdShMemProvider = RcShMemProvider<ServedShMemProvider<MmapShMemProvider>>; pub type StdShMemProvider = RcShMemProvider<ServedShMemProvider<MmapShMemProvider>>;
/// The standard sharedmem type
#[cfg(all(feature = "std", target_vendor = "apple"))]
pub type StdShMem = RcShMem<ServedShMemProvider<MmapShMemProvider>>;
#[cfg(all(feature = "std", target_vendor = "apple"))] #[cfg(all(feature = "std", target_vendor = "apple"))]
/// The standard sharedmem service /// The standard sharedmem service
pub type StdShMemService = ShMemService<MmapShMemProvider>; pub type StdShMemService = ShMemService<MmapShMemProvider>;
/// The default [`ShMemProvider`] for this os. /// The default [`ShMemProvider`] for this os.
#[cfg(all( #[cfg(all(
feature = "std", feature = "std",
@ -64,14 +53,6 @@ pub type StdShMemService = ShMemService<MmapShMemProvider>;
not(any(target_os = "android", target_vendor = "apple")) not(any(target_os = "android", target_vendor = "apple"))
))] ))]
pub type StdShMemProvider = UnixShMemProvider; pub type StdShMemProvider = UnixShMemProvider;
/// The default [`ShMemProvider`] for this os.
#[cfg(all(
feature = "std",
unix,
not(any(target_os = "android", target_vendor = "apple"))
))]
pub type StdShMem = UnixShMem;
/// The standard sharedmem service /// The standard sharedmem service
#[cfg(any( #[cfg(any(
not(any(target_os = "android", target_vendor = "apple")), not(any(target_os = "android", target_vendor = "apple")),
@ -185,10 +166,10 @@ pub trait ShMem: Sized + Debug + Clone {
} }
/// The actual shared map, in memory /// The actual shared map, in memory
fn map(&self) -> &[u8]; fn as_slice(&self) -> &[u8];
/// The actual shared map, mutable /// The actual shared map, mutable
fn map_mut(&mut self) -> &mut [u8]; fn as_mut_slice(&mut self) -> &mut [u8];
/// Write this map's config to env /// Write this map's config to env
#[cfg(feature = "std")] #[cfg(feature = "std")]
@ -206,33 +187,36 @@ pub trait ShMem: Sized + Debug + Clone {
/// All you need for scaling on a new target is to implement this interface, as well as the respective [`ShMem`]. /// All you need for scaling on a new target is to implement this interface, as well as the respective [`ShMem`].
pub trait ShMemProvider: Clone + Default + Debug { pub trait ShMemProvider: Clone + Default + Debug {
/// The actual shared map handed out by this [`ShMemProvider`]. /// The actual shared map handed out by this [`ShMemProvider`].
type Mem: ShMem; type ShMem: ShMem;
/// Create a new instance of the provider /// Create a new instance of the provider
fn new() -> Result<Self, Error>; fn new() -> Result<Self, Error>;
/// Create a new shared memory mapping /// Create a new shared memory mapping
fn new_map(&mut self, map_size: usize) -> Result<Self::Mem, Error>; fn new_shmem(&mut self, map_size: usize) -> Result<Self::ShMem, Error>;
/// Get a mapping given its id and size /// Get a mapping given its id and size
fn map_from_id_and_size(&mut self, id: ShMemId, size: usize) -> Result<Self::Mem, Error>; fn shmem_from_id_and_size(&mut self, id: ShMemId, size: usize) -> Result<Self::ShMem, Error>;
/// Get a mapping given a description /// Get a mapping given a description
fn map_from_decription(&mut self, description: ShMemDescription) -> Result<Self::Mem, Error> { fn shmem_from_description(
self.map_from_id_and_size(description.id, description.size) &mut self,
description: ShMemDescription,
) -> Result<Self::ShMem, Error> {
self.shmem_from_id_and_size(description.id, description.size)
} }
/// Create a new sharedmap reference from an existing `id` and `len` /// Create a new sharedmap reference from an existing `id` and `len`
fn clone_ref(&mut self, mapping: &Self::Mem) -> Result<Self::Mem, Error> { fn clone_ref(&mut self, mapping: &Self::ShMem) -> Result<Self::ShMem, Error> {
self.map_from_id_and_size(mapping.id(), mapping.len()) self.shmem_from_id_and_size(mapping.id(), mapping.len())
} }
/// Reads an existing map config from env vars, then maps it /// Reads an existing map config from env vars, then maps it
#[cfg(feature = "std")] #[cfg(feature = "std")]
fn existing_from_env(&mut self, env_name: &str) -> Result<Self::Mem, Error> { fn existing_from_env(&mut self, env_name: &str) -> Result<Self::ShMem, Error> {
let map_shm_str = env::var(env_name)?; let map_shm_str = env::var(env_name)?;
let map_size = str::parse::<usize>(&env::var(format!("{}_SIZE", env_name))?)?; let map_size = str::parse::<usize>(&env::var(format!("{}_SIZE", env_name))?)?;
self.map_from_decription(ShMemDescription::from_string_and_size( self.shmem_from_description(ShMemDescription::from_string_and_size(
&map_shm_str, &map_shm_str,
map_size, map_size,
)) ))
@ -255,7 +239,7 @@ pub trait ShMemProvider: Clone + Default + Debug {
} }
/// Release the resources associated with the given [`ShMem`] /// Release the resources associated with the given [`ShMem`]
fn release_map(&mut self, _map: &mut Self::Mem) { fn release_shmem(&mut self, _shmem: &mut Self::ShMem) {
// do nothing // do nothing
} }
} }
@ -265,7 +249,7 @@ pub trait ShMemProvider: Clone + Default + Debug {
/// Useful if the `ShMemProvider` needs to keep local state. /// Useful if the `ShMemProvider` needs to keep local state.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct RcShMem<T: ShMemProvider> { pub struct RcShMem<T: ShMemProvider> {
internal: ManuallyDrop<T::Mem>, internal: ManuallyDrop<T::ShMem>,
provider: Rc<RefCell<T>>, provider: Rc<RefCell<T>>,
} }
@ -281,18 +265,18 @@ where
self.internal.len() self.internal.len()
} }
fn map(&self) -> &[u8] { fn as_slice(&self) -> &[u8] {
self.internal.map() self.internal.as_slice()
} }
fn map_mut(&mut self) -> &mut [u8] { fn as_mut_slice(&mut self) -> &mut [u8] {
self.internal.map_mut() self.internal.as_mut_slice()
} }
} }
impl<T: ShMemProvider> Drop for RcShMem<T> { impl<T: ShMemProvider> Drop for RcShMem<T> {
fn drop(&mut self) { fn drop(&mut self) {
self.provider.borrow_mut().release_map(&mut self.internal); self.provider.borrow_mut().release_shmem(&mut self.internal);
} }
} }
@ -325,7 +309,7 @@ impl<SP> ShMemProvider for RcShMemProvider<SP>
where where
SP: ShMemProvider + Debug, SP: ShMemProvider + Debug,
{ {
type Mem = RcShMem<SP>; type ShMem = RcShMem<SP>;
fn new() -> Result<Self, Error> { fn new() -> Result<Self, Error> {
Ok(Self { Ok(Self {
@ -335,26 +319,30 @@ where
}) })
} }
fn new_map(&mut self, map_size: usize) -> Result<Self::Mem, Error> { fn new_shmem(&mut self, map_size: usize) -> Result<Self::ShMem, Error> {
Ok(Self::Mem { Ok(Self::ShMem {
internal: ManuallyDrop::new(self.internal.borrow_mut().new_map(map_size)?), internal: ManuallyDrop::new(self.internal.borrow_mut().new_shmem(map_size)?),
provider: self.internal.clone(), provider: self.internal.clone(),
}) })
} }
fn map_from_id_and_size(&mut self, id: ShMemId, size: usize) -> Result<Self::Mem, Error> { fn shmem_from_id_and_size(&mut self, id: ShMemId, size: usize) -> Result<Self::ShMem, Error> {
Ok(Self::Mem { Ok(Self::ShMem {
internal: ManuallyDrop::new(self.internal.borrow_mut().map_from_id_and_size(id, size)?), internal: ManuallyDrop::new(
self.internal
.borrow_mut()
.shmem_from_id_and_size(id, size)?,
),
provider: self.internal.clone(), provider: self.internal.clone(),
}) })
} }
fn release_map(&mut self, map: &mut Self::Mem) { fn release_shmem(&mut self, map: &mut Self::ShMem) {
self.internal.borrow_mut().release_map(&mut map.internal); self.internal.borrow_mut().release_shmem(&mut map.internal);
} }
fn clone_ref(&mut self, mapping: &Self::Mem) -> Result<Self::Mem, Error> { fn clone_ref(&mut self, mapping: &Self::ShMem) -> Result<Self::ShMem, Error> {
Ok(Self::Mem { Ok(Self::ShMem {
internal: ManuallyDrop::new(self.internal.borrow_mut().clone_ref(&mapping.internal)?), internal: ManuallyDrop::new(self.internal.borrow_mut().clone_ref(&mapping.internal)?),
provider: self.internal.clone(), provider: self.internal.clone(),
}) })
@ -622,7 +610,7 @@ pub mod unix_shmem {
} }
} }
fn map_from_id_and_size(id: ShMemId, map_size: usize) -> Result<Self, Error> { fn shmem_from_id_and_size(id: ShMemId, map_size: usize) -> Result<Self, Error> {
unsafe { unsafe {
let shm_fd: i32 = id.to_string().parse().unwrap(); let shm_fd: i32 = id.to_string().parse().unwrap();
@ -659,7 +647,7 @@ pub mod unix_shmem {
#[cfg(unix)] #[cfg(unix)]
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct MmapShMemProvider { pub struct MmapShMemProvider {
current_map_id: usize, current_shmem_id: usize,
} }
unsafe impl Send for MmapShMemProvider {} unsafe impl Send for MmapShMemProvider {}
@ -674,22 +662,24 @@ pub mod unix_shmem {
/// Implement [`ShMemProvider`] for [`MmapShMemProvider`]. /// Implement [`ShMemProvider`] for [`MmapShMemProvider`].
#[cfg(unix)] #[cfg(unix)]
impl ShMemProvider for MmapShMemProvider { impl ShMemProvider for MmapShMemProvider {
type Mem = MmapShMem; type ShMem = MmapShMem;
fn new() -> Result<Self, Error> { fn new() -> Result<Self, Error> {
Ok(Self { current_map_id: 0 }) Ok(Self {
current_shmem_id: 0,
})
} }
fn new_map(&mut self, map_size: usize) -> Result<Self::Mem, Error> { fn new_shmem(&mut self, map_size: usize) -> Result<Self::ShMem, Error> {
self.current_map_id += 1; self.current_shmem_id += 1;
MmapShMem::new(map_size, self.current_map_id) MmapShMem::new(map_size, self.current_shmem_id)
} }
fn map_from_id_and_size( fn shmem_from_id_and_size(
&mut self, &mut self,
id: ShMemId, id: ShMemId,
size: usize, size: usize,
) -> Result<Self::Mem, Error> { ) -> Result<Self::ShMem, Error> {
MmapShMem::map_from_id_and_size(id, size) MmapShMem::shmem_from_id_and_size(id, size)
} }
} }
@ -702,11 +692,11 @@ pub mod unix_shmem {
self.map_size self.map_size
} }
fn map(&self) -> &[u8] { fn as_slice(&self) -> &[u8] {
unsafe { slice::from_raw_parts(self.map, self.map_size) } unsafe { slice::from_raw_parts(self.map, self.map_size) }
} }
fn map_mut(&mut self) -> &mut [u8] { fn as_mut_slice(&mut self) -> &mut [u8] {
unsafe { slice::from_raw_parts_mut(self.map, self.map_size) } unsafe { slice::from_raw_parts_mut(self.map, self.map_size) }
} }
} }
@ -775,7 +765,7 @@ pub mod unix_shmem {
} }
/// Get a [`UnixShMem`] of the existing shared memory mapping identified by id /// Get a [`UnixShMem`] of the existing shared memory mapping identified by id
pub fn map_from_id_and_size(id: ShMemId, map_size: usize) -> Result<Self, Error> { pub fn shmem_from_id_and_size(id: ShMemId, map_size: usize) -> Result<Self, Error> {
unsafe { unsafe {
let id_int: i32 = id.into(); let id_int: i32 = id.into();
let map = shmat(id_int, ptr::null(), 0) as *mut c_uchar; let map = shmat(id_int, ptr::null(), 0) as *mut c_uchar;
@ -801,11 +791,11 @@ pub mod unix_shmem {
self.map_size self.map_size
} }
fn map(&self) -> &[u8] { fn as_slice(&self) -> &[u8] {
unsafe { slice::from_raw_parts(self.map, self.map_size) } unsafe { slice::from_raw_parts(self.map, self.map_size) }
} }
fn map_mut(&mut self) -> &mut [u8] { fn as_mut_slice(&mut self) -> &mut [u8] {
unsafe { slice::from_raw_parts_mut(self.map, self.map_size) } unsafe { slice::from_raw_parts_mut(self.map, self.map_size) }
} }
} }
@ -838,21 +828,21 @@ pub mod unix_shmem {
/// Implement [`ShMemProvider`] for [`UnixShMemProvider`]. /// Implement [`ShMemProvider`] for [`UnixShMemProvider`].
#[cfg(unix)] #[cfg(unix)]
impl ShMemProvider for CommonUnixShMemProvider { impl ShMemProvider for CommonUnixShMemProvider {
type Mem = CommonUnixShMem; type ShMem = CommonUnixShMem;
fn new() -> Result<Self, Error> { fn new() -> Result<Self, Error> {
Ok(Self {}) Ok(Self {})
} }
fn new_map(&mut self, map_size: usize) -> Result<Self::Mem, Error> { fn new_shmem(&mut self, map_size: usize) -> Result<Self::ShMem, Error> {
CommonUnixShMem::new(map_size) CommonUnixShMem::new(map_size)
} }
fn map_from_id_and_size( fn shmem_from_id_and_size(
&mut self, &mut self,
id: ShMemId, id: ShMemId,
size: usize, size: usize,
) -> Result<Self::Mem, Error> { ) -> Result<Self::ShMem, Error> {
CommonUnixShMem::map_from_id_and_size(id, size) CommonUnixShMem::shmem_from_id_and_size(id, size)
} }
} }
} }
@ -959,7 +949,7 @@ pub mod unix_shmem {
} }
/// Get a [`crate::bolts::shmem::unix_shmem::UnixShMem`] of the existing [`ShMem`] mapping identified by id. /// Get a [`crate::bolts::shmem::unix_shmem::UnixShMem`] of the existing [`ShMem`] mapping identified by id.
pub fn map_from_id_and_size(id: ShMemId, map_size: usize) -> Result<Self, Error> { pub fn shmem_from_id_and_size(id: ShMemId, map_size: usize) -> Result<Self, Error> {
unsafe { unsafe {
let fd: i32 = id.to_string().parse().unwrap(); let fd: i32 = id.to_string().parse().unwrap();
#[allow(trivial_numeric_casts, clippy::cast_sign_loss)] #[allow(trivial_numeric_casts, clippy::cast_sign_loss)]
@ -1003,11 +993,11 @@ pub mod unix_shmem {
self.map_size self.map_size
} }
fn map(&self) -> &[u8] { fn as_slice(&self) -> &[u8] {
unsafe { slice::from_raw_parts(self.map, self.map_size) } unsafe { slice::from_raw_parts(self.map, self.map_size) }
} }
fn map_mut(&mut self) -> &mut [u8] { fn as_mut_slice(&mut self) -> &mut [u8] {
unsafe { slice::from_raw_parts_mut(self.map, self.map_size) } unsafe { slice::from_raw_parts_mut(self.map, self.map_size) }
} }
} }
@ -1052,23 +1042,23 @@ pub mod unix_shmem {
/// Implement [`ShMemProvider`] for [`AshmemShMemProvider`], for the Android `ShMem`. /// Implement [`ShMemProvider`] for [`AshmemShMemProvider`], for the Android `ShMem`.
#[cfg(unix)] #[cfg(unix)]
impl ShMemProvider for AshmemShMemProvider { impl ShMemProvider for AshmemShMemProvider {
type Mem = AshmemShMem; type ShMem = AshmemShMem;
fn new() -> Result<Self, Error> { fn new() -> Result<Self, Error> {
Ok(Self {}) Ok(Self {})
} }
fn new_map(&mut self, map_size: usize) -> Result<Self::Mem, Error> { fn new_shmem(&mut self, map_size: usize) -> Result<Self::ShMem, Error> {
let mapping = AshmemShMem::new(map_size)?; let mapping = AshmemShMem::new(map_size)?;
Ok(mapping) Ok(mapping)
} }
fn map_from_id_and_size( fn shmem_from_id_and_size(
&mut self, &mut self,
id: ShMemId, id: ShMemId,
size: usize, size: usize,
) -> Result<Self::Mem, Error> { ) -> Result<Self::ShMem, Error> {
AshmemShMem::map_from_id_and_size(id, size) AshmemShMem::shmem_from_id_and_size(id, size)
} }
} }
} }
@ -1122,7 +1112,7 @@ pub mod win32_shmem {
} }
impl Win32ShMem { impl Win32ShMem {
fn new_map(map_size: usize) -> Result<Self, Error> { fn new_shmem(map_size: usize) -> Result<Self, Error> {
unsafe { unsafe {
let uuid = Uuid::new_v4(); let uuid = Uuid::new_v4();
let mut map_str = format!("libafl_{}", uuid.to_simple()); let mut map_str = format!("libafl_{}", uuid.to_simple());
@ -1159,7 +1149,7 @@ pub mod win32_shmem {
} }
} }
fn map_from_id_and_size(id: ShMemId, map_size: usize) -> Result<Self, Error> { fn shmem_from_id_and_size(id: ShMemId, map_size: usize) -> Result<Self, Error> {
unsafe { unsafe {
let map_str_bytes = id.id; let map_str_bytes = id.id;
// Unlike MapViewOfFile this one needs u32 // Unlike MapViewOfFile this one needs u32
@ -1200,11 +1190,11 @@ pub mod win32_shmem {
self.map_size self.map_size
} }
fn map(&self) -> &[u8] { fn as_slice(&self) -> &[u8] {
unsafe { slice::from_raw_parts(self.map, self.map_size) } unsafe { slice::from_raw_parts(self.map, self.map_size) }
} }
fn map_mut(&mut self) -> &mut [u8] { fn as_mut_slice(&mut self) -> &mut [u8] {
unsafe { slice::from_raw_parts_mut(self.map, self.map_size) } unsafe { slice::from_raw_parts_mut(self.map, self.map_size) }
} }
} }
@ -1231,17 +1221,21 @@ pub mod win32_shmem {
/// Implement [`ShMemProvider`] for [`Win32ShMemProvider`] /// Implement [`ShMemProvider`] for [`Win32ShMemProvider`]
impl ShMemProvider for Win32ShMemProvider { impl ShMemProvider for Win32ShMemProvider {
type Mem = Win32ShMem; type ShMem = Win32ShMem;
fn new() -> Result<Self, Error> { fn new() -> Result<Self, Error> {
Ok(Self {}) Ok(Self {})
} }
fn new_map(&mut self, map_size: usize) -> Result<Self::Mem, Error> { fn new_shmem(&mut self, map_size: usize) -> Result<Self::ShMem, Error> {
Win32ShMem::new_map(map_size) Win32ShMem::new_shmem(map_size)
} }
fn map_from_id_and_size(&mut self, id: ShMemId, size: usize) -> Result<Self::Mem, Error> { fn shmem_from_id_and_size(
Win32ShMem::map_from_id_and_size(id, size) &mut self,
id: ShMemId,
size: usize,
) -> Result<Self::ShMem, Error> {
Win32ShMem::shmem_from_id_and_size(id, size)
} }
} }
} }
@ -1280,7 +1274,7 @@ impl<T: ShMem> ShMemCursor<T> {
/// Slice from the current location on this map to the end, mutable /// Slice from the current location on this map to the end, mutable
fn empty_slice_mut(&mut self) -> &mut [u8] { fn empty_slice_mut(&mut self) -> &mut [u8] {
&mut (self.inner.map_mut()[self.pos..]) &mut (self.inner.as_mut_slice()[self.pos..])
} }
} }
@ -1327,7 +1321,7 @@ impl<T: ShMem> std::io::Seek for ShMemCursor<T> {
let effective_new_pos = match pos { let effective_new_pos = match pos {
std::io::SeekFrom::Start(s) => s, std::io::SeekFrom::Start(s) => s,
std::io::SeekFrom::End(offset) => { std::io::SeekFrom::End(offset) => {
let map_len = self.inner.map().len(); let map_len = self.inner.as_slice().len();
assert!(i64::try_from(map_len).is_ok()); assert!(i64::try_from(map_len).is_ok());
let signed_pos = map_len as i64; let signed_pos = map_len as i64;
let effective = signed_pos.checked_add(offset).unwrap(); let effective = signed_pos.checked_add(offset).unwrap();
@ -1360,8 +1354,8 @@ mod tests {
#[serial] #[serial]
fn test_shmem_service() { fn test_shmem_service() {
let mut provider = StdShMemProvider::new().unwrap(); let mut provider = StdShMemProvider::new().unwrap();
let mut map = provider.new_map(1024).unwrap(); let mut map = provider.new_shmem(1024).unwrap();
map.map_mut()[0] = 1; map.as_mut_slice()[0] = 1;
assert!(map.map()[0] == 1); assert!(map.as_slice()[0] == 1);
} }
} }

View File

@ -58,7 +58,7 @@ pub struct StateRestorer<SP>
where where
SP: ShMemProvider, SP: ShMemProvider,
{ {
shmem: SP::Mem, shmem: SP::ShMem,
phantom: PhantomData<*const SP>, phantom: PhantomData<*const SP>,
} }
@ -85,7 +85,7 @@ where
} }
/// Create a new [`StateRestorer`]. /// Create a new [`StateRestorer`].
pub fn new(shmem: SP::Mem) -> Self { pub fn new(shmem: SP::ShMem) -> Self {
let mut ret = Self { let mut ret = Self {
shmem, shmem,
phantom: PhantomData, phantom: PhantomData,
@ -178,7 +178,7 @@ where
} }
fn content_mut(&mut self) -> &mut StateShMemContent { fn content_mut(&mut self) -> &mut StateShMemContent {
let ptr = self.shmem.map().as_ptr(); let ptr = self.shmem.as_slice().as_ptr();
#[allow(clippy::cast_ptr_alignment)] // Beginning of the page will always be aligned #[allow(clippy::cast_ptr_alignment)] // Beginning of the page will always be aligned
unsafe { unsafe {
&mut *(ptr as *mut StateShMemContent) &mut *(ptr as *mut StateShMemContent)
@ -188,7 +188,7 @@ where
/// The content is either the name of the tmpfile, or the serialized bytes directly, if they fit on a single page. /// The content is either the name of the tmpfile, or the serialized bytes directly, if they fit on a single page.
fn content(&self) -> &StateShMemContent { fn content(&self) -> &StateShMemContent {
#[allow(clippy::cast_ptr_alignment)] // Beginning of the page will always be aligned #[allow(clippy::cast_ptr_alignment)] // Beginning of the page will always be aligned
let ptr = self.shmem.map().as_ptr() as *const StateShMemContent; let ptr = self.shmem.as_slice().as_ptr() as *const StateShMemContent;
unsafe { &*(ptr) } unsafe { &*(ptr) }
} }
@ -251,7 +251,7 @@ mod tests {
const TESTMAP_SIZE: usize = 1024; const TESTMAP_SIZE: usize = 1024;
let mut shmem_provider = StdShMemProvider::new().unwrap(); let mut shmem_provider = StdShMemProvider::new().unwrap();
let shmem = shmem_provider.new_map(TESTMAP_SIZE).unwrap(); let shmem = shmem_provider.new_shmem(TESTMAP_SIZE).unwrap();
let mut state_restorer = StateRestorer::<StdShMemProvider>::new(shmem); let mut state_restorer = StateRestorer::<StdShMemProvider>::new(shmem);
let state = "hello world".to_string(); let state = "hello world".to_string();

View File

@ -835,7 +835,7 @@ where
// First, create a channel from the current fuzzer to the next to store state between restarts. // First, create a channel from the current fuzzer to the next to store state between restarts.
let staterestorer: StateRestorer<SP> = let staterestorer: StateRestorer<SP> =
StateRestorer::new(self.shmem_provider.new_map(256 * 1024 * 1024)?); StateRestorer::new(self.shmem_provider.new_shmem(256 * 1024 * 1024)?);
// Store the information to a map. // Store the information to a map.
staterestorer.write_to_env(_ENV_FUZZER_SENDER)?; staterestorer.write_to_env(_ENV_FUZZER_SENDER)?;
@ -976,7 +976,7 @@ mod tests {
let mut llmp_client = LlmpClient::new( let mut llmp_client = LlmpClient::new(
shmem_provider.clone(), shmem_provider.clone(),
LlmpSharedMap::new(0, shmem_provider.new_map(1024).unwrap()), LlmpSharedMap::new(0, shmem_provider.new_shmem(1024).unwrap()),
) )
.unwrap(); .unwrap();
@ -1007,7 +1007,7 @@ mod tests {
// First, create a channel from the current fuzzer to the next to store state between restarts. // First, create a channel from the current fuzzer to the next to store state between restarts.
let mut staterestorer = StateRestorer::<StdShMemProvider>::new( let mut staterestorer = StateRestorer::<StdShMemProvider>::new(
shmem_provider.new_map(256 * 1024 * 1024).unwrap(), shmem_provider.new_shmem(256 * 1024 * 1024).unwrap(),
); );
staterestorer.reset(); staterestorer.reset();

View File

@ -333,7 +333,7 @@ where
let mut staterestorer = if std::env::var(_ENV_FUZZER_SENDER).is_err() { let mut staterestorer = if std::env::var(_ENV_FUZZER_SENDER).is_err() {
// First, create a place to store state in, for restarts. // First, create a place to store state in, for restarts.
let staterestorer: StateRestorer<SP> = let staterestorer: StateRestorer<SP> =
StateRestorer::new(shmem_provider.new_map(256 * 1024 * 1024)?); StateRestorer::new(shmem_provider.new_shmem(256 * 1024 * 1024)?);
//let staterestorer = { LlmpSender::new(shmem_provider.clone(), 0, false)? }; //let staterestorer = { LlmpSender::new(shmem_provider.clone(), 0, false)? };
staterestorer.write_to_env(_ENV_FUZZER_SENDER)?; staterestorer.write_to_env(_ENV_FUZZER_SENDER)?;

View File

@ -18,7 +18,7 @@ use std::{
use crate::{ use crate::{
bolts::{ bolts::{
os::{dup2, pipes::Pipe}, os::{dup2, pipes::Pipe},
shmem::{ShMem, ShMemProvider, StdShMem, StdShMemProvider}, shmem::{ShMem, ShMemProvider, StdShMemProvider},
}, },
executors::{Executor, ExitKind, HasObservers}, executors::{Executor, ExitKind, HasObservers},
inputs::{HasTargetBytes, Input}, inputs::{HasTargetBytes, Input},
@ -354,6 +354,9 @@ impl Forkserver {
/// A struct that has a forkserver /// A struct that has a forkserver
pub trait HasForkserver { pub trait HasForkserver {
/// The [`ShMemProvider`] used for this forkserver's map
type SP: ShMemProvider;
/// The forkserver /// The forkserver
fn forkserver(&self) -> &Forkserver; fn forkserver(&self) -> &Forkserver;
@ -367,10 +370,10 @@ pub trait HasForkserver {
fn out_file_mut(&mut self) -> &mut OutFile; fn out_file_mut(&mut self) -> &mut OutFile;
/// The map of the fuzzer /// The map of the fuzzer
fn map(&self) -> &Option<StdShMem>; fn shmem(&self) -> &Option<<<Self as HasForkserver>::SP as ShMemProvider>::ShMem>;
/// The map of the fuzzer, mutable /// The map of the fuzzer, mutable
fn map_mut(&mut self) -> &mut Option<StdShMem>; fn shmem_mut(&mut self) -> &mut Option<<<Self as HasForkserver>::SP as ShMemProvider>::ShMem>;
} }
/// The timeout forkserver executor that wraps around the standard forkserver executor and sets a timeout before each run. /// The timeout forkserver executor that wraps around the standard forkserver executor and sets a timeout before each run.
@ -417,14 +420,14 @@ where
let last_run_timed_out = self.executor.forkserver().last_run_timed_out(); let last_run_timed_out = self.executor.forkserver().last_run_timed_out();
match &mut self.executor.map_mut() { match &mut self.executor.shmem_mut() {
Some(map) => { Some(shmem) => {
let target_bytes = input.target_bytes(); let target_bytes = input.target_bytes();
let size = target_bytes.as_slice().len(); let size = target_bytes.as_slice().len();
let size_in_bytes = size.to_ne_bytes(); let size_in_bytes = size.to_ne_bytes();
// The first four bytes tells the size of the shmem. // The first four bytes tells the size of the shmem.
map.map_mut()[..4].copy_from_slice(&size_in_bytes[..4]); shmem.as_mut_slice()[..4].copy_from_slice(&size_in_bytes[..4]);
map.map_mut()[SHMEM_FUZZ_HDR_SIZE..(SHMEM_FUZZ_HDR_SIZE + size)] shmem.as_mut_slice()[SHMEM_FUZZ_HDR_SIZE..(SHMEM_FUZZ_HDR_SIZE + size)]
.copy_from_slice(target_bytes.as_slice()); .copy_from_slice(target_bytes.as_slice());
} }
None => { None => {
@ -498,24 +501,26 @@ where
/// This [`Executor`] can run binaries compiled for AFL/AFL++ that make use of a forkserver. /// This [`Executor`] can run binaries compiled for AFL/AFL++ that make use of a forkserver.
/// Shared memory feature is also available, but you have to set things up in your code. /// Shared memory feature is also available, but you have to set things up in your code.
/// Please refer to AFL++'s docs. <https://github.com/AFLplusplus/AFLplusplus/blob/stable/instrumentation/README.persistent_mode.md> /// Please refer to AFL++'s docs. <https://github.com/AFLplusplus/AFLplusplus/blob/stable/instrumentation/README.persistent_mode.md>
pub struct ForkserverExecutor<I, OT, S> pub struct ForkserverExecutor<I, OT, S, SP>
where where
I: Input + HasTargetBytes, I: Input + HasTargetBytes,
OT: ObserversTuple<I, S>, OT: ObserversTuple<I, S>,
SP: ShMemProvider,
{ {
target: String, target: String,
args: Vec<String>, args: Vec<String>,
out_file: OutFile, out_file: OutFile,
forkserver: Forkserver, forkserver: Forkserver,
observers: OT, observers: OT,
map: Option<StdShMem>, map: Option<SP::ShMem>,
phantom: PhantomData<(I, S)>, phantom: PhantomData<(I, S)>,
} }
impl<I, OT, S> Debug for ForkserverExecutor<I, OT, S> impl<I, OT, S, SP> Debug for ForkserverExecutor<I, OT, S, SP>
where where
I: Input + HasTargetBytes, I: Input + HasTargetBytes,
OT: ObserversTuple<I, S>, OT: ObserversTuple<I, S>,
SP: ShMemProvider,
{ {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("ForkserverExecutor") f.debug_struct("ForkserverExecutor")
@ -529,28 +534,54 @@ where
} }
} }
impl<I, OT, S> ForkserverExecutor<I, OT, S> impl<I, OT, S> ForkserverExecutor<I, OT, S, StdShMemProvider>
where where
I: Input + HasTargetBytes, I: Input + HasTargetBytes,
OT: ObserversTuple<I, S>, OT: ObserversTuple<I, S>,
{ {
/// Creates a new [`ForkserverExecutor`] with the given target, arguments and observers. /// Creates a new `AFL`-style [`ForkserverExecutor`] with the given target, arguments and observers.
/// This Forserver won't attempt to provide inputs over shared mem but write them to an iput file
/// If `debug_child` is set, the child will print to `stdout`/`stderr`.
pub fn new( pub fn new(
target: String, target: String,
arguments: &[String], arguments: &[String],
use_shmem_testcase: bool,
observers: OT, observers: OT,
debug_child: bool,
) -> Result<Self, Error> { ) -> Result<Self, Error> {
Self::with_debug(target, arguments, use_shmem_testcase, observers, false) Self::new_internal(target, arguments, observers, debug_child, None)
}
}
impl<I, OT, S, SP> ForkserverExecutor<I, OT, S, SP>
where
I: Input + HasTargetBytes,
OT: ObserversTuple<I, S>,
SP: ShMemProvider,
{
/// Creates a new [`ForkserverExecutor`] with the given target, arguments and observers.
pub fn with_shmem_inputs(
target: String,
arguments: &[String],
observers: OT,
debug_child: bool,
shmem_provider: &mut SP,
) -> Result<Self, Error> {
Self::new_internal(
target,
arguments,
observers,
debug_child,
Some(shmem_provider),
)
} }
/// Creates a new [`ForkserverExecutor`] with the given target, arguments and observers, with debug mode /// Creates a new [`ForkserverExecutor`] with the given target, arguments and observers, with debug mode
pub fn with_debug( fn new_internal(
target: String, target: String,
arguments: &[String], arguments: &[String],
use_shmem_testcase: bool,
observers: OT, observers: OT,
debug_output: bool, debug_child: bool,
shmem_provider: Option<&mut SP>,
) -> Result<Self, Error> { ) -> Result<Self, Error> {
let mut args = Vec::<String>::new(); let mut args = Vec::<String>::new();
let mut use_stdin = true; let mut use_stdin = true;
@ -567,17 +598,18 @@ where
let out_file = OutFile::new(&out_filename)?; let out_file = OutFile::new(&out_filename)?;
let mut map = None; let map = match shmem_provider {
if use_shmem_testcase { None => None,
Some(provider) => {
// setup shared memory // setup shared memory
let mut provider = StdShMemProvider::new()?; let mut shmem = provider.new_shmem(MAX_FILE + SHMEM_FUZZ_HDR_SIZE)?;
let mut shmem = provider.new_map(MAX_FILE + SHMEM_FUZZ_HDR_SIZE)?;
shmem.write_to_env("__AFL_SHM_FUZZ_ID")?; shmem.write_to_env("__AFL_SHM_FUZZ_ID")?;
let size_in_bytes = (MAX_FILE + SHMEM_FUZZ_HDR_SIZE).to_ne_bytes(); let size_in_bytes = (MAX_FILE + SHMEM_FUZZ_HDR_SIZE).to_ne_bytes();
shmem.map_mut()[..4].clone_from_slice(&size_in_bytes[..4]); shmem.as_mut_slice()[..4].clone_from_slice(&size_in_bytes[..4]);
map = Some(shmem); Some(shmem)
} }
};
let mut forkserver = Forkserver::new( let mut forkserver = Forkserver::new(
target.clone(), target.clone(),
@ -585,7 +617,7 @@ where
out_file.as_raw_fd(), out_file.as_raw_fd(),
use_stdin, use_stdin,
0, 0,
debug_output, debug_child,
)?; )?;
let (rlen, status) = forkserver.read_st()?; // Initial handshake, read 4-bytes hello message from the forkserver. let (rlen, status) = forkserver.read_st()?; // Initial handshake, read 4-bytes hello message from the forkserver.
@ -598,7 +630,7 @@ where
println!("All right - fork server is up."); println!("All right - fork server is up.");
// If forkserver is responding, we then check if there's any option enabled. // If forkserver is responding, we then check if there's any option enabled.
if status & FS_OPT_ENABLED == FS_OPT_ENABLED { if status & FS_OPT_ENABLED == FS_OPT_ENABLED {
if (status & FS_OPT_SHDMEM_FUZZ == FS_OPT_SHDMEM_FUZZ) & use_shmem_testcase { if (status & FS_OPT_SHDMEM_FUZZ == FS_OPT_SHDMEM_FUZZ) & map.is_some() {
println!("Using SHARED MEMORY FUZZING feature."); println!("Using SHARED MEMORY FUZZING feature.");
let send_status = FS_OPT_ENABLED | FS_OPT_SHDMEM_FUZZ; let send_status = FS_OPT_ENABLED | FS_OPT_SHDMEM_FUZZ;
@ -645,10 +677,11 @@ where
} }
} }
impl<EM, I, OT, S, Z> Executor<EM, I, S, Z> for ForkserverExecutor<I, OT, S> impl<EM, I, OT, S, SP, Z> Executor<EM, I, S, Z> for ForkserverExecutor<I, OT, S, SP>
where where
I: Input + HasTargetBytes, I: Input + HasTargetBytes,
OT: ObserversTuple<I, S>, OT: ObserversTuple<I, S>,
SP: ShMemProvider,
{ {
#[inline] #[inline]
fn run_target( fn run_target(
@ -667,8 +700,8 @@ where
let size = target_bytes.as_slice().len(); let size = target_bytes.as_slice().len();
let size_in_bytes = size.to_ne_bytes(); let size_in_bytes = size.to_ne_bytes();
// The first four bytes tells the size of the shmem. // The first four bytes tells the size of the shmem.
map.map_mut()[..4].copy_from_slice(&size_in_bytes[..4]); map.as_mut_slice()[..4].copy_from_slice(&size_in_bytes[..4]);
map.map_mut()[SHMEM_FUZZ_HDR_SIZE..(SHMEM_FUZZ_HDR_SIZE + size)] map.as_mut_slice()[SHMEM_FUZZ_HDR_SIZE..(SHMEM_FUZZ_HDR_SIZE + size)]
.copy_from_slice(target_bytes.as_slice()); .copy_from_slice(target_bytes.as_slice());
} }
None => { None => {
@ -719,10 +752,11 @@ where
} }
} }
impl<I, OT, S> HasObservers<I, OT, S> for ForkserverExecutor<I, OT, S> impl<I, OT, S, SP> HasObservers<I, OT, S> for ForkserverExecutor<I, OT, S, SP>
where where
I: Input + HasTargetBytes, I: Input + HasTargetBytes,
OT: ObserversTuple<I, S>, OT: ObserversTuple<I, S>,
SP: ShMemProvider,
{ {
#[inline] #[inline]
fn observers(&self) -> &OT { fn observers(&self) -> &OT {
@ -735,11 +769,14 @@ where
} }
} }
impl<I, OT, S> HasForkserver for ForkserverExecutor<I, OT, S> impl<I, OT, S, SP> HasForkserver for ForkserverExecutor<I, OT, S, SP>
where where
I: Input + HasTargetBytes, I: Input + HasTargetBytes,
OT: ObserversTuple<I, S>, OT: ObserversTuple<I, S>,
SP: ShMemProvider,
{ {
type SP = SP;
#[inline] #[inline]
fn forkserver(&self) -> &Forkserver { fn forkserver(&self) -> &Forkserver {
&self.forkserver &self.forkserver
@ -761,12 +798,12 @@ where
} }
#[inline] #[inline]
fn map(&self) -> &Option<StdShMem> { fn shmem(&self) -> &Option<SP::ShMem> {
&self.map &self.map
} }
#[inline] #[inline]
fn map_mut(&mut self) -> &mut Option<StdShMem> { fn shmem_mut(&mut self) -> &mut Option<SP::ShMem> {
&mut self.map &mut self.map
} }
} }
@ -808,20 +845,23 @@ mod tests {
let bin = "echo"; let bin = "echo";
let args = vec![String::from("@@")]; let args = vec![String::from("@@")];
let mut shmem = StdShMemProvider::new().unwrap().new_map(MAP_SIZE).unwrap(); let mut shmem_provider = StdShMemProvider::new().unwrap();
let mut shmem = shmem_provider.new_shmem(MAP_SIZE).unwrap();
shmem.write_to_env("__AFL_SHM_ID").unwrap(); shmem.write_to_env("__AFL_SHM_ID").unwrap();
let shmem_map = shmem.map_mut(); let shmem_buf = shmem.as_mut_slice();
let edges_observer = HitcountsMapObserver::new(ConstMapObserver::<_, MAP_SIZE>::new( let edges_observer = HitcountsMapObserver::new(ConstMapObserver::<_, MAP_SIZE>::new(
"shared_mem", "shared_mem",
shmem_map, shmem_buf,
)); ));
let executor = ForkserverExecutor::<NopInput, _, ()>::new( let executor = ForkserverExecutor::<NopInput, _, (), _>::with_shmem_inputs(
bin.to_string(), bin.to_string(),
&args, &args,
false,
tuple_list!(edges_observer), tuple_list!(edges_observer),
false,
&mut shmem_provider,
); );
// Since /usr/bin/echo is not a instrumented binary file, the test will just check if the forkserver has failed at the initial handshake // Since /usr/bin/echo is not a instrumented binary file, the test will just check if the forkserver has failed at the initial handshake

View File

@ -488,7 +488,7 @@ impl<T: ShMem> MessageFileWriter<ShMemCursor<T>> {
} }
} }
impl MessageFileWriter<ShMemCursor<<StdShMemProvider as ShMemProvider>::Mem>> { impl MessageFileWriter<ShMemCursor<<StdShMemProvider as ShMemProvider>::ShMem>> {
/// Creates a new `MessageFileWriter` by reading a [`ShMem`] from the given environment variable. /// Creates a new `MessageFileWriter` by reading a [`ShMem`] from the given environment variable.
pub fn from_stdshmem_env_with_name(env_name: impl AsRef<str>) -> io::Result<Self> { pub fn from_stdshmem_env_with_name(env_name: impl AsRef<str>) -> io::Result<Self> {
Self::from_shmem( Self::from_shmem(
@ -507,4 +507,4 @@ impl MessageFileWriter<ShMemCursor<<StdShMemProvider as ShMemProvider>::Mem>> {
/// A writer that will write messages to a shared memory buffer. /// A writer that will write messages to a shared memory buffer.
pub type StdShMemMessageFileWriter = pub type StdShMemMessageFileWriter =
MessageFileWriter<ShMemCursor<<StdShMemProvider as ShMemProvider>::Mem>>; MessageFileWriter<ShMemCursor<<StdShMemProvider as ShMemProvider>::ShMem>>;

View File

@ -194,7 +194,9 @@ where
let val = unsafe { let val = unsafe {
// SAFETY: the index is modulo by the length, therefore it is always in bounds // SAFETY: the index is modulo by the length, therefore it is always in bounds
let len = self.hitcounts_map.len(); let len = self.hitcounts_map.len();
self.hitcounts_map.map_mut().get_unchecked_mut(hash % len) self.hitcounts_map
.as_mut_slice()
.get_unchecked_mut(hash % len)
}; };
*val = val.saturating_add(1); *val = val.saturating_add(1);
} }

View File

@ -62,7 +62,7 @@ fn main() {
let mut shmemprovider = StdShMemProvider::default(); let mut shmemprovider = StdShMemProvider::default();
let concolic_shmem = shmemprovider let concolic_shmem = shmemprovider
.new_map(1024 * 1024 * 1024) .new_shmem(1024 * 1024 * 1024)
.expect("unable to create shared mapping"); .expect("unable to create shared mapping");
concolic_shmem concolic_shmem
.write_to_env(DEFAULT_ENV_NAME) .write_to_env(DEFAULT_ENV_NAME)
@ -70,7 +70,7 @@ fn main() {
let coverage_map = StdShMemProvider::new() let coverage_map = StdShMemProvider::new()
.unwrap() .unwrap()
.new_map(COVERAGE_MAP_SIZE) .new_shmem(COVERAGE_MAP_SIZE)
.unwrap(); .unwrap();
//let the forkserver know the shmid //let the forkserver know the shmid
coverage_map.write_to_env(HITMAP_ENV_NAME).unwrap(); coverage_map.write_to_env(HITMAP_ENV_NAME).unwrap();
@ -104,7 +104,7 @@ fn main() {
File::create(coverage_file_path).expect("unable to open coverage file"), File::create(coverage_file_path).expect("unable to open coverage file"),
); );
for (index, count) in coverage_map for (index, count) in coverage_map
.map() .as_slice()
.iter() .iter()
.enumerate() .enumerate()
.filter(|(_, &v)| v != 0) .filter(|(_, &v)| v != 0)
@ -117,7 +117,7 @@ fn main() {
let output_file_path = opt.output.unwrap_or_else(|| "trace".into()); let output_file_path = opt.output.unwrap_or_else(|| "trace".into());
let mut output_file = let mut output_file =
BufWriter::new(File::create(output_file_path).expect("unable to open output file")); BufWriter::new(File::create(output_file_path).expect("unable to open output file"));
let mut reader = MessageFileReader::from_length_prefixed_buffer(concolic_shmem.map()) let mut reader = MessageFileReader::from_length_prefixed_buffer(concolic_shmem.as_slice())
.expect("unable to create trace reader"); .expect("unable to create trace reader");
if opt.plain_text { if opt.plain_text {
while let Some(message) = reader.next_message() { while let Some(message) = reader.next_message() {

View File

@ -101,17 +101,15 @@ impl<'a, const MAP_SIZE: usize> ForkserverBytesCoverageSugar<'a, MAP_SIZE> {
out_dir.push("queue"); out_dir.push("queue");
let shmem_provider = StdShMemProvider::new().expect("Failed to init shared memory"); let shmem_provider = StdShMemProvider::new().expect("Failed to init shared memory");
let mut shmem_provider_client = shmem_provider.clone();
let monitor = MultiMonitor::new(|s| println!("{}", s)); let monitor = MultiMonitor::new(|s| println!("{}", s));
let mut run_client = |state: Option<StdState<_, _, _, _, _>>, mut mgr, _core_id| { let mut run_client = |state: Option<StdState<_, _, _, _, _>>, mut mgr, _core_id| {
// Coverage map shared between target and fuzzer // Coverage map shared between target and fuzzer
let mut shmem = StdShMemProvider::new() let mut shmem = shmem_provider_client.new_shmem(MAP_SIZE).unwrap();
.expect("Failed to init shared memory")
.new_map(MAP_SIZE)
.unwrap();
shmem.write_to_env("__AFL_SHM_ID").unwrap(); shmem.write_to_env("__AFL_SHM_ID").unwrap();
let shmem_map = shmem.map_mut(); let shmem_map = shmem.as_mut_slice();
// Create an observation channel using the coverage map // Create an observation channel using the coverage map
let edges_observer = unsafe { let edges_observer = unsafe {
@ -171,13 +169,22 @@ impl<'a, const MAP_SIZE: usize> ForkserverBytesCoverageSugar<'a, MAP_SIZE> {
// Create the executor for an in-process function with one observer for edge coverage and one for the execution time // Create the executor for an in-process function with one observer for edge coverage and one for the execution time
let mut executor = TimeoutForkserverExecutor::new( let mut executor = TimeoutForkserverExecutor::new(
ForkserverExecutor::with_debug( if self.shmem_testcase {
ForkserverExecutor::with_shmem_inputs(
self.program.clone(),
self.arguments,
tuple_list!(edges_observer, time_observer),
self.debug_output,
&mut shmem_provider_client,
)
} else {
ForkserverExecutor::new(
self.program.clone(), self.program.clone(),
self.arguments, self.arguments,
self.shmem_testcase,
tuple_list!(edges_observer, time_observer), tuple_list!(edges_observer, time_observer),
self.debug_output, self.debug_output,
) )
}
.expect("Failed to create the executor."), .expect("Failed to create the executor."),
timeout, timeout,
) )