This commit is contained in:
Dominik Maier 2021-04-10 17:03:33 +02:00
parent 24a033de31
commit 70be959b82

View File

@ -677,8 +677,11 @@ where
let buf_len_padded;
let mut complete_msg_size = llmp_align(size_of::<LlmpMsg>() + buf_len);
let map = self.out_maps.last_mut().unwrap();
println!("Allocating on map {:?} (size: {})", map, complete_msg_size);
let page = map.page_mut();
println!("Allocating {} (>={}) bytes on page {:?}", complete_msg_size, buf_len, page);
let last_msg = self.last_msg_sent;
println!("last msg: {:?}", last_msg);
/* DBG("XXX complete_msg_size %lu (h: %lu)\n", complete_msg_size, sizeof(llmp_message)); */
/* In case we don't have enough space, make sure the next page will be large
* enough */
@ -697,19 +700,21 @@ where
buf_len_padded =
llmp_align(base_addr + complete_msg_size) - base_addr - size_of::<LlmpMsg>();
complete_msg_size = buf_len_padded + size_of::<LlmpMsg>();
/* DBG("XXX complete_msg_size NEW %lu\n", complete_msg_size); */
dbg!("LLMP_DEBUG: complete_msg_size NEW {}", complete_msg_size);
/* Still space for the new message plus the additional "we're full" message?
*/
dbg!((*page).size_used, complete_msg_size, EOP_MSG_SIZE, (*page).size_total);
if (*page).size_used + complete_msg_size + EOP_MSG_SIZE > (*page).size_total {
/* We're full. */
return None;
}
/* We need to start with 1 for ids, as current message id is initialized
* with 0... */
(*ret).message_id = if !last_msg.is_null() {
(*last_msg).message_id + 1
} else {
(*ret).message_id = if last_msg.is_null() {
1
} else {
(*last_msg).message_id + 1
}
} else if (*page).current_msg_id != (*last_msg).message_id {
/* Oops, wrong usage! */
@ -755,7 +760,9 @@ where
(*ret).buf_len = buf_len as u64;
/* DBG("Returning new message at %p with len %ld, TAG was %x", ret, ret->buf_len_padded, ret->tag); */
/* Maybe catch some bugs... */
(*_llmp_next_msg_ptr(ret)).tag = LLMP_TAG_UNSET;
(dbg!(*_llmp_next_msg_ptr(ret))).tag = LLMP_TAG_UNSET;
dbg!(ret);
dbg!(*ret);
(*ret).tag = LLMP_TAG_UNINITIALIZED;
Some(ret)
}
@ -765,6 +772,8 @@ where
/// It will be read by the consuming threads (broker->clients or client->broker)
#[inline(never)] // Not inlined to make cpu-level reodering (hopefully?) improbable
unsafe fn send(&mut self, msg: *mut LlmpMsg) -> Result<(), Error> {
dbg!("Sending msg {:?}", msg);
if self.last_msg_sent == msg {
panic!("Message sent twice!");
}
@ -805,6 +814,8 @@ where
let old_map = self.out_maps.last_mut().unwrap().page_mut();
println!("New Map Size {}", new_map_size((*old_map).max_alloc_size));
// Create a new shard page.
let mut new_map_shmem = LlmpSharedMap::new(
(*old_map).sender,
@ -812,28 +823,42 @@ where
);
let mut new_map = new_map_shmem.page_mut();
println!("got new map at: {:?}", new_map);
ptr::write_volatile(&mut (*new_map).current_msg_id, (*old_map).current_msg_id);
println!("Setting max alloc size: {:?}", (*old_map).max_alloc_size);
(*new_map).max_alloc_size = (*old_map).max_alloc_size;
/* On the old map, place a last message linking to the new map for the clients
* to consume */
let mut out: *mut LlmpMsg = self.alloc_eop()?;
println!("out");
(*out).sender = (*old_map).sender;
println!("sender");
let mut end_of_page_msg = (*out).buf.as_mut_ptr() as *mut LlmpPayloadSharedMapInfo;
println!("size");
(*end_of_page_msg).map_size = new_map_shmem.shmem.map().len();
println!("str");
(*end_of_page_msg).shm_str = *new_map_shmem.shmem.shm_slice();
println!("send");
/* Send the last msg on the old buf */
self.send(out)?;
println!("sent");
// Set the new page as current page.
self.out_maps.push(new_map_shmem);
// We never sent a msg on the new buf */
self.last_msg_sent = ptr::null_mut();
/* Send the last msg on the old buf */
self.send(out)?;
// If we want to get red if old pages, (client to broker), do that now
if !self.keep_pages_forever {
println!("pruning");
self.prune_old_pages();
}
self.out_maps.push(new_map_shmem);
println!("Done");
Ok(())
}
@ -849,6 +874,8 @@ where
self.handle_out_eop()?;
}
println!("Handled out eop");
match unsafe { self.alloc_next_if_space(buf_len) } {
Some(msg) => Ok(msg),
None => Err(Error::Unknown(format!(
@ -1173,18 +1200,18 @@ where
/// Maps and wraps an existing
pub fn existing(existing_map: SH) -> Self {
#[cfg(all(feature = "llmp_debug", feature = "std"))]
{
#[cfg(debug_assertions)]
let bt = Backtrace::new();
#[cfg(not(debug_assertions))]
let bt = "<n/a (release)>";
println!(
"LLMP_DEBUG: Using existing map {} with size {}, bt: {:?}",
existing_map.shm_str(),
existing_map.map().len(),
bt
);
}
//{
//#[cfg(debug_assertions)]
//let bt = Backtrace::new();
//#[cfg(not(debug_assertions))]
//let bt = "<n/a (release)>";
dbg!(
"LLMP_DEBUG: Using existing map {} with size {}",
existing_map.shm_str(),
existing_map.map().len(),
//bt
);
//}
let ret = Self {
shmem: existing_map,