more rusty

This commit is contained in:
Dominik Maier 2020-12-08 19:10:54 +01:00
parent 6f25fabe07
commit f9782b48d4

View File

@ -75,9 +75,15 @@ const LLMP_TAG_END_OF_PAGE: u32 = 0xaf1e0f1;
/// A new client for this broekr got added. /// A new client for this broekr got added.
const LLMP_TAG_NEW_SHM_CLIENT: u32 = 0xc11e471; const LLMP_TAG_NEW_SHM_CLIENT: u32 = 0xc11e471;
pub type AflRet = c_uint; /// What byte count to align messages to
pub const AFL_RET_ALLOC: AflRet = 3; /// LlmpMsg sizes (including header) will always be rounded up to be a multiple of this value
pub const AFL_RET_SUCCESS: AflRet = 0; const LLMP_ALIGNNMENT: usize = 64;
/// Size of a new page message, header, payload, and alignment
const EOP_MSG_SIZE: usize = llmp_align(size_of::<LlmpMsg>() + size_of::<LlmpPayloadSharedMap>());
/// Message Hook
pub type LlmpMsgHookFn = unsafe fn(client_id: u32, msg: *mut LlmpMsg) -> LlmpMsgHookResult;
/// Sending end on a (unidirectional) sharedmap channel /// Sending end on a (unidirectional) sharedmap channel
#[derive(Clone)] #[derive(Clone)]
@ -88,7 +94,7 @@ pub struct LlmpSender {
/// If null, a new page (just) started. /// If null, a new page (just) started.
pub last_msg_sent: *mut LlmpMsg, pub last_msg_sent: *mut LlmpMsg,
/// A vec of page wrappers, each containing an intialized AfShmem /// A vec of page wrappers, each containing an intialized AfShmem
pub out_maps: Vec<LlmpPageWrapper>, pub out_maps: Vec<LlmpSharedMap>,
/// If true, pages will never be pruned. /// If true, pages will never be pruned.
/// The broker uses this feature. /// The broker uses this feature.
/// By keeping the message history around, /// By keeping the message history around,
@ -103,7 +109,7 @@ pub struct LlmpReceiver {
/// Pointer to the last meg this received /// Pointer to the last meg this received
pub last_msg_recvd: *mut LlmpMsg, pub last_msg_recvd: *mut LlmpMsg,
/// current page. After EOP, this gets replaced with the new one /// current page. After EOP, this gets replaced with the new one
pub current_recv_map: LlmpPageWrapper, pub current_recv_map: LlmpSharedMap,
} }
/// Client side of LLMP /// Client side of LLMP
@ -115,43 +121,11 @@ pub struct LlmpClient {
/// A page wrapper /// A page wrapper
#[derive(Clone)] #[derive(Clone)]
pub struct LlmpPageWrapper { pub struct LlmpSharedMap {
/// Shmem containg the actual (unsafe) page, /// Shmem containg the actual (unsafe) page,
/// shared between one LlmpSender and one LlmpReceiver /// shared between one LlmpSender and one LlmpReceiver
shmem: AflShmem, shmem: AflShmem,
} }
/// The page struct, placed on a shared mem instance.
impl LlmpPageWrapper {
/// Creates a new page with minimum prev_max_alloc_size or LLMP_INITIAL_MAP_SIZE
/// returning the initialized shared mem struct
pub unsafe fn new(sender: u32, min_size: usize) -> Result<Self, AflError> {
// Create a new shard page.
let mut shmem = AflShmem::new(new_map_size(min_size))?;
_llmp_page_init(&mut shmem, sender);
Ok(Self { shmem })
}
/// Initialize from a 0-terminated sharedmap id string and its size
pub unsafe fn from_str(shm_str: &CStr, map_size: usize) -> Result<Self, AflError> {
let shmem = AflShmem::from_str(shm_str, map_size)?;
// Not initializing the page here - the other side should have done it already!
Ok(Self { shmem })
}
/// Initialize from a shm_str with fixed len of 20
pub unsafe fn from_name_slice(shm_str: &[u8; 20], map_size: usize) -> Result<Self, AflError> {
let shmem = AflShmem::from_name_slice(shm_str, map_size)?;
// Not initializing the page here - the other side should have done it already!
Ok(Self { shmem })
}
/// Get the unsafe ptr to this page, situated on the shared map
pub unsafe fn page(&self) -> *mut LlmpPage {
shmem2page(&self.shmem)
}
}
/// Message sent over the "wire" /// Message sent over the "wire"
#[derive(Copy, Clone)] #[derive(Copy, Clone)]
#[repr(C, packed)] #[repr(C, packed)]
@ -205,9 +179,6 @@ pub enum LlmpMsgHookResult {
ForwardToClients, ForwardToClients,
} }
/// Message Hook
pub type LlmpMsgHookFn = unsafe fn(client_id: u32, msg: *mut LlmpMsg) -> LlmpMsgHookResult;
/// Message payload when a client got added LLMP_TAG_CLIENT_ADDED_V1 */ /// Message payload when a client got added LLMP_TAG_CLIENT_ADDED_V1 */
/// This is an internal message! /// This is an internal message!
/// LLMP_TAG_END_OF_PAGE_V1 /// LLMP_TAG_END_OF_PAGE_V1
@ -218,26 +189,20 @@ struct LlmpPayloadSharedMap {
pub shm_str: [u8; 20], pub shm_str: [u8; 20],
} }
/// Get sharedmem from a page
#[inline] #[inline]
unsafe fn shmem2page(afl_shmem: &AflShmem) -> *mut LlmpPage { unsafe fn shmem2page(afl_shmem: &AflShmem) -> *mut LlmpPage {
afl_shmem.map as *mut LlmpPage afl_shmem.map as *mut LlmpPage
} }
/* If a msg is contained in the current page */ /// Return, if a msg is contained in the current page
unsafe fn llmp_msg_in_page(page: *mut LlmpPage, msg: *mut LlmpMsg) -> bool { unsafe fn llmp_msg_in_page(page: *mut LlmpPage, msg: *mut LlmpMsg) -> bool {
/* DBG("llmp_msg_in_page %p within %p-%p\n", msg, page, page + page->size_total); */ /* DBG("llmp_msg_in_page %p within %p-%p\n", msg, page, page + page->size_total); */
return (page as *mut u8) < msg as *mut u8 return (page as *mut u8) < msg as *mut u8
&& (page as *mut u8).offset((*page).size_total as isize) > msg as *mut u8; && (page as *mut u8).offset((*page).size_total as isize) > msg as *mut u8;
} }
/// What byte count to align messages to /// allign to LLMP_ALIGNNMENT=64 bytes
/// LlmpMsg sizes (including header) will always be rounded up to be a multiple of this value
const LLMP_ALIGNNMENT: usize = 64;
/// Size of a new page message, header, payload, and alignment
const EOP_MSG_SIZE: usize = llmp_align(size_of::<LlmpMsg>() + size_of::<LlmpPayloadSharedMap>());
/* allign to LLMP_ALIGNNMENT=64 bytes */
#[inline] #[inline]
const fn llmp_align(to_align: usize) -> usize { const fn llmp_align(to_align: usize) -> usize {
// check if we need to align first // check if we need to align first
@ -264,8 +229,8 @@ fn new_map_size(max_alloc: usize) -> usize {
) as u64) as usize ) as u64) as usize
} }
/* Initialize a new llmp_page. size should be relative to /// Initialize a new llmp_page. size should be relative to
* llmp_page->messages */ /// llmp_page->messages
unsafe fn _llmp_page_init(shmem: &mut AflShmem, sender: u32) { unsafe fn _llmp_page_init(shmem: &mut AflShmem, sender: u32) {
let page = shmem2page(&shmem); let page = shmem2page(&shmem);
(*page).sender = sender; (*page).sender = sender;
@ -280,7 +245,7 @@ unsafe fn _llmp_page_init(shmem: &mut AflShmem, sender: u32) {
ptr::write_volatile(&mut (*page).sender_dead, 0); ptr::write_volatile(&mut (*page).sender_dead, 0);
} }
/* Pointer to the message behind the last message */ /// Pointer to the message behind the last message
#[inline] #[inline]
unsafe fn _llmp_next_msg_ptr(last_msg: *const LlmpMsg) -> *mut LlmpMsg { unsafe fn _llmp_next_msg_ptr(last_msg: *const LlmpMsg) -> *mut LlmpMsg {
/* DBG("_llmp_next_msg_ptr %p %lu + %lu\n", last_msg, last_msg->buf_len_padded, sizeof(llmp_message)); */ /* DBG("_llmp_next_msg_ptr %p %lu + %lu\n", last_msg, last_msg->buf_len_padded, sizeof(llmp_message)); */
@ -289,12 +254,233 @@ unsafe fn _llmp_next_msg_ptr(last_msg: *const LlmpMsg) -> *mut LlmpMsg {
.offset((*last_msg).buf_len_padded as isize) as *mut LlmpMsg; .offset((*last_msg).buf_len_padded as isize) as *mut LlmpMsg;
} }
/* Read next message. */ /// An actor on the sendin part of the shared map
unsafe fn llmp_recv(receiver: &mut LlmpReceiver) -> Result<Option<*mut LlmpMsg>, AflError> { impl LlmpSender {
/* DBG("llmp_recv %p %p\n", page, last_msg); */ /// For non zero-copy, we want to get rid of old pages with duplicate messages in the client
/// eventually. This function This funtion sees if we can unallocate older pages.
/// The broker would have informed us by setting the save_to_unmap-flag.
unsafe fn prune_old_pages(&mut self) {
// Exclude the current page by splitting of the last element for this iter
let mut unmap_until_excl = 0;
for map in self.out_maps.split_last().unwrap().1 {
if (*map.page()).save_to_unmap == 0 {
// The broker didn't read this page yet, no more pages to unmap.
break;
}
unmap_until_excl += 1;
}
// Remove all maps that the broker already mapped
// simply removing them from the vec should then call drop and unmap them.
self.out_maps.drain(0..unmap_until_excl);
}
/// Intern: Special allocation function for EOP messages (and nothing else!)
/// The normal alloc will fail if there is not enough space for buf_len_padded + EOP
/// So if alloc_next fails, create new page if necessary, use this function,
/// place EOP, commit EOP, reset, alloc again on the new space.
unsafe fn alloc_eop(&mut self) -> *mut LlmpMsg {
let page = self.out_maps.last().unwrap().page();
let last_msg = self.last_msg_sent;
if (*page).size_used + EOP_MSG_SIZE > (*page).size_total {
panic!(format!("PROGRAM ABORT : BUG: EOP does not fit in page! page {:?}, size_current {:?}, size_total {:?}", page,
(*page).size_used, (*page).size_total));
}
let mut ret: *mut LlmpMsg = if !last_msg.is_null() {
_llmp_next_msg_ptr(last_msg)
} else {
(*page).messages.as_mut_ptr()
};
if (*ret).tag == LLMP_TAG_UNINITIALIZED {
panic!("Did not call send() on last message!");
}
(*ret).buf_len_padded = size_of::<LlmpPayloadSharedMap>() as c_ulong;
(*ret).message_id = if !last_msg.is_null() {
(*last_msg).message_id + 1
} else {
1
};
(*ret).tag = LLMP_TAG_END_OF_PAGE;
(*page).size_used += EOP_MSG_SIZE;
ret
}
/// Intern: Will return a ptr to the next msg buf, or None if map is full.
/// Never call alloc_next without either sending or cancelling the last allocated message for this page!
/// There can only ever be up to one message allocated per page at each given time.
unsafe fn alloc_next_if_space(&mut self, buf_len: usize) -> Option<*mut LlmpMsg> {
let mut buf_len_padded = buf_len;
let mut complete_msg_size = llmp_align(size_of::<LlmpMsg>() + buf_len_padded);
let page = self.out_maps.last().unwrap().page();
let last_msg = self.last_msg_sent;
/* DBG("XXX complete_msg_size %lu (h: %lu)\n", complete_msg_size, sizeof(llmp_message)); */
/* In case we don't have enough space, make sure the next page will be large
* enough */
// For future allocs, keep track of the maximum (aligned) alloc size we used
(*page).max_alloc_size = max((*page).max_alloc_size, complete_msg_size);
let mut ret: *mut LlmpMsg;
/* DBG("last_msg %p %d (%d)\n", last_msg, last_msg ? (int)last_msg->tag : -1, (int)LLMP_TAG_END_OF_PAGE_V1); */
if last_msg.is_null() || (*last_msg).tag == LLMP_TAG_END_OF_PAGE {
/* We start fresh, on a new page */
ret = (*page).messages.as_mut_ptr();
/* The initial message may not be alligned, so we at least align the end of
it. Technically, c_ulong can be smaller than a pointer, then who knows what
happens */
let base_addr = ret as usize;
buf_len_padded =
llmp_align(base_addr + complete_msg_size) - base_addr - size_of::<LlmpMsg>();
complete_msg_size = buf_len_padded + size_of::<LlmpMsg>();
/* DBG("XXX complete_msg_size NEW %lu\n", complete_msg_size); */
/* Still space for the new message plus the additional "we're full" message?
*/
if (*page).size_used + complete_msg_size + EOP_MSG_SIZE > (*page).size_total {
/* We're full. */
return None;
}
/* We need to start with 1 for ids, as current message id is initialized
* with 0... */
(*ret).message_id = if !last_msg.is_null() {
(*last_msg).message_id + 1
} else {
1
}
} else if (*page).current_msg_id != (*last_msg).message_id {
/* Oops, wrong usage! */
panic!(format!("BUG: The current message never got commited using send! (page->current_msg_id {:?}, last_msg->message_id: {})", (*page).current_msg_id, (*last_msg).message_id));
} else {
buf_len_padded = complete_msg_size - size_of::<LlmpMsg>();
/* DBG("XXX ret %p id %u buf_len_padded %lu complete_msg_size %lu\n", ret, ret->message_id, buf_len_padded,
* complete_msg_size); */
/* Still space for the new message plus the additional "we're full" message? */
if (*page).size_used + complete_msg_size + EOP_MSG_SIZE > (*page).size_total {
/* We're full. */
return None;
}
ret = _llmp_next_msg_ptr(last_msg);
(*ret).message_id = (*last_msg).message_id + 1
}
/* The beginning of our message should be messages + size_used, else nobody
* sent the last msg! */
/* DBG("XXX ret %p - page->messages %p = %lu != %lu, will add %lu -> %p\n", ret, page->messages,
(c_ulong)((u8 *)ret - (u8 *)page->messages), page->size_used, complete_msg_size, ((u8 *)ret) + complete_msg_size);
*/
if last_msg.is_null() && (*page).size_used != 0
|| ((ret as usize) - (*page).messages.as_mut_ptr() as usize) != (*page).size_used
{
panic!(format!("Allocated new message without calling send() inbetween. ret: {:?}, page: {:?}, complete_msg_size: {:?}, size_used: {:?}, last_msg: {:?}", ret, page,
buf_len_padded, (*page).size_used, last_msg));
}
(*page).size_used = (*page).size_used + complete_msg_size;
(*ret).buf_len_padded = buf_len_padded as c_ulong;
(*ret).buf_len = buf_len as c_ulong;
/* DBG("Returning new message at %p with len %ld, TAG was %x", ret, ret->buf_len_padded, ret->tag); */
/* Maybe catch some bugs... */
(*_llmp_next_msg_ptr(ret)).tag = LLMP_TAG_UNSET;
(*ret).tag = LLMP_TAG_UNINITIALIZED;
Some(ret)
}
/// Commit the message last allocated by alloc_next to the queue.
/// After commiting, the msg shall no longer be altered!
/// It will be read by the consuming threads (broker->clients or client->broker)
unsafe fn send(&mut self, msg: *mut LlmpMsg) -> Result<(), AflError> {
if self.last_msg_sent == msg {
panic!("Message sent twice!");
}
if (*msg).tag == LLMP_TAG_UNSET as c_uint {
panic!(format!(
"No tag set on message with id {}",
(*msg).message_id
));
}
let page = self.out_maps.last().unwrap().page();
if msg.is_null() || !llmp_msg_in_page(page, msg) {
return Err(AflError::Unknown(format!(
"Llmp Message {:?} is null or not in current page",
msg
)));
}
(*msg).message_id = (*page).current_msg_id + 1;
compiler_fence(Ordering::SeqCst); compiler_fence(Ordering::SeqCst);
let page = receiver.current_recv_map.page(); ptr::write_volatile(&mut (*page).current_msg_id, (*msg).message_id);
let last_msg = receiver.last_msg_recvd; compiler_fence(Ordering::SeqCst);
self.last_msg_sent = msg;
Ok(())
}
/// listener about it using a EOP message.
unsafe fn handle_out_eop(&mut self) -> Result<(), AflError> {
let old_map = self.out_maps.last_mut().unwrap().page();
// Create a new shard page.
let new_map_shmem = LlmpSharedMap::new((*old_map).sender, (*old_map).max_alloc_size)?;
let mut new_map = new_map_shmem.page();
ptr::write_volatile(&mut (*new_map).current_msg_id, (*old_map).current_msg_id);
(*new_map).max_alloc_size = (*old_map).max_alloc_size;
/* On the old map, place a last message linking to the new map for the clients
* to consume */
let mut out: *mut LlmpMsg = self.alloc_eop();
(*out).sender = (*old_map).sender;
let mut end_of_page_msg = (*out).buf.as_mut_ptr() as *mut LlmpPayloadSharedMap;
(*end_of_page_msg).map_size = new_map_shmem.shmem.map_size;
(*end_of_page_msg).shm_str = new_map_shmem.shmem.shm_str;
// We never sent a msg on the new buf */
self.last_msg_sent = 0 as *mut LlmpMsg;
/* Send the last msg on the old buf */
self.send(out)?;
if !self.keep_pages_forever {
self.prune_old_pages();
}
self.out_maps.push(new_map_shmem);
Ok(())
}
/// Allocates the next space on this sender page
pub unsafe fn alloc_next(&mut self, buf_len: usize) -> Result<*mut LlmpMsg, AflError> {
match self.alloc_next_if_space(buf_len) {
Some(msg) => return Ok(msg),
_ => (),
};
/* no more space left! We'll have to start a new page */
self.handle_out_eop()?;
match self.alloc_next_if_space(buf_len) {
Some(msg) => Ok(msg),
None => Err(AflError::Unknown(format!(
"Error allocating {} bytes in shmap",
buf_len
))),
}
}
/// Cancel send of the next message, this allows us to allocate a new message without sending this one.
pub unsafe fn cancel_send(&mut self, msg: *mut LlmpMsg) {
/* DBG("Client %d cancels send of msg at %p with tag 0x%X and size %ld", client->id, msg, msg->tag,
* msg->buf_len_padded); */
let page = self.out_maps.last().unwrap().page();
(*msg).tag = LLMP_TAG_UNSET;
(*page).size_used -= (*msg).buf_len_padded as usize + size_of::<LlmpMsg>();
}
}
/// Receiving end of an llmp channel
impl LlmpReceiver {
/// Read next message.
unsafe fn recv(&mut self) -> Result<Option<*mut LlmpMsg>, AflError> {
/* DBG("recv %p %p\n", page, last_msg); */
compiler_fence(Ordering::SeqCst);
let page = self.current_recv_map.page();
let last_msg = self.last_msg_recvd;
let current_msg_id = ptr::read_volatile(&mut (*page).current_msg_id); let current_msg_id = ptr::read_volatile(&mut (*page).current_msg_id);
// Read the message from the page // Read the message from the page
@ -335,34 +521,31 @@ unsafe fn llmp_recv(receiver: &mut LlmpReceiver) -> Result<Option<*mut LlmpMsg>,
let pageinfo_cpy = (*pageinfo).clone(); let pageinfo_cpy = (*pageinfo).clone();
ptr::write_volatile(&mut (*page).save_to_unmap, 1); ptr::write_volatile(&mut (*page).save_to_unmap, 1);
receiver.current_recv_map = LlmpPageWrapper::from_name_slice( self.current_recv_map = LlmpSharedMap::from_name_slice(
&pageinfo_cpy.shm_str, &pageinfo_cpy.shm_str,
pageinfo_cpy.map_size, pageinfo_cpy.map_size,
)?; )?;
dbg!( dbg!("Got a new recv map", self.current_recv_map.shmem.shm_str);
"Got a new recv map",
receiver.current_recv_map.shmem.shm_str
);
// After we mapped the new page, return the next message, if available // After we mapped the new page, return the next message, if available
return llmp_recv(receiver); return self.recv();
} }
_ => (), _ => (),
} }
// Store the last msg for next time // Store the last msg for next time
receiver.last_msg_recvd = msg; self.last_msg_recvd = msg;
} }
_ => (), _ => (),
}; };
Ok(ret) Ok(ret)
} }
/* Blocks/spins until the next message gets posted to the page, /// Blocks/spins until the next message gets posted to the page,
then returns that message. */ /// then returns that message.
pub unsafe fn llmp_recv_blocking(receiver: &mut LlmpReceiver) -> Result<*mut LlmpMsg, AflError> { pub unsafe fn recv_blocking(&mut self) -> Result<*mut LlmpMsg, AflError> {
let mut current_msg_id = 0; let mut current_msg_id = 0;
let page = receiver.current_recv_map.page(); let page = self.current_recv_map.page();
let last_msg = receiver.last_msg_recvd; let last_msg = self.last_msg_recvd;
if !last_msg.is_null() { if !last_msg.is_null() {
if (*last_msg).tag == LLMP_TAG_END_OF_PAGE && !llmp_msg_in_page(page, last_msg) { if (*last_msg).tag == LLMP_TAG_END_OF_PAGE && !llmp_msg_in_page(page, last_msg) {
panic!("BUG: full page passed to await_message_blocking or reset failed"); panic!("BUG: full page passed to await_message_blocking or reset failed");
@ -372,205 +555,48 @@ pub unsafe fn llmp_recv_blocking(receiver: &mut LlmpReceiver) -> Result<*mut Llm
loop { loop {
compiler_fence(Ordering::SeqCst); compiler_fence(Ordering::SeqCst);
if ptr::read_volatile(&mut (*page).current_msg_id) != current_msg_id { if ptr::read_volatile(&mut (*page).current_msg_id) != current_msg_id {
return match llmp_recv(receiver)? { return match self.recv()? {
Some(msg) => Ok(msg), Some(msg) => Ok(msg),
None => panic!("BUG: blocking llmp message should never be NULL"), None => panic!("BUG: blocking llmp message should never be NULL"),
}; };
} }
} }
} }
/* Special allocation function for EOP messages (and nothing else!)
The normal alloc will fail if there is not enough space for buf_len_padded + EOP
So if llmp_alloc_next fails, create new page if necessary, use this function,
place EOP, commit EOP, reset, alloc again on the new space.
*/
unsafe fn llmp_alloc_eop(page: *mut LlmpPage, last_msg: *const LlmpMsg) -> *mut LlmpMsg {
if (*page).size_used + EOP_MSG_SIZE > (*page).size_total {
panic!(format!("PROGRAM ABORT : BUG: EOP does not fit in page! page {:?}, size_current {:?}, size_total {:?}", page,
(*page).size_used, (*page).size_total));
}
let mut ret: *mut LlmpMsg = if !last_msg.is_null() {
_llmp_next_msg_ptr(last_msg)
} else {
(*page).messages.as_mut_ptr()
};
if (*ret).tag == LLMP_TAG_UNINITIALIZED {
panic!("Did not call send() on last message!");
}
(*ret).buf_len_padded = size_of::<LlmpPayloadSharedMap>() as c_ulong;
(*ret).message_id = if !last_msg.is_null() {
(*last_msg).message_id + 1
} else {
1
};
(*ret).tag = LLMP_TAG_END_OF_PAGE;
(*page).size_used += EOP_MSG_SIZE;
ret
} }
/// Will return a ptr to the next msg buf, or None if map is full. /// The page struct, placed on a shared mem instance.
/// Never call alloc_next without either sending or cancelling the last allocated message for this page! impl LlmpSharedMap {
/// There can only ever be up to one message allocated per page at each given time. /// Creates a new page with minimum prev_max_alloc_size or LLMP_INITIAL_MAP_SIZE
unsafe fn llmp_alloc_next_if_space(llmp: &mut LlmpSender, buf_len: usize) -> Option<*mut LlmpMsg> { /// returning the initialized shared mem struct
let mut buf_len_padded = buf_len; pub unsafe fn new(sender: u32, min_size: usize) -> Result<Self, AflError> {
let mut complete_msg_size = llmp_align(size_of::<LlmpMsg>() + buf_len_padded);
let page = llmp.out_maps.last().unwrap().page();
let last_msg = llmp.last_msg_sent;
/* DBG("XXX complete_msg_size %lu (h: %lu)\n", complete_msg_size, sizeof(llmp_message)); */
/* In case we don't have enough space, make sure the next page will be large
* enough */
// For future allocs, keep track of the maximum (aligned) alloc size we used
(*page).max_alloc_size = max((*page).max_alloc_size, complete_msg_size);
let mut ret: *mut LlmpMsg;
/* DBG("last_msg %p %d (%d)\n", last_msg, last_msg ? (int)last_msg->tag : -1, (int)LLMP_TAG_END_OF_PAGE_V1); */
if last_msg.is_null() || (*last_msg).tag == LLMP_TAG_END_OF_PAGE {
/* We start fresh, on a new page */
ret = (*page).messages.as_mut_ptr();
/* The initial message may not be alligned, so we at least align the end of
it. Technically, c_ulong can be smaller than a pointer, then who knows what
happens */
let base_addr = ret as usize;
buf_len_padded =
llmp_align(base_addr + complete_msg_size) - base_addr - size_of::<LlmpMsg>();
complete_msg_size = buf_len_padded + size_of::<LlmpMsg>();
/* DBG("XXX complete_msg_size NEW %lu\n", complete_msg_size); */
/* Still space for the new message plus the additional "we're full" message?
*/
if (*page).size_used + complete_msg_size + EOP_MSG_SIZE > (*page).size_total {
/* We're full. */
return None;
}
/* We need to start with 1 for ids, as current message id is initialized
* with 0... */
(*ret).message_id = if !last_msg.is_null() {
(*last_msg).message_id + 1
} else {
1
}
} else if (*page).current_msg_id != (*last_msg).message_id {
/* Oops, wrong usage! */
panic!(format!("BUG: The current message never got commited using llmp_send! (page->current_msg_id {:?}, last_msg->message_id: {})", (*page).current_msg_id, (*last_msg).message_id));
} else {
buf_len_padded = complete_msg_size - size_of::<LlmpMsg>();
/* DBG("XXX ret %p id %u buf_len_padded %lu complete_msg_size %lu\n", ret, ret->message_id, buf_len_padded,
* complete_msg_size); */
/* Still space for the new message plus the additional "we're full" message? */
if (*page).size_used + complete_msg_size + EOP_MSG_SIZE > (*page).size_total {
/* We're full. */
return None;
}
ret = _llmp_next_msg_ptr(last_msg);
(*ret).message_id = (*last_msg).message_id + 1
}
/* The beginning of our message should be messages + size_used, else nobody
* sent the last msg! */
/* DBG("XXX ret %p - page->messages %p = %lu != %lu, will add %lu -> %p\n", ret, page->messages,
(c_ulong)((u8 *)ret - (u8 *)page->messages), page->size_used, complete_msg_size, ((u8 *)ret) + complete_msg_size);
*/
if last_msg.is_null() && (*page).size_used != 0
|| ((ret as usize) - (*page).messages.as_mut_ptr() as usize) != (*page).size_used
{
panic!(format!("Allocated new message without calling send() inbetween. ret: {:?}, page: {:?}, complete_msg_size: {:?}, size_used: {:?}, last_msg: {:?}", ret, page,
buf_len_padded, (*page).size_used, last_msg));
}
(*page).size_used = (*page).size_used + complete_msg_size;
(*ret).buf_len_padded = buf_len_padded as c_ulong;
(*ret).buf_len = buf_len as c_ulong;
/* DBG("Returning new message at %p with len %ld, TAG was %x", ret, ret->buf_len_padded, ret->tag); */
/* Maybe catch some bugs... */
(*_llmp_next_msg_ptr(ret)).tag = LLMP_TAG_UNSET;
(*ret).tag = LLMP_TAG_UNINITIALIZED;
Some(ret)
}
/// Commit the message last allocated by llmp_alloc_next to the queue.
/// After commiting, the msg shall no longer be altered!
/// It will be read by the consuming threads (broker->clients or client->broker)
unsafe fn llmp_send(sender: &mut LlmpSender, msg: *mut LlmpMsg) -> Result<(), AflError> {
if sender.last_msg_sent == msg {
panic!("Message sent twice!");
}
if (*msg).tag == LLMP_TAG_UNSET as c_uint {
panic!(format!(
"No tag set on message with id {}",
(*msg).message_id
));
}
let page = sender.out_maps.last().unwrap().page();
if msg.is_null() || !llmp_msg_in_page(page, msg) {
return Err(AflError::Unknown(format!(
"Llmp Message {:?} is null or not in current page",
msg
)));
}
(*msg).message_id = (*page).current_msg_id + 1;
compiler_fence(Ordering::SeqCst);
ptr::write_volatile(&mut (*page).current_msg_id, (*msg).message_id);
compiler_fence(Ordering::SeqCst);
sender.last_msg_sent = msg;
Ok(())
}
/// listener about it using a EOP message.
unsafe fn llmp_handle_out_eop(sender: &mut LlmpSender) -> Result<(), AflError> {
let old_map = sender.out_maps.last_mut().unwrap().page();
// Create a new shard page. // Create a new shard page.
let new_map_shmem = LlmpPageWrapper::new((*old_map).sender, (*old_map).max_alloc_size)?; let mut shmem = AflShmem::new(new_map_size(min_size))?;
let mut new_map = new_map_shmem.page(); _llmp_page_init(&mut shmem, sender);
Ok(Self { shmem })
ptr::write_volatile(&mut (*new_map).current_msg_id, (*old_map).current_msg_id);
(*new_map).max_alloc_size = (*old_map).max_alloc_size;
/* On the old map, place a last message linking to the new map for the clients
* to consume */
let mut out: *mut LlmpMsg = llmp_alloc_eop(old_map, sender.last_msg_sent);
(*out).sender = (*old_map).sender;
let mut end_of_page_msg = (*out).buf.as_mut_ptr() as *mut LlmpPayloadSharedMap;
(*end_of_page_msg).map_size = new_map_shmem.shmem.map_size;
(*end_of_page_msg).shm_str = new_map_shmem.shmem.shm_str;
// We never sent a msg on the new buf */
sender.last_msg_sent = 0 as *mut LlmpMsg;
/* Send the last msg on the old buf */
llmp_send(sender, out)?;
if !sender.keep_pages_forever {
llmp_prune_old_pages(sender);
} }
sender.out_maps.push(new_map_shmem); /// Initialize from a 0-terminated sharedmap id string and its size
pub unsafe fn from_str(shm_str: &CStr, map_size: usize) -> Result<Self, AflError> {
Ok(()) let shmem = AflShmem::from_str(shm_str, map_size)?;
// Not initializing the page here - the other side should have done it already!
Ok(Self { shmem })
} }
/// Allocates the next space on this sender page /// Initialize from a shm_str with fixed len of 20
pub unsafe fn llmp_alloc_next( pub unsafe fn from_name_slice(shm_str: &[u8; 20], map_size: usize) -> Result<Self, AflError> {
sender: &mut LlmpSender, let shmem = AflShmem::from_name_slice(shm_str, map_size)?;
buf_len: usize, // Not initializing the page here - the other side should have done it already!
) -> Result<*mut LlmpMsg, AflError> { Ok(Self { shmem })
match llmp_alloc_next_if_space(sender, buf_len) { }
Some(msg) => return Ok(msg),
_ => (),
};
/* no more space left! We'll have to start a new page */ /// Get the unsafe ptr to this page, situated on the shared map
llmp_handle_out_eop(sender)?; pub unsafe fn page(&self) -> *mut LlmpPage {
shmem2page(&self.shmem)
match llmp_alloc_next_if_space(sender, buf_len) {
Some(msg) => Ok(msg),
None => Err(AflError::Unknown(format!(
"Error allocating {} bytes in shmap",
buf_len
))),
} }
} }
/// The broker forwards all messages to its own bus-like broadcast map.
/// It may intercept messages passing through.
impl LlmpBroker { impl LlmpBroker {
/// Create and initialize a new llmp_broker /// Create and initialize a new llmp_broker
pub unsafe fn new() -> Result<Self, AflError> { pub unsafe fn new() -> Result<Self, AflError> {
@ -578,7 +604,7 @@ impl LlmpBroker {
llmp_out: LlmpSender { llmp_out: LlmpSender {
id: 0, id: 0,
last_msg_sent: ptr::null_mut(), last_msg_sent: ptr::null_mut(),
out_maps: vec![LlmpPageWrapper::new(0, 0)?], out_maps: vec![LlmpSharedMap::new(0, 0)?],
// Broker never cleans up the pages so that new // Broker never cleans up the pages so that new
// clients may join at any time // clients may join at any time
keep_pages_forever: true, keep_pages_forever: true,
@ -590,13 +616,14 @@ impl LlmpBroker {
Ok(broker) Ok(broker)
} }
/// Allocate the next message on the outgoing map
unsafe fn alloc_next(&mut self, buf_len: usize) -> Result<*mut LlmpMsg, AflError> { unsafe fn alloc_next(&mut self, buf_len: usize) -> Result<*mut LlmpMsg, AflError> {
llmp_alloc_next(&mut self.llmp_out, buf_len) self.llmp_out.alloc_next(buf_len)
} }
/// Registers a new client for the given sharedmap str and size. /// Registers a new client for the given sharedmap str and size.
/// Returns the id of the new client in broker.client_map /// Returns the id of the new client in broker.client_map
pub unsafe fn register_client(&mut self, client_page: LlmpPageWrapper) { pub unsafe fn register_client(&mut self, client_page: LlmpSharedMap) {
let id = self.llmp_clients.len() as u32; let id = self.llmp_clients.len() as u32;
self.llmp_clients.push(LlmpReceiver { self.llmp_clients.push(LlmpReceiver {
id, id,
@ -622,7 +649,7 @@ impl LlmpBroker {
msg.copy_to_nonoverlapping(out, size_of::<LlmpMsg>() + (*msg).buf_len_padded as usize); msg.copy_to_nonoverlapping(out, size_of::<LlmpMsg>() + (*msg).buf_len_padded as usize);
(*out).buf_len_padded = actual_size; (*out).buf_len_padded = actual_size;
/* We need to replace the message ID with our own */ /* We need to replace the message ID with our own */
match llmp_send(&mut self.llmp_out, out) { match self.llmp_out.send(out) {
Err(e) => panic!(format!("Error sending msg: {:?}", e)), Err(e) => panic!(format!("Error sending msg: {:?}", e)),
_ => (), _ => (),
}; };
@ -637,8 +664,8 @@ impl LlmpBroker {
// TODO: We could memcpy a range of pending messages, instead of one by one. // TODO: We could memcpy a range of pending messages, instead of one by one.
loop { loop {
let msg = { let msg = {
let mut client = &mut self.llmp_clients[client_id as usize]; let client = &mut self.llmp_clients[client_id as usize];
match llmp_recv(&mut client)? { match client.recv()? {
None => { None => {
// We're done handling this client // We're done handling this client
return Ok(()); return Ok(());
@ -657,10 +684,8 @@ impl LlmpBroker {
} else { } else {
let pageinfo = (*msg).buf.as_mut_ptr() as *mut LlmpPayloadSharedMap; let pageinfo = (*msg).buf.as_mut_ptr() as *mut LlmpPayloadSharedMap;
match LlmpPageWrapper::from_name_slice( match LlmpSharedMap::from_name_slice(&(*pageinfo).shm_str, (*pageinfo).map_size)
&(*pageinfo).shm_str, {
(*pageinfo).map_size,
) {
Ok(new_page) => { Ok(new_page) => {
let id = next_id; let id = next_id;
next_id += 1; next_id += 1;
@ -701,7 +726,7 @@ impl LlmpBroker {
/// Loops infinitely, forwarding and handling all incoming messages from clients. /// Loops infinitely, forwarding and handling all incoming messages from clients.
/// Never returns. Panics on error. /// Never returns. Panics on error.
pub unsafe fn broker_loop(&mut self) -> ! { pub unsafe fn loop_forever(&mut self) -> ! {
loop { loop {
compiler_fence(Ordering::SeqCst); compiler_fence(Ordering::SeqCst);
self.once() self.once()
@ -713,75 +738,47 @@ impl LlmpBroker {
} }
} }
/// For non zero-copy, we want to get rid of old pages with duplicate messages in the client /// `n` clients connect to a broker. They share an outgoing map with the broker,
/// eventually. This function This funtion sees if we can unallocate older pages. /// and get incoming messages from the shared broker bus
/// The broker would have informed us by setting the save_to_unmap-flag.
unsafe fn llmp_prune_old_pages(sender: &mut LlmpSender) {
// Exclude the current page by splitting of the last element for this iter
let mut unmap_until_excl = 0;
for map in sender.out_maps.split_last().unwrap().1 {
if (*map.page()).save_to_unmap == 0 {
// The broker didn't read this page yet, no more pages to unmap.
break;
}
unmap_until_excl += 1;
}
// Remove all maps that the broker already mapped
// simply removing them from the vec should then call drop and unmap them.
sender.out_maps.drain(0..unmap_until_excl);
}
impl LlmpClient { impl LlmpClient {
/// Creates a new LlmpClient /// Creates a new LlmpClient
pub unsafe fn new(initial_broker_page: LlmpPageWrapper) -> Result<Self, AflError> { pub unsafe fn new(initial_broker_map: LlmpSharedMap) -> Result<Self, AflError> {
Ok(Self { Ok(Self {
llmp_out: LlmpSender { llmp_out: LlmpSender {
id: 0, id: 0,
last_msg_sent: 0 as *mut LlmpMsg, last_msg_sent: 0 as *mut LlmpMsg,
out_maps: vec![LlmpPageWrapper::new(0, LLMP_INITIAL_MAP_SIZE)?], out_maps: vec![LlmpSharedMap::new(0, LLMP_INITIAL_MAP_SIZE)?],
// drop pages to the broker if it already read them // drop pages to the broker if it already read them
keep_pages_forever: false, keep_pages_forever: false,
}, },
llmp_in: LlmpReceiver { llmp_in: LlmpReceiver {
id: 0, id: 0,
current_recv_map: initial_broker_page, current_recv_map: initial_broker_map,
last_msg_recvd: 0 as *mut LlmpMsg, last_msg_recvd: 0 as *mut LlmpMsg,
}, },
}) })
} }
/// Commits a msg to the client's out map
pub unsafe fn send(&mut self, msg: *mut LlmpMsg) -> Result<(), AflError> {
self.llmp_out.send(msg)
} }
/// A client receives a broadcast message. /// A client receives a broadcast message.
/// Returns null if no message is availiable /// Returns null if no message is availiable
pub unsafe fn llmp_client_recv(client: &mut LlmpClient) -> Result<Option<*mut LlmpMsg>, AflError> { pub unsafe fn recv(&mut self) -> Result<Option<*mut LlmpMsg>, AflError> {
llmp_recv(&mut client.llmp_in) self.llmp_in.recv()
} }
/// A client blocks/spins until the next message gets posted to the page, /// A client blocks/spins until the next message gets posted to the page,
/// then returns that message. /// then returns that message.
pub unsafe fn llmp_client_recv_blocking(client: &mut LlmpClient) -> Result<*mut LlmpMsg, AflError> { pub unsafe fn recv_blocking(&mut self) -> Result<*mut LlmpMsg, AflError> {
llmp_recv_blocking(&mut client.llmp_in) self.llmp_in.recv_blocking()
} }
/// The current page could have changed in recv (EOP) /// The current page could have changed in recv (EOP)
/// Alloc the next message, internally handling end of page by allocating a new one. /// Alloc the next message, internally handling end of page by allocating a new one.
pub unsafe fn llmp_client_alloc_next( pub unsafe fn alloc_next(&mut self, buf_len: usize) -> Result<*mut LlmpMsg, AflError> {
client: &mut LlmpClient, self.llmp_out.alloc_next(buf_len)
buf_len: usize,
) -> Result<*mut LlmpMsg, AflError> {
llmp_alloc_next(&mut client.llmp_out, buf_len)
} }
/// Cancel send of the next message, this allows us to allocate a new message without sending this one.
pub unsafe fn llmp_cancel_send(sender: &mut LlmpSender, msg: *mut LlmpMsg) {
/* DBG("Client %d cancels send of msg at %p with tag 0x%X and size %ld", client->id, msg, msg->tag,
* msg->buf_len_padded); */
let page = sender.out_maps.last().unwrap().page();
(*msg).tag = LLMP_TAG_UNSET;
(*page).size_used -= (*msg).buf_len_padded as usize + size_of::<LlmpMsg>();
}
/// Commits a msg to the client's out map
pub unsafe fn llmp_client_send(client: &mut LlmpClient, msg: *mut LlmpMsg) -> Result<(), AflError> {
llmp_send(&mut client.llmp_out, msg)
} }