LLMP Changes (#130)

* llmp_changes

* fixed send

* no_std fixes
This commit is contained in:
Dominik Maier 2021-06-07 02:15:31 +02:00 committed by GitHub
parent 392ffd33f7
commit 35e655ca04
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 97 additions and 84 deletions

View File

@ -62,7 +62,7 @@ For broker2broker communication, all messages are forwarded via network sockets.
use alloc::{string::String, vec::Vec}; use alloc::{string::String, vec::Vec};
use core::{ use core::{
cmp::max, cmp::max,
convert::TryFrom, convert::{TryFrom, TryInto},
fmt::Debug, fmt::Debug,
mem::size_of, mem::size_of,
ptr, slice, ptr, slice,
@ -73,7 +73,6 @@ use serde::{Deserialize, Serialize};
#[cfg(feature = "std")] #[cfg(feature = "std")]
use std::{ use std::{
convert::TryInto,
env, env,
io::{ErrorKind, Read, Write}, io::{ErrorKind, Read, Write},
net::{SocketAddr, TcpListener, TcpStream, ToSocketAddrs}, net::{SocketAddr, TcpListener, TcpStream, ToSocketAddrs},
@ -709,6 +708,9 @@ where
/// By keeping the message history around, /// By keeping the message history around,
/// new clients may join at any time in the future. /// new clients may join at any time in the future.
pub keep_pages_forever: bool, pub keep_pages_forever: bool,
/// True, if we allocatd a message, but didn't call [`Self::send()`] yet
has_unsent_message: bool,
/// The sharedmem provider to get new sharaed maps if we're full
shmem_provider: SP, shmem_provider: SP,
} }
@ -730,6 +732,7 @@ where
)], )],
// drop pages to the broker if it already read them // drop pages to the broker if it already read them
keep_pages_forever, keep_pages_forever,
has_unsent_message: false,
shmem_provider, shmem_provider,
}) })
} }
@ -804,6 +807,7 @@ where
out_maps: vec![out_map], out_maps: vec![out_map],
// drop pages to the broker if it already read them // drop pages to the broker if it already read them
keep_pages_forever: false, keep_pages_forever: false,
has_unsent_message: false,
shmem_provider, shmem_provider,
}) })
} }
@ -864,103 +868,76 @@ where
/// Never call [`alloc_next`] without either sending or cancelling the last allocated message for this page! /// Never call [`alloc_next`] without either sending or cancelling the last allocated message for this page!
/// There can only ever be up to one message allocated per page at each given time. /// There can only ever be up to one message allocated per page at each given time.
unsafe fn alloc_next_if_space(&mut self, buf_len: usize) -> Option<*mut LlmpMsg> { unsafe fn alloc_next_if_space(&mut self, buf_len: usize) -> Option<*mut LlmpMsg> {
let buf_len_padded;
let mut complete_msg_size = llmp_align(size_of::<LlmpMsg>() + buf_len);
let map = self.out_maps.last_mut().unwrap(); let map = self.out_maps.last_mut().unwrap();
let page = map.page_mut(); let page = map.page_mut();
let last_msg = self.last_msg_sent; let last_msg = self.last_msg_sent;
if self.has_unsent_message {
panic!("Called alloc without callind send inbetween");
}
#[cfg(all(feature = "llmp_debug", feature = "std"))] #[cfg(all(feature = "llmp_debug", feature = "std"))]
println!( println!(
"Allocating {} (>={}) bytes on page {:?} / map {:?} (last msg: {:?})", "Allocating {} bytes on page {:?} / map {:?} (last msg: {:?})",
complete_msg_size, buf_len, page, &map, last_msg buf_len, page, &map, last_msg
); );
/* DBG("XXX complete_msg_size %lu (h: %lu)\n", complete_msg_size, sizeof(llmp_message)); */
/* In case we don't have enough space, make sure the next page will be large
* enough */
// For future allocs, keep track of the maximum (aligned) alloc size we used
(*page).max_alloc_size = max((*page).max_alloc_size, complete_msg_size);
let mut ret: *mut LlmpMsg; let msg_start = (*page).messages.as_mut_ptr() as usize + (*page).size_used;
/* DBG("last_msg %p %d (%d)\n", last_msg, last_msg ? (int)last_msg->tag : -1, (int)LLMP_TAG_END_OF_PAGE_V1); */
if last_msg.is_null() || (*last_msg).tag == LLMP_TAG_END_OF_PAGE { // Make sure the end of our msg is aligned.
/* We start fresh, on a new page */ let buf_len_padded = llmp_align(msg_start + buf_len + size_of::<LlmpMsg>())
ret = (*page).messages.as_mut_ptr(); - msg_start
/* The initial message may not be alligned, so we at least align the end of - size_of::<LlmpMsg>();
it. Technically, c_ulong can be smaller than a pointer, then who knows what
happens */
let base_addr = ret as usize;
buf_len_padded =
llmp_align(base_addr + complete_msg_size) - base_addr - size_of::<LlmpMsg>();
complete_msg_size = buf_len_padded + size_of::<LlmpMsg>();
/* Still space for the new message plus the additional "we're full" message?
*/
#[cfg(all(feature = "llmp_debug", feature = "std"))] #[cfg(all(feature = "llmp_debug", feature = "std"))]
dbg!( dbg!(
page, page,
*page, *page,
(*page).size_used, (*page).size_used,
complete_msg_size, buf_len_padded,
EOP_MSG_SIZE, EOP_MSG_SIZE,
(*page).size_total (*page).size_total
); );
if (*page).size_used + complete_msg_size + EOP_MSG_SIZE > (*page).size_total {
// We need enough space for the current page size_used + payload + padding
if (*page).size_used + size_of::<LlmpMsg>() + buf_len_padded + EOP_MSG_SIZE
> (*page).size_total
{
#[cfg(all(feature = "llmp_debug", feature = "std"))]
println!("LLMP: Page full.");
/* We're full. */ /* We're full. */
return None; return None;
} }
let ret = msg_start as *mut LlmpMsg;
/* We need to start with 1 for ids, as current message id is initialized /* We need to start with 1 for ids, as current message id is initialized
* with 0... */ * with 0... */
(*ret).message_id = if last_msg.is_null() { (*ret).message_id = if last_msg.is_null() {
1 1
} else {
(*last_msg).message_id + 1
}
} else if (*page).current_msg_id == (*last_msg).message_id { } else if (*page).current_msg_id == (*last_msg).message_id {
buf_len_padded = complete_msg_size - size_of::<LlmpMsg>(); (*last_msg).message_id + 1
/* DBG("XXX ret %p id %u buf_len_padded %lu complete_msg_size %lu\n", ret, ret->message_id, buf_len_padded,
* complete_msg_size); */
/* Still space for the new message plus the additional "we're full" message? */
if (*page).size_used + complete_msg_size + EOP_MSG_SIZE > (*page).size_total {
/* We're full. */
return None;
}
ret = match llmp_next_msg_ptr_checked(map, last_msg, complete_msg_size) {
Ok(msg) => msg,
Err(e) => {
#[cfg(feature = "std")]
dbg!("Unexpected error allocing new msg", e);
#[cfg(feature = "std")]
return None;
#[cfg(not(feature = "std"))]
panic!("Unexpected error allocing new msg {:?}", e);
}
};
(*ret).message_id = (*last_msg).message_id + 1
} else { } else {
/* Oops, wrong usage! */ /* Oops, wrong usage! */
panic!("BUG: The current message never got committed using send! (page->current_msg_id {:?}, last_msg->message_id: {})", ptr::addr_of!((*page).current_msg_id), (*last_msg).message_id); panic!("BUG: The current message never got committed using send! (page->current_msg_id {:?}, last_msg->message_id: {})", ptr::addr_of!((*page).current_msg_id), (*last_msg).message_id);
} };
/* The beginning of our message should be messages + size_used, else nobody
* sent the last msg! */
/* DBG("XXX ret %p - page->messages %p = %lu != %lu, will add %lu -> %p\n", ret, page->messages,
(c_ulong)((u8 *)ret - (u8 *)page->messages), page->size_used, complete_msg_size, ((u8 *)ret) + complete_msg_size);
*/
if last_msg.is_null() && (*page).size_used != 0
|| ((ret as usize) - (*page).messages.as_mut_ptr() as usize) != (*page).size_used
{
panic!("Allocated new message without calling send() inbetween. ret: {:?}, page: {:?}, complete_msg_size: {:?}, size_used: {:?}, last_msg: {:?}", ret, page,
buf_len_padded, ptr::addr_of!((*page).size_used), last_msg);
}
(*page).size_used += complete_msg_size;
(*ret).buf_len_padded = buf_len_padded as u64;
(*ret).buf_len = buf_len as u64; (*ret).buf_len = buf_len as u64;
/* DBG("Returning new message at %p with len %ld, TAG was %x", ret, ret->buf_len_padded, ret->tag); */ (*ret).buf_len_padded = buf_len_padded as u64;
/* Maybe catch some bugs... */ (*page).size_used += size_of::<LlmpMsg>() + buf_len_padded;
(*_llmp_next_msg_ptr(ret)).tag = LLMP_TAG_UNSET; (*_llmp_next_msg_ptr(ret)).tag = LLMP_TAG_UNSET;
(*ret).tag = LLMP_TAG_UNINITIALIZED; (*ret).tag = LLMP_TAG_UNINITIALIZED;
// For future allocs, keep track of the maximum (aligned) alloc size we used
(*page).max_alloc_size = max(
(*page).max_alloc_size,
size_of::<LlmpMsg>() + buf_len_padded,
);
self.has_unsent_message = true;
Some(ret) Some(ret)
} }
@ -994,6 +971,7 @@ where
ptr::write_volatile(ptr::addr_of_mut!((*page).current_msg_id), (*msg).message_id); ptr::write_volatile(ptr::addr_of_mut!((*page).current_msg_id), (*msg).message_id);
compiler_fence(Ordering::SeqCst); compiler_fence(Ordering::SeqCst);
self.last_msg_sent = msg; self.last_msg_sent = msg;
self.has_unsent_message = false;
Ok(()) Ok(())
} }
@ -1101,6 +1079,38 @@ where
(*page).size_used -= (*msg).buf_len_padded as usize + size_of::<LlmpMsg>(); (*page).size_used -= (*msg).buf_len_padded as usize + size_of::<LlmpMsg>();
} }
/// Shrinks the allocated [`LlmpMsg`] to a given size.
pub unsafe fn shrink_alloced(
&mut self,
msg: *mut LlmpMsg,
shrinked_len: usize,
) -> Result<(), Error> {
let old_len_padded = (*msg).buf_len_padded;
let msg_start = msg as usize;
// Make sure the end of our msg is aligned.
let buf_len_padded = llmp_align(msg_start + shrinked_len + size_of::<LlmpMsg>())
- msg_start
- size_of::<LlmpMsg>();
if buf_len_padded > old_len_padded.try_into().unwrap() {
return Err(Error::IllegalArgument(format!("Cannot shrink msg of size {} (paded: {}) to requested larger size of {} (padded: {})!", (*msg).buf_len, old_len_padded, shrinked_len, buf_len_padded)));
}
(*msg).buf_len = shrinked_len as u64;
(*msg).buf_len_padded = buf_len_padded as u64;
let page = self.out_maps.last_mut().unwrap().page_mut();
// Doing this step by step will catch underflows in debug builds :)
(*page).size_used -= old_len_padded as usize;
(*page).size_used += buf_len_padded as usize;
(*_llmp_next_msg_ptr(msg)).tag = LLMP_TAG_UNSET;
Ok(())
}
/// Allocates a message of the given size, tags it, and sends it off. /// Allocates a message of the given size, tags it, and sends it off.
pub fn send_buf(&mut self, tag: Tag, buf: &[u8]) -> Result<(), Error> { pub fn send_buf(&mut self, tag: Tag, buf: &[u8]) -> Result<(), Error> {
// Make sure we don't reuse already allocated tags // Make sure we don't reuse already allocated tags
@ -1633,6 +1643,7 @@ where
// Broker never cleans up the pages so that new // Broker never cleans up the pages so that new
// clients may join at any time // clients may join at any time
keep_pages_forever: true, keep_pages_forever: true,
has_unsent_message: false,
shmem_provider: shmem_provider.clone(), shmem_provider: shmem_provider.clone(),
}, },
llmp_clients: vec![], llmp_clients: vec![],
@ -2099,6 +2110,7 @@ where
)], )],
// drop pages to the broker, if it already read them. // drop pages to the broker, if it already read them.
keep_pages_forever: false, keep_pages_forever: false,
has_unsent_message: false,
shmem_provider: shmem_provider_clone.clone(), shmem_provider: shmem_provider_clone.clone(),
}; };
@ -2366,6 +2378,7 @@ where
})], })],
// drop pages to the broker if it already read them // drop pages to the broker if it already read them
keep_pages_forever: false, keep_pages_forever: false,
has_unsent_message: false,
shmem_provider: shmem_provider.clone(), shmem_provider: shmem_provider.clone(),
}, },

View File

@ -10,7 +10,7 @@ use core::ptr::{addr_of, read_volatile};
#[cfg(feature = "std")] #[cfg(feature = "std")]
use crate::bolts::{ use crate::bolts::{
llmp::{LlmpClient, LlmpReceiver}, llmp::{LlmpClient, LlmpConnection, LlmpReceiver},
shmem::StdShMemProvider, shmem::StdShMemProvider,
}; };
@ -19,7 +19,7 @@ use std::net::{SocketAddr, ToSocketAddrs};
use crate::{ use crate::{
bolts::{ bolts::{
llmp::{self, Flags, LlmpClientDescription, LlmpConnection, LlmpSender, Tag}, llmp::{self, Flags, LlmpClientDescription, LlmpSender, Tag},
shmem::ShMemProvider, shmem::ShMemProvider,
}, },
events::{BrokerEventResult, Event, EventFirer, EventManager, EventProcessor, EventRestarter}, events::{BrokerEventResult, Event, EventFirer, EventManager, EventProcessor, EventRestarter},