llmp cleanups

This commit is contained in:
Dominik Maier 2021-03-21 22:56:28 +01:00
parent 39d33ce7ff
commit ab9a2485f7
2 changed files with 46 additions and 30 deletions

View File

@ -966,7 +966,7 @@ where
unsafe fn recv(&mut self) -> Result<Option<*mut LlmpMsg>, Error> { unsafe fn recv(&mut self) -> Result<Option<*mut LlmpMsg>, Error> {
/* DBG("recv %p %p\n", page, last_msg); */ /* DBG("recv %p %p\n", page, last_msg); */
compiler_fence(Ordering::SeqCst); compiler_fence(Ordering::SeqCst);
let page = self.current_recv_map.page_mut(); let mut page = self.current_recv_map.page_mut();
let last_msg = self.last_msg_recvd; let last_msg = self.last_msg_recvd;
let current_msg_id = ptr::read_volatile(&(*page).current_msg_id); let current_msg_id = ptr::read_volatile(&(*page).current_msg_id);
@ -1004,7 +1004,7 @@ where
} }
LLMP_TAG_END_OF_PAGE => { LLMP_TAG_END_OF_PAGE => {
#[cfg(feature = "std")] #[cfg(feature = "std")]
dbg!("Got end of page, allocing next"); println!("Received end of page, allocating next");
// Handle end of page // Handle end of page
if (*msg).buf_len < size_of::<LlmpPayloadSharedMapInfo>() as u64 { if (*msg).buf_len < size_of::<LlmpPayloadSharedMapInfo>() as u64 {
panic!( panic!(
@ -1015,18 +1015,19 @@ where
} }
let pageinfo = (*msg).buf.as_mut_ptr() as *mut LlmpPayloadSharedMapInfo; let pageinfo = (*msg).buf.as_mut_ptr() as *mut LlmpPayloadSharedMapInfo;
/* We can reuse the map mem space, no need to free and calloc. /* The pageinfo points to the map we're about to unmap.
However, the pageinfo points to the map we're about to unmap. Copy the contents first to be safe (probably fine in rust either way). */
Clone the contents first to be safe (probably fine in rust eitner way). */
let pageinfo_cpy = *pageinfo; let pageinfo_cpy = *pageinfo;
// Mark the old page save to unmap, in case we didn't so earlier. // Mark the old page save to unmap, in case we didn't so earlier.
ptr::write_volatile(&mut (*page).save_to_unmap, 1); ptr::write_volatile(&mut (*page).save_to_unmap, 1);
// Map the new page. The old one should be unmapped by Drop // Map the new page. The old one should be unmapped by Drop
self.current_recv_map = LlmpSharedMap::existing(SH::existing_from_shm_slice( self.current_recv_map = LlmpSharedMap::existing(SH::existing_from_shm_slice(
&pageinfo_cpy.shm_str, &pageinfo_cpy.shm_str,
pageinfo_cpy.map_size, pageinfo_cpy.map_size,
)?); )?);
page = self.current_recv_map.page_mut();
// Mark the new page save to unmap also (it's mapped by us, the broker now) // Mark the new page save to unmap also (it's mapped by us, the broker now)
ptr::write_volatile(&mut (*page).save_to_unmap, 1); ptr::write_volatile(&mut (*page).save_to_unmap, 1);
@ -1434,7 +1435,9 @@ where
// to read from the initial map id. // to read from the initial map id.
let client_out_map_mem = &self.llmp_out.out_maps.first().unwrap().shmem; let client_out_map_mem = &self.llmp_out.out_maps.first().unwrap().shmem;
let broadcast_str_initial = *client_out_map_mem.shm_slice(); let broadcast_map_description = postcard::to_allocvec(&client_out_map_mem.description())?;
let mut incoming_map_description_serialized = vec![0u8; broadcast_map_description.len()];
let llmp_tcp_id = self.llmp_clients.len() as u32; let llmp_tcp_id = self.llmp_clients.len() as u32;
@ -1462,33 +1465,37 @@ where
match listener.accept() { match listener.accept() {
ListenerStream::Tcp(mut stream, addr) => { ListenerStream::Tcp(mut stream, addr) => {
dbg!("New connection", addr, stream.peer_addr().unwrap()); dbg!("New connection", addr, stream.peer_addr().unwrap());
match stream.write(&broadcast_str_initial) { match stream.write(&broadcast_map_description) {
Ok(_) => {} // fire & forget Ok(_) => {} // fire & forget
Err(e) => { Err(e) => {
dbg!("Could not send to shmap to client", e); dbg!("Could not send to shmap to client", e);
continue; continue;
} }
}; };
let mut new_client_map_str: [u8; 20] = Default::default(); match stream.read_exact(&mut incoming_map_description_serialized) {
match stream.read_exact(&mut new_client_map_str) {
Ok(()) => (), Ok(()) => (),
Err(e) => { Err(e) => {
dbg!("Ignoring failed read from client", e); dbg!("Ignoring failed read from client", e);
continue; continue;
} }
}; };
unsafe { if let Ok(incoming_map_description) = postcard::from_bytes::<ShMemDescription>(
let msg = new_client_sender &incoming_map_description_serialized,
.alloc_next(size_of::<LlmpPayloadSharedMapInfo>()) ) {
.expect("Could not allocate a new message in shared map."); unsafe {
(*msg).tag = LLMP_TAG_NEW_SHM_CLIENT; let msg = new_client_sender
let pageinfo = (*msg).buf.as_mut_ptr() as *mut LlmpPayloadSharedMapInfo; .alloc_next(size_of::<LlmpPayloadSharedMapInfo>())
(*pageinfo).shm_str = new_client_map_str; .expect("Could not allocate a new message in shared map.");
(*pageinfo).map_size = LLMP_CFG_INITIAL_MAP_SIZE; (*msg).tag = LLMP_TAG_NEW_SHM_CLIENT;
match new_client_sender.send(msg) { let pageinfo =
Ok(()) => (), (*msg).buf.as_mut_ptr() as *mut LlmpPayloadSharedMapInfo;
Err(e) => println!("Error forwarding client on map: {:?}", e), (*pageinfo).shm_str = incoming_map_description.str_bytes;
}; (*pageinfo).map_size = incoming_map_description.size;
match new_client_sender.send(msg) {
Ok(()) => (),
Err(e) => println!("Error forwarding client on map: {:?}", e),
};
}
} }
} }
#[cfg(unix)] #[cfg(unix)]
@ -1496,14 +1503,14 @@ where
dbg!("New connection", addr); dbg!("New connection", addr);
let broadcast_fd_initial: i32 = let broadcast_fd_initial: i32 =
CStr::from_ptr(broadcast_str_initial.as_ptr() as *const c_char) CStr::from_ptr(broadcast_map_description.as_ptr() as *const c_char)
.to_string_lossy() .to_string_lossy()
.into_owned() .into_owned()
.parse() .parse()
.unwrap_or_else(|_| { .unwrap_or_else(|_| {
panic!( panic!(
"ShmId is not a valid int file descriptor: {:?}", "ShmId is not a valid int file descriptor: {:?}",
broadcast_str_initial broadcast_map_description
) )
}); });
@ -1856,15 +1863,24 @@ where
let mut stream = TcpStream::connect(format!("127.0.0.1:{}", port))?; let mut stream = TcpStream::connect(format!("127.0.0.1:{}", port))?;
println!("Connected to port {}", port); println!("Connected to port {}", port);
let mut new_broker_map_str: [u8; 20] = Default::default(); // First, get the serialized description size by serializing a dummy.
let dummy_description = ShMemDescription {
size: 0,
str_bytes: Default::default(),
};
let mut new_broker_map_str = postcard::to_allocvec(&dummy_description)?;
stream.read_exact(&mut new_broker_map_str)?; stream.read_exact(&mut new_broker_map_str)?;
let ret = Self::new(LlmpSharedMap::existing(SH::existing_from_shm_slice( let broker_map_description: ShMemDescription = postcard::from_bytes(&new_broker_map_str)?;
&new_broker_map_str,
LLMP_CFG_INITIAL_MAP_SIZE, let ret = Self::new(LlmpSharedMap::existing(SH::existing_from_description(
&broker_map_description,
)?))?; )?))?;
stream.write_all(ret.sender.out_maps.first().unwrap().shmem.shm_slice())?; let own_map_description_bytes =
postcard::to_allocvec(&ret.sender.out_maps.first().unwrap().shmem.description())?;
stream.write_all(&own_map_description_bytes)?;
Ok(ret) Ok(ret)
} }
} }

View File

@ -20,9 +20,9 @@ use crate::Error;
#[derive(Copy, Clone, Debug, Serialize, Deserialize)] #[derive(Copy, Clone, Debug, Serialize, Deserialize)]
pub struct ShMemDescription { pub struct ShMemDescription {
/// Size of this map /// Size of this map
size: usize, pub size: usize,
/// of name of this map, as fixed 20 bytes c-string /// of name of this map, as fixed 20 bytes c-string
str_bytes: [u8; 20], pub str_bytes: [u8; 20],
} }
/// A Shared map /// A Shared map