added tcp server
This commit is contained in:
parent
eb8941d422
commit
ff8a89f0c1
@ -52,7 +52,14 @@ use core::ptr;
|
|||||||
use core::sync::atomic::{compiler_fence, Ordering};
|
use core::sync::atomic::{compiler_fence, Ordering};
|
||||||
use core::time::Duration;
|
use core::time::Duration;
|
||||||
use libc::{c_uint, c_ulong, c_ushort};
|
use libc::{c_uint, c_ulong, c_ushort};
|
||||||
use std::{cmp::max, ffi::CStr, mem::size_of, thread};
|
use std::{
|
||||||
|
cmp::max,
|
||||||
|
ffi::CStr,
|
||||||
|
io::{Read, Write},
|
||||||
|
mem::size_of,
|
||||||
|
net::TcpListener,
|
||||||
|
thread,
|
||||||
|
};
|
||||||
|
|
||||||
use crate::utils::next_pow2;
|
use crate::utils::next_pow2;
|
||||||
use crate::AflError;
|
use crate::AflError;
|
||||||
@ -471,6 +478,28 @@ impl LlmpSender {
|
|||||||
(*msg).tag = LLMP_TAG_UNSET;
|
(*msg).tag = LLMP_TAG_UNSET;
|
||||||
(*page).size_used -= (*msg).buf_len_padded as usize + size_of::<LlmpMsg>();
|
(*page).size_used -= (*msg).buf_len_padded as usize + size_of::<LlmpMsg>();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Allocates a message of the given size, tags it, and sends it off.
|
||||||
|
pub fn send_buf(&mut self, tag: u32, buf: &[u8]) -> Result<(), AflError> {
|
||||||
|
// Make sure we don't reuse already allocated tags
|
||||||
|
if tag == LLMP_TAG_NEW_SHM_CLIENT
|
||||||
|
|| tag == LLMP_TAG_END_OF_PAGE
|
||||||
|
|| tag == LLMP_TAG_UNINITIALIZED
|
||||||
|
|| tag == LLMP_TAG_UNSET
|
||||||
|
{
|
||||||
|
return Err(AflError::Unknown(format!(
|
||||||
|
"Reserved tag supplied to send_buf ({:#X})",
|
||||||
|
tag
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
unsafe {
|
||||||
|
let msg = self.alloc_next(buf.len())?;
|
||||||
|
(*msg).tag = tag;
|
||||||
|
buf.as_ptr()
|
||||||
|
.copy_to_nonoverlapping((*msg).buf.as_mut_ptr(), buf.len());
|
||||||
|
self.send(msg)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Receiving end of an llmp channel
|
/// Receiving end of an llmp channel
|
||||||
@ -625,7 +654,7 @@ impl LlmpBroker {
|
|||||||
|
|
||||||
/// Registers a new client for the given sharedmap str and size.
|
/// Registers a new client for the given sharedmap str and size.
|
||||||
/// Returns the id of the new client in broker.client_map
|
/// Returns the id of the new client in broker.client_map
|
||||||
pub unsafe fn register_client(&mut self, client_page: LlmpSharedMap) {
|
pub fn register_client(&mut self, client_page: LlmpSharedMap) {
|
||||||
let id = self.llmp_clients.len() as u32;
|
let id = self.llmp_clients.len() as u32;
|
||||||
self.llmp_clients.push(LlmpReceiver {
|
self.llmp_clients.push(LlmpReceiver {
|
||||||
id,
|
id,
|
||||||
@ -740,6 +769,75 @@ impl LlmpBroker {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn launch_tcp_listener(&mut self, port: u16) -> Result<thread::JoinHandle<()>, AflError> {
|
||||||
|
// Later in the execution, after the initial map filled up,
|
||||||
|
// the current broacast map will will point to a different map.
|
||||||
|
// However, the original map is (as of now) never freed, new clients will start
|
||||||
|
// to read from the initial map id.
|
||||||
|
|
||||||
|
let listener = TcpListener::bind(format!("127.0.0.1:{}", port))?;
|
||||||
|
// accept connections and process them, spawning a new thread for each one
|
||||||
|
println!("Server listening on port {}", port);
|
||||||
|
|
||||||
|
let client_out_map_mem = &self.llmp_out.out_maps.first().unwrap().shmem;
|
||||||
|
let broadcast_str_initial = client_out_map_mem.shm_str.clone();
|
||||||
|
|
||||||
|
let llmp_tcp_id = self.llmp_clients.len() as u32;
|
||||||
|
|
||||||
|
// Tcp out map sends messages from background thread tcp server to foreground client
|
||||||
|
let tcp_out_map = LlmpSharedMap::new(llmp_tcp_id, LLMP_PREF_INITIAL_MAP_SIZE)?;
|
||||||
|
let tcp_out_map_str = tcp_out_map.shmem.shm_str;
|
||||||
|
let tcp_out_map_size = tcp_out_map.shmem.map_size;
|
||||||
|
self.register_client(tcp_out_map);
|
||||||
|
|
||||||
|
Ok(thread::spawn(move || {
|
||||||
|
let mut new_client_sender = LlmpSender {
|
||||||
|
id: 0,
|
||||||
|
last_msg_sent: 0 as *mut LlmpMsg,
|
||||||
|
out_maps: vec![
|
||||||
|
LlmpSharedMap::from_name_slice(&tcp_out_map_str, tcp_out_map_size).unwrap(),
|
||||||
|
],
|
||||||
|
// drop pages to the broker if it already read them
|
||||||
|
keep_pages_forever: false,
|
||||||
|
};
|
||||||
|
|
||||||
|
loop {
|
||||||
|
let (mut stream, addr) = match listener.accept() {
|
||||||
|
Ok(res) => res,
|
||||||
|
Err(e) => {
|
||||||
|
dbg!("Ignoring failed accept", e);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
dbg!("New connection", addr, stream.peer_addr().unwrap());
|
||||||
|
match stream.write(&broadcast_str_initial) {
|
||||||
|
Ok(_) => {} // fire & forget
|
||||||
|
Err(e) => {
|
||||||
|
dbg!("Could not send to shmap to client", e);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
let mut new_client_map_str: [u8; 20] = Default::default();
|
||||||
|
let map_str_len = match stream.read(&mut new_client_map_str) {
|
||||||
|
Ok(res) => res,
|
||||||
|
Err(e) => {
|
||||||
|
dbg!("Ignoring failed read from client", e);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
if map_str_len < 20 {
|
||||||
|
dbg!("Didn't receive a complete shmap id str from client. Ignoring.");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
match new_client_sender.send_buf(LLMP_TAG_NEW_SHM_CLIENT, &new_client_map_str) {
|
||||||
|
Ok(()) => (),
|
||||||
|
Err(e) => println!("Error forwarding client on map: {:?}", e),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// `n` clients connect to a broker. They share an outgoing map with the broker,
|
/// `n` clients connect to a broker. They share an outgoing map with the broker,
|
||||||
@ -770,24 +868,7 @@ impl LlmpClient {
|
|||||||
|
|
||||||
/// Allocates a message of the given size, tags it, and sends it off.
|
/// Allocates a message of the given size, tags it, and sends it off.
|
||||||
pub fn send_buf(&mut self, tag: u32, buf: &[u8]) -> Result<(), AflError> {
|
pub fn send_buf(&mut self, tag: u32, buf: &[u8]) -> Result<(), AflError> {
|
||||||
// Make sure we don't reuse already allocated tags
|
self.llmp_out.send_buf(tag, buf)
|
||||||
if tag == LLMP_TAG_NEW_SHM_CLIENT
|
|
||||||
|| tag == LLMP_TAG_END_OF_PAGE
|
|
||||||
|| tag == LLMP_TAG_UNINITIALIZED
|
|
||||||
|| tag == LLMP_TAG_UNSET
|
|
||||||
{
|
|
||||||
return Err(AflError::Unknown(format!(
|
|
||||||
"Reserved tag supplied to send_buf ({:#X})",
|
|
||||||
tag
|
|
||||||
)));
|
|
||||||
}
|
|
||||||
unsafe {
|
|
||||||
let msg = self.alloc_next(buf.len())?;
|
|
||||||
(*msg).tag = tag;
|
|
||||||
buf.as_ptr()
|
|
||||||
.copy_to_nonoverlapping((*msg).buf.as_mut_ptr(), buf.len());
|
|
||||||
self.send(msg)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A client receives a broadcast message.
|
/// A client receives a broadcast message.
|
||||||
|
@ -9,55 +9,6 @@ use super::{
|
|||||||
Event, EventManager,
|
Event, EventManager,
|
||||||
};
|
};
|
||||||
|
|
||||||
/*
|
|
||||||
pub unsafe fn llmp_tcp_server_clientloop(client: &mut LlmpClient, _data: *mut c_void) -> ! {
|
|
||||||
// Later in the execution, after the initial map filled up,
|
|
||||||
// the current broacast map will will point to a different map.
|
|
||||||
// However, the original map is (as of now) never freed, new clients will start
|
|
||||||
// to read from the initial map id.
|
|
||||||
let initial_broadcasts_map_str = client
|
|
||||||
.as_ref()
|
|
||||||
.unwrap()
|
|
||||||
.current_broadcast_map
|
|
||||||
.as_ref()
|
|
||||||
.unwrap()
|
|
||||||
.shm_str;
|
|
||||||
|
|
||||||
let listener = TcpListener::bind("0.0.0.0:3333").unwrap();
|
|
||||||
// accept connections and process them, spawning a new thread for each one
|
|
||||||
println!("Server listening on port 3333");
|
|
||||||
loop {
|
|
||||||
let (mut stream, addr) = match listener.accept() {
|
|
||||||
Ok(res) => res,
|
|
||||||
Err(e) => {
|
|
||||||
dbg!("Ignoring failed accept", e);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
dbg!("New connection", addr, stream.peer_addr().unwrap());
|
|
||||||
match stream.write(&initial_broadcasts_map_str as &[u8]) {
|
|
||||||
Ok(_) => {} // fire & forget
|
|
||||||
Err(e) => {
|
|
||||||
dbg!("Could not send to shmap to client", e);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
let mut new_client_map_str: [u8; 20] = Default::default();
|
|
||||||
let map_str_len = match stream.read(&mut new_client_map_str) {
|
|
||||||
Ok(res) => res,
|
|
||||||
Err(e) => {
|
|
||||||
dbg!("Ignoring failed read from client", e);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
if map_str_len < 20 {
|
|
||||||
dbg!("Didn't receive a complete shmap id str from client. Ignoring.");
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
|
|
||||||
/// Eventmanager for multi-processed application
|
/// Eventmanager for multi-processed application
|
||||||
#[cfg(feature = "std")]
|
#[cfg(feature = "std")]
|
||||||
pub struct LLMPEventManager<S, C, E, I, R>
|
pub struct LLMPEventManager<S, C, E, I, R>
|
||||||
|
@ -6,13 +6,11 @@ use core::marker::PhantomData;
|
|||||||
|
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
#[cfg(feature = "std")]
|
|
||||||
pub mod llmp_translated; // TODO: Abstract away.
|
|
||||||
#[cfg(feature = "std")]
|
#[cfg(feature = "std")]
|
||||||
pub mod shmem_translated;
|
pub mod shmem_translated;
|
||||||
|
|
||||||
#[cfg(feature = "std")]
|
/*#[cfg(feature = "std")]
|
||||||
pub use crate::events::llmp::LLMPEventManager;
|
pub use crate::events::llmp::LLMPEventManager;*/
|
||||||
|
|
||||||
#[cfg(feature = "std")]
|
#[cfg(feature = "std")]
|
||||||
use std::io::Write;
|
use std::io::Write;
|
||||||
|
@ -59,7 +59,6 @@ const AFL_RET_SUCCESS: c_uint = 0;
|
|||||||
// too.)
|
// too.)
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
#[repr(C)]
|
|
||||||
pub struct AflShmem {
|
pub struct AflShmem {
|
||||||
pub shm_str: [u8; 20],
|
pub shm_str: [u8; 20],
|
||||||
pub shm_id: c_int,
|
pub shm_id: c_int,
|
||||||
|
Loading…
x
Reference in New Issue
Block a user