Cpu atomics for LLMP (#438)

* atomic read for unmap

* send and recv

* switching to Atomics

* atomics

* bring back compiler_fence (maybe needed for signals?)

* only acquire mem if new msg is available

* unused compiler fence

* caching for msg ids to not have to read atomics as much

* fix build

* speed++

* only in a spinloop for the second try

* cleanup logs

* docu, error log
This commit is contained in:
Dominik Maier 2022-01-03 00:47:31 +01:00 committed by GitHub
parent af3d321213
commit b9acac46d9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -63,9 +63,10 @@ use alloc::{string::String, vec::Vec};
use core::{
cmp::max,
fmt::Debug,
hint,
mem::size_of,
ptr, slice,
sync::atomic::{compiler_fence, Ordering},
sync::atomic::{fence, AtomicU16, AtomicU64, Ordering},
time::Duration,
};
use serde::{Deserialize, Serialize};
@ -281,7 +282,7 @@ impl Listener {
Listener::Tcp(inner) => match inner.accept() {
Ok(res) => ListenerStream::Tcp(res.0, res.1),
Err(err) => {
dbg!("Ignoring failed accept", err);
println!("Ignoring failed accept: {:?}", err);
ListenerStream::Empty()
}
},
@ -422,11 +423,11 @@ fn new_map_size(max_alloc: usize) -> usize {
/// `llmp_page->messages`
unsafe fn _llmp_page_init<SHM: ShMem>(shmem: &mut SHM, sender: u32, allow_reinit: bool) {
#[cfg(all(feature = "llmp_debug", feature = "std"))]
dbg!("_llmp_page_init: shmem {}", &shmem);
println!("_llmp_page_init: shmem {:?}", &shmem);
let map_size = shmem.len();
let page = shmem2page_mut(shmem);
#[cfg(all(feature = "llmp_debug", feature = "std"))]
dbg!("_llmp_page_init: page {}", *page);
println!("_llmp_page_init: page {:?}", &(*page));
if !allow_reinit {
assert!(
@ -439,15 +440,15 @@ unsafe fn _llmp_page_init<SHM: ShMem>(shmem: &mut SHM, sender: u32, allow_reinit
(*page).magic = PAGE_INITIALIZED_MAGIC;
(*page).sender = sender;
ptr::write_volatile(ptr::addr_of_mut!((*page).current_msg_id), 0);
(*page).current_msg_id.store(0, Ordering::Relaxed);
(*page).max_alloc_size = 0;
// Don't forget to subtract our own header size
(*page).size_total = map_size - LLMP_PAGE_HEADER_LEN;
(*page).size_used = 0;
(*(*page).messages.as_mut_ptr()).message_id = 0;
(*(*page).messages.as_mut_ptr()).tag = LLMP_TAG_UNSET;
ptr::write_volatile(ptr::addr_of_mut!((*page).safe_to_unmap), 0);
ptr::write_volatile(ptr::addr_of_mut!((*page).sender_dead), 0);
(*page).safe_to_unmap.store(0, Ordering::Relaxed);
(*page).sender_dead.store(0, Ordering::Relaxed);
assert!((*page).size_total != 0);
}
@ -598,7 +599,7 @@ where
match tcp_bind(port) {
Ok(listener) => {
// We got the port. We are the broker! :)
dbg!("We're the broker");
println!("We're the broker");
let mut broker = LlmpBroker::new(shmem_provider)?;
let _listener_thread = broker.launch_listener(Listener::Tcp(listener))?;
@ -670,7 +671,7 @@ where
}
/// Contents of the share mem pages, used by llmp internally
#[derive(Copy, Clone, Debug)]
#[derive(Debug)]
#[repr(C)]
pub struct LlmpPage {
/// to check if this page got initialized properly
@ -680,11 +681,11 @@ pub struct LlmpPage {
/// Set to != 1 by the receiver, once it got mapped
/// It's not safe for the sender to unmap this page before
/// (The os may have tidied up the memory when the receiver starts to map)
pub safe_to_unmap: u16,
pub safe_to_unmap: AtomicU16,
/// Not used at the moment (would indicate that the sender is no longer there)
pub sender_dead: u16,
pub sender_dead: AtomicU16,
/// The current message ID
pub current_msg_id: u64,
pub current_msg_id: AtomicU64,
/// How much space is available on this page in bytes
pub size_total: usize,
/// How much space is used on this page in bytes
@ -816,6 +817,7 @@ where
if self.safe_to_unmap() {
return;
}
hint::spin_loop();
// We log that we're looping -> see when we're blocking.
#[cfg(feature = "std")]
{
@ -831,9 +833,11 @@ where
pub fn safe_to_unmap(&self) -> bool {
let current_out_map = self.out_maps.last().unwrap();
unsafe {
compiler_fence(Ordering::SeqCst);
// println!("Reading safe_to_unmap from {:?}", current_out_map.page() as *const _);
ptr::read_volatile(ptr::addr_of!((*current_out_map.page()).safe_to_unmap)) != 0
(*current_out_map.page())
.safe_to_unmap
.load(Ordering::Relaxed)
!= 0
}
}
@ -841,8 +845,9 @@ where
/// # Safety
/// If this method is called, the page may be unmapped before it is read by any receiver.
pub unsafe fn mark_safe_to_unmap(&mut self) {
// No need to do this volatile, as we should be the same thread in this scenario.
(*self.out_maps.last_mut().unwrap().page_mut()).safe_to_unmap = 1;
(*self.out_maps.last_mut().unwrap().page_mut())
.safe_to_unmap
.store(1, Ordering::Relaxed);
}
/// Reattach to a vacant `out_map`.
@ -877,7 +882,7 @@ where
// Exclude the current page by splitting of the last element for this iter
let mut unmap_until_excl = 0;
for map in self.out_maps.split_last_mut().unwrap().1 {
if (*map.page_mut()).safe_to_unmap == 0 {
if (*map.page()).safe_to_unmap.load(Ordering::Relaxed) == 0 {
// The broker didn't read this page yet, no more pages to unmap.
break;
}
@ -960,7 +965,7 @@ where
#[cfg(all(feature = "llmp_debug", feature = "std"))]
dbg!(
page,
*page,
&(*page),
(*page).size_used,
buf_len_padded,
EOP_MSG_SIZE,
@ -984,7 +989,7 @@ where
* with 0... */
(*ret).message_id = if last_msg.is_null() {
1
} else if (*page).current_msg_id == (*last_msg).message_id {
} else if (*page).current_msg_id.load(Ordering::Relaxed) == (*last_msg).message_id {
(*last_msg).message_id + 1
} else {
/* Oops, wrong usage! */
@ -1034,10 +1039,14 @@ where
msg
)));
}
(*msg).message_id = (*page).current_msg_id + 1;
compiler_fence(Ordering::SeqCst);
ptr::write_volatile(ptr::addr_of_mut!((*page).current_msg_id), (*msg).message_id);
compiler_fence(Ordering::SeqCst);
(*msg).message_id = (*page).current_msg_id.load(Ordering::Relaxed) + 1;
// Make sure all things have been written to the page, and commit the message to the page
(*page)
.current_msg_id
.store((*msg).message_id, Ordering::Release);
self.last_msg_sent = msg;
self.has_unsent_message = false;
Ok(())
@ -1076,9 +1085,9 @@ where
#[cfg(all(feature = "llmp_debug", feature = "std"))]
println!("got new map at: {:?}", new_map);
ptr::write_volatile(
ptr::addr_of_mut!((*new_map).current_msg_id),
(*old_map).current_msg_id,
(*new_map).current_msg_id.store(
(*old_map).current_msg_id.load(Ordering::Relaxed),
Ordering::Relaxed,
);
#[cfg(all(feature = "llmp_debug", feature = "std"))]
@ -1283,6 +1292,8 @@ where
pub shmem_provider: SP,
/// current page. After EOP, this gets replaced with the new one
pub current_recv_map: LlmpSharedMap<SP::Mem>,
/// Caches the highest msg id we've seen so far
highest_msg_id: u64,
}
/// Receiving end of an llmp channel
@ -1328,6 +1339,7 @@ where
current_recv_map,
last_msg_recvd,
shmem_provider,
highest_msg_id: 0,
})
}
@ -1336,10 +1348,19 @@ where
#[inline(never)]
unsafe fn recv(&mut self) -> Result<Option<*mut LlmpMsg>, Error> {
/* DBG("recv %p %p\n", page, last_msg); */
compiler_fence(Ordering::SeqCst);
let mut page = self.current_recv_map.page_mut();
let last_msg = self.last_msg_recvd;
let current_msg_id = ptr::read_volatile(ptr::addr_of!((*page).current_msg_id));
let (current_msg_id, loaded) =
if !last_msg.is_null() && self.highest_msg_id > (*last_msg).message_id {
// read the msg_id from cache
(self.highest_msg_id, false)
} else {
// read the msg_id from shared map
let current_msg_id = (*page).current_msg_id.load(Ordering::Relaxed);
self.highest_msg_id = current_msg_id;
(current_msg_id, true)
};
// Read the message from the page
let ret = if current_msg_id == 0 {
@ -1347,11 +1368,16 @@ where
None
} else if last_msg.is_null() {
/* We never read a message from this queue. Return first. */
fence(Ordering::Acquire);
Some((*page).messages.as_mut_ptr())
} else if (*last_msg).message_id == current_msg_id {
/* Oops! No new message! */
None
} else {
if loaded {
// we read a higher id from this page, fetch.
fence(Ordering::Acquire);
}
// We don't know how big the msg wants to be, assert at least the header has space.
Some(llmp_next_msg_ptr_checked(
&mut self.current_recv_map,
@ -1360,14 +1386,18 @@ where
)?)
};
// Let's see what we go here.
// Let's see what we got.
if let Some(msg) = ret {
if !(*msg).in_map(&mut self.current_recv_map) {
return Err(Error::IllegalState("Unexpected message in map (out of map bounds) - bugy client or tampered shared map detedted!".into()));
}
// Handle special, LLMP internal, messages.
match (*msg).tag {
LLMP_TAG_UNSET => panic!("BUG: Read unallocated msg"),
LLMP_TAG_UNSET => panic!(
"BUG: Read unallocated msg (tag was {:x} - msg header: {:?}",
LLMP_TAG_UNSET,
&(*msg)
),
LLMP_TAG_EXITING => {
// The other side is done.
assert_eq!((*msg).buf_len, 0);
@ -1394,9 +1424,10 @@ where
// Set last msg we received to null (as the map may no longer exist)
self.last_msg_recvd = ptr::null();
self.highest_msg_id = 0;
// Mark the old page save to unmap, in case we didn't so earlier.
ptr::write_volatile(ptr::addr_of_mut!((*page).safe_to_unmap), 1);
(*page).safe_to_unmap.store(1, Ordering::Relaxed);
// Map the new page. The old one should be unmapped by Drop
self.current_recv_map =
@ -1406,7 +1437,7 @@ where
)?);
page = self.current_recv_map.page_mut();
// Mark the new page save to unmap also (it's mapped by us, the broker now)
ptr::write_volatile(ptr::addr_of_mut!((*page).safe_to_unmap), 1);
(*page).safe_to_unmap.store(1, Ordering::Relaxed);
#[cfg(all(feature = "llmp_debug", feature = "std"))]
println!(
@ -1443,13 +1474,13 @@ where
current_msg_id = (*last_msg).message_id;
}
loop {
compiler_fence(Ordering::SeqCst);
if ptr::read_volatile(ptr::addr_of!((*page).current_msg_id)) != current_msg_id {
if (*page).current_msg_id.load(Ordering::Relaxed) != current_msg_id {
return match self.recv()? {
Some(msg) => Ok(msg),
None => panic!("BUG: blocking llmp message should never be NULL"),
};
}
hint::spin_loop();
}
}
@ -1562,7 +1593,7 @@ where
//let bt = Backtrace::new();
//#[cfg(not(debug_assertions))]
//let bt = "<n/a (release)>";
dbg!(
println!(
"LLMP_DEBUG: Using existing map {} with size {}",
existing_map.id(),
existing_map.len(),
@ -1580,7 +1611,7 @@ where
&ret.shmem
);
#[cfg(all(feature = "llmp_debug", feature = "std"))]
dbg!("PAGE: {}", *ret.page());
println!("PAGE: {:?}", &(*ret.page()));
}
ret
}
@ -1589,7 +1620,7 @@ where
/// This indicates, that the page may safely be unmapped by the sender.
pub fn mark_safe_to_unmap(&mut self) {
unsafe {
ptr::write_volatile(ptr::addr_of_mut!((*self.page_mut()).safe_to_unmap), 1);
(*self.page_mut()).safe_to_unmap.store(1, Ordering::Relaxed);
}
}
@ -1767,6 +1798,7 @@ where
current_recv_map: client_page,
last_msg_recvd: ptr::null_mut(),
shmem_provider: self.shmem_provider.clone(),
highest_msg_id: 0,
});
}
@ -1858,7 +1890,6 @@ where
where
F: FnMut(ClientId, Tag, Flags, &[u8]) -> Result<LlmpMsgHookResult, Error>,
{
compiler_fence(Ordering::SeqCst);
for i in 0..self.llmp_clients.len() {
unsafe {
self.handle_new_msgs(i as u32, on_new_msg)?;
@ -1898,7 +1929,6 @@ where
}
while !self.is_shutting_down() {
compiler_fence(Ordering::SeqCst);
self.once(on_new_msg)
.expect("An error occurred when brokering. Exiting.");
@ -2010,7 +2040,7 @@ where
.expect("Failed to map local page in broker 2 broker thread!");
#[cfg(all(feature = "llmp_debug", feature = "std"))]
dbg!("B2B: Starting proxy loop :)");
println!("B2B: Starting proxy loop :)");
loop {
// first, forward all data we have.
@ -2019,13 +2049,16 @@ where
.expect("Error reading from local page!")
{
if client_id == b2b_client_id {
dbg!("Ignored message we probably sent earlier (same id)", tag);
println!(
"Ignored message we probably sent earlier (same id), TAG: {:x}",
tag
);
continue;
}
#[cfg(all(feature = "llmp_debug", feature = "std"))]
dbg!(
"Fowarding message via broker2broker connection",
println!(
"Fowarding message ({} bytes) via broker2broker connection",
payload.len()
);
// We got a new message! Forward...
@ -2053,8 +2086,8 @@ where
);
#[cfg(all(feature = "llmp_debug", feature = "std"))]
dbg!(
"Fowarding incoming message from broker2broker connection",
println!(
"Fowarding incoming message ({} bytes) from broker2broker connection",
msg.payload.len()
);
@ -2065,7 +2098,7 @@ where
.expect("B2B: Error forwarding message. Exiting.");
} else {
#[cfg(all(feature = "llmp_debug", feature = "std"))]
dbg!("Received no input, timeout or closed. Looping back up :)");
println!("Received no input, timeout or closed. Looping back up :)");
}
}
});
@ -2075,7 +2108,7 @@ where
});
#[cfg(all(feature = "llmp_debug", feature = "std"))]
dbg!("B2B: returning from loop. Success: {}", ret.is_ok());
println!("B2B: returning from loop. Success: {}", ret.is_ok());
ret
}
@ -2186,14 +2219,18 @@ where
loop {
match listener.accept() {
ListenerStream::Tcp(mut stream, addr) => {
dbg!("New connection", addr, stream.peer_addr().unwrap());
eprintln!(
"New connection: {:?}/{:?}",
addr,
stream.peer_addr().unwrap()
);
// Send initial information, without anyone asking.
// This makes it a tiny bit easier to map the broker map for new Clients.
match send_tcp_msg(&mut stream, &broker_hello) {
Ok(()) => {}
Err(e) => {
dbg!("Error sending initial hello: {:?}", e);
eprintln!("Error sending initial hello: {:?}", e);
continue;
}
}
@ -2201,14 +2238,14 @@ where
let buf = match recv_tcp_msg(&mut stream) {
Ok(buf) => buf,
Err(e) => {
dbg!("Error receving from tcp", e);
eprintln!("Error receving from tcp: {:?}", e);
continue;
}
};
let req = match (&buf).try_into() {
Ok(req) => req,
Err(e) => {
dbg!("Could not deserialize tcp message", e);
eprintln!("Could not deserialize tcp message: {:?}", e);
continue;
}
};
@ -2290,6 +2327,7 @@ where
current_recv_map: new_page,
last_msg_recvd: ptr::null_mut(),
shmem_provider: self.shmem_provider.clone(),
highest_msg_id: 0,
});
}
Err(e) => {
@ -2469,6 +2507,7 @@ where
current_recv_map: initial_broker_map,
last_msg_recvd: ptr::null_mut(),
shmem_provider,
highest_msg_id: 0,
},
})
}
@ -2576,7 +2615,7 @@ where
match TcpStream::connect((_LLMP_CONNECT_ADDR, port)) {
Ok(stream) => break stream,
Err(_) => {
dbg!("Connection Refused.. Retrying");
println!("Connection Refused.. Retrying");
}
}
}
@ -2670,7 +2709,7 @@ mod tests {
.unwrap();
let tag: Tag = 0x1337;
let arr: [u8; 1] = [1u8];
let arr: [u8; 1] = [1_u8];
// Send stuff
client.send_buf(tag, &arr).unwrap();