Increase LLMP clients timeout to 5 min (#1126)
* LLMP client access fast path * Increase LLMP client timeoit to 5min --------- Co-authored-by: Your Name <you@example.com>
This commit is contained in:
parent
b96e194812
commit
3ffec79a17
@ -110,7 +110,7 @@ use crate::{
|
|||||||
|
|
||||||
/// The timeout after which a client will be considered stale, and removed.
|
/// The timeout after which a client will be considered stale, and removed.
|
||||||
#[cfg(feature = "std")]
|
#[cfg(feature = "std")]
|
||||||
const CLIENT_TIMEOUT: Duration = Duration::from_secs(60);
|
const CLIENT_TIMEOUT: Duration = Duration::from_secs(60 * 5);
|
||||||
|
|
||||||
/// The max number of pages a [`client`] may have mapped that were not yet read by the [`broker`]
|
/// The max number of pages a [`client`] may have mapped that were not yet read by the [`broker`]
|
||||||
/// Usually, this value should not exceed `1`, else the broker cannot keep up with the amount of incoming messages.
|
/// Usually, this value should not exceed `1`, else the broker cannot keep up with the amount of incoming messages.
|
||||||
@ -1897,7 +1897,7 @@ where
|
|||||||
/// after which the broker loop should quit gracefully.
|
/// after which the broker loop should quit gracefully.
|
||||||
pub exit_cleanly_after: Option<NonZeroUsize>,
|
pub exit_cleanly_after: Option<NonZeroUsize>,
|
||||||
/// Clients that should be removed soon, (offset into llmp_clients)
|
/// Clients that should be removed soon, (offset into llmp_clients)
|
||||||
clients_to_remove: Vec<u32>,
|
clients_to_remove: Vec<usize>,
|
||||||
/// The ShMemProvider to use
|
/// The ShMemProvider to use
|
||||||
shmem_provider: SP,
|
shmem_provider: SP,
|
||||||
}
|
}
|
||||||
@ -2114,22 +2114,22 @@ where
|
|||||||
if last_msg_time < current_time
|
if last_msg_time < current_time
|
||||||
&& current_time - last_msg_time > CLIENT_TIMEOUT
|
&& current_time - last_msg_time > CLIENT_TIMEOUT
|
||||||
{
|
{
|
||||||
self.clients_to_remove.push(i as u32);
|
self.clients_to_remove.push(i);
|
||||||
#[cfg(feature = "llmp_debug")]
|
#[cfg(feature = "llmp_debug")]
|
||||||
println!("Client {i} timed out. Removing.");
|
println!("Client #{i} timed out. Removing.");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
new_messages = has_messages;
|
new_messages = has_messages;
|
||||||
}
|
}
|
||||||
Err(Error::ShuttingDown) => self.clients_to_remove.push(i as u32),
|
Err(Error::ShuttingDown) => self.clients_to_remove.push(i),
|
||||||
Err(err) => return Err(err),
|
Err(err) => return Err(err),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// After brokering, remove all clients we don't want to keep.
|
// After brokering, remove all clients we don't want to keep.
|
||||||
for client_id in self.clients_to_remove.iter().rev() {
|
for i in self.clients_to_remove.iter().rev() {
|
||||||
log::debug!("Client {client_id} disconnected.");
|
log::debug!("Client #{i} disconnected.");
|
||||||
self.llmp_clients.remove((*client_id) as usize);
|
self.llmp_clients.remove(*i);
|
||||||
}
|
}
|
||||||
self.clients_to_remove.clear();
|
self.clients_to_remove.clear();
|
||||||
Ok(new_messages)
|
Ok(new_messages)
|
||||||
@ -2642,12 +2642,18 @@ where
|
|||||||
// TODO: We could memcpy a range of pending messages, instead of one by one.
|
// TODO: We could memcpy a range of pending messages, instead of one by one.
|
||||||
loop {
|
loop {
|
||||||
let msg = {
|
let msg = {
|
||||||
// TODO faster search (e.g. binary search)
|
let pos = if (client_id.0 as usize) < self.llmp_clients.len()
|
||||||
let pos = self
|
&& self.llmp_clients[client_id.0 as usize].id == client_id
|
||||||
.llmp_clients
|
{
|
||||||
|
// Fast path when no client was removed
|
||||||
|
client_id.0 as usize
|
||||||
|
} else {
|
||||||
|
// TODO binary search
|
||||||
|
self.llmp_clients
|
||||||
.iter()
|
.iter()
|
||||||
.position(|x| x.id == client_id)
|
.position(|x| x.id == client_id)
|
||||||
.expect("Fatal error, client ID not found");
|
.expect("Fatal error, client ID not found")
|
||||||
|
};
|
||||||
let client = &mut self.llmp_clients[pos];
|
let client = &mut self.llmp_clients[pos];
|
||||||
match client.recv()? {
|
match client.recv()? {
|
||||||
None => {
|
None => {
|
||||||
@ -2723,12 +2729,19 @@ where
|
|||||||
// The message is not specifically for use. Let the user handle it, then forward it to the clients, if necessary.
|
// The message is not specifically for use. Let the user handle it, then forward it to the clients, if necessary.
|
||||||
let mut should_forward_msg = true;
|
let mut should_forward_msg = true;
|
||||||
|
|
||||||
// TODO faster search (e.g. binary search)
|
let pos = if (client_id.0 as usize) < self.llmp_clients.len()
|
||||||
let pos = self
|
&& self.llmp_clients[client_id.0 as usize].id == client_id
|
||||||
.llmp_clients
|
{
|
||||||
|
// Fast path when no client was removed
|
||||||
|
client_id.0 as usize
|
||||||
|
} else {
|
||||||
|
// TODO binary search
|
||||||
|
self.llmp_clients
|
||||||
.iter()
|
.iter()
|
||||||
.position(|x| x.id == client_id)
|
.position(|x| x.id == client_id)
|
||||||
.expect("Fatal error, client ID not found");
|
.expect("Fatal error, client ID not found")
|
||||||
|
};
|
||||||
|
|
||||||
let map = &mut self.llmp_clients[pos].current_recv_shmem;
|
let map = &mut self.llmp_clients[pos].current_recv_shmem;
|
||||||
let msg_buf = (*msg).try_as_slice(map)?;
|
let msg_buf = (*msg).try_as_slice(map)?;
|
||||||
if let LlmpMsgHookResult::Handled =
|
if let LlmpMsgHookResult::Handled =
|
||||||
|
Loading…
x
Reference in New Issue
Block a user