From f9782b48d4da9c4f3ce8d8e152d654fc1ad3149a Mon Sep 17 00:00:00 2001 From: Dominik Maier Date: Tue, 8 Dec 2020 19:10:54 +0100 Subject: [PATCH] more rusty --- afl/src/events/llmp_translated.rs | 747 +++++++++++++++--------------- 1 file changed, 372 insertions(+), 375 deletions(-) diff --git a/afl/src/events/llmp_translated.rs b/afl/src/events/llmp_translated.rs index 0878dca724..71681bf2ad 100644 --- a/afl/src/events/llmp_translated.rs +++ b/afl/src/events/llmp_translated.rs @@ -75,9 +75,15 @@ const LLMP_TAG_END_OF_PAGE: u32 = 0xaf1e0f1; /// A new client for this broekr got added. const LLMP_TAG_NEW_SHM_CLIENT: u32 = 0xc11e471; -pub type AflRet = c_uint; -pub const AFL_RET_ALLOC: AflRet = 3; -pub const AFL_RET_SUCCESS: AflRet = 0; +/// What byte count to align messages to +/// LlmpMsg sizes (including header) will always be rounded up to be a multiple of this value +const LLMP_ALIGNNMENT: usize = 64; + +/// Size of a new page message, header, payload, and alignment +const EOP_MSG_SIZE: usize = llmp_align(size_of::() + size_of::()); + +/// Message Hook +pub type LlmpMsgHookFn = unsafe fn(client_id: u32, msg: *mut LlmpMsg) -> LlmpMsgHookResult; /// Sending end on a (unidirectional) sharedmap channel #[derive(Clone)] @@ -88,7 +94,7 @@ pub struct LlmpSender { /// If null, a new page (just) started. pub last_msg_sent: *mut LlmpMsg, /// A vec of page wrappers, each containing an intialized AfShmem - pub out_maps: Vec, + pub out_maps: Vec, /// If true, pages will never be pruned. /// The broker uses this feature. /// By keeping the message history around, @@ -103,7 +109,7 @@ pub struct LlmpReceiver { /// Pointer to the last meg this received pub last_msg_recvd: *mut LlmpMsg, /// current page. After EOP, this gets replaced with the new one - pub current_recv_map: LlmpPageWrapper, + pub current_recv_map: LlmpSharedMap, } /// Client side of LLMP @@ -115,43 +121,11 @@ pub struct LlmpClient { /// A page wrapper #[derive(Clone)] -pub struct LlmpPageWrapper { +pub struct LlmpSharedMap { /// Shmem containg the actual (unsafe) page, /// shared between one LlmpSender and one LlmpReceiver shmem: AflShmem, } - -/// The page struct, placed on a shared mem instance. -impl LlmpPageWrapper { - /// Creates a new page with minimum prev_max_alloc_size or LLMP_INITIAL_MAP_SIZE - /// returning the initialized shared mem struct - pub unsafe fn new(sender: u32, min_size: usize) -> Result { - // Create a new shard page. - let mut shmem = AflShmem::new(new_map_size(min_size))?; - _llmp_page_init(&mut shmem, sender); - Ok(Self { shmem }) - } - - /// Initialize from a 0-terminated sharedmap id string and its size - pub unsafe fn from_str(shm_str: &CStr, map_size: usize) -> Result { - let shmem = AflShmem::from_str(shm_str, map_size)?; - // Not initializing the page here - the other side should have done it already! - Ok(Self { shmem }) - } - - /// Initialize from a shm_str with fixed len of 20 - pub unsafe fn from_name_slice(shm_str: &[u8; 20], map_size: usize) -> Result { - let shmem = AflShmem::from_name_slice(shm_str, map_size)?; - // Not initializing the page here - the other side should have done it already! - Ok(Self { shmem }) - } - - /// Get the unsafe ptr to this page, situated on the shared map - pub unsafe fn page(&self) -> *mut LlmpPage { - shmem2page(&self.shmem) - } -} - /// Message sent over the "wire" #[derive(Copy, Clone)] #[repr(C, packed)] @@ -205,9 +179,6 @@ pub enum LlmpMsgHookResult { ForwardToClients, } -/// Message Hook -pub type LlmpMsgHookFn = unsafe fn(client_id: u32, msg: *mut LlmpMsg) -> LlmpMsgHookResult; - /// Message payload when a client got added LLMP_TAG_CLIENT_ADDED_V1 */ /// This is an internal message! /// LLMP_TAG_END_OF_PAGE_V1 @@ -218,26 +189,20 @@ struct LlmpPayloadSharedMap { pub shm_str: [u8; 20], } +/// Get sharedmem from a page #[inline] unsafe fn shmem2page(afl_shmem: &AflShmem) -> *mut LlmpPage { afl_shmem.map as *mut LlmpPage } -/* If a msg is contained in the current page */ +/// Return, if a msg is contained in the current page unsafe fn llmp_msg_in_page(page: *mut LlmpPage, msg: *mut LlmpMsg) -> bool { /* DBG("llmp_msg_in_page %p within %p-%p\n", msg, page, page + page->size_total); */ return (page as *mut u8) < msg as *mut u8 && (page as *mut u8).offset((*page).size_total as isize) > msg as *mut u8; } -/// What byte count to align messages to -/// LlmpMsg sizes (including header) will always be rounded up to be a multiple of this value -const LLMP_ALIGNNMENT: usize = 64; - -/// Size of a new page message, header, payload, and alignment -const EOP_MSG_SIZE: usize = llmp_align(size_of::() + size_of::()); - -/* allign to LLMP_ALIGNNMENT=64 bytes */ +/// allign to LLMP_ALIGNNMENT=64 bytes #[inline] const fn llmp_align(to_align: usize) -> usize { // check if we need to align first @@ -264,8 +229,8 @@ fn new_map_size(max_alloc: usize) -> usize { ) as u64) as usize } -/* Initialize a new llmp_page. size should be relative to - * llmp_page->messages */ +/// Initialize a new llmp_page. size should be relative to +/// llmp_page->messages unsafe fn _llmp_page_init(shmem: &mut AflShmem, sender: u32) { let page = shmem2page(&shmem); (*page).sender = sender; @@ -280,7 +245,7 @@ unsafe fn _llmp_page_init(shmem: &mut AflShmem, sender: u32) { ptr::write_volatile(&mut (*page).sender_dead, 0); } -/* Pointer to the message behind the last message */ +/// Pointer to the message behind the last message #[inline] unsafe fn _llmp_next_msg_ptr(last_msg: *const LlmpMsg) -> *mut LlmpMsg { /* DBG("_llmp_next_msg_ptr %p %lu + %lu\n", last_msg, last_msg->buf_len_padded, sizeof(llmp_message)); */ @@ -289,288 +254,349 @@ unsafe fn _llmp_next_msg_ptr(last_msg: *const LlmpMsg) -> *mut LlmpMsg { .offset((*last_msg).buf_len_padded as isize) as *mut LlmpMsg; } -/* Read next message. */ -unsafe fn llmp_recv(receiver: &mut LlmpReceiver) -> Result, AflError> { - /* DBG("llmp_recv %p %p\n", page, last_msg); */ - compiler_fence(Ordering::SeqCst); - let page = receiver.current_recv_map.page(); - let last_msg = receiver.last_msg_recvd; - let current_msg_id = ptr::read_volatile(&mut (*page).current_msg_id); - - // Read the message from the page - let ret = if current_msg_id == 0 { - /* No messages yet */ - None - } else if last_msg.is_null() { - /* We never read a message from this queue. Return first. */ - Some((*page).messages.as_mut_ptr()) - } else if (*last_msg).message_id == current_msg_id { - /* Oops! No new message! */ - None - } else { - Some(_llmp_next_msg_ptr(last_msg)) - }; - - // Let's see what we go here. - match ret { - Some(msg) => { - // Handle special, LLMP internal, messages. - match (*msg).tag { - LLMP_TAG_UNSET => panic!("BUG: Read unallocated msg"), - LLMP_TAG_END_OF_PAGE => { - dbg!("Got end of page, allocing next"); - // Handle end of page - if (*msg).buf_len < size_of::() as u64 { - panic!(format!( - "Illegal message length for EOP (is {}, expected {})", - (*msg).buf_len_padded, - size_of::() - )); - } - let pageinfo = (*msg).buf.as_mut_ptr() as *mut LlmpPayloadSharedMap; - - /* We can reuse the map mem space, no need to free and calloc. - However, the pageinfo points to the map we're about to unmap. - Clone the contents first to be safe (probably fine in rust eitner way). */ - let pageinfo_cpy = (*pageinfo).clone(); - - ptr::write_volatile(&mut (*page).save_to_unmap, 1); - receiver.current_recv_map = LlmpPageWrapper::from_name_slice( - &pageinfo_cpy.shm_str, - pageinfo_cpy.map_size, - )?; - dbg!( - "Got a new recv map", - receiver.current_recv_map.shmem.shm_str - ); - // After we mapped the new page, return the next message, if available - return llmp_recv(receiver); - } - _ => (), +/// An actor on the sendin part of the shared map +impl LlmpSender { + /// For non zero-copy, we want to get rid of old pages with duplicate messages in the client + /// eventually. This function This funtion sees if we can unallocate older pages. + /// The broker would have informed us by setting the save_to_unmap-flag. + unsafe fn prune_old_pages(&mut self) { + // Exclude the current page by splitting of the last element for this iter + let mut unmap_until_excl = 0; + for map in self.out_maps.split_last().unwrap().1 { + if (*map.page()).save_to_unmap == 0 { + // The broker didn't read this page yet, no more pages to unmap. + break; } - - // Store the last msg for next time - receiver.last_msg_recvd = msg; + unmap_until_excl += 1; } - _ => (), - }; - Ok(ret) -} + // Remove all maps that the broker already mapped + // simply removing them from the vec should then call drop and unmap them. + self.out_maps.drain(0..unmap_until_excl); + } -/* Blocks/spins until the next message gets posted to the page, -then returns that message. */ -pub unsafe fn llmp_recv_blocking(receiver: &mut LlmpReceiver) -> Result<*mut LlmpMsg, AflError> { - let mut current_msg_id = 0; - let page = receiver.current_recv_map.page(); - let last_msg = receiver.last_msg_recvd; - if !last_msg.is_null() { - if (*last_msg).tag == LLMP_TAG_END_OF_PAGE && !llmp_msg_in_page(page, last_msg) { - panic!("BUG: full page passed to await_message_blocking or reset failed"); + /// Intern: Special allocation function for EOP messages (and nothing else!) + /// The normal alloc will fail if there is not enough space for buf_len_padded + EOP + /// So if alloc_next fails, create new page if necessary, use this function, + /// place EOP, commit EOP, reset, alloc again on the new space. + unsafe fn alloc_eop(&mut self) -> *mut LlmpMsg { + let page = self.out_maps.last().unwrap().page(); + let last_msg = self.last_msg_sent; + if (*page).size_used + EOP_MSG_SIZE > (*page).size_total { + panic!(format!("PROGRAM ABORT : BUG: EOP does not fit in page! page {:?}, size_current {:?}, size_total {:?}", page, + (*page).size_used, (*page).size_total)); } - current_msg_id = (*last_msg).message_id - } - loop { - compiler_fence(Ordering::SeqCst); - if ptr::read_volatile(&mut (*page).current_msg_id) != current_msg_id { - return match llmp_recv(receiver)? { - Some(msg) => Ok(msg), - None => panic!("BUG: blocking llmp message should never be NULL"), - }; + let mut ret: *mut LlmpMsg = if !last_msg.is_null() { + _llmp_next_msg_ptr(last_msg) + } else { + (*page).messages.as_mut_ptr() + }; + if (*ret).tag == LLMP_TAG_UNINITIALIZED { + panic!("Did not call send() on last message!"); } - } -} - -/* Special allocation function for EOP messages (and nothing else!) - The normal alloc will fail if there is not enough space for buf_len_padded + EOP - So if llmp_alloc_next fails, create new page if necessary, use this function, - place EOP, commit EOP, reset, alloc again on the new space. -*/ -unsafe fn llmp_alloc_eop(page: *mut LlmpPage, last_msg: *const LlmpMsg) -> *mut LlmpMsg { - if (*page).size_used + EOP_MSG_SIZE > (*page).size_total { - panic!(format!("PROGRAM ABORT : BUG: EOP does not fit in page! page {:?}, size_current {:?}, size_total {:?}", page, - (*page).size_used, (*page).size_total)); - } - let mut ret: *mut LlmpMsg = if !last_msg.is_null() { - _llmp_next_msg_ptr(last_msg) - } else { - (*page).messages.as_mut_ptr() - }; - if (*ret).tag == LLMP_TAG_UNINITIALIZED { - panic!("Did not call send() on last message!"); - } - (*ret).buf_len_padded = size_of::() as c_ulong; - (*ret).message_id = if !last_msg.is_null() { - (*last_msg).message_id + 1 - } else { - 1 - }; - (*ret).tag = LLMP_TAG_END_OF_PAGE; - (*page).size_used += EOP_MSG_SIZE; - ret -} - -/// Will return a ptr to the next msg buf, or None if map is full. -/// Never call alloc_next without either sending or cancelling the last allocated message for this page! -/// There can only ever be up to one message allocated per page at each given time. -unsafe fn llmp_alloc_next_if_space(llmp: &mut LlmpSender, buf_len: usize) -> Option<*mut LlmpMsg> { - let mut buf_len_padded = buf_len; - let mut complete_msg_size = llmp_align(size_of::() + buf_len_padded); - let page = llmp.out_maps.last().unwrap().page(); - let last_msg = llmp.last_msg_sent; - /* DBG("XXX complete_msg_size %lu (h: %lu)\n", complete_msg_size, sizeof(llmp_message)); */ - /* In case we don't have enough space, make sure the next page will be large - * enough */ - // For future allocs, keep track of the maximum (aligned) alloc size we used - (*page).max_alloc_size = max((*page).max_alloc_size, complete_msg_size); - - let mut ret: *mut LlmpMsg; - /* DBG("last_msg %p %d (%d)\n", last_msg, last_msg ? (int)last_msg->tag : -1, (int)LLMP_TAG_END_OF_PAGE_V1); */ - if last_msg.is_null() || (*last_msg).tag == LLMP_TAG_END_OF_PAGE { - /* We start fresh, on a new page */ - ret = (*page).messages.as_mut_ptr(); - /* The initial message may not be alligned, so we at least align the end of - it. Technically, c_ulong can be smaller than a pointer, then who knows what - happens */ - let base_addr = ret as usize; - buf_len_padded = - llmp_align(base_addr + complete_msg_size) - base_addr - size_of::(); - complete_msg_size = buf_len_padded + size_of::(); - /* DBG("XXX complete_msg_size NEW %lu\n", complete_msg_size); */ - /* Still space for the new message plus the additional "we're full" message? - */ - if (*page).size_used + complete_msg_size + EOP_MSG_SIZE > (*page).size_total { - /* We're full. */ - return None; - } - /* We need to start with 1 for ids, as current message id is initialized - * with 0... */ + (*ret).buf_len_padded = size_of::() as c_ulong; (*ret).message_id = if !last_msg.is_null() { (*last_msg).message_id + 1 } else { 1 + }; + (*ret).tag = LLMP_TAG_END_OF_PAGE; + (*page).size_used += EOP_MSG_SIZE; + ret + } + + /// Intern: Will return a ptr to the next msg buf, or None if map is full. + /// Never call alloc_next without either sending or cancelling the last allocated message for this page! + /// There can only ever be up to one message allocated per page at each given time. + unsafe fn alloc_next_if_space(&mut self, buf_len: usize) -> Option<*mut LlmpMsg> { + let mut buf_len_padded = buf_len; + let mut complete_msg_size = llmp_align(size_of::() + buf_len_padded); + let page = self.out_maps.last().unwrap().page(); + let last_msg = self.last_msg_sent; + /* DBG("XXX complete_msg_size %lu (h: %lu)\n", complete_msg_size, sizeof(llmp_message)); */ + /* In case we don't have enough space, make sure the next page will be large + * enough */ + // For future allocs, keep track of the maximum (aligned) alloc size we used + (*page).max_alloc_size = max((*page).max_alloc_size, complete_msg_size); + + let mut ret: *mut LlmpMsg; + /* DBG("last_msg %p %d (%d)\n", last_msg, last_msg ? (int)last_msg->tag : -1, (int)LLMP_TAG_END_OF_PAGE_V1); */ + if last_msg.is_null() || (*last_msg).tag == LLMP_TAG_END_OF_PAGE { + /* We start fresh, on a new page */ + ret = (*page).messages.as_mut_ptr(); + /* The initial message may not be alligned, so we at least align the end of + it. Technically, c_ulong can be smaller than a pointer, then who knows what + happens */ + let base_addr = ret as usize; + buf_len_padded = + llmp_align(base_addr + complete_msg_size) - base_addr - size_of::(); + complete_msg_size = buf_len_padded + size_of::(); + /* DBG("XXX complete_msg_size NEW %lu\n", complete_msg_size); */ + /* Still space for the new message plus the additional "we're full" message? + */ + if (*page).size_used + complete_msg_size + EOP_MSG_SIZE > (*page).size_total { + /* We're full. */ + return None; + } + /* We need to start with 1 for ids, as current message id is initialized + * with 0... */ + (*ret).message_id = if !last_msg.is_null() { + (*last_msg).message_id + 1 + } else { + 1 + } + } else if (*page).current_msg_id != (*last_msg).message_id { + /* Oops, wrong usage! */ + panic!(format!("BUG: The current message never got commited using send! (page->current_msg_id {:?}, last_msg->message_id: {})", (*page).current_msg_id, (*last_msg).message_id)); + } else { + buf_len_padded = complete_msg_size - size_of::(); + /* DBG("XXX ret %p id %u buf_len_padded %lu complete_msg_size %lu\n", ret, ret->message_id, buf_len_padded, + * complete_msg_size); */ + + /* Still space for the new message plus the additional "we're full" message? */ + if (*page).size_used + complete_msg_size + EOP_MSG_SIZE > (*page).size_total { + /* We're full. */ + return None; + } + ret = _llmp_next_msg_ptr(last_msg); + (*ret).message_id = (*last_msg).message_id + 1 } - } else if (*page).current_msg_id != (*last_msg).message_id { - /* Oops, wrong usage! */ - panic!(format!("BUG: The current message never got commited using llmp_send! (page->current_msg_id {:?}, last_msg->message_id: {})", (*page).current_msg_id, (*last_msg).message_id)); - } else { - buf_len_padded = complete_msg_size - size_of::(); - /* DBG("XXX ret %p id %u buf_len_padded %lu complete_msg_size %lu\n", ret, ret->message_id, buf_len_padded, - * complete_msg_size); */ - /* Still space for the new message plus the additional "we're full" message? */ - if (*page).size_used + complete_msg_size + EOP_MSG_SIZE > (*page).size_total { - /* We're full. */ - return None; + /* The beginning of our message should be messages + size_used, else nobody + * sent the last msg! */ + /* DBG("XXX ret %p - page->messages %p = %lu != %lu, will add %lu -> %p\n", ret, page->messages, + (c_ulong)((u8 *)ret - (u8 *)page->messages), page->size_used, complete_msg_size, ((u8 *)ret) + complete_msg_size); + */ + if last_msg.is_null() && (*page).size_used != 0 + || ((ret as usize) - (*page).messages.as_mut_ptr() as usize) != (*page).size_used + { + panic!(format!("Allocated new message without calling send() inbetween. ret: {:?}, page: {:?}, complete_msg_size: {:?}, size_used: {:?}, last_msg: {:?}", ret, page, + buf_len_padded, (*page).size_used, last_msg)); } - ret = _llmp_next_msg_ptr(last_msg); - (*ret).message_id = (*last_msg).message_id + 1 + (*page).size_used = (*page).size_used + complete_msg_size; + (*ret).buf_len_padded = buf_len_padded as c_ulong; + (*ret).buf_len = buf_len as c_ulong; + /* DBG("Returning new message at %p with len %ld, TAG was %x", ret, ret->buf_len_padded, ret->tag); */ + /* Maybe catch some bugs... */ + (*_llmp_next_msg_ptr(ret)).tag = LLMP_TAG_UNSET; + (*ret).tag = LLMP_TAG_UNINITIALIZED; + Some(ret) } - /* The beginning of our message should be messages + size_used, else nobody - * sent the last msg! */ - /* DBG("XXX ret %p - page->messages %p = %lu != %lu, will add %lu -> %p\n", ret, page->messages, - (c_ulong)((u8 *)ret - (u8 *)page->messages), page->size_used, complete_msg_size, ((u8 *)ret) + complete_msg_size); - */ - if last_msg.is_null() && (*page).size_used != 0 - || ((ret as usize) - (*page).messages.as_mut_ptr() as usize) != (*page).size_used - { - panic!(format!("Allocated new message without calling send() inbetween. ret: {:?}, page: {:?}, complete_msg_size: {:?}, size_used: {:?}, last_msg: {:?}", ret, page, - buf_len_padded, (*page).size_used, last_msg)); - } - (*page).size_used = (*page).size_used + complete_msg_size; - (*ret).buf_len_padded = buf_len_padded as c_ulong; - (*ret).buf_len = buf_len as c_ulong; - /* DBG("Returning new message at %p with len %ld, TAG was %x", ret, ret->buf_len_padded, ret->tag); */ - /* Maybe catch some bugs... */ - (*_llmp_next_msg_ptr(ret)).tag = LLMP_TAG_UNSET; - (*ret).tag = LLMP_TAG_UNINITIALIZED; - Some(ret) -} - -/// Commit the message last allocated by llmp_alloc_next to the queue. -/// After commiting, the msg shall no longer be altered! -/// It will be read by the consuming threads (broker->clients or client->broker) -unsafe fn llmp_send(sender: &mut LlmpSender, msg: *mut LlmpMsg) -> Result<(), AflError> { - if sender.last_msg_sent == msg { - panic!("Message sent twice!"); - } - if (*msg).tag == LLMP_TAG_UNSET as c_uint { - panic!(format!( - "No tag set on message with id {}", - (*msg).message_id - )); - } - let page = sender.out_maps.last().unwrap().page(); - if msg.is_null() || !llmp_msg_in_page(page, msg) { - return Err(AflError::Unknown(format!( - "Llmp Message {:?} is null or not in current page", - msg - ))); - } - (*msg).message_id = (*page).current_msg_id + 1; - compiler_fence(Ordering::SeqCst); - ptr::write_volatile(&mut (*page).current_msg_id, (*msg).message_id); - compiler_fence(Ordering::SeqCst); - sender.last_msg_sent = msg; - Ok(()) -} - -/// listener about it using a EOP message. -unsafe fn llmp_handle_out_eop(sender: &mut LlmpSender) -> Result<(), AflError> { - let old_map = sender.out_maps.last_mut().unwrap().page(); - - // Create a new shard page. - let new_map_shmem = LlmpPageWrapper::new((*old_map).sender, (*old_map).max_alloc_size)?; - let mut new_map = new_map_shmem.page(); - - ptr::write_volatile(&mut (*new_map).current_msg_id, (*old_map).current_msg_id); - (*new_map).max_alloc_size = (*old_map).max_alloc_size; - /* On the old map, place a last message linking to the new map for the clients - * to consume */ - let mut out: *mut LlmpMsg = llmp_alloc_eop(old_map, sender.last_msg_sent); - (*out).sender = (*old_map).sender; - - let mut end_of_page_msg = (*out).buf.as_mut_ptr() as *mut LlmpPayloadSharedMap; - (*end_of_page_msg).map_size = new_map_shmem.shmem.map_size; - (*end_of_page_msg).shm_str = new_map_shmem.shmem.shm_str; - - // We never sent a msg on the new buf */ - sender.last_msg_sent = 0 as *mut LlmpMsg; - - /* Send the last msg on the old buf */ - llmp_send(sender, out)?; - - if !sender.keep_pages_forever { - llmp_prune_old_pages(sender); + /// Commit the message last allocated by alloc_next to the queue. + /// After commiting, the msg shall no longer be altered! + /// It will be read by the consuming threads (broker->clients or client->broker) + unsafe fn send(&mut self, msg: *mut LlmpMsg) -> Result<(), AflError> { + if self.last_msg_sent == msg { + panic!("Message sent twice!"); + } + if (*msg).tag == LLMP_TAG_UNSET as c_uint { + panic!(format!( + "No tag set on message with id {}", + (*msg).message_id + )); + } + let page = self.out_maps.last().unwrap().page(); + if msg.is_null() || !llmp_msg_in_page(page, msg) { + return Err(AflError::Unknown(format!( + "Llmp Message {:?} is null or not in current page", + msg + ))); + } + (*msg).message_id = (*page).current_msg_id + 1; + compiler_fence(Ordering::SeqCst); + ptr::write_volatile(&mut (*page).current_msg_id, (*msg).message_id); + compiler_fence(Ordering::SeqCst); + self.last_msg_sent = msg; + Ok(()) } - sender.out_maps.push(new_map_shmem); + /// listener about it using a EOP message. + unsafe fn handle_out_eop(&mut self) -> Result<(), AflError> { + let old_map = self.out_maps.last_mut().unwrap().page(); - Ok(()) -} + // Create a new shard page. + let new_map_shmem = LlmpSharedMap::new((*old_map).sender, (*old_map).max_alloc_size)?; + let mut new_map = new_map_shmem.page(); -/// Allocates the next space on this sender page -pub unsafe fn llmp_alloc_next( - sender: &mut LlmpSender, - buf_len: usize, -) -> Result<*mut LlmpMsg, AflError> { - match llmp_alloc_next_if_space(sender, buf_len) { - Some(msg) => return Ok(msg), - _ => (), - }; + ptr::write_volatile(&mut (*new_map).current_msg_id, (*old_map).current_msg_id); + (*new_map).max_alloc_size = (*old_map).max_alloc_size; + /* On the old map, place a last message linking to the new map for the clients + * to consume */ + let mut out: *mut LlmpMsg = self.alloc_eop(); + (*out).sender = (*old_map).sender; - /* no more space left! We'll have to start a new page */ - llmp_handle_out_eop(sender)?; + let mut end_of_page_msg = (*out).buf.as_mut_ptr() as *mut LlmpPayloadSharedMap; + (*end_of_page_msg).map_size = new_map_shmem.shmem.map_size; + (*end_of_page_msg).shm_str = new_map_shmem.shmem.shm_str; - match llmp_alloc_next_if_space(sender, buf_len) { - Some(msg) => Ok(msg), - None => Err(AflError::Unknown(format!( - "Error allocating {} bytes in shmap", - buf_len - ))), + // We never sent a msg on the new buf */ + self.last_msg_sent = 0 as *mut LlmpMsg; + + /* Send the last msg on the old buf */ + self.send(out)?; + + if !self.keep_pages_forever { + self.prune_old_pages(); + } + + self.out_maps.push(new_map_shmem); + + Ok(()) + } + + /// Allocates the next space on this sender page + pub unsafe fn alloc_next(&mut self, buf_len: usize) -> Result<*mut LlmpMsg, AflError> { + match self.alloc_next_if_space(buf_len) { + Some(msg) => return Ok(msg), + _ => (), + }; + + /* no more space left! We'll have to start a new page */ + self.handle_out_eop()?; + + match self.alloc_next_if_space(buf_len) { + Some(msg) => Ok(msg), + None => Err(AflError::Unknown(format!( + "Error allocating {} bytes in shmap", + buf_len + ))), + } + } + + /// Cancel send of the next message, this allows us to allocate a new message without sending this one. + pub unsafe fn cancel_send(&mut self, msg: *mut LlmpMsg) { + /* DBG("Client %d cancels send of msg at %p with tag 0x%X and size %ld", client->id, msg, msg->tag, + * msg->buf_len_padded); */ + let page = self.out_maps.last().unwrap().page(); + (*msg).tag = LLMP_TAG_UNSET; + (*page).size_used -= (*msg).buf_len_padded as usize + size_of::(); } } +/// Receiving end of an llmp channel +impl LlmpReceiver { + /// Read next message. + unsafe fn recv(&mut self) -> Result, AflError> { + /* DBG("recv %p %p\n", page, last_msg); */ + compiler_fence(Ordering::SeqCst); + let page = self.current_recv_map.page(); + let last_msg = self.last_msg_recvd; + let current_msg_id = ptr::read_volatile(&mut (*page).current_msg_id); + + // Read the message from the page + let ret = if current_msg_id == 0 { + /* No messages yet */ + None + } else if last_msg.is_null() { + /* We never read a message from this queue. Return first. */ + Some((*page).messages.as_mut_ptr()) + } else if (*last_msg).message_id == current_msg_id { + /* Oops! No new message! */ + None + } else { + Some(_llmp_next_msg_ptr(last_msg)) + }; + + // Let's see what we go here. + match ret { + Some(msg) => { + // Handle special, LLMP internal, messages. + match (*msg).tag { + LLMP_TAG_UNSET => panic!("BUG: Read unallocated msg"), + LLMP_TAG_END_OF_PAGE => { + dbg!("Got end of page, allocing next"); + // Handle end of page + if (*msg).buf_len < size_of::() as u64 { + panic!(format!( + "Illegal message length for EOP (is {}, expected {})", + (*msg).buf_len_padded, + size_of::() + )); + } + let pageinfo = (*msg).buf.as_mut_ptr() as *mut LlmpPayloadSharedMap; + + /* We can reuse the map mem space, no need to free and calloc. + However, the pageinfo points to the map we're about to unmap. + Clone the contents first to be safe (probably fine in rust eitner way). */ + let pageinfo_cpy = (*pageinfo).clone(); + + ptr::write_volatile(&mut (*page).save_to_unmap, 1); + self.current_recv_map = LlmpSharedMap::from_name_slice( + &pageinfo_cpy.shm_str, + pageinfo_cpy.map_size, + )?; + dbg!("Got a new recv map", self.current_recv_map.shmem.shm_str); + // After we mapped the new page, return the next message, if available + return self.recv(); + } + _ => (), + } + + // Store the last msg for next time + self.last_msg_recvd = msg; + } + _ => (), + }; + Ok(ret) + } + + /// Blocks/spins until the next message gets posted to the page, + /// then returns that message. + pub unsafe fn recv_blocking(&mut self) -> Result<*mut LlmpMsg, AflError> { + let mut current_msg_id = 0; + let page = self.current_recv_map.page(); + let last_msg = self.last_msg_recvd; + if !last_msg.is_null() { + if (*last_msg).tag == LLMP_TAG_END_OF_PAGE && !llmp_msg_in_page(page, last_msg) { + panic!("BUG: full page passed to await_message_blocking or reset failed"); + } + current_msg_id = (*last_msg).message_id + } + loop { + compiler_fence(Ordering::SeqCst); + if ptr::read_volatile(&mut (*page).current_msg_id) != current_msg_id { + return match self.recv()? { + Some(msg) => Ok(msg), + None => panic!("BUG: blocking llmp message should never be NULL"), + }; + } + } + } +} + +/// The page struct, placed on a shared mem instance. +impl LlmpSharedMap { + /// Creates a new page with minimum prev_max_alloc_size or LLMP_INITIAL_MAP_SIZE + /// returning the initialized shared mem struct + pub unsafe fn new(sender: u32, min_size: usize) -> Result { + // Create a new shard page. + let mut shmem = AflShmem::new(new_map_size(min_size))?; + _llmp_page_init(&mut shmem, sender); + Ok(Self { shmem }) + } + + /// Initialize from a 0-terminated sharedmap id string and its size + pub unsafe fn from_str(shm_str: &CStr, map_size: usize) -> Result { + let shmem = AflShmem::from_str(shm_str, map_size)?; + // Not initializing the page here - the other side should have done it already! + Ok(Self { shmem }) + } + + /// Initialize from a shm_str with fixed len of 20 + pub unsafe fn from_name_slice(shm_str: &[u8; 20], map_size: usize) -> Result { + let shmem = AflShmem::from_name_slice(shm_str, map_size)?; + // Not initializing the page here - the other side should have done it already! + Ok(Self { shmem }) + } + + /// Get the unsafe ptr to this page, situated on the shared map + pub unsafe fn page(&self) -> *mut LlmpPage { + shmem2page(&self.shmem) + } +} + +/// The broker forwards all messages to its own bus-like broadcast map. +/// It may intercept messages passing through. impl LlmpBroker { /// Create and initialize a new llmp_broker pub unsafe fn new() -> Result { @@ -578,7 +604,7 @@ impl LlmpBroker { llmp_out: LlmpSender { id: 0, last_msg_sent: ptr::null_mut(), - out_maps: vec![LlmpPageWrapper::new(0, 0)?], + out_maps: vec![LlmpSharedMap::new(0, 0)?], // Broker never cleans up the pages so that new // clients may join at any time keep_pages_forever: true, @@ -590,13 +616,14 @@ impl LlmpBroker { Ok(broker) } + /// Allocate the next message on the outgoing map unsafe fn alloc_next(&mut self, buf_len: usize) -> Result<*mut LlmpMsg, AflError> { - llmp_alloc_next(&mut self.llmp_out, buf_len) + self.llmp_out.alloc_next(buf_len) } /// Registers a new client for the given sharedmap str and size. /// Returns the id of the new client in broker.client_map - pub unsafe fn register_client(&mut self, client_page: LlmpPageWrapper) { + pub unsafe fn register_client(&mut self, client_page: LlmpSharedMap) { let id = self.llmp_clients.len() as u32; self.llmp_clients.push(LlmpReceiver { id, @@ -622,7 +649,7 @@ impl LlmpBroker { msg.copy_to_nonoverlapping(out, size_of::() + (*msg).buf_len_padded as usize); (*out).buf_len_padded = actual_size; /* We need to replace the message ID with our own */ - match llmp_send(&mut self.llmp_out, out) { + match self.llmp_out.send(out) { Err(e) => panic!(format!("Error sending msg: {:?}", e)), _ => (), }; @@ -637,8 +664,8 @@ impl LlmpBroker { // TODO: We could memcpy a range of pending messages, instead of one by one. loop { let msg = { - let mut client = &mut self.llmp_clients[client_id as usize]; - match llmp_recv(&mut client)? { + let client = &mut self.llmp_clients[client_id as usize]; + match client.recv()? { None => { // We're done handling this client return Ok(()); @@ -657,10 +684,8 @@ impl LlmpBroker { } else { let pageinfo = (*msg).buf.as_mut_ptr() as *mut LlmpPayloadSharedMap; - match LlmpPageWrapper::from_name_slice( - &(*pageinfo).shm_str, - (*pageinfo).map_size, - ) { + match LlmpSharedMap::from_name_slice(&(*pageinfo).shm_str, (*pageinfo).map_size) + { Ok(new_page) => { let id = next_id; next_id += 1; @@ -701,7 +726,7 @@ impl LlmpBroker { /// Loops infinitely, forwarding and handling all incoming messages from clients. /// Never returns. Panics on error. - pub unsafe fn broker_loop(&mut self) -> ! { + pub unsafe fn loop_forever(&mut self) -> ! { loop { compiler_fence(Ordering::SeqCst); self.once() @@ -713,75 +738,47 @@ impl LlmpBroker { } } -/// For non zero-copy, we want to get rid of old pages with duplicate messages in the client -/// eventually. This function This funtion sees if we can unallocate older pages. -/// The broker would have informed us by setting the save_to_unmap-flag. -unsafe fn llmp_prune_old_pages(sender: &mut LlmpSender) { - // Exclude the current page by splitting of the last element for this iter - let mut unmap_until_excl = 0; - for map in sender.out_maps.split_last().unwrap().1 { - if (*map.page()).save_to_unmap == 0 { - // The broker didn't read this page yet, no more pages to unmap. - break; - } - unmap_until_excl += 1; - } - // Remove all maps that the broker already mapped - // simply removing them from the vec should then call drop and unmap them. - sender.out_maps.drain(0..unmap_until_excl); -} - +/// `n` clients connect to a broker. They share an outgoing map with the broker, +/// and get incoming messages from the shared broker bus impl LlmpClient { /// Creates a new LlmpClient - pub unsafe fn new(initial_broker_page: LlmpPageWrapper) -> Result { + pub unsafe fn new(initial_broker_map: LlmpSharedMap) -> Result { Ok(Self { llmp_out: LlmpSender { id: 0, last_msg_sent: 0 as *mut LlmpMsg, - out_maps: vec![LlmpPageWrapper::new(0, LLMP_INITIAL_MAP_SIZE)?], + out_maps: vec![LlmpSharedMap::new(0, LLMP_INITIAL_MAP_SIZE)?], // drop pages to the broker if it already read them keep_pages_forever: false, }, llmp_in: LlmpReceiver { id: 0, - current_recv_map: initial_broker_page, + current_recv_map: initial_broker_map, last_msg_recvd: 0 as *mut LlmpMsg, }, }) } -} -/// A client receives a broadcast message. -/// Returns null if no message is availiable -pub unsafe fn llmp_client_recv(client: &mut LlmpClient) -> Result, AflError> { - llmp_recv(&mut client.llmp_in) -} + /// Commits a msg to the client's out map + pub unsafe fn send(&mut self, msg: *mut LlmpMsg) -> Result<(), AflError> { + self.llmp_out.send(msg) + } -/// A client blocks/spins until the next message gets posted to the page, -/// then returns that message. -pub unsafe fn llmp_client_recv_blocking(client: &mut LlmpClient) -> Result<*mut LlmpMsg, AflError> { - llmp_recv_blocking(&mut client.llmp_in) -} + /// A client receives a broadcast message. + /// Returns null if no message is availiable + pub unsafe fn recv(&mut self) -> Result, AflError> { + self.llmp_in.recv() + } -/// The current page could have changed in recv (EOP) -/// Alloc the next message, internally handling end of page by allocating a new one. -pub unsafe fn llmp_client_alloc_next( - client: &mut LlmpClient, - buf_len: usize, -) -> Result<*mut LlmpMsg, AflError> { - llmp_alloc_next(&mut client.llmp_out, buf_len) -} + /// A client blocks/spins until the next message gets posted to the page, + /// then returns that message. + pub unsafe fn recv_blocking(&mut self) -> Result<*mut LlmpMsg, AflError> { + self.llmp_in.recv_blocking() + } -/// Cancel send of the next message, this allows us to allocate a new message without sending this one. -pub unsafe fn llmp_cancel_send(sender: &mut LlmpSender, msg: *mut LlmpMsg) { - /* DBG("Client %d cancels send of msg at %p with tag 0x%X and size %ld", client->id, msg, msg->tag, - * msg->buf_len_padded); */ - let page = sender.out_maps.last().unwrap().page(); - (*msg).tag = LLMP_TAG_UNSET; - (*page).size_used -= (*msg).buf_len_padded as usize + size_of::(); -} - -/// Commits a msg to the client's out map -pub unsafe fn llmp_client_send(client: &mut LlmpClient, msg: *mut LlmpMsg) -> Result<(), AflError> { - llmp_send(&mut client.llmp_out, msg) + /// The current page could have changed in recv (EOP) + /// Alloc the next message, internally handling end of page by allocating a new one. + pub unsafe fn alloc_next(&mut self, buf_len: usize) -> Result<*mut LlmpMsg, AflError> { + self.llmp_out.alloc_next(buf_len) + } }