Fix client_id for outgoing messages (#154)

* attaching client_id to outgoing messages

* fixed forwarding, example
This commit is contained in:
Dominik Maier 2021-06-07 01:48:52 +02:00 committed by GitHub
parent 3b2ee4bb70
commit 392ffd33f7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 22 additions and 15 deletions

View File

@ -31,7 +31,7 @@ fn adder_loop(port: u16) -> ! {
loop {
let mut msg_counter = 0;
loop {
let (_sender, tag, buf) = match client.recv_buf().unwrap() {
let (sender, tag, buf) = match client.recv_buf().unwrap() {
None => break,
Some(msg) => msg,
};
@ -42,8 +42,9 @@ fn adder_loop(port: u16) -> ! {
current_result.wrapping_add(u32::from_le_bytes(buf.try_into().unwrap()));
}
_ => println!(
"Adder Client ignored unknown message {} with {} bytes",
"Adder Client ignored unknown message {:#x} from client {} with {} bytes",
tag,
sender,
buf.len()
),
};

View File

@ -697,7 +697,7 @@ pub struct LlmpSender<SP>
where
SP: ShMemProvider,
{
/// ID of this sender. Only used in the broker.
/// ID of this sender.
pub id: u32,
/// Ref to the last message this sender sent on the last page.
/// If null, a new page (just) started.
@ -967,8 +967,9 @@ where
/// Commit the message last allocated by [`alloc_next`] to the queue.
/// After commiting, the msg shall no longer be altered!
/// It will be read by the consuming threads (`broker->clients` or `client->broker`)
/// If `overwrite_client_id` is `false`, the message's `sender` won't be touched (for broker forwarding)
#[inline(never)] // Not inlined to make cpu-level reodering (hopefully?) improbable
unsafe fn send(&mut self, msg: *mut LlmpMsg) -> Result<(), Error> {
unsafe fn send(&mut self, msg: *mut LlmpMsg, overwrite_client_id: bool) -> Result<(), Error> {
// dbg!("Sending msg {:?}", msg);
if self.last_msg_sent == msg {
@ -977,6 +978,10 @@ where
if (*msg).tag == LLMP_TAG_UNSET {
panic!("No tag set on message with id {}", (*msg).message_id);
}
// A client gets the sender id assigned to by the broker during the initial handshake.
if overwrite_client_id {
(*msg).sender = self.id;
}
let page = self.out_maps.last_mut().unwrap().page_mut();
if msg.is_null() || !llmp_msg_in_page(page, msg) {
return Err(Error::Unknown(format!(
@ -1034,17 +1039,18 @@ where
println!("Setting max alloc size: {:?}", (*old_map).max_alloc_size);
(*new_map).max_alloc_size = (*old_map).max_alloc_size;
(*new_map).sender = self.id;
/* On the old map, place a last message linking to the new map for the clients
* to consume */
let mut out: *mut LlmpMsg = self.alloc_eop()?;
(*out).sender = (*old_map).sender;
let out = self.alloc_eop()?;
let mut end_of_page_msg = (*out).buf.as_mut_ptr() as *mut LlmpPayloadSharedMapInfo;
(*end_of_page_msg).map_size = new_map_shmem.shmem.len();
(*end_of_page_msg).shm_str = *new_map_shmem.shmem.id().as_slice();
/* Send the last msg on the old buf */
self.send(out)?;
self.send(out, true)?;
// Set the new page as current page.
self.out_maps.push(new_map_shmem);
@ -1115,7 +1121,7 @@ where
(*msg).flags = LLMP_FLAG_INITIALIZED;
buf.as_ptr()
.copy_to_nonoverlapping((*msg).buf.as_mut_ptr(), buf.len());
self.send(msg)
self.send(msg, true)
}
}
@ -1139,7 +1145,7 @@ where
(*msg).flags = flags;
buf.as_ptr()
.copy_to_nonoverlapping((*msg).buf.as_mut_ptr(), buf.len());
self.send(msg)
self.send(msg, true)
}
}
@ -1355,7 +1361,7 @@ where
/// Returns the next message, tag, buf, if avaliable, else None
#[allow(clippy::type_complexity)]
#[inline]
pub fn recv_buf(&mut self) -> Result<Option<(u32, Tag, &[u8])>, Error> {
pub fn recv_buf(&mut self) -> Result<Option<(ClientId, Tag, &[u8])>, Error> {
if let Some((sender, tag, _flags, buf)) = self.recv_buf_with_flags()? {
Ok(Some((sender, tag, buf)))
} else {
@ -1744,7 +1750,7 @@ where
(msg as *const u8).copy_to_nonoverlapping(out as *mut u8, complete_size);
(*out).buf_len_padded = actual_size;
/* We need to replace the message ID with our own */
if let Err(e) = self.llmp_out.send(out) {
if let Err(e) = self.llmp_out.send(out, false) {
panic!("Error sending msg: {:?}", e)
};
self.llmp_out.last_msg_sent = out;
@ -1855,7 +1861,7 @@ where
let pageinfo = (*msg).buf.as_mut_ptr() as *mut LlmpPayloadSharedMapInfo;
(*pageinfo).shm_str = *shmem_description.id.as_slice();
(*pageinfo).map_size = shmem_description.size;
sender.send(msg)
sender.send(msg, true)
}
}
@ -2377,7 +2383,7 @@ where
/// # Safety
/// Needs to be called with a proper msg pointer
pub unsafe fn send(&mut self, msg: *mut LlmpMsg) -> Result<(), Error> {
self.sender.send(msg)
self.sender.send(msg, true)
}
/// Allocates a message of the given size, tags it, and sends it off.
@ -2439,13 +2445,13 @@ where
/// Returns the next message, tag, buf, if avaliable, else None
#[allow(clippy::type_complexity)]
#[inline]
pub fn recv_buf(&mut self) -> Result<Option<(u32, Tag, &[u8])>, Error> {
pub fn recv_buf(&mut self) -> Result<Option<(ClientId, Tag, &[u8])>, Error> {
self.receiver.recv_buf()
}
/// Receives a buf from the broker, looping until a messages becomes avaliable
#[inline]
pub fn recv_buf_blocking(&mut self) -> Result<(u32, Tag, &[u8]), Error> {
pub fn recv_buf_blocking(&mut self) -> Result<(ClientId, Tag, &[u8]), Error> {
self.receiver.recv_buf_blocking()
}