Llmp Fixes (#51)

* fixed llmp
This commit is contained in:
Dominik Maier 2021-04-11 02:21:09 +02:00 committed by GitHub
parent fb0a23e767
commit 9bc22d0331
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 38 additions and 30 deletions

View File

@ -237,7 +237,7 @@ fn msg_offset_from_env(env_name: &str) -> Result<Option<u64>, Error> {
fn new_map_size(max_alloc: usize) -> usize { fn new_map_size(max_alloc: usize) -> usize {
max( max(
max_alloc * 2 + EOP_MSG_SIZE + LLMP_PAGE_HEADER_LEN, max_alloc * 2 + EOP_MSG_SIZE + LLMP_PAGE_HEADER_LEN,
LLMP_CFG_INITIAL_MAP_SIZE, LLMP_CFG_INITIAL_MAP_SIZE - 1,
) )
.next_power_of_two() .next_power_of_two()
} }
@ -546,7 +546,7 @@ where
last_msg_sent: ptr::null_mut(), last_msg_sent: ptr::null_mut(),
out_maps: vec![LlmpSharedMap::new( out_maps: vec![LlmpSharedMap::new(
0, 0,
SH::new_map(new_map_size(LLMP_CFG_INITIAL_MAP_SIZE))?, SH::new_map(LLMP_CFG_INITIAL_MAP_SIZE)?,
)], )],
// drop pages to the broker if it already read them // drop pages to the broker if it already read them
keep_pages_forever, keep_pages_forever,
@ -659,7 +659,10 @@ where
if (*ret).tag == LLMP_TAG_UNINITIALIZED { if (*ret).tag == LLMP_TAG_UNINITIALIZED {
panic!("Did not call send() on last message!"); panic!("Did not call send() on last message!");
} }
(*ret).buf_len_padded = size_of::<LlmpPayloadSharedMapInfo>() as u64; (*ret).buf_len = size_of::<LlmpPayloadSharedMapInfo>() as u64;
// We don't need to pad the EOP message: it'll always be the last in this page.
(*ret).buf_len_padded = (*ret).buf_len;
(*ret).message_id = if !last_msg.is_null() { (*ret).message_id = if !last_msg.is_null() {
(*last_msg).message_id + 1 (*last_msg).message_id + 1
} else { } else {
@ -677,11 +680,13 @@ where
let buf_len_padded; let buf_len_padded;
let mut complete_msg_size = llmp_align(size_of::<LlmpMsg>() + buf_len); let mut complete_msg_size = llmp_align(size_of::<LlmpMsg>() + buf_len);
let map = self.out_maps.last_mut().unwrap(); let map = self.out_maps.last_mut().unwrap();
println!("Allocating on map {:?} (size: {})", map, complete_msg_size);
let page = map.page_mut(); let page = map.page_mut();
println!("Allocating {} (>={}) bytes on page {:?}", complete_msg_size, buf_len, page);
let last_msg = self.last_msg_sent; let last_msg = self.last_msg_sent;
println!("last msg: {:?}", last_msg); #[cfg(all(feature = "llmp_debug", feature = "std"))]
println!(
"Allocating {} (>={}) bytes on page {:?} / map {:?} (last msg: {:?})",
complete_msg_size, buf_len, page, map, last_msg
);
/* DBG("XXX complete_msg_size %lu (h: %lu)\n", complete_msg_size, sizeof(llmp_message)); */ /* DBG("XXX complete_msg_size %lu (h: %lu)\n", complete_msg_size, sizeof(llmp_message)); */
/* In case we don't have enough space, make sure the next page will be large /* In case we don't have enough space, make sure the next page will be large
* enough */ * enough */
@ -700,11 +705,16 @@ where
buf_len_padded = buf_len_padded =
llmp_align(base_addr + complete_msg_size) - base_addr - size_of::<LlmpMsg>(); llmp_align(base_addr + complete_msg_size) - base_addr - size_of::<LlmpMsg>();
complete_msg_size = buf_len_padded + size_of::<LlmpMsg>(); complete_msg_size = buf_len_padded + size_of::<LlmpMsg>();
dbg!("LLMP_DEBUG: complete_msg_size NEW {}", complete_msg_size);
/* Still space for the new message plus the additional "we're full" message? /* Still space for the new message plus the additional "we're full" message?
*/ */
dbg!((*page).size_used, complete_msg_size, EOP_MSG_SIZE, (*page).size_total); #[cfg(all(feature = "llmp_debug", feature = "std"))]
dbg!(
(*page).size_used,
complete_msg_size,
EOP_MSG_SIZE,
(*page).size_total
);
if (*page).size_used + complete_msg_size + EOP_MSG_SIZE > (*page).size_total { if (*page).size_used + complete_msg_size + EOP_MSG_SIZE > (*page).size_total {
/* We're full. */ /* We're full. */
return None; return None;
@ -760,9 +770,7 @@ where
(*ret).buf_len = buf_len as u64; (*ret).buf_len = buf_len as u64;
/* DBG("Returning new message at %p with len %ld, TAG was %x", ret, ret->buf_len_padded, ret->tag); */ /* DBG("Returning new message at %p with len %ld, TAG was %x", ret, ret->buf_len_padded, ret->tag); */
/* Maybe catch some bugs... */ /* Maybe catch some bugs... */
(dbg!(*_llmp_next_msg_ptr(ret))).tag = LLMP_TAG_UNSET; (*_llmp_next_msg_ptr(ret)).tag = LLMP_TAG_UNSET;
dbg!(ret);
dbg!(*ret);
(*ret).tag = LLMP_TAG_UNINITIALIZED; (*ret).tag = LLMP_TAG_UNINITIALIZED;
Some(ret) Some(ret)
} }
@ -772,7 +780,7 @@ where
/// It will be read by the consuming threads (broker->clients or client->broker) /// It will be read by the consuming threads (broker->clients or client->broker)
#[inline(never)] // Not inlined to make cpu-level reodering (hopefully?) improbable #[inline(never)] // Not inlined to make cpu-level reodering (hopefully?) improbable
unsafe fn send(&mut self, msg: *mut LlmpMsg) -> Result<(), Error> { unsafe fn send(&mut self, msg: *mut LlmpMsg) -> Result<(), Error> {
dbg!("Sending msg {:?}", msg); // dbg!("Sending msg {:?}", msg);
if self.last_msg_sent == msg { if self.last_msg_sent == msg {
panic!("Message sent twice!"); panic!("Message sent twice!");
@ -814,6 +822,7 @@ where
let old_map = self.out_maps.last_mut().unwrap().page_mut(); let old_map = self.out_maps.last_mut().unwrap().page_mut();
#[cfg(all(feature = "llmp_debug", feature = "std"))]
println!("New Map Size {}", new_map_size((*old_map).max_alloc_size)); println!("New Map Size {}", new_map_size((*old_map).max_alloc_size));
// Create a new shard page. // Create a new shard page.
@ -823,30 +832,26 @@ where
); );
let mut new_map = new_map_shmem.page_mut(); let mut new_map = new_map_shmem.page_mut();
#[cfg(all(feature = "llmp_debug", feature = "std"))]
println!("got new map at: {:?}", new_map); println!("got new map at: {:?}", new_map);
ptr::write_volatile(&mut (*new_map).current_msg_id, (*old_map).current_msg_id); ptr::write_volatile(&mut (*new_map).current_msg_id, (*old_map).current_msg_id);
#[cfg(all(feature = "llmp_debug", feature = "std"))]
println!("Setting max alloc size: {:?}", (*old_map).max_alloc_size); println!("Setting max alloc size: {:?}", (*old_map).max_alloc_size);
(*new_map).max_alloc_size = (*old_map).max_alloc_size; (*new_map).max_alloc_size = (*old_map).max_alloc_size;
/* On the old map, place a last message linking to the new map for the clients /* On the old map, place a last message linking to the new map for the clients
* to consume */ * to consume */
let mut out: *mut LlmpMsg = self.alloc_eop()?; let mut out: *mut LlmpMsg = self.alloc_eop()?;
println!("out");
(*out).sender = (*old_map).sender; (*out).sender = (*old_map).sender;
println!("sender");
let mut end_of_page_msg = (*out).buf.as_mut_ptr() as *mut LlmpPayloadSharedMapInfo; let mut end_of_page_msg = (*out).buf.as_mut_ptr() as *mut LlmpPayloadSharedMapInfo;
println!("size");
(*end_of_page_msg).map_size = new_map_shmem.shmem.map().len(); (*end_of_page_msg).map_size = new_map_shmem.shmem.map().len();
println!("str");
(*end_of_page_msg).shm_str = *new_map_shmem.shmem.shm_slice(); (*end_of_page_msg).shm_str = *new_map_shmem.shmem.shm_slice();
println!("send");
/* Send the last msg on the old buf */ /* Send the last msg on the old buf */
self.send(out)?; self.send(out)?;
println!("sent");
// Set the new page as current page. // Set the new page as current page.
self.out_maps.push(new_map_shmem); self.out_maps.push(new_map_shmem);
@ -855,10 +860,10 @@ where
// If we want to get red if old pages, (client to broker), do that now // If we want to get red if old pages, (client to broker), do that now
if !self.keep_pages_forever { if !self.keep_pages_forever {
#[cfg(all(feature = "llmp_debug", feature = "std"))]
println!("pruning"); println!("pruning");
self.prune_old_pages(); self.prune_old_pages();
} }
println!("Done");
Ok(()) Ok(())
} }
@ -874,6 +879,7 @@ where
self.handle_out_eop()?; self.handle_out_eop()?;
} }
#[cfg(all(feature = "llmp_debug", feature = "std"))]
println!("Handled out eop"); println!("Handled out eop");
match unsafe { self.alloc_next_if_space(buf_len) } { match unsafe { self.alloc_next_if_space(buf_len) } {
@ -1046,7 +1052,8 @@ where
// Handle end of page // Handle end of page
if (*msg).buf_len < size_of::<LlmpPayloadSharedMapInfo>() as u64 { if (*msg).buf_len < size_of::<LlmpPayloadSharedMapInfo>() as u64 {
panic!( panic!(
"Illegal message length for EOP (is {}, expected {})", "Illegal message length for EOP (is {}/{}, expected {})",
(*msg).buf_len,
(*msg).buf_len_padded, (*msg).buf_len_padded,
size_of::<LlmpPayloadSharedMapInfo>() size_of::<LlmpPayloadSharedMapInfo>()
); );
@ -1057,6 +1064,9 @@ where
Copy the contents first to be safe (probably fine in rust either way). */ Copy the contents first to be safe (probably fine in rust either way). */
let pageinfo_cpy = *pageinfo; let pageinfo_cpy = *pageinfo;
// Set last msg we received to null (as the map may no longer exist)
self.last_msg_recvd = ptr::null();
// Mark the old page save to unmap, in case we didn't so earlier. // Mark the old page save to unmap, in case we didn't so earlier.
ptr::write_volatile(&mut (*page).save_to_unmap, 1); ptr::write_volatile(&mut (*page).save_to_unmap, 1);
@ -1397,7 +1407,8 @@ where
If we should need zero copy, we could instead post a link to the If we should need zero copy, we could instead post a link to the
original msg with the map_id and offset. */ original msg with the map_id and offset. */
let actual_size = (*out).buf_len_padded; let actual_size = (*out).buf_len_padded;
msg.copy_to_nonoverlapping(out, size_of::<LlmpMsg>() + (*msg).buf_len_padded as usize); let complete_size = actual_size as usize + size_of::<LlmpMsg>();
(msg as *const u8).copy_to_nonoverlapping(out as *mut u8, complete_size);
(*out).buf_len_padded = actual_size; (*out).buf_len_padded = actual_size;
/* We need to replace the message ID with our own */ /* We need to replace the message ID with our own */
if let Err(e) = self.llmp_out.send(out) { if let Err(e) = self.llmp_out.send(out) {
@ -1507,10 +1518,7 @@ where
let llmp_tcp_id = self.llmp_clients.len() as u32; let llmp_tcp_id = self.llmp_clients.len() as u32;
// Tcp out map sends messages from background thread tcp server to foreground client // Tcp out map sends messages from background thread tcp server to foreground client
let tcp_out_map = LlmpSharedMap::new( let tcp_out_map = LlmpSharedMap::new(llmp_tcp_id, SH::new_map(LLMP_CFG_INITIAL_MAP_SIZE)?);
llmp_tcp_id,
SH::new_map(new_map_size(LLMP_CFG_INITIAL_MAP_SIZE))?,
);
let tcp_out_map_str = tcp_out_map.shmem.shm_str(); let tcp_out_map_str = tcp_out_map.shmem.shm_str();
let tcp_out_map_size = tcp_out_map.shmem.map().len(); let tcp_out_map_size = tcp_out_map.shmem.map().len();
self.register_client(tcp_out_map); self.register_client(tcp_out_map);
@ -1822,7 +1830,7 @@ where
last_msg_sent: ptr::null_mut(), last_msg_sent: ptr::null_mut(),
out_maps: vec![LlmpSharedMap::new( out_maps: vec![LlmpSharedMap::new(
0, 0,
SH::new_map(new_map_size(LLMP_CFG_INITIAL_MAP_SIZE))?, SH::new_map(LLMP_CFG_INITIAL_MAP_SIZE)?,
)], )],
// drop pages to the broker if it already read them // drop pages to the broker if it already read them
keep_pages_forever: false, keep_pages_forever: false,

View File

@ -344,13 +344,13 @@ pub mod unix_shmem {
pub fn new(map_size: usize) -> Result<Self, Error> { pub fn new(map_size: usize) -> Result<Self, Error> {
let mut ret = unix_shmem_unitialized(); let mut ret = unix_shmem_unitialized();
let map = unsafe { unix_shmem_init(&mut ret, map_size) }; let map = unsafe { unix_shmem_init(&mut ret, map_size) };
if !map.is_null() { if map.is_null() {
Ok(ret)
} else {
Err(Error::Unknown(format!( Err(Error::Unknown(format!(
"Could not allocate map of size {}", "Could not allocate map of size {} - check OS limits, (i.e. shmall, shmmax)!",
map_size map_size
))) )))
} else {
Ok(ret)
} }
} }
} }