Tcp Broker to Broker Communication (#66)

* initial b2b implementation

* no_std and clippy fixes

* b2b testcase added

* more correct testcases

* fixed b2b

* typo

* fixed unused warning
This commit is contained in:
Dominik Maier 2021-05-01 13:24:02 +02:00 committed by Andrea Fioraldi
parent a78a4b73fa
commit b175500971
6 changed files with 560 additions and 105 deletions

View File

@ -41,6 +41,7 @@ derive = ["libafl_derive"] # provide derive(SerdeAny) macro.
llmp_small_maps = [] # reduces initial map size for llmp llmp_small_maps = [] # reduces initial map size for llmp
llmp_debug = ["backtrace"] # Enables debug output for LLMP llmp_debug = ["backtrace"] # Enables debug output for LLMP
llmp_compression = [] # llmp compression using GZip llmp_compression = [] # llmp compression using GZip
llmp_bind_public = [] # If set, llmp will bind to 0.0.0.0, allowing cross-device communication. Binds to localhost by default.
[[example]] [[example]]
name = "llmp_test" name = "llmp_test"
@ -61,6 +62,7 @@ libafl_derive = { version = "0.1.0", optional = true, path = "../libafl_derive"
serde_json = { version = "1.0", optional = true, default-features = false, features = ["alloc"] } # an easy way to debug print SerdeAnyMap serde_json = { version = "1.0", optional = true, default-features = false, features = ["alloc"] } # an easy way to debug print SerdeAnyMap
compression = { version = "0.1.5" } compression = { version = "0.1.5" }
num_enum = "0.5.1" num_enum = "0.5.1"
hostname = "^0.3" # Is there really no gethostname in the stdlib?
[target.'cfg(target_os = "android")'.dependencies] [target.'cfg(target_os = "android")'.dependencies]
backtrace = { version = "0.3", optional = true, default-features = false, features = ["std", "libbacktrace"] } # for llmp_debug backtrace = { version = "0.3", optional = true, default-features = false, features = ["std", "libbacktrace"] } # for llmp_debug

View File

@ -83,7 +83,7 @@ fn large_msg_loop(port: u16) -> ! {
fn broker_message_hook( fn broker_message_hook(
client_id: u32, client_id: u32,
tag: llmp::Tag, tag: llmp::Tag,
_flags: llmp::Flag, _flags: llmp::Flags,
message: &[u8], message: &[u8],
) -> Result<llmp::LlmpMsgHookResult, Error> { ) -> Result<llmp::LlmpMsgHookResult, Error> {
match tag { match tag {
@ -120,22 +120,31 @@ fn main() {
let mode = std::env::args() let mode = std::env::args()
.nth(1) .nth(1)
.expect("no mode specified, chose 'broker', 'ctr', 'adder', or 'large'"); .expect("no mode specified, chose 'broker', 'b2b', 'ctr', 'adder', or 'large'");
let port: u16 = std::env::args() let port: u16 = std::env::args()
.nth(2) .nth(2)
.unwrap_or("1337".into()) .unwrap_or("1337".into())
.parse::<u16>() .parse::<u16>()
.unwrap(); .unwrap();
// in the b2b use-case, this is our "own" port, we connect to the "normal" broker node on startup.
let b2b_port: u16 = std::env::args()
.nth(3)
.unwrap_or("4242".into())
.parse::<u16>()
.unwrap();
println!("Launching in mode {} on port {}", mode, port); println!("Launching in mode {} on port {}", mode, port);
match mode.as_str() { match mode.as_str() {
"broker" => { "broker" => {
let mut broker = llmp::LlmpBroker::new(StdShMemProvider::new().unwrap()).unwrap(); let mut broker = llmp::LlmpBroker::new(StdShMemProvider::new().unwrap()).unwrap();
broker broker.launch_tcp_listener_on(port).unwrap();
.launch_listener(llmp::Listener::Tcp( broker.loop_forever(&mut broker_message_hook, Some(Duration::from_millis(5)))
std::net::TcpListener::bind(format!("127.0.0.1:{}", port)).unwrap(), }
)) "b2b" => {
.unwrap(); let mut broker = llmp::LlmpBroker::new(StdShMemProvider::new().unwrap()).unwrap();
broker.launch_tcp_listener_on(b2b_port).unwrap();
// connect back to the main broker.
broker.connect_b2b(("127.0.0.1", port)).unwrap();
broker.loop_forever(&mut broker_message_hook, Some(Duration::from_millis(5))) broker.loop_forever(&mut broker_message_hook, Some(Duration::from_millis(5)))
} }
"ctr" => { "ctr" => {

View File

@ -42,6 +42,7 @@ impl GzipCompressor {
/// Decompression. /// Decompression.
/// Flag is used to indicate if it's compressed or not /// Flag is used to indicate if it's compressed or not
#[allow(clippy::unused_self)]
pub fn decompress(&self, buf: &[u8]) -> Result<Vec<u8>, Error> { pub fn decompress(&self, buf: &[u8]) -> Result<Vec<u8>, Error> {
Ok(buf Ok(buf
.iter() .iter()

View File

@ -2,9 +2,9 @@
A library for low level message passing A library for low level message passing
To send new messages, the clients place a new message at the end of their To send new messages, the clients place a new message at the end of their
client_out_map. If the ringbuf is filled up, they start place a client_out_map. If the current map is filled up, they place a
LLMP_AGE_END_OF_PAGE_V1 msg and alloc a new shmap. LLMP_AGE_END_OF_PAGE_V1 msg and alloc a new shmap.
Once the broker mapped a page, it flags it save for unmapping. Once the broker mapped this same page, it flags it as safe for unmapping.
```text ```text
[client0] [client1] ... [clientN] [client0] [client1] ... [clientN]
@ -41,7 +41,7 @@ current map.
[client0] [client1] ... [clientN] [client0] [client1] ... [clientN]
``` ```
In the future, if we need zero copy, the current_broadcast_map could instead In the future, if we would need zero copy, the current_broadcast_map could instead
list the client_out_map ID an offset for each message. In that case, the clients list the client_out_map ID an offset for each message. In that case, the clients
also need to create new shmaps once their bufs are filled up. also need to create new shmaps once their bufs are filled up.
@ -50,11 +50,14 @@ To use, you will have to create a broker using llmp_broker_new().
Then register some clientloops using llmp_broker_register_threaded_clientloop Then register some clientloops using llmp_broker_register_threaded_clientloop
(or launch them as seperate processes) and call llmp_broker_run(); (or launch them as seperate processes) and call llmp_broker_run();
For broker2broker communication, all messages are forwarded via network sockets.
*/ */
use alloc::{string::String, vec::Vec}; use alloc::{string::String, vec::Vec};
use core::{ use core::{
cmp::max, cmp::max,
convert::TryFrom,
fmt::Debug, fmt::Debug,
mem::size_of, mem::size_of,
ptr, slice, ptr, slice,
@ -64,9 +67,11 @@ use core::{
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
#[cfg(feature = "std")] #[cfg(feature = "std")]
use std::{ use std::{
convert::TryInto,
env, env,
io::{Read, Write}, io::{Read, Write},
net::{SocketAddr, TcpListener, TcpStream}, net::{SocketAddr, TcpListener, TcpStream, ToSocketAddrs},
sync::mpsc::channel,
thread, thread,
}; };
@ -104,8 +109,23 @@ const LLMP_TAG_NEW_SHM_CLIENT: Tag = 0xC11E471;
/// The sender on this map is exiting (if broker exits, clients should exit gracefully); /// The sender on this map is exiting (if broker exits, clients should exit gracefully);
const LLMP_TAG_EXITING: Tag = 0x13C5171; const LLMP_TAG_EXITING: Tag = 0x13C5171;
pub const LLMP_FLAG_INITIALIZED: Flag = 0x0; /// Unused...
pub const LLMP_FLAG_COMPRESSED: Flag = 0x1; pub const LLMP_FLAG_INITIALIZED: Flags = 0x0;
/// This message was compressed in transit
pub const LLMP_FLAG_COMPRESSED: Flags = 0x1;
/// From another broker.
pub const LLMP_FLAG_FROM_B2B: Flags = 0x2;
/// Timt the broker 2 broker connection waits for incoming data,
/// before checking for own data to forward again.
const _LLMP_B2B_BLOCK_TIME: Duration = Duration::from_millis(3_000);
/// If broker2broker is enabled, bind to public IP
#[cfg(feature = "llmp_bind_public")]
const _LLMP_BIND_ADDR: &str = "0.0.0.0";
/// If broker2broker is disabled, bind to localhost
#[cfg(not(feature = "llmp_bind_public"))]
const _LLMP_BIND_ADDR: &str = "127.0.0.1";
/// An env var of this value indicates that the set value was a NULL PTR /// An env var of this value indicates that the set value was a NULL PTR
const _NULL_ENV_STR: &str = "_NULL"; const _NULL_ENV_STR: &str = "_NULL";
@ -127,27 +147,87 @@ static mut GLOBAL_SIGHANDLER_STATE: LlmpBrokerSignalHandler = LlmpBrokerSignalHa
/// TAGs used thorughout llmp /// TAGs used thorughout llmp
pub type Tag = u32; pub type Tag = u32;
pub type Flag = u64; /// The client ID == the sender id.
pub type ClientId = u32;
/// The broker ID, for broker 2 broker communication.
pub type BrokerId = u32;
/// The flags, indicating, for example, enabled compression.
pub type Flags = u32;
/// The message ID, an ever-increasing number, unique only to a sharedmap/page.
pub type MessageId = u64;
/// This is for the server the broker will spawn. /// This is for the server the broker will spawn.
/// If an llmp connection is local - use sharedmaps /// If an llmp connection is local - use sharedmaps
/// or remote (broker2broker) - forwarded via tcp /// or remote (broker2broker) - forwarded via tcp
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum TcpRequest { pub enum TcpRequest {
LocalClientHello { shmem: ShMemDescription }, /// We would like to be a local client.
RemoteBrokerHello, LocalClientHello { shmem_description: ShMemDescription },
RemoteNewMessage { tag: Tag, payload: Vec<u8> }, /// We would like to establish a b2b connection.
RemoteBrokerHello { hostname: String },
}
impl TryFrom<&Vec<u8>> for TcpRequest {
type Error = crate::Error;
fn try_from(bytes: &Vec<u8>) -> Result<Self, Error> {
Ok(postcard::from_bytes(bytes)?)
}
}
/// Messages for broker 2 broker connection.
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct TcpRemoteNewMessage {
// The client ID of the original broker
client_id: ClientId,
// The message tag
tag: Tag,
// The flags
flags: Flags,
// The actual content of the message
payload: Vec<u8>,
}
impl TryFrom<&Vec<u8>> for TcpRemoteNewMessage {
type Error = crate::Error;
fn try_from(bytes: &Vec<u8>) -> Result<Self, Error> {
Ok(postcard::from_bytes(bytes)?)
}
} }
/// Responses for requests to the server. /// Responses for requests to the server.
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum TcpResponse { pub enum TcpResponse {
LocalClientAccepted { /// After receiving a new connection, the broker immediately sends a Hello.
client_id: u32, BrokerConnectHello {
shmem: ShMemDescription, /// The broker page a new local client can listen on
}, broker_map_description: ShMemDescription,
RemoteBrokerAccepted { /// This broker's hostname
broker_id: u32,
hostname: String, hostname: String,
}, },
LocalClientAccepted {
/// The ClientId this client should send messages as
/// Mainly used for client-side deduplication of incoming messages
client_id: ClientId,
},
RemoteBrokerAccepted {
/// The broker id of this element
broker_id: BrokerId,
},
/// Something went wrong when processing the request.
Error {
/// Error description
description: String,
},
}
impl TryFrom<&Vec<u8>> for TcpResponse {
type Error = crate::Error;
fn try_from(bytes: &Vec<u8>) -> Result<Self, Error> {
Ok(postcard::from_bytes(bytes)?)
}
} }
/// Abstraction for listeners /// Abstraction for listeners
@ -226,6 +306,59 @@ fn msg_offset_from_env(env_name: &str) -> Result<Option<u64>, Error> {
}) })
} }
/// Send one message as `u32` len and `[u8;len]` bytes
#[cfg(feature = "std")]
fn send_tcp_msg<T>(stream: &mut TcpStream, msg: &T) -> Result<(), Error>
where
T: Serialize,
{
let msg = postcard::to_allocvec(msg)?;
if msg.len() > u32::MAX as usize {
return Err(Error::IllegalState(format!(
"Trying to send message a tcp message > u32! (size: {})",
msg.len()
)));
}
#[cfg(feature = "llmp_debug")]
println!("LLMP TCP: Sending {} bytes", msg.len());
let size_bytes = (msg.len() as u32).to_be_bytes();
stream.write_all(&size_bytes)?;
stream.write_all(&msg)?;
#[cfg(feature = "llmp_debug")]
println!("LLMP TCP: Sending {} bytes finished.", msg.len());
Ok(())
}
/// Receive one message of `u32` len and `[u8; len]` bytes
#[cfg(feature = "std")]
fn recv_tcp_msg(stream: &mut TcpStream) -> Result<Vec<u8>, Error> {
// Always receive one be u32 of size, then the command.
#[cfg(feature = "llmp_debug")]
println!(
"LLMP TCP: Waiting for packet... (Timeout: {:?})",
stream.read_timeout().unwrap_or(None)
);
let mut size_bytes = [0u8; 4];
stream.read_exact(&mut size_bytes)?;
let size = u32::from_be_bytes(size_bytes);
let mut bytes = vec![];
bytes.resize(size as usize, 0u8);
#[cfg(feature = "llmp_debug")]
println!("LLMP TCP: Receiving payload of size {}", size);
stream
.read_exact(&mut bytes)
.expect("Failed to read message body");
Ok(bytes)
}
/// In case we don't have enough space, make sure the next page will be large /// In case we don't have enough space, make sure the next page will be large
/// enough. For now, we want to have at least enough space to store 2 of the /// enough. For now, we want to have at least enough space to store 2 of the
/// largest messages we encountered (plus message one new_page message). /// largest messages we encountered (plus message one new_page message).
@ -324,18 +457,22 @@ pub enum LlmpMsgHookResult {
#[repr(C, packed)] #[repr(C, packed)]
pub struct LlmpMsg { pub struct LlmpMsg {
/// A tag /// A tag
pub tag: Tag, pub tag: Tag, //u32
/// Sender of this messge /// Sender of this messge
pub sender: u32, pub sender: ClientId, //u32
/// ID of another Broker, for b2b messages
pub broker: BrokerId, //u32
/// flags, currently only used for indicating compression /// flags, currently only used for indicating compression
pub flags: Flag, pub flags: Flags, //u32
/// The message ID, unique per page /// The message ID, unique per page
pub message_id: u64, pub message_id: MessageId, //u64
/// Buffer length as specified by the user /// Buffer length as specified by the user
pub buf_len: u64, pub buf_len: u64,
/// (Actual) buffer length after padding /// (Actual) buffer length after padding
// Padding makes sure the next msg is aligned.
pub buf_len_padded: u64, pub buf_len_padded: u64,
/// The buf /// The actual payload buf
// We try to keep the start of buf 64-bit aligned!
pub buf: [u8; 0], pub buf: [u8; 0],
} }
@ -399,7 +536,7 @@ where
#[cfg(feature = "std")] #[cfg(feature = "std")]
/// Creates either a broker, if the tcp port is not bound, or a client, connected to this port. /// Creates either a broker, if the tcp port is not bound, or a client, connected to this port.
pub fn on_port(shmem_provider: SP, port: u16) -> Result<Self, Error> { pub fn on_port(shmem_provider: SP, port: u16) -> Result<Self, Error> {
match TcpListener::bind(format!("127.0.0.1:{}", port)) { match TcpListener::bind(format!("{}:{}", _LLMP_BIND_ADDR, port)) {
Ok(listener) => { Ok(listener) => {
// We got the port. We are the broker! :) // We got the port. We are the broker! :)
dbg!("We're the broker"); dbg!("We're the broker");
@ -449,7 +586,7 @@ where
} }
} }
pub fn send_buf_with_flags(&mut self, tag: Tag, buf: &[u8], flags: Flag) -> Result<(), Error> { pub fn send_buf_with_flags(&mut self, tag: Tag, buf: &[u8], flags: Flags) -> Result<(), Error> {
match self { match self {
LlmpConnection::IsBroker { broker } => broker.send_buf_with_flags(tag, flags, buf), LlmpConnection::IsBroker { broker } => broker.send_buf_with_flags(tag, flags, buf),
LlmpConnection::IsClient { client } => client.send_buf_with_flags(tag, flags, buf), LlmpConnection::IsClient { client } => client.send_buf_with_flags(tag, flags, buf),
@ -700,7 +837,7 @@ where
#[cfg(all(feature = "llmp_debug", feature = "std"))] #[cfg(all(feature = "llmp_debug", feature = "std"))]
dbg!( dbg!(
page, page,
(*page), *page,
(*page).size_used, (*page).size_used,
complete_msg_size, complete_msg_size,
EOP_MSG_SIZE, EOP_MSG_SIZE,
@ -918,7 +1055,7 @@ where
} }
} }
pub fn send_buf_with_flags(&mut self, tag: Tag, flags: Flag, buf: &[u8]) -> Result<(), Error> { pub fn send_buf_with_flags(&mut self, tag: Tag, flags: Flags, buf: &[u8]) -> Result<(), Error> {
// Make sure we don't reuse already allocated tags // Make sure we don't reuse already allocated tags
if tag == LLMP_TAG_NEW_SHM_CLIENT if tag == LLMP_TAG_NEW_SHM_CLIENT
|| tag == LLMP_TAG_END_OF_PAGE || tag == LLMP_TAG_END_OF_PAGE
@ -1159,8 +1296,9 @@ where
} }
} }
/// Receive the buffer, also reading the LLMP internal message flags
#[inline] #[inline]
pub fn recv_buf_with_flags(&mut self) -> Result<Option<(u32, Tag, Flag, &[u8])>, Error> { pub fn recv_buf_with_flags(&mut self) -> Result<Option<(ClientId, Tag, Flags, &[u8])>, Error> {
unsafe { unsafe {
Ok(match self.recv()? { Ok(match self.recv()? {
Some(msg) => Some(( Some(msg) => Some((
@ -1176,7 +1314,7 @@ where
/// Returns the next sender, tag, buf, looping until it becomes available /// Returns the next sender, tag, buf, looping until it becomes available
#[inline] #[inline]
pub fn recv_buf_blocking(&mut self) -> Result<(u32, Tag, &[u8]), Error> { pub fn recv_buf_blocking(&mut self) -> Result<(ClientId, Tag, &[u8]), Error> {
unsafe { unsafe {
let msg = self.recv_blocking()?; let msg = self.recv_blocking()?;
Ok(( Ok((
@ -1233,7 +1371,7 @@ where
SHM: ShMem, SHM: ShMem,
{ {
/// Creates a new page, initializing the passed shared mem struct /// Creates a new page, initializing the passed shared mem struct
pub fn new(sender: u32, mut new_map: SHM) -> Self { pub fn new(sender: ClientId, mut new_map: SHM) -> Self {
#[cfg(all(feature = "llmp_debug", feature = "std"))] #[cfg(all(feature = "llmp_debug", feature = "std"))]
println!( println!(
"LLMP_DEBUG: Initializing map on {} with size {}", "LLMP_DEBUG: Initializing map on {} with size {}",
@ -1403,7 +1541,7 @@ impl Handler for LlmpBrokerSignalHandler {
/// It may intercept messages passing through. /// It may intercept messages passing through.
impl<SP> LlmpBroker<SP> impl<SP> LlmpBroker<SP>
where where
SP: ShMemProvider, SP: ShMemProvider + 'static,
{ {
/// Create and initialize a new llmp_broker /// Create and initialize a new llmp_broker
pub fn new(mut shmem_provider: SP) -> Result<Self, Error> { pub fn new(mut shmem_provider: SP) -> Result<Self, Error> {
@ -1447,6 +1585,69 @@ where
}); });
} }
/// Connects to a broker running on another machine.
/// This will spawn a new background thread, registered as client, that proxies all messages to a remote machine.
/// Returns the description of the new page that still needs to be announced/added to the broker afterwards.
#[cfg(feature = "std")]
pub fn connect_b2b<A>(&mut self, addr: A) -> Result<(), Error>
where
A: ToSocketAddrs,
{
let mut stream = TcpStream::connect(addr)?;
println!("B2B: Connected to {:?}", stream);
match (&recv_tcp_msg(&mut stream)?).try_into()? {
TcpResponse::BrokerConnectHello {
broker_map_description: _,
hostname,
} => println!("B2B: Connected to {}", hostname),
_ => {
return Err(Error::IllegalState(
"Unexpected response from B2B server received.".to_string(),
))
}
};
let hostname = hostname::get()
.unwrap_or_else(|_| "<unknown>".into())
.to_string_lossy()
.into();
send_tcp_msg(&mut stream, &TcpRequest::RemoteBrokerHello { hostname })?;
let broker_id = match (&recv_tcp_msg(&mut stream)?).try_into()? {
TcpResponse::RemoteBrokerAccepted { broker_id } => {
println!("B2B: Got Connection Ack, broker_id {}", broker_id);
broker_id
}
_ => {
return Err(Error::IllegalState(
"Unexpected response from B2B server received.".to_string(),
));
}
};
// TODO: use broker ids!
println!("B2B: We are broker {}", broker_id);
// TODO: handle broker_ids properly/at all.
let map_description = Self::b2b_thread_on(
stream,
&self.shmem_provider,
self.llmp_clients.len() as ClientId,
&self.llmp_out.out_maps.first().unwrap().shmem.description(),
)?;
let new_map =
LlmpSharedMap::existing(self.shmem_provider.from_description(map_description)?);
{
self.register_client(new_map);
}
Ok(())
}
/// For internal use: Forward the current message to the out map. /// For internal use: Forward the current message to the out map.
unsafe fn forward_msg(&mut self, msg: *mut LlmpMsg) -> Result<(), Error> { unsafe fn forward_msg(&mut self, msg: *mut LlmpMsg) -> Result<(), Error> {
let mut out: *mut LlmpMsg = self.alloc_next((*msg).buf_len_padded as usize)?; let mut out: *mut LlmpMsg = self.alloc_next((*msg).buf_len_padded as usize)?;
@ -1471,7 +1672,7 @@ where
#[inline] #[inline]
pub fn once<F>(&mut self, on_new_msg: &mut F) -> Result<(), Error> pub fn once<F>(&mut self, on_new_msg: &mut F) -> Result<(), Error>
where where
F: FnMut(u32, Tag, Flag, &[u8]) -> Result<LlmpMsgHookResult, Error>, F: FnMut(ClientId, Tag, Flags, &[u8]) -> Result<LlmpMsgHookResult, Error>,
{ {
compiler_fence(Ordering::SeqCst); compiler_fence(Ordering::SeqCst);
for i in 0..self.llmp_clients.len() { for i in 0..self.llmp_clients.len() {
@ -1502,7 +1703,7 @@ where
/// 5 millis of sleep can't hurt to keep busywait not at 100% /// 5 millis of sleep can't hurt to keep busywait not at 100%
pub fn loop_forever<F>(&mut self, on_new_msg: &mut F, sleep_time: Option<Duration>) pub fn loop_forever<F>(&mut self, on_new_msg: &mut F, sleep_time: Option<Duration>)
where where
F: FnMut(u32, Tag, Flag, &[u8]) -> Result<LlmpMsgHookResult, Error>, F: FnMut(ClientId, Tag, Flags, &[u8]) -> Result<LlmpMsgHookResult, Error>,
{ {
#[cfg(unix)] #[cfg(unix)]
if let Err(_e) = unsafe { setup_signal_handler(&mut GLOBAL_SIGHANDLER_STATE) } { if let Err(_e) = unsafe { setup_signal_handler(&mut GLOBAL_SIGHANDLER_STATE) } {
@ -1539,20 +1740,221 @@ where
self.llmp_out.send_buf(tag, buf) self.llmp_out.send_buf(tag, buf)
} }
pub fn send_buf_with_flags(&mut self, tag: Tag, flags: Flag, buf: &[u8]) -> Result<(), Error> { pub fn send_buf_with_flags(&mut self, tag: Tag, flags: Flags, buf: &[u8]) -> Result<(), Error> {
self.llmp_out.send_buf_with_flags(tag, flags, buf) self.llmp_out.send_buf_with_flags(tag, flags, buf)
} }
#[cfg(feature = "std")]
/// Launches a thread using a tcp listener socket, on which new clients may connect to this broker /// Launches a thread using a tcp listener socket, on which new clients may connect to this broker
/// Does so on the given port. /// Does so on the given port.
#[cfg(feature = "std")]
pub fn launch_tcp_listener_on(&mut self, port: u16) -> Result<thread::JoinHandle<()>, Error> { pub fn launch_tcp_listener_on(&mut self, port: u16) -> Result<thread::JoinHandle<()>, Error> {
let listener = TcpListener::bind(format!("127.0.0.1:{}", port))?; let listener = TcpListener::bind(format!("{}:{}", _LLMP_BIND_ADDR, port))?;
// accept connections and process them, spawning a new thread for each one // accept connections and process them, spawning a new thread for each one
println!("Server listening on port {}", port); println!("Server listening on port {}", port);
self.launch_listener(Listener::Tcp(listener)) self.launch_listener(Listener::Tcp(listener))
} }
/// Announces a new client on the given shared map.
/// Called from a background thread, typically.
/// Upon receiving this message, the broker should map the announced page and start trckang it for new messages.
#[allow(dead_code)]
fn announce_new_client(
sender: &mut LlmpSender<SP>,
shmem_description: &ShMemDescription,
) -> Result<(), Error> {
unsafe {
let msg = sender
.alloc_next(size_of::<LlmpPayloadSharedMapInfo>())
.expect("Could not allocate a new message in shared map.");
(*msg).tag = LLMP_TAG_NEW_SHM_CLIENT;
let pageinfo = (*msg).buf.as_mut_ptr() as *mut LlmpPayloadSharedMapInfo;
(*pageinfo).shm_str = *shmem_description.id.as_slice();
(*pageinfo).map_size = shmem_description.size;
sender.send(msg)
}
}
/// For broker to broker connections:
/// Launches a proxy thread.
/// It will read outgoing messages from the given broker map (and handle EOP by mapping a new page).
/// This function returns the ShMemDescription the client uses to place incoming messages.
/// The thread exits, when the remote broker disconnects.
#[cfg(feature = "std")]
fn b2b_thread_on(
mut stream: TcpStream,
shmem_provider: &SP,
b2b_client_id: ClientId,
broker_map_description: &ShMemDescription,
) -> Result<ShMemDescription, Error> {
let broker_map_description = *broker_map_description;
let mut shmem_provider_clone = shmem_provider.clone();
// A channel to get the new "client's" sharedmap id from
let (send, recv) = channel();
// (For now) the thread remote broker 2 broker just acts like a "normal" llmp client, except it proxies all messages to the attached socket, in both directions.
thread::spawn(move || {
// as always, call post_fork to potentially reconnect the provider (for threaded/forked use)
shmem_provider_clone.post_fork();
#[cfg(fature = "llmp_debug")]
println!("B2b: Spawned proxy thread");
// The background thread blocks on the incoming connection for 15 seconds (if no data is available), then checks if it should forward own messages, then blocks some more.
stream
.set_read_timeout(Some(_LLMP_B2B_BLOCK_TIME))
.expect("Failed to set tcp stream timeout");
let mut new_sender =
match LlmpSender::new(shmem_provider_clone.clone(), b2b_client_id, false) {
Ok(new_sender) => new_sender,
Err(e) => {
panic!("B2B: Could not map shared map: {}", e);
}
};
send.send(new_sender.out_maps.first().unwrap().shmem.description())
.expect("B2B: Error sending map description to channel!");
// the receiver receives from the local broker, and forwards it to the tcp stream.
let mut local_receiver = LlmpReceiver::on_existing_from_description(
shmem_provider_clone,
&LlmpDescription {
last_message_offset: None,
shmem: broker_map_description,
},
)
.expect("Failed to map local page in broker 2 broker thread!");
#[cfg(all(feature = "llmp_debug", feature = "std"))]
dbg!("B2B: Starting proxy loop :)");
loop {
// first, forward all data we have.
while let Some((client_id, tag, flags, payload)) = local_receiver
.recv_buf_with_flags()
.expect("Error reading from local page!")
{
if client_id == b2b_client_id {
dbg!("Ignored message we probably sent earlier (same id)", tag);
continue;
}
#[cfg(all(feature = "llmp_debug", feature = "std"))]
dbg!(
"Fowarding message via broker2broker connection",
payload.len()
);
// We got a new message! Forward...
send_tcp_msg(
&mut stream,
&TcpRemoteNewMessage {
client_id,
tag,
flags,
payload: payload.to_vec(),
},
)
.expect("Error sending message via broker 2 broker");
}
// Then, see if we can receive something.
// We set a timeout on the receive earlier.
// This makes sure we will still forward our own stuff.
// Forwarding happens between each recv, too, as simplification.
// We ignore errors completely as they may be timeout, or stream closings.
// Instead, we catch stream close when/if we next try to send.
if let Ok(val) = recv_tcp_msg(&mut stream) {
let msg: TcpRemoteNewMessage = (&val).try_into().expect(
"Illegal message received from broker 2 broker connection - shutting down.",
);
#[cfg(all(feature = "llmp_debug", feature = "std"))]
dbg!(
"Fowarding incoming message from broker2broker connection",
msg.payload.len()
);
// TODO: Could probably optimize this somehow to forward all queued messages between locks... oh well.
// Todo: somehow mangle in the other broker id? ClientId?
new_sender
.send_buf_with_flags(msg.tag, msg.flags | LLMP_FLAG_FROM_B2B, &msg.payload)
.expect("B2B: Error forwarding message. Exiting.");
} else {
#[cfg(all(feature = "llmp_debug", feature = "std"))]
dbg!("Received no input, timeout or closed. Looping back up :)");
}
}
});
let ret = recv.recv().map_err(|_| {
Error::Unknown("Error launching background thread for b2b communcation".to_string())
});
#[cfg(all(feature = "llmp_debug", feature = "std"))]
dbg!("B2B: returning from loop. Success: {}", ret.is_ok());
ret
}
/// handles a single tcp request in the current context.
#[cfg(feature = "std")]
fn handle_tcp_request(
mut stream: TcpStream,
request: &TcpRequest,
current_client_id: &mut u32,
sender: &mut LlmpSender<SP>,
shmem_provider: &SP,
broker_map_description: &ShMemDescription,
) {
match request {
TcpRequest::LocalClientHello { shmem_description } => {
match Self::announce_new_client(sender, shmem_description) {
Ok(()) => (),
Err(e) => println!("Error forwarding client on map: {:?}", e),
};
if let Err(e) = send_tcp_msg(
&mut stream,
&TcpResponse::LocalClientAccepted {
client_id: *current_client_id,
},
) {
println!("An error occurred sending via tcp {}", e);
};
*current_client_id += 1;
}
TcpRequest::RemoteBrokerHello { hostname } => {
println!("B2B new client: {}", hostname);
// TODO: Clean up broker ids.
if send_tcp_msg(
&mut stream,
&TcpResponse::RemoteBrokerAccepted {
broker_id: *current_client_id,
},
)
.is_err()
{
println!("Error accepting broker, ignoring.");
return;
}
if let Ok(shmem_description) = Self::b2b_thread_on(
stream,
shmem_provider,
*current_client_id,
&broker_map_description,
) {
if Self::announce_new_client(sender, &shmem_description).is_err() {
println!("B2B: Error announcing client {:?}", shmem_description);
};
*current_client_id += 1;
}
}
};
}
#[cfg(feature = "std")] #[cfg(feature = "std")]
/// Launches a thread using a listener socket, on which new clients may connect to this broker /// Launches a thread using a listener socket, on which new clients may connect to this broker
pub fn launch_listener(&mut self, listener: Listener) -> Result<thread::JoinHandle<()>, Error> { pub fn launch_listener(&mut self, listener: Listener) -> Result<thread::JoinHandle<()>, Error> {
@ -1562,37 +1964,43 @@ where
// to read from the initial map id. // to read from the initial map id.
let client_out_map_mem = &self.llmp_out.out_maps.first().unwrap().shmem; let client_out_map_mem = &self.llmp_out.out_maps.first().unwrap().shmem;
let broadcast_map_description = postcard::to_allocvec(&client_out_map_mem.description())?; let broker_map_description = client_out_map_mem.description();
let hostname = hostname::get()
.unwrap_or_else(|_| "<unknown>".into())
.to_string_lossy()
.into();
let broker_hello = TcpResponse::BrokerConnectHello {
broker_map_description,
hostname,
};
let mut incoming_map_description_serialized = vec![0u8; broadcast_map_description.len()]; let llmp_tcp_id = self.llmp_clients.len() as ClientId;
let llmp_tcp_id = self.llmp_clients.len() as u32;
// Tcp out map sends messages from background thread tcp server to foreground client // Tcp out map sends messages from background thread tcp server to foreground client
let tcp_out_map = LlmpSharedMap::new( let tcp_out_map = LlmpSharedMap::new(
llmp_tcp_id, llmp_tcp_id,
self.shmem_provider.new_map(LLMP_CFG_INITIAL_MAP_SIZE)?, self.shmem_provider.new_map(LLMP_CFG_INITIAL_MAP_SIZE)?,
); );
let shmem_id = tcp_out_map.shmem.id(); let tcp_out_map_description = tcp_out_map.shmem.description();
let tcp_out_map_str = *shmem_id.as_slice();
let tcp_out_map_size = tcp_out_map.shmem.len();
self.register_client(tcp_out_map); self.register_client(tcp_out_map);
let mut shmem_provider_clone = self.shmem_provider.clone(); let mut shmem_provider_clone = self.shmem_provider.clone();
Ok(thread::spawn(move || { Ok(thread::spawn(move || {
// Call `post_fork` (even though this is not forked) so we get a new connection to the cloned `ShMemServer` if we are using a `ServedShMemProvider`
shmem_provider_clone.post_fork(); shmem_provider_clone.post_fork();
// Clone so we get a new connection to the AshmemServer if we are using
// ServedShMemProvider let mut current_client_id = llmp_tcp_id + 1;
let mut new_client_sender = LlmpSender {
id: 0, let mut tcp_incoming_sender = LlmpSender {
id: llmp_tcp_id,
last_msg_sent: ptr::null_mut(), last_msg_sent: ptr::null_mut(),
out_maps: vec![LlmpSharedMap::existing( out_maps: vec![LlmpSharedMap::existing(
shmem_provider_clone shmem_provider_clone
.from_id_and_size(ShMemId::from_slice(&tcp_out_map_str), tcp_out_map_size) .from_description(tcp_out_map_description)
.unwrap(), .unwrap(),
)], )],
// drop pages to the broker if it already read them // drop pages to the broker, if it already read them.
keep_pages_forever: false, keep_pages_forever: false,
shmem_provider: shmem_provider_clone.clone(), shmem_provider: shmem_provider_clone.clone(),
}; };
@ -1601,38 +2009,40 @@ where
match listener.accept() { match listener.accept() {
ListenerStream::Tcp(mut stream, addr) => { ListenerStream::Tcp(mut stream, addr) => {
dbg!("New connection", addr, stream.peer_addr().unwrap()); dbg!("New connection", addr, stream.peer_addr().unwrap());
match stream.write(&broadcast_map_description) {
Ok(_) => {} // fire & forget // Send initial information, without anyone asking.
// This makes it a tiny bit easier to map the broker map for new Clients.
match send_tcp_msg(&mut stream, &broker_hello) {
Ok(()) => {}
Err(e) => { Err(e) => {
dbg!("Could not send to shmap to client", e); dbg!("Error sending initial hello: {:?}", e);
continue;
}
}
let buf = match recv_tcp_msg(&mut stream) {
Ok(buf) => buf,
Err(e) => {
dbg!("Error receving from tcp", e);
continue; continue;
} }
}; };
match stream.read_exact(&mut incoming_map_description_serialized) { let req = match (&buf).try_into() {
Ok(()) => (), Ok(req) => req,
Err(e) => { Err(e) => {
dbg!("Ignoring failed read from client", e); dbg!("Could not deserialize tcp message", e);
continue; continue;
} }
}; };
if let Ok(incoming_map_description) = postcard::from_bytes::<ShMemDescription>(
&incoming_map_description_serialized, Self::handle_tcp_request(
) { stream,
unsafe { &req,
let msg = new_client_sender &mut current_client_id,
.alloc_next(size_of::<LlmpPayloadSharedMapInfo>()) &mut tcp_incoming_sender,
.expect("Could not allocate a new message in shared map."); &shmem_provider_clone,
(*msg).tag = LLMP_TAG_NEW_SHM_CLIENT; &broker_map_description,
let pageinfo = );
(*msg).buf.as_mut_ptr() as *mut LlmpPayloadSharedMapInfo;
(*pageinfo).shm_str = *incoming_map_description.id.as_slice();
(*pageinfo).map_size = incoming_map_description.size;
match new_client_sender.send(msg) {
Ok(()) => (),
Err(e) => println!("Error forwarding client on map: {:?}", e),
};
}
}
} }
ListenerStream::Empty() => { ListenerStream::Empty() => {
continue; continue;
@ -1646,7 +2056,7 @@ where
#[inline] #[inline]
unsafe fn handle_new_msgs<F>(&mut self, client_id: u32, on_new_msg: &mut F) -> Result<(), Error> unsafe fn handle_new_msgs<F>(&mut self, client_id: u32, on_new_msg: &mut F) -> Result<(), Error>
where where
F: FnMut(u32, Tag, Flag, &[u8]) -> Result<LlmpMsgHookResult, Error>, F: FnMut(ClientId, Tag, Flags, &[u8]) -> Result<LlmpMsgHookResult, Error>,
{ {
let mut next_id = self.llmp_clients.len() as u32; let mut next_id = self.llmp_clients.len() as u32;
@ -1880,7 +2290,7 @@ where
self.sender.send_buf(tag, buf) self.sender.send_buf(tag, buf)
} }
pub fn send_buf_with_flags(&mut self, tag: Tag, flags: Flag, buf: &[u8]) -> Result<(), Error> { pub fn send_buf_with_flags(&mut self, tag: Tag, flags: Flags, buf: &[u8]) -> Result<(), Error> {
self.sender.send_buf_with_flags(tag, flags, buf) self.sender.send_buf_with_flags(tag, flags, buf)
} }
@ -1943,7 +2353,7 @@ where
self.receiver.recv_buf_blocking() self.receiver.recv_buf_blocking()
} }
pub fn recv_buf_with_flags(&mut self) -> Result<Option<(u32, Tag, Flag, &[u8])>, Error> { pub fn recv_buf_with_flags(&mut self) -> Result<Option<(ClientId, Tag, Flags, &[u8])>, Error> {
self.receiver.recv_buf_with_flags() self.receiver.recv_buf_with_flags()
} }
@ -1957,26 +2367,48 @@ where
#[cfg(feature = "std")] #[cfg(feature = "std")]
/// Create a LlmpClient, getting the ID from a given port /// Create a LlmpClient, getting the ID from a given port
pub fn create_attach_to_tcp(mut shmem_provider: SP, port: u16) -> Result<Self, Error> { pub fn create_attach_to_tcp(mut shmem_provider: SP, port: u16) -> Result<Self, Error> {
let mut stream = TcpStream::connect(format!("127.0.0.1:{}", port))?; let mut stream = TcpStream::connect(format!("{}:{}", _LLMP_BIND_ADDR, port))?;
println!("Connected to port {}", port); println!("Connected to port {}", port);
// First, get the serialized description size by serializing a dummy. let broker_map_description = if let TcpResponse::BrokerConnectHello {
let dummy_description = ShMemDescription { broker_map_description,
size: 0, hostname: _,
id: ShMemId::default(), } = (&recv_tcp_msg(&mut stream)?).try_into()?
{
broker_map_description
} else {
return Err(Error::IllegalState(
"Received unexpected Broker Hello".to_string(),
));
}; };
let mut new_broker_map_str = postcard::to_allocvec(&dummy_description)?;
stream.read_exact(&mut new_broker_map_str)?;
let broker_map_description: ShMemDescription = postcard::from_bytes(&new_broker_map_str)?;
let map = LlmpSharedMap::existing(shmem_provider.from_description(broker_map_description)?); let map = LlmpSharedMap::existing(shmem_provider.from_description(broker_map_description)?);
let ret = Self::new(shmem_provider, map)?; let mut ret = Self::new(shmem_provider, map)?;
let client_hello_req = TcpRequest::LocalClientHello {
shmem_description: ret.sender.out_maps.first().unwrap().shmem.description(),
};
send_tcp_msg(&mut stream, &client_hello_req)?;
let client_id = if let TcpResponse::LocalClientAccepted { client_id } =
(&recv_tcp_msg(&mut stream)?).try_into()?
{
client_id
} else {
return Err(Error::IllegalState(
"Unexpected Response from Broker".to_string(),
));
};
// Set our ID to the one the broker sent us..
// This is mainly so we can filter out our own msgs later.
ret.sender.id = client_id;
// Also set the sender on our initial llmp map correctly.
unsafe {
(*ret.sender.out_maps.first_mut().unwrap().page_mut()).sender = client_id;
}
let own_map_description_bytes =
postcard::to_allocvec(&ret.sender.out_maps.first().unwrap().shmem.description())?;
stream.write_all(&own_map_description_bytes)?;
Ok(ret) Ok(ret)
} }
} }

View File

@ -171,8 +171,8 @@ pub trait ShMemProvider: Send + Clone + Default + Debug {
)) ))
} }
/// This method should be called after a fork or thread creation event, allowing the ShMem to /// This method should be called after a fork or after cloning/a thread creation event, allowing the ShMem to
/// reset thread specific info. /// reset thread specific info, and potentially reconnect.
fn post_fork(&mut self) { fn post_fork(&mut self) {
// do nothing // do nothing
} }
@ -183,6 +183,9 @@ pub trait ShMemProvider: Send + Clone + Default + Debug {
} }
} }
/// A Refernce Counted shared map,
/// that can use internal mutability.
/// Useful if the `ShMemProvider` needs to keep local state.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct RcShMem<T: ShMemProvider> { pub struct RcShMem<T: ShMemProvider> {
internal: ManuallyDrop<T::Mem>, internal: ManuallyDrop<T::Mem>,
@ -216,6 +219,9 @@ impl<T: ShMemProvider> Drop for RcShMem<T> {
} }
} }
/// A Refernce Counted `ShMemProvider`,
/// that can use internal mutability.
/// Useful if the `ShMemProvider` needs to keep local state.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct RcShMemProvider<T: ShMemProvider> { pub struct RcShMemProvider<T: ShMemProvider> {
internal: Rc<RefCell<T>>, internal: Rc<RefCell<T>>,
@ -274,6 +280,11 @@ where
} }
} }
/// A Unix sharedmem implementation.
///
/// On Android, this is partially reused to wrap `Ashmem`,
/// Although for an `AshmemShMemProvider using a unix domain socket
/// Is needed on top.
#[cfg(all(unix, feature = "std"))] #[cfg(all(unix, feature = "std"))]
pub mod unix_shmem { pub mod unix_shmem {

View File

@ -15,7 +15,7 @@ use crate::bolts::{
use crate::{ use crate::{
bolts::{ bolts::{
llmp::{self, Flag, LlmpClientDescription, LlmpSender, Tag}, llmp::{self, Flags, LlmpClientDescription, LlmpSender, Tag},
shmem::ShMemProvider, shmem::ShMemProvider,
}, },
corpus::CorpusScheduler, corpus::CorpusScheduler,
@ -173,7 +173,7 @@ where
#[cfg(feature = "llmp_compression")] #[cfg(feature = "llmp_compression")]
let compressor = &self.compressor; let compressor = &self.compressor;
broker.loop_forever( broker.loop_forever(
&mut |sender_id: u32, tag: Tag, _flags: Flag, msg: &[u8]| { &mut |sender_id: u32, tag: Tag, _flags: Flags, msg: &[u8]| {
if tag == LLMP_TAG_EVENT_TO_BOTH { if tag == LLMP_TAG_EVENT_TO_BOTH {
#[cfg(not(feature = "llmp_compression"))] #[cfg(not(feature = "llmp_compression"))]
let event_bytes = msg; let event_bytes = msg;
@ -376,7 +376,7 @@ where
#[cfg(feature = "llmp_compression")] #[cfg(feature = "llmp_compression")]
fn fire(&mut self, _state: &mut S, event: Event<I>) -> Result<(), Error> { fn fire(&mut self, _state: &mut S, event: Event<I>) -> Result<(), Error> {
let serialized = postcard::to_allocvec(&event)?; let serialized = postcard::to_allocvec(&event)?;
let flags: Flag = LLMP_FLAG_INITIALIZED; let flags: Flags = LLMP_FLAG_INITIALIZED;
match self.compressor.compress(&serialized)? { match self.compressor.compress(&serialized)? {
Some(comp_buf) => { Some(comp_buf) => {