Less pub in LLMP (#1470)
* Less pub in LLMP * add mut to docstring * fix eample
This commit is contained in:
parent
1357b9f310
commit
b45985c76b
@ -377,7 +377,7 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn send_exiting(&mut self) -> Result<(), Error> {
|
fn send_exiting(&mut self) -> Result<(), Error> {
|
||||||
self.client.sender.send_exiting()?;
|
self.client.sender_mut().send_exiting()?;
|
||||||
self.inner.send_exiting()
|
self.inner.send_exiting()
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -594,7 +594,7 @@ where
|
|||||||
Z: ExecutionProcessor<E::Observers, State = EM::State> + EvaluatorObservers<E::Observers>,
|
Z: ExecutionProcessor<E::Observers, State = EM::State> + EvaluatorObservers<E::Observers>,
|
||||||
{
|
{
|
||||||
// TODO: Get around local event copy by moving handle_in_client
|
// TODO: Get around local event copy by moving handle_in_client
|
||||||
let self_id = self.client.sender.id;
|
let self_id = self.client.sender().id();
|
||||||
let mut count = 0;
|
let mut count = 0;
|
||||||
while let Some((client_id, tag, _flags, msg)) = self.client.recv_buf_with_flags()? {
|
while let Some((client_id, tag, _flags, msg)) = self.client.recv_buf_with_flags()? {
|
||||||
assert!(
|
assert!(
|
||||||
|
@ -624,7 +624,7 @@ impl<S: UsesInput, SP: ShMemProvider> LlmpEventManager<S, SP> {
|
|||||||
/// The other side may free up all allocated memory.
|
/// The other side may free up all allocated memory.
|
||||||
/// We are no longer allowed to send anything afterwards.
|
/// We are no longer allowed to send anything afterwards.
|
||||||
pub fn send_exiting(&mut self) -> Result<(), Error> {
|
pub fn send_exiting(&mut self) -> Result<(), Error> {
|
||||||
self.llmp.sender.send_exiting()
|
self.llmp.sender_mut().send_exiting()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -758,7 +758,7 @@ where
|
|||||||
executor: &mut E,
|
executor: &mut E,
|
||||||
) -> Result<usize, Error> {
|
) -> Result<usize, Error> {
|
||||||
// TODO: Get around local event copy by moving handle_in_client
|
// TODO: Get around local event copy by moving handle_in_client
|
||||||
let self_id = self.llmp.sender.id;
|
let self_id = self.llmp.sender().id();
|
||||||
let mut count = 0;
|
let mut count = 0;
|
||||||
while let Some((client_id, tag, _flags, msg)) = self.llmp.recv_buf_with_flags()? {
|
while let Some((client_id, tag, _flags, msg)) = self.llmp.recv_buf_with_flags()? {
|
||||||
assert!(
|
assert!(
|
||||||
@ -825,7 +825,7 @@ where
|
|||||||
{
|
{
|
||||||
/// Gets the id assigned to this staterestorer.
|
/// Gets the id assigned to this staterestorer.
|
||||||
fn mgr_id(&self) -> EventManagerId {
|
fn mgr_id(&self) -> EventManagerId {
|
||||||
EventManagerId(self.llmp.sender.id.0 as usize)
|
EventManagerId(self.llmp.sender().id().0 as usize)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1572,7 +1572,7 @@ where
|
|||||||
Z: ExecutionProcessor<E::Observers, State = S> + EvaluatorObservers<E::Observers>,
|
Z: ExecutionProcessor<E::Observers, State = S> + EvaluatorObservers<E::Observers>,
|
||||||
{
|
{
|
||||||
// TODO: Get around local event copy by moving handle_in_client
|
// TODO: Get around local event copy by moving handle_in_client
|
||||||
let self_id = self.llmp.sender.id;
|
let self_id = self.llmp.sender().id();
|
||||||
let mut count = 0;
|
let mut count = 0;
|
||||||
while let Some((client_id, tag, _flags, msg)) = self.llmp.recv_buf_with_flags()? {
|
while let Some((client_id, tag, _flags, msg)) = self.llmp.recv_buf_with_flags()? {
|
||||||
assert!(
|
assert!(
|
||||||
|
@ -202,7 +202,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|||||||
thread::sleep(Duration::from_millis(10));
|
thread::sleep(Duration::from_millis(10));
|
||||||
}
|
}
|
||||||
log::info!("Exiting Client exits");
|
log::info!("Exiting Client exits");
|
||||||
client.sender.send_exiting()?;
|
client.sender_mut().send_exiting()?;
|
||||||
}
|
}
|
||||||
_ => {
|
_ => {
|
||||||
println!("No valid mode supplied");
|
println!("No valid mode supplied");
|
||||||
|
@ -827,12 +827,12 @@ where
|
|||||||
SP: ShMemProvider,
|
SP: ShMemProvider,
|
||||||
{
|
{
|
||||||
/// ID of this sender.
|
/// ID of this sender.
|
||||||
pub id: ClientId,
|
id: ClientId,
|
||||||
/// Ref to the last message this sender sent on the last page.
|
/// Ref to the last message this sender sent on the last page.
|
||||||
/// If null, a new page (just) started.
|
/// If null, a new page (just) started.
|
||||||
pub last_msg_sent: *const LlmpMsg,
|
last_msg_sent: *const LlmpMsg,
|
||||||
/// A vec of page wrappers, each containing an initialized [`ShMem`]
|
/// A vec of page wrappers, each containing an initialized [`ShMem`]
|
||||||
pub out_shmems: Vec<LlmpSharedMap<SP::ShMem>>,
|
out_shmems: Vec<LlmpSharedMap<SP::ShMem>>,
|
||||||
/// A vec of pages that we previously used, but that have served its purpose
|
/// A vec of pages that we previously used, but that have served its purpose
|
||||||
/// (no potential readers are left).
|
/// (no potential readers are left).
|
||||||
/// Instead of freeing them, we keep them around to potentially reuse them later,
|
/// Instead of freeing them, we keep them around to potentially reuse them later,
|
||||||
@ -843,7 +843,7 @@ where
|
|||||||
/// The broker uses this feature.
|
/// The broker uses this feature.
|
||||||
/// By keeping the message history around,
|
/// By keeping the message history around,
|
||||||
/// new clients may join at any time in the future.
|
/// new clients may join at any time in the future.
|
||||||
pub keep_pages_forever: bool,
|
keep_pages_forever: bool,
|
||||||
/// True, if we allocatd a message, but didn't call [`Self::send()`] yet
|
/// True, if we allocatd a message, but didn't call [`Self::send()`] yet
|
||||||
has_unsent_message: bool,
|
has_unsent_message: bool,
|
||||||
/// The sharedmem provider to get new sharaed maps if we're full
|
/// The sharedmem provider to get new sharaed maps if we're full
|
||||||
@ -878,6 +878,12 @@ where
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// ID of this sender.
|
||||||
|
#[must_use]
|
||||||
|
pub fn id(&self) -> ClientId {
|
||||||
|
self.id
|
||||||
|
}
|
||||||
|
|
||||||
/// Completely reset the current sender map.
|
/// Completely reset the current sender map.
|
||||||
/// Afterwards, no receiver should read from it at a different location.
|
/// Afterwards, no receiver should read from it at a different location.
|
||||||
/// This is only useful if all connected llmp parties start over, for example after a crash.
|
/// This is only useful if all connected llmp parties start over, for example after a crash.
|
||||||
@ -1470,16 +1476,16 @@ where
|
|||||||
SP: ShMemProvider,
|
SP: ShMemProvider,
|
||||||
{
|
{
|
||||||
/// Client Id of this receiver
|
/// Client Id of this receiver
|
||||||
pub id: ClientId,
|
id: ClientId,
|
||||||
/// Pointer to the last message received
|
/// Pointer to the last message received
|
||||||
pub last_msg_recvd: *const LlmpMsg,
|
last_msg_recvd: *const LlmpMsg,
|
||||||
/// Time we received the last message from this receiver
|
/// Time we received the last message from this receiver
|
||||||
#[cfg(feature = "std")]
|
#[cfg(feature = "std")]
|
||||||
last_msg_time: Duration,
|
last_msg_time: Duration,
|
||||||
/// The shmem provider
|
/// The shmem provider
|
||||||
pub shmem_provider: SP,
|
shmem_provider: SP,
|
||||||
/// current page. After EOP, this gets replaced with the new one
|
/// current page. After EOP, this gets replaced with the new one
|
||||||
pub current_recv_shmem: LlmpSharedMap<SP::ShMem>,
|
current_recv_shmem: LlmpSharedMap<SP::ShMem>,
|
||||||
/// Caches the highest msg id we've seen so far
|
/// Caches the highest msg id we've seen so far
|
||||||
highest_msg_id: MessageId,
|
highest_msg_id: MessageId,
|
||||||
}
|
}
|
||||||
@ -1753,7 +1759,7 @@ where
|
|||||||
{
|
{
|
||||||
/// Shmem containg the actual (unsafe) page,
|
/// Shmem containg the actual (unsafe) page,
|
||||||
/// shared between one LlmpSender and one LlmpReceiver
|
/// shared between one LlmpSender and one LlmpReceiver
|
||||||
pub shmem: SHM,
|
shmem: SHM,
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: May be obsolete
|
// TODO: May be obsolete
|
||||||
@ -1903,13 +1909,13 @@ where
|
|||||||
SP: ShMemProvider + 'static,
|
SP: ShMemProvider + 'static,
|
||||||
{
|
{
|
||||||
/// Broadcast map from broker to all clients
|
/// Broadcast map from broker to all clients
|
||||||
pub llmp_out: LlmpSender<SP>,
|
llmp_out: LlmpSender<SP>,
|
||||||
/// Users of Llmp can add message handlers in the broker.
|
/// Users of Llmp can add message handlers in the broker.
|
||||||
/// This allows us to intercept messages right in the broker.
|
/// This allows us to intercept messages right in the broker.
|
||||||
/// This keeps the out map clean.
|
/// This keeps the out map clean.
|
||||||
/// The backing values of `llmp_clients` [`ClientId`]s will always be sorted (but not gapless)
|
/// The backing values of `llmp_clients` [`ClientId`]s will always be sorted (but not gapless)
|
||||||
/// Make sure to always increase `num_clients_total` when pushing a new [`LlmpReceiver`] to `llmp_clients`!
|
/// Make sure to always increase `num_clients_total` when pushing a new [`LlmpReceiver`] to `llmp_clients`!
|
||||||
pub llmp_clients: Vec<LlmpReceiver<SP>>,
|
llmp_clients: Vec<LlmpReceiver<SP>>,
|
||||||
/// The own listeners we spawned via `launch_listener` or `crate_attach_to_tcp`.
|
/// The own listeners we spawned via `launch_listener` or `crate_attach_to_tcp`.
|
||||||
/// Listeners will be ignored for `exit_cleanly_after` and they are never considered to have timed out.
|
/// Listeners will be ignored for `exit_cleanly_after` and they are never considered to have timed out.
|
||||||
listeners: Vec<ClientId>,
|
listeners: Vec<ClientId>,
|
||||||
@ -2030,6 +2036,19 @@ where
|
|||||||
self.exit_cleanly_after = Some(n_clients);
|
self.exit_cleanly_after = Some(n_clients);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Add a client to this broker.
|
||||||
|
/// Will set an appropriate [`ClientId`] before pushing the client to the internal vec.
|
||||||
|
/// Will increase `num_clients_total`.
|
||||||
|
/// The backing values of `llmp_clients` [`ClientId`]s will always be sorted (but not gapless)
|
||||||
|
/// returns the [`ClientId`] of the new client.
|
||||||
|
pub fn add_client(&mut self, mut client_receiver: LlmpReceiver<SP>) -> ClientId {
|
||||||
|
let id = self.peek_next_client_id();
|
||||||
|
client_receiver.id = id;
|
||||||
|
self.llmp_clients.push(client_receiver);
|
||||||
|
self.num_clients_total += 1;
|
||||||
|
id
|
||||||
|
}
|
||||||
|
|
||||||
/// Allocate the next message on the outgoing map
|
/// Allocate the next message on the outgoing map
|
||||||
unsafe fn alloc_next(&mut self, buf_len: usize) -> Result<*mut LlmpMsg, Error> {
|
unsafe fn alloc_next(&mut self, buf_len: usize) -> Result<*mut LlmpMsg, Error> {
|
||||||
self.llmp_out.alloc_next(buf_len)
|
self.llmp_out.alloc_next(buf_len)
|
||||||
@ -2042,9 +2061,8 @@ where
|
|||||||
// Since we now have a handle to it, it won't be umapped too early (only after we also unmap it)
|
// Since we now have a handle to it, it won't be umapped too early (only after we also unmap it)
|
||||||
client_page.mark_safe_to_unmap();
|
client_page.mark_safe_to_unmap();
|
||||||
|
|
||||||
let id = self.peek_next_client_id();
|
self.add_client(LlmpReceiver {
|
||||||
self.llmp_clients.push(LlmpReceiver {
|
id: ClientId(0), // Will be auto-filled
|
||||||
id,
|
|
||||||
current_recv_shmem: client_page,
|
current_recv_shmem: client_page,
|
||||||
last_msg_recvd: ptr::null_mut(),
|
last_msg_recvd: ptr::null_mut(),
|
||||||
shmem_provider: self.shmem_provider.clone(),
|
shmem_provider: self.shmem_provider.clone(),
|
||||||
@ -2052,10 +2070,7 @@ where
|
|||||||
// We don't know the last received time, just assume the current time.
|
// We don't know the last received time, just assume the current time.
|
||||||
#[cfg(feature = "std")]
|
#[cfg(feature = "std")]
|
||||||
last_msg_time: current_time(),
|
last_msg_time: current_time(),
|
||||||
});
|
})
|
||||||
self.num_clients_total += 1;
|
|
||||||
|
|
||||||
id
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Connects to a broker running on another machine.
|
/// Connects to a broker running on another machine.
|
||||||
@ -2753,9 +2768,8 @@ where
|
|||||||
let mut new_page = LlmpSharedMap::existing(new_shmem);
|
let mut new_page = LlmpSharedMap::existing(new_shmem);
|
||||||
new_page.mark_safe_to_unmap();
|
new_page.mark_safe_to_unmap();
|
||||||
|
|
||||||
let id = self.peek_next_client_id();
|
self.add_client(LlmpReceiver {
|
||||||
self.llmp_clients.push(LlmpReceiver {
|
id: ClientId(0), // will be auto-filled
|
||||||
id,
|
|
||||||
current_recv_shmem: new_page,
|
current_recv_shmem: new_page,
|
||||||
last_msg_recvd: ptr::null_mut(),
|
last_msg_recvd: ptr::null_mut(),
|
||||||
shmem_provider: self.shmem_provider.clone(),
|
shmem_provider: self.shmem_provider.clone(),
|
||||||
@ -2764,7 +2778,6 @@ where
|
|||||||
#[cfg(feature = "std")]
|
#[cfg(feature = "std")]
|
||||||
last_msg_time: current_time(),
|
last_msg_time: current_time(),
|
||||||
});
|
});
|
||||||
self.num_clients_total += 1;
|
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
log::info!("Error adding client! Ignoring: {e:?}");
|
log::info!("Error adding client! Ignoring: {e:?}");
|
||||||
@ -2811,9 +2824,9 @@ where
|
|||||||
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
|
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
|
||||||
pub struct LlmpClientDescription {
|
pub struct LlmpClientDescription {
|
||||||
/// Description of the sender
|
/// Description of the sender
|
||||||
pub sender: LlmpDescription,
|
sender: LlmpDescription,
|
||||||
/// Description of the receiver
|
/// Description of the receiver
|
||||||
pub receiver: LlmpDescription,
|
receiver: LlmpDescription,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Client side of LLMP
|
/// Client side of LLMP
|
||||||
@ -2823,9 +2836,9 @@ where
|
|||||||
SP: ShMemProvider,
|
SP: ShMemProvider,
|
||||||
{
|
{
|
||||||
/// Outgoing channel to the broker
|
/// Outgoing channel to the broker
|
||||||
pub sender: LlmpSender<SP>,
|
sender: LlmpSender<SP>,
|
||||||
/// Incoming (broker) broadcast map
|
/// Incoming (broker) broadcast map
|
||||||
pub receiver: LlmpReceiver<SP>,
|
receiver: LlmpReceiver<SP>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// `n` clients connect to a broker. They share an outgoing map with the broker,
|
/// `n` clients connect to a broker. They share an outgoing map with the broker,
|
||||||
@ -2907,6 +2920,30 @@ where
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Outgoing channel to the broker
|
||||||
|
#[must_use]
|
||||||
|
pub fn sender(&self) -> &LlmpSender<SP> {
|
||||||
|
&self.sender
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Outgoing channel to the broker (mut)
|
||||||
|
#[must_use]
|
||||||
|
pub fn sender_mut(&mut self) -> &mut LlmpSender<SP> {
|
||||||
|
&mut self.sender
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Incoming (broker) broadcast map
|
||||||
|
#[must_use]
|
||||||
|
pub fn receiver(&self) -> &LlmpReceiver<SP> {
|
||||||
|
&self.receiver
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Incoming (broker) broadcast map (mut)
|
||||||
|
#[must_use]
|
||||||
|
pub fn receiver_mut(&mut self) -> &mut LlmpReceiver<SP> {
|
||||||
|
&mut self.receiver
|
||||||
|
}
|
||||||
|
|
||||||
/// Waits for the sender to be save to unmap.
|
/// Waits for the sender to be save to unmap.
|
||||||
/// If a receiver is involved on the other side, this function should always be called.
|
/// If a receiver is involved on the other side, this function should always be called.
|
||||||
pub fn await_safe_to_unmap_blocking(&self) {
|
pub fn await_safe_to_unmap_blocking(&self) {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user