Handle broker-to-broker connection interruptions more gracefully (#921)

* Handle broker-to-broker connection interruptions more gracefully

Exit gracefully instead of panicking or getting stuck in infinite loops

* Run cargo-fmt
This commit is contained in:
omergreen 2022-12-04 19:55:55 +02:00 committed by GitHub
parent c879a0a8d3
commit 3bad100cb7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -2096,36 +2096,47 @@ where
#[cfg(all(feature = "llmp_debug", feature = "std"))] #[cfg(all(feature = "llmp_debug", feature = "std"))]
println!("B2B: Starting proxy loop :)"); println!("B2B: Starting proxy loop :)");
let peer_address = stream.peer_addr().unwrap();
loop { loop {
// first, forward all data we have. // first, forward all data we have.
while let Some((client_id, tag, flags, payload)) = local_receiver loop {
.recv_buf_with_flags() match local_receiver.recv_buf_with_flags() {
.expect("Error reading from local page!") Ok(None) => break, // no more data to forward
{ Ok(Some((client_id, tag, flags, payload))) => {
if client_id == b2b_client_id { if client_id == b2b_client_id {
println!( println!(
"Ignored message we probably sent earlier (same id), TAG: {:x}", "Ignored message we probably sent earlier (same id), TAG: {:x}",
tag tag
); );
continue; continue;
} }
#[cfg(all(feature = "llmp_debug", feature = "std"))] #[cfg(all(feature = "llmp_debug", feature = "std"))]
println!( println!(
"Fowarding message ({} bytes) via broker2broker connection", "Fowarding message ({} bytes) via broker2broker connection",
payload.len() payload.len()
); );
// We got a new message! Forward... // We got a new message! Forward...
send_tcp_msg( if let Err(e) = send_tcp_msg(
&mut stream, &mut stream,
&TcpRemoteNewMessage { &TcpRemoteNewMessage {
client_id, client_id,
tag, tag,
flags, flags,
payload: payload.to_vec(), payload: payload.to_vec(),
}, },
) ) {
.expect("Error sending message via broker 2 broker"); println!("Got error {} while trying to forward a message to broker {}, exiting thread", e, peer_address);
return;
}
}
Err(Error::ShuttingDown) => {
println!("Local broker is shutting down, exiting thread");
return;
}
Err(e) => panic!("Error reading from local page! {}", e),
}
} }
// Then, see if we can receive something. // Then, see if we can receive something.
@ -2134,25 +2145,42 @@ where
// Forwarding happens between each recv, too, as simplification. // Forwarding happens between each recv, too, as simplification.
// We ignore errors completely as they may be timeout, or stream closings. // We ignore errors completely as they may be timeout, or stream closings.
// Instead, we catch stream close when/if we next try to send. // Instead, we catch stream close when/if we next try to send.
if let Ok(val) = recv_tcp_msg(&mut stream) { match recv_tcp_msg(&mut stream) {
let msg: TcpRemoteNewMessage = val.try_into().expect( Ok(val) => {
"Illegal message received from broker 2 broker connection - shutting down.", let msg: TcpRemoteNewMessage = val.try_into().expect(
); "Illegal message received from broker 2 broker connection - shutting down.",
);
#[cfg(all(feature = "llmp_debug", feature = "std"))] #[cfg(all(feature = "llmp_debug", feature = "std"))]
println!( println!(
"Fowarding incoming message ({} bytes) from broker2broker connection", "Fowarding incoming message ({} bytes) from broker2broker connection",
msg.payload.len() msg.payload.len()
); );
// TODO: Could probably optimize this somehow to forward all queued messages between locks... oh well. // TODO: Could probably optimize this somehow to forward all queued messages between locks... oh well.
// Todo: somehow mangle in the other broker id? ClientId? // Todo: somehow mangle in the other broker id? ClientId?
new_sender new_sender
.send_buf_with_flags(msg.tag, msg.flags | LLMP_FLAG_FROM_B2B, &msg.payload) .send_buf_with_flags(
.expect("B2B: Error forwarding message. Exiting."); msg.tag,
} else { msg.flags | LLMP_FLAG_FROM_B2B,
#[cfg(all(feature = "llmp_debug", feature = "std"))] &msg.payload,
println!("Received no input, timeout or closed. Looping back up :)"); )
.expect("B2B: Error forwarding message. Exiting.");
}
Err(e) => {
if let Error::File(e, _) = e {
if e.kind() == ErrorKind::UnexpectedEof {
println!(
"Broker {} seems to have disconnected, exiting",
peer_address
);
return;
}
}
#[cfg(all(feature = "llmp_debug", feature = "std"))]
println!("Received no input, timeout or closed. Looping back up :)");
}
} }
} }
}); });