Fixed potential unsoundness due to Rc threading for ShMemProvider (#355)
* Fixed potential unsoundness due to Rc threading for ShMemProvider * tidy
This commit is contained in:
parent
72d22ee5e5
commit
32b8f838ae
@ -1816,7 +1816,6 @@ where
|
|||||||
// TODO: handle broker_ids properly/at all.
|
// TODO: handle broker_ids properly/at all.
|
||||||
let map_description = Self::b2b_thread_on(
|
let map_description = Self::b2b_thread_on(
|
||||||
stream,
|
stream,
|
||||||
&mut self.shmem_provider,
|
|
||||||
self.llmp_clients.len() as ClientId,
|
self.llmp_clients.len() as ClientId,
|
||||||
&self.llmp_out.out_maps.first().unwrap().shmem.description(),
|
&self.llmp_out.out_maps.first().unwrap().shmem.description(),
|
||||||
)?;
|
)?;
|
||||||
@ -1966,22 +1965,18 @@ where
|
|||||||
#[allow(clippy::let_and_return)]
|
#[allow(clippy::let_and_return)]
|
||||||
fn b2b_thread_on(
|
fn b2b_thread_on(
|
||||||
mut stream: TcpStream,
|
mut stream: TcpStream,
|
||||||
shmem_provider: &mut SP,
|
|
||||||
b2b_client_id: ClientId,
|
b2b_client_id: ClientId,
|
||||||
broker_map_description: &ShMemDescription,
|
broker_map_description: &ShMemDescription,
|
||||||
) -> Result<ShMemDescription, Error> {
|
) -> Result<ShMemDescription, Error> {
|
||||||
let broker_map_description = *broker_map_description;
|
let broker_map_description = *broker_map_description;
|
||||||
|
|
||||||
shmem_provider.pre_fork()?;
|
|
||||||
let mut shmem_provider_clone = shmem_provider.clone();
|
|
||||||
|
|
||||||
// A channel to get the new "client's" sharedmap id from
|
// A channel to get the new "client's" sharedmap id from
|
||||||
let (send, recv) = channel();
|
let (send, recv) = channel();
|
||||||
|
|
||||||
// (For now) the thread remote broker 2 broker just acts like a "normal" llmp client, except it proxies all messages to the attached socket, in both directions.
|
// (For now) the thread remote broker 2 broker just acts like a "normal" llmp client, except it proxies all messages to the attached socket, in both directions.
|
||||||
thread::spawn(move || {
|
thread::spawn(move || {
|
||||||
// as always, call post_fork to potentially reconnect the provider (for threaded/forked use)
|
// Crete a new ShMemProvider for this background thread
|
||||||
shmem_provider_clone.post_fork(true).unwrap();
|
let shmem_provider_bg = SP::new().unwrap();
|
||||||
|
|
||||||
#[cfg(fature = "llmp_debug")]
|
#[cfg(fature = "llmp_debug")]
|
||||||
println!("B2b: Spawned proxy thread");
|
println!("B2b: Spawned proxy thread");
|
||||||
@ -1992,7 +1987,7 @@ where
|
|||||||
.expect("Failed to set tcp stream timeout");
|
.expect("Failed to set tcp stream timeout");
|
||||||
|
|
||||||
let mut new_sender =
|
let mut new_sender =
|
||||||
match LlmpSender::new(shmem_provider_clone.clone(), b2b_client_id, false) {
|
match LlmpSender::new(shmem_provider_bg.clone(), b2b_client_id, false) {
|
||||||
Ok(new_sender) => new_sender,
|
Ok(new_sender) => new_sender,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
panic!("B2B: Could not map shared map: {}", e);
|
panic!("B2B: Could not map shared map: {}", e);
|
||||||
@ -2004,7 +1999,7 @@ where
|
|||||||
|
|
||||||
// the receiver receives from the local broker, and forwards it to the tcp stream.
|
// the receiver receives from the local broker, and forwards it to the tcp stream.
|
||||||
let mut local_receiver = LlmpReceiver::on_existing_from_description(
|
let mut local_receiver = LlmpReceiver::on_existing_from_description(
|
||||||
shmem_provider_clone,
|
shmem_provider_bg,
|
||||||
&LlmpDescription {
|
&LlmpDescription {
|
||||||
last_message_offset: None,
|
last_message_offset: None,
|
||||||
shmem: broker_map_description,
|
shmem: broker_map_description,
|
||||||
@ -2073,8 +2068,6 @@ where
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
shmem_provider.post_fork(false)?;
|
|
||||||
|
|
||||||
let ret = recv.recv().map_err(|_| {
|
let ret = recv.recv().map_err(|_| {
|
||||||
Error::Unknown("Error launching background thread for b2b communcation".to_string())
|
Error::Unknown("Error launching background thread for b2b communcation".to_string())
|
||||||
});
|
});
|
||||||
@ -2092,7 +2085,6 @@ where
|
|||||||
request: &TcpRequest,
|
request: &TcpRequest,
|
||||||
current_client_id: &mut u32,
|
current_client_id: &mut u32,
|
||||||
sender: &mut LlmpSender<SP>,
|
sender: &mut LlmpSender<SP>,
|
||||||
shmem_provider: &mut SP,
|
|
||||||
broker_map_description: &ShMemDescription,
|
broker_map_description: &ShMemDescription,
|
||||||
) {
|
) {
|
||||||
match request {
|
match request {
|
||||||
@ -2128,12 +2120,9 @@ where
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Ok(shmem_description) = Self::b2b_thread_on(
|
if let Ok(shmem_description) =
|
||||||
stream,
|
Self::b2b_thread_on(stream, *current_client_id, broker_map_description)
|
||||||
shmem_provider,
|
{
|
||||||
*current_client_id,
|
|
||||||
broker_map_description,
|
|
||||||
) {
|
|
||||||
if Self::announce_new_client(sender, &shmem_description).is_err() {
|
if Self::announce_new_client(sender, &shmem_description).is_err() {
|
||||||
println!("B2B: Error announcing client {:?}", shmem_description);
|
println!("B2B: Error announcing client {:?}", shmem_description);
|
||||||
};
|
};
|
||||||
@ -2172,12 +2161,9 @@ where
|
|||||||
let tcp_out_map_description = tcp_out_map.shmem.description();
|
let tcp_out_map_description = tcp_out_map.shmem.description();
|
||||||
self.register_client(tcp_out_map);
|
self.register_client(tcp_out_map);
|
||||||
|
|
||||||
self.shmem_provider.pre_fork()?;
|
|
||||||
let mut shmem_provider_clone = self.shmem_provider.clone();
|
|
||||||
|
|
||||||
let ret = thread::spawn(move || {
|
let ret = thread::spawn(move || {
|
||||||
// Call `post_fork` (even though this is not forked) so we get a new connection to the cloned `ShMemServer` if we are using a `ServedShMemProvider`
|
// Create a new ShMemProvider for this background thread.
|
||||||
shmem_provider_clone.post_fork(true).unwrap();
|
let mut shmem_provider_bg = SP::new().unwrap();
|
||||||
|
|
||||||
let mut current_client_id = llmp_tcp_id + 1;
|
let mut current_client_id = llmp_tcp_id + 1;
|
||||||
|
|
||||||
@ -2185,14 +2171,14 @@ where
|
|||||||
id: llmp_tcp_id,
|
id: llmp_tcp_id,
|
||||||
last_msg_sent: ptr::null_mut(),
|
last_msg_sent: ptr::null_mut(),
|
||||||
out_maps: vec![LlmpSharedMap::existing(
|
out_maps: vec![LlmpSharedMap::existing(
|
||||||
shmem_provider_clone
|
shmem_provider_bg
|
||||||
.from_description(tcp_out_map_description)
|
.from_description(tcp_out_map_description)
|
||||||
.unwrap(),
|
.unwrap(),
|
||||||
)],
|
)],
|
||||||
// drop pages to the broker, if it already read them.
|
// drop pages to the broker, if it already read them.
|
||||||
keep_pages_forever: false,
|
keep_pages_forever: false,
|
||||||
has_unsent_message: false,
|
has_unsent_message: false,
|
||||||
shmem_provider: shmem_provider_clone.clone(),
|
shmem_provider: shmem_provider_bg.clone(),
|
||||||
};
|
};
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
@ -2230,7 +2216,6 @@ where
|
|||||||
&req,
|
&req,
|
||||||
&mut current_client_id,
|
&mut current_client_id,
|
||||||
&mut tcp_incoming_sender,
|
&mut tcp_incoming_sender,
|
||||||
&mut shmem_provider_clone,
|
|
||||||
&broker_map_description,
|
&broker_map_description,
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
@ -2241,7 +2226,6 @@ where
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
self.shmem_provider.post_fork(false)?;
|
|
||||||
Ok(ret)
|
Ok(ret)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -195,7 +195,7 @@ pub trait ShMem: Sized + Debug + Clone {
|
|||||||
/// A [`ShMemProvider`] provides access to shared maps.
|
/// A [`ShMemProvider`] provides access to shared maps.
|
||||||
/// They are the backbone of [`crate::bolts::llmp`] for inter-process communication.
|
/// They are the backbone of [`crate::bolts::llmp`] for inter-process communication.
|
||||||
/// All you need for scaling on a new target is to implement this interface, as well as the respective [`ShMem`].
|
/// All you need for scaling on a new target is to implement this interface, as well as the respective [`ShMem`].
|
||||||
pub trait ShMemProvider: Send + Clone + Default + Debug {
|
pub trait ShMemProvider: Clone + Default + Debug {
|
||||||
/// The actual shared map handed out by this [`ShMemProvider`].
|
/// The actual shared map handed out by this [`ShMemProvider`].
|
||||||
type Mem: ShMem;
|
type Mem: ShMem;
|
||||||
|
|
||||||
@ -308,8 +308,8 @@ where
|
|||||||
parent_child_pipe: Option<Pipe>,
|
parent_child_pipe: Option<Pipe>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(all(unix, feature = "std"))]
|
//#[cfg(all(unix, feature = "std"))]
|
||||||
unsafe impl<SP: ShMemProvider> Send for RcShMemProvider<SP> {}
|
//unsafe impl<SP: ShMemProvider> Send for RcShMemProvider<SP> {}
|
||||||
|
|
||||||
#[cfg(all(unix, feature = "std"))]
|
#[cfg(all(unix, feature = "std"))]
|
||||||
impl<SP> ShMemProvider for RcShMemProvider<SP>
|
impl<SP> ShMemProvider for RcShMemProvider<SP>
|
||||||
|
Loading…
x
Reference in New Issue
Block a user