connects and sends something again

This commit is contained in:
Dominik Maier 2020-12-09 00:28:10 +01:00
parent 960154a3de
commit d17c281b55
2 changed files with 48 additions and 23 deletions

View File

@ -79,7 +79,7 @@ fn main() {
let mode = std::env::args() let mode = std::env::args()
.nth(1) .nth(1)
.expect("no mode specified, chose 'broker', 'adder', or 'printer'"); .expect("no mode specified, chose 'broker', 'ctr', or 'adder'");
let port: u16 = std::env::args() let port: u16 = std::env::args()
.nth(2) .nth(2)
.unwrap_or("1337".into()) .unwrap_or("1337".into())
@ -94,7 +94,7 @@ fn main() {
broker.add_message_hook(broker_message_hook); broker.add_message_hook(broker_message_hook);
broker.loop_forever(Some(Duration::from_millis(5))) broker.loop_forever(Some(Duration::from_millis(5)))
} }
"adder" => { "ctr" => {
let mut client = llmp::LlmpClient::create_attach_to_tcp(port).unwrap(); let mut client = llmp::LlmpClient::create_attach_to_tcp(port).unwrap();
let mut counter: u32 = 0; let mut counter: u32 = 0;
loop { loop {
@ -104,7 +104,7 @@ fn main() {
.unwrap(); .unwrap();
} }
} }
"printer" => { "adder" => {
adder_loop(port); adder_loop(port);
} }
_ => { _ => {

View File

@ -83,7 +83,8 @@ const LLMP_TAG_END_OF_PAGE: u32 = 0xAF1E0F1;
const LLMP_TAG_NEW_SHM_CLIENT: u32 = 0xC11E471; const LLMP_TAG_NEW_SHM_CLIENT: u32 = 0xC11E471;
/// Size of a new page message, header, payload, and alignment /// Size of a new page message, header, payload, and alignment
const EOP_MSG_SIZE: usize = llmp_align(size_of::<LlmpMsg>() + size_of::<LlmpPayloadSharedMap>()); const EOP_MSG_SIZE: usize =
llmp_align(size_of::<LlmpMsg>() + size_of::<LlmpPayloadSharedMapInfo>());
/// The header length of a llmp page in a shared map (until messages start) /// The header length of a llmp page in a shared map (until messages start)
const LLMP_PAGE_HEADER_LEN: usize = size_of::<LlmpPage>(); const LLMP_PAGE_HEADER_LEN: usize = size_of::<LlmpPage>();
@ -199,7 +200,7 @@ pub enum LlmpMsgHookResult {
/// LLMP_TAG_END_OF_PAGE_V1 /// LLMP_TAG_END_OF_PAGE_V1
#[derive(Copy, Clone)] #[derive(Copy, Clone)]
#[repr(C, packed)] #[repr(C, packed)]
struct LlmpPayloadSharedMap { struct LlmpPayloadSharedMapInfo {
pub map_size: usize, pub map_size: usize,
pub shm_str: [u8; 20], pub shm_str: [u8; 20],
} }
@ -308,7 +309,7 @@ impl LlmpSender {
if (*ret).tag == LLMP_TAG_UNINITIALIZED { if (*ret).tag == LLMP_TAG_UNINITIALIZED {
panic!("Did not call send() on last message!"); panic!("Did not call send() on last message!");
} }
(*ret).buf_len_padded = size_of::<LlmpPayloadSharedMap>() as u64; (*ret).buf_len_padded = size_of::<LlmpPayloadSharedMapInfo>() as u64;
(*ret).message_id = if !last_msg.is_null() { (*ret).message_id = if !last_msg.is_null() {
(*last_msg).message_id + 1 (*last_msg).message_id + 1
} else { } else {
@ -440,7 +441,7 @@ impl LlmpSender {
let mut out: *mut LlmpMsg = self.alloc_eop(); let mut out: *mut LlmpMsg = self.alloc_eop();
(*out).sender = (*old_map).sender; (*out).sender = (*old_map).sender;
let mut end_of_page_msg = (*out).buf.as_mut_ptr() as *mut LlmpPayloadSharedMap; let mut end_of_page_msg = (*out).buf.as_mut_ptr() as *mut LlmpPayloadSharedMapInfo;
(*end_of_page_msg).map_size = new_map_shmem.shmem.map_size; (*end_of_page_msg).map_size = new_map_shmem.shmem.map_size;
(*end_of_page_msg).shm_str = new_map_shmem.shmem.shm_str; (*end_of_page_msg).shm_str = new_map_shmem.shmem.shm_str;
@ -543,14 +544,14 @@ impl LlmpReceiver {
LLMP_TAG_END_OF_PAGE => { LLMP_TAG_END_OF_PAGE => {
dbg!("Got end of page, allocing next"); dbg!("Got end of page, allocing next");
// Handle end of page // Handle end of page
if (*msg).buf_len < size_of::<LlmpPayloadSharedMap>() as u64 { if (*msg).buf_len < size_of::<LlmpPayloadSharedMapInfo>() as u64 {
panic!(format!( panic!(format!(
"Illegal message length for EOP (is {}, expected {})", "Illegal message length for EOP (is {}, expected {})",
(*msg).buf_len_padded, (*msg).buf_len_padded,
size_of::<LlmpPayloadSharedMap>() size_of::<LlmpPayloadSharedMapInfo>()
)); ));
} }
let pageinfo = (*msg).buf.as_mut_ptr() as *mut LlmpPayloadSharedMap; let pageinfo = (*msg).buf.as_mut_ptr() as *mut LlmpPayloadSharedMapInfo;
/* We can reuse the map mem space, no need to free and calloc. /* We can reuse the map mem space, no need to free and calloc.
However, the pageinfo points to the map we're about to unmap. However, the pageinfo points to the map we're about to unmap.
@ -726,13 +727,13 @@ impl LlmpBroker {
if (*msg).tag == LLMP_TAG_NEW_SHM_CLIENT { if (*msg).tag == LLMP_TAG_NEW_SHM_CLIENT {
/* This client informs us about yet another new client /* This client informs us about yet another new client
add it to the list! Also, no need to forward this msg. */ add it to the list! Also, no need to forward this msg. */
if (*msg).buf_len < size_of::<LlmpPayloadSharedMap>() as u64 { if (*msg).buf_len < size_of::<LlmpPayloadSharedMapInfo>() as u64 {
println!("Ignoring broken CLIENT_ADDED msg due to incorrect size. Expected {} but got {}", println!("Ignoring broken CLIENT_ADDED msg due to incorrect size. Expected {} but got {}",
(*msg).buf_len_padded, (*msg).buf_len_padded,
size_of::<LlmpPayloadSharedMap>() size_of::<LlmpPayloadSharedMapInfo>()
); );
} else { } else {
let pageinfo = (*msg).buf.as_mut_ptr() as *mut LlmpPayloadSharedMap; let pageinfo = (*msg).buf.as_mut_ptr() as *mut LlmpPayloadSharedMapInfo;
match LlmpSharedMap::from_name_slice(&(*pageinfo).shm_str, (*pageinfo).map_size) match LlmpSharedMap::from_name_slice(&(*pageinfo).shm_str, (*pageinfo).map_size)
{ {
@ -840,22 +841,27 @@ impl LlmpBroker {
} }
}; };
let mut new_client_map_str: [u8; 20] = Default::default(); let mut new_client_map_str: [u8; 20] = Default::default();
let map_str_len = match stream.read(&mut new_client_map_str) { match stream.read_exact(&mut new_client_map_str) {
Ok(res) => res, Ok(()) => (),
Err(e) => { Err(e) => {
dbg!("Ignoring failed read from client", e); dbg!("Ignoring failed read from client", e);
continue; continue;
} }
}; };
if map_str_len < 20 {
dbg!("Didn't receive a complete shmap id str from client. Ignoring.");
continue;
}
match new_client_sender.send_buf(LLMP_TAG_NEW_SHM_CLIENT, &new_client_map_str) { unsafe {
Ok(()) => (), let msg = new_client_sender
Err(e) => println!("Error forwarding client on map: {:?}", e), .alloc_next(size_of::<LlmpPayloadSharedMapInfo>())
}; .expect("Could not allocate a new message in shared map.");
(*msg).tag = LLMP_TAG_NEW_SHM_CLIENT;
let pageinfo = (*msg).buf.as_mut_ptr() as *mut LlmpPayloadSharedMapInfo;
(*pageinfo).shm_str = new_client_map_str;
(*pageinfo).map_size = LLMP_PREF_INITIAL_MAP_SIZE;
match new_client_sender.send(msg) {
Ok(()) => (),
Err(e) => println!("Error forwarding client on map: {:?}", e),
};
}
} }
})) }))
} }
@ -908,6 +914,25 @@ impl LlmpClient {
self.llmp_out.send_buf(tag, buf) self.llmp_out.send_buf(tag, buf)
} }
/// Informs the broker about a new client in town, with the given map id
pub fn send_client_added_msg(
&mut self,
shm_str: &[u8; 20],
shm_id: usize,
) -> Result<(), AflError> {
// We write this by hand to get around checks in send_buf
unsafe {
let msg = self
.alloc_next(size_of::<LlmpPayloadSharedMapInfo>())
.expect("Could not allocate a new message in shared map.");
(*msg).tag = LLMP_TAG_NEW_SHM_CLIENT;
let pageinfo = (*msg).buf.as_mut_ptr() as *mut LlmpPayloadSharedMapInfo;
(*pageinfo).shm_str = *shm_str;
(*pageinfo).map_size = shm_id;
self.send(msg)
}
}
/// A client receives a broadcast message. /// A client receives a broadcast message.
/// Returns null if no message is availiable /// Returns null if no message is availiable
pub unsafe fn recv(&mut self) -> Result<Option<*mut LlmpMsg>, AflError> { pub unsafe fn recv(&mut self) -> Result<Option<*mut LlmpMsg>, AflError> {