Add Broker.peek_next_client_id (#1468)

* Add Broker::next_client_id

* rename to peek_

* Undo change, probably not better
This commit is contained in:
Dominik Maier 2023-08-28 09:00:05 +02:00 committed by GitHub
parent 0a0c4639a6
commit 1357b9f310
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -1907,6 +1907,8 @@ where
/// Users of Llmp can add message handlers in the broker. /// Users of Llmp can add message handlers in the broker.
/// This allows us to intercept messages right in the broker. /// This allows us to intercept messages right in the broker.
/// This keeps the out map clean. /// This keeps the out map clean.
/// The backing values of `llmp_clients` [`ClientId`]s will always be sorted (but not gapless)
/// Make sure to always increase `num_clients_total` when pushing a new [`LlmpReceiver`] to `llmp_clients`!
pub llmp_clients: Vec<LlmpReceiver<SP>>, pub llmp_clients: Vec<LlmpReceiver<SP>>,
/// The own listeners we spawned via `launch_listener` or `crate_attach_to_tcp`. /// The own listeners we spawned via `launch_listener` or `crate_attach_to_tcp`.
/// Listeners will be ignored for `exit_cleanly_after` and they are never considered to have timed out. /// Listeners will be ignored for `exit_cleanly_after` and they are never considered to have timed out.
@ -1982,6 +1984,20 @@ where
}) })
} }
/// Gets the [`ClientId`] the next client attaching to this broker will get.
/// In its current implememtation, the inner value of the next [`ClientId`]
/// is equal to `self.num_clients_total`.
/// Calling `peek_next_client_id` mutliple times (without adding a client) will yield the same value.
#[must_use]
#[inline]
pub fn peek_next_client_id(&self) -> ClientId {
ClientId(
self.num_clients_total
.try_into()
.expect("More than u32::MAX clients!"),
)
}
/// Create a new [`LlmpBroker`] attaching to a TCP port /// Create a new [`LlmpBroker`] attaching to a TCP port
#[cfg(feature = "std")] #[cfg(feature = "std")]
pub fn create_attach_to_tcp(shmem_provider: SP, port: u16) -> Result<Self, Error> { pub fn create_attach_to_tcp(shmem_provider: SP, port: u16) -> Result<Self, Error> {
@ -2026,7 +2042,7 @@ where
// Since we now have a handle to it, it won't be umapped too early (only after we also unmap it) // Since we now have a handle to it, it won't be umapped too early (only after we also unmap it)
client_page.mark_safe_to_unmap(); client_page.mark_safe_to_unmap();
let id = ClientId(self.num_clients_total.try_into().unwrap()); let id = self.peek_next_client_id();
self.llmp_clients.push(LlmpReceiver { self.llmp_clients.push(LlmpReceiver {
id, id,
current_recv_shmem: client_page, current_recv_shmem: client_page,
@ -2037,8 +2053,8 @@ where
#[cfg(feature = "std")] #[cfg(feature = "std")]
last_msg_time: current_time(), last_msg_time: current_time(),
}); });
self.num_clients_total += 1; self.num_clients_total += 1;
id id
} }
@ -2090,7 +2106,7 @@ where
// TODO: handle broker_ids properly/at all. // TODO: handle broker_ids properly/at all.
let map_description = Self::b2b_thread_on( let map_description = Self::b2b_thread_on(
stream, stream,
ClientId(self.llmp_clients.len() as u32), self.peek_next_client_id(),
&self &self
.llmp_out .llmp_out
.out_shmems .out_shmems
@ -2577,7 +2593,7 @@ where
hostname, hostname,
}; };
let llmp_tcp_id = ClientId(self.llmp_clients.len() as u32); let llmp_tcp_id = self.peek_next_client_id();
// Tcp out map sends messages from background thread tcp server to foreground client // Tcp out map sends messages from background thread tcp server to foreground client
let tcp_out_shmem = LlmpSharedMap::new( let tcp_out_shmem = LlmpSharedMap::new(
@ -2675,7 +2691,6 @@ where
F: FnMut(ClientId, Tag, Flags, &[u8]) -> Result<LlmpMsgHookResult, Error>, F: FnMut(ClientId, Tag, Flags, &[u8]) -> Result<LlmpMsgHookResult, Error>,
{ {
let mut new_messages = false; let mut new_messages = false;
let mut next_id = self.llmp_clients.len() as u32;
// TODO: We could memcpy a range of pending messages, instead of one by one. // TODO: We could memcpy a range of pending messages, instead of one by one.
loop { loop {
@ -2683,7 +2698,7 @@ where
let pos = if (client_id.0 as usize) < self.llmp_clients.len() let pos = if (client_id.0 as usize) < self.llmp_clients.len()
&& self.llmp_clients[client_id.0 as usize].id == client_id && self.llmp_clients[client_id.0 as usize].id == client_id
{ {
// Fast path when no client was removed // Fast path when no client before this one was removed
client_id.0 as usize client_id.0 as usize
} else { } else {
self.llmp_clients self.llmp_clients
@ -2736,11 +2751,11 @@ where
) { ) {
Ok(new_shmem) => { Ok(new_shmem) => {
let mut new_page = LlmpSharedMap::existing(new_shmem); let mut new_page = LlmpSharedMap::existing(new_shmem);
let id = next_id;
next_id += 1;
new_page.mark_safe_to_unmap(); new_page.mark_safe_to_unmap();
let id = self.peek_next_client_id();
self.llmp_clients.push(LlmpReceiver { self.llmp_clients.push(LlmpReceiver {
id: ClientId(id), id,
current_recv_shmem: new_page, current_recv_shmem: new_page,
last_msg_recvd: ptr::null_mut(), last_msg_recvd: ptr::null_mut(),
shmem_provider: self.shmem_provider.clone(), shmem_provider: self.shmem_provider.clone(),
@ -2768,7 +2783,7 @@ where
let pos = if (client_id.0 as usize) < self.llmp_clients.len() let pos = if (client_id.0 as usize) < self.llmp_clients.len()
&& self.llmp_clients[client_id.0 as usize].id == client_id && self.llmp_clients[client_id.0 as usize].id == client_id
{ {
// Fast path when no client was removed // Fast path when no client before this one was removed
client_id.0 as usize client_id.0 as usize
} else { } else {
self.llmp_clients self.llmp_clients