shm trait stated

This commit is contained in:
Dominik Maier 2020-12-18 01:14:16 +01:00
parent 5c15620bd3
commit 9362224020
6 changed files with 404 additions and 247 deletions

View File

@ -40,3 +40,4 @@ serde = { version = "1.0", default-features = false, features = ["alloc"] } # se
erased-serde = "0.3.12" erased-serde = "0.3.12"
postcard = { version = "0.5.1", features = ["alloc"] } # no_std compatible serde serialization fromat postcard = { version = "0.5.1", features = ["alloc"] } # no_std compatible serde serialization fromat
static_assertions = "1.1.0" static_assertions = "1.1.0"
#TODO: for llmp brotli = { version = "3.3.0", default-features = false } # brotli compression

View File

@ -58,6 +58,7 @@ use core::{
#[cfg(feature = "std")] #[cfg(feature = "std")]
use std::{ use std::{
env,
io::{Read, Write}, io::{Read, Write},
net::{TcpListener, TcpStream}, net::{TcpListener, TcpStream},
thread, thread,
@ -67,7 +68,7 @@ use crate::utils::next_pow2;
use crate::AflError; use crate::AflError;
#[cfg(feature = "std")] #[cfg(feature = "std")]
use super::shmem_translated::AflShmem; use super::shmem_translated::{AflShmem, ShMem};
/// We'll start off with 256 megabyte maps per fuzzer client /// We'll start off with 256 megabyte maps per fuzzer client
const LLMP_PREF_INITIAL_MAP_SIZE: usize = 1 << 28; const LLMP_PREF_INITIAL_MAP_SIZE: usize = 1 << 28;
@ -96,14 +97,17 @@ pub type Tag = u32;
/// Sending end on a (unidirectional) sharedmap channel /// Sending end on a (unidirectional) sharedmap channel
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct LlmpSender { pub struct LlmpSender<SH>
where
SH: ShMem,
{
/// ID of this sender. Only used in the broker. /// ID of this sender. Only used in the broker.
pub id: u32, pub id: u32,
/// Ref to the last message this sender sent on the last page. /// Ref to the last message this sender sent on the last page.
/// If null, a new page (just) started. /// If null, a new page (just) started.
pub last_msg_sent: *mut LlmpMsg, pub last_msg_sent: *mut LlmpMsg,
/// A vec of page wrappers, each containing an intialized AfShmem /// A vec of page wrappers, each containing an intialized AfShmem
pub out_maps: Vec<LlmpSharedMap>, pub out_maps: Vec<LlmpSharedMap<SH>>,
/// If true, pages will never be pruned. /// If true, pages will never be pruned.
/// The broker uses this feature. /// The broker uses this feature.
/// By keeping the message history around, /// By keeping the message history around,
@ -113,29 +117,38 @@ pub struct LlmpSender {
/// Receiving end on a (unidirectional) sharedmap channel /// Receiving end on a (unidirectional) sharedmap channel
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct LlmpReceiver { pub struct LlmpReceiver<SH>
where
SH: ShMem
{
pub id: u32, pub id: u32,
/// Pointer to the last meg this received /// Pointer to the last meg this received
pub last_msg_recvd: *mut LlmpMsg, pub last_msg_recvd: *mut LlmpMsg,
/// current page. After EOP, this gets replaced with the new one /// current page. After EOP, this gets replaced with the new one
pub current_recv_map: LlmpSharedMap, pub current_recv_map: LlmpSharedMap<SH>,
} }
/// Client side of LLMP /// Client side of LLMP
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct LlmpClient { pub struct LlmpClient<SH>
where
SH: ShMem,
{
/// Outgoing channel to the broker /// Outgoing channel to the broker
pub llmp_out: LlmpSender, pub llmp_out: LlmpSender<SH>,
/// Incoming (broker) broadcast map /// Incoming (broker) broadcast map
pub llmp_in: LlmpReceiver, pub llmp_in: LlmpReceiver<SH>,
} }
/// A page wrapper /// A page wrapper
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct LlmpSharedMap { pub struct LlmpSharedMap<SH>
where
SH: ShMem,
{
/// Shmem containg the actual (unsafe) page, /// Shmem containg the actual (unsafe) page,
/// shared between one LlmpSender and one LlmpReceiver /// shared between one LlmpSender and one LlmpReceiver
shmem: AflShmem, shmem: SH,
} }
/// Message sent over the "wire" /// Message sent over the "wire"
#[derive(Copy, Clone, Debug)] #[derive(Copy, Clone, Debug)]
@ -165,7 +178,7 @@ impl LlmpMsg {
/// Gets the buffer from this message as slice, with the corrent length. /// Gets the buffer from this message as slice, with the corrent length.
#[inline] #[inline]
pub fn as_slice(&self, map: &LlmpSharedMap) -> Result<&[u8], AflError> { pub fn as_slice<SH: ShMem>(&self, map: &LlmpSharedMap<SH>) -> Result<&[u8], AflError> {
unsafe { unsafe {
if self.in_map(map) { if self.in_map(map) {
Ok(self.as_slice_unsafe()) Ok(self.as_slice_unsafe())
@ -177,17 +190,18 @@ impl LlmpMsg {
/// Returns true, if the pointer is, indeed, in the page of this shared map. /// Returns true, if the pointer is, indeed, in the page of this shared map.
#[inline] #[inline]
pub fn in_map(&self, map: &LlmpSharedMap) -> bool { pub fn in_map<SH: ShMem>(&self, map: &LlmpSharedMap<SH>) -> bool {
unsafe { unsafe {
let map_size = map.shmem.map().len();
let buf_ptr = self.buf.as_ptr(); let buf_ptr = self.buf.as_ptr();
if buf_ptr > (map.page() as *const u8).offset(size_of::<LlmpPage>() as isize) if buf_ptr > (map.page() as *const u8).offset(size_of::<LlmpPage>() as isize)
&& buf_ptr && buf_ptr
<= (map.page() as *const u8) <= (map.page() as *const u8)
.offset((map.shmem.map_size - size_of::<LlmpMsg>() as usize) as isize) .offset((map_size - size_of::<LlmpMsg>() as usize) as isize)
{ {
// The message header is in the page. Continue with checking the body. // The message header is in the page. Continue with checking the body.
let len = self.buf_len_padded as usize + size_of::<LlmpMsg>(); let len = self.buf_len_padded as usize + size_of::<LlmpMsg>();
buf_ptr <= (map.page() as *const u8).offset((map.shmem.map_size - len) as isize) buf_ptr <= (map.page() as *const u8).offset((map_size - len) as isize)
} else { } else {
false false
} }
@ -195,18 +209,24 @@ impl LlmpMsg {
} }
} }
/// An Llmp instance /// An Llmp instance
pub enum LlmpConnection { pub enum LlmpConnection<SH>
where
SH: ShMem,
{
/// A broker and a thread using this tcp background thread /// A broker and a thread using this tcp background thread
IsBroker { IsBroker {
broker: LlmpBroker, broker: LlmpBroker<SH>,
listener_thread: thread::JoinHandle<()>, listener_thread: thread::JoinHandle<()>,
}, },
/// A client, connected to the port /// A client, connected to the port
IsClient { client: LlmpClient }, IsClient { client: LlmpClient<SH> },
} }
impl LlmpConnection { impl LlmpConnection<AflShmem> {
/// Creates either a broker, if the tcp port is not bound, or a client, connected to this port. /// Creates either a broker, if the tcp port is not bound, or a client, connected to this port.
pub fn on_port(port: u16) -> Result<Self, AflError> { pub fn on_port(port: u16) -> Result<Self, AflError> {
match TcpListener::bind(format!("127.0.0.1:{}", port)) { match TcpListener::bind(format!("127.0.0.1:{}", port)) {
@ -264,13 +284,16 @@ pub struct LlmpPage {
/// The broker (node 0) /// The broker (node 0)
#[derive(Clone)] #[derive(Clone)]
#[repr(C)] #[repr(C)]
pub struct LlmpBroker { pub struct LlmpBroker<SH>
where
SH: ShMem,
{
/// Broadcast map from broker to all clients /// Broadcast map from broker to all clients
pub llmp_out: LlmpSender, pub llmp_out: LlmpSender<SH>,
/// Users of Llmp can add message handlers in the broker. /// Users of Llmp can add message handlers in the broker.
/// This allows us to intercept messages right in the broker /// This allows us to intercept messages right in the broker
/// This keeps the out map clean. /// This keeps the out map clean.
pub llmp_clients: Vec<LlmpReceiver>, pub llmp_clients: Vec<LlmpReceiver<SH>>,
} }
/// Result of an LLMP Mesasge hook /// Result of an LLMP Mesasge hook
@ -293,8 +316,8 @@ struct LlmpPayloadSharedMapInfo {
/// Get sharedmem from a page /// Get sharedmem from a page
#[inline] #[inline]
unsafe fn shmem2page(afl_shmem: &AflShmem) -> *mut LlmpPage { unsafe fn shmem2page<SH: ShMem>(afl_shmem: &SH) -> *mut LlmpPage {
afl_shmem.map as *mut LlmpPage afl_shmem.map().as_mut_ptr() as *mut LlmpPage
} }
/// Return, if a msg is contained in the current page /// Return, if a msg is contained in the current page
@ -335,7 +358,7 @@ fn new_map_size(max_alloc: usize) -> usize {
/// Initialize a new llmp_page. size should be relative to /// Initialize a new llmp_page. size should be relative to
/// llmp_page->messages /// llmp_page->messages
unsafe fn _llmp_page_init(shmem: &mut AflShmem, sender: u32) { unsafe fn _llmp_page_init(shmem: &mut AflShmem, sender: u32) {
let page = shmem2page(&shmem); let page = shmem2page(shmem);
(*page).sender = sender; (*page).sender = sender;
ptr::write_volatile(&mut (*page).current_msg_id, 0); ptr::write_volatile(&mut (*page).current_msg_id, 0);
(*page).max_alloc_size = 0; (*page).max_alloc_size = 0;
@ -350,15 +373,16 @@ unsafe fn _llmp_page_init(shmem: &mut AflShmem, sender: u32) {
/// Get the next pointer and make sure it's in the current page, and has enough space. /// Get the next pointer and make sure it's in the current page, and has enough space.
#[inline] #[inline]
unsafe fn llmp_next_msg_ptr_checked( unsafe fn llmp_next_msg_ptr_checked<SH: ShMem> (
map: &LlmpSharedMap, map: &LlmpSharedMap<SH>,
last_msg: *const LlmpMsg, last_msg: *const LlmpMsg,
alloc_size: usize, alloc_size: usize,
) -> Result<*mut LlmpMsg, AflError> { ) -> Result<*mut LlmpMsg, AflError> {
let page = map.page(); let page = map.page();
let map_size = map.shmem.map().len();
let msg_begin_min = (page as *const u8).offset(size_of::<LlmpPage>() as isize); let msg_begin_min = (page as *const u8).offset(size_of::<LlmpPage>() as isize);
// We still need space for this msg (alloc_size). // We still need space for this msg (alloc_size).
let msg_begin_max = (page as *const u8).offset((map.shmem.map_size - alloc_size) as isize); let msg_begin_max = (page as *const u8).offset((map_size - alloc_size) as isize);
let next = _llmp_next_msg_ptr(last_msg); let next = _llmp_next_msg_ptr(last_msg);
let next_ptr = next as *const u8; let next_ptr = next as *const u8;
if next_ptr >= msg_begin_min && next_ptr <= msg_begin_max { if next_ptr >= msg_begin_min && next_ptr <= msg_begin_max {
@ -381,7 +405,7 @@ unsafe fn _llmp_next_msg_ptr(last_msg: *const LlmpMsg) -> *mut LlmpMsg {
} }
/// An actor on the sendin part of the shared map /// An actor on the sendin part of the shared map
impl LlmpSender { impl LlmpSender<AflShmem> {
/// For non zero-copy, we want to get rid of old pages with duplicate messages in the client /// For non zero-copy, we want to get rid of old pages with duplicate messages in the client
/// eventually. This function This funtion sees if we can unallocate older pages. /// eventually. This function This funtion sees if we can unallocate older pages.
/// The broker would have informed us by setting the save_to_unmap-flag. /// The broker would have informed us by setting the save_to_unmap-flag.
@ -550,7 +574,7 @@ impl LlmpSender {
let old_map = self.out_maps.last_mut().unwrap().page(); let old_map = self.out_maps.last_mut().unwrap().page();
// Create a new shard page. // Create a new shard page.
let new_map_shmem = LlmpSharedMap::new((*old_map).sender, (*old_map).max_alloc_size)?; let new_map_shmem = LlmpSharedMap::<AflShmem>::new((*old_map).sender, (*old_map).max_alloc_size)?;
let mut new_map = new_map_shmem.page(); let mut new_map = new_map_shmem.page();
ptr::write_volatile(&mut (*new_map).current_msg_id, (*old_map).current_msg_id); ptr::write_volatile(&mut (*new_map).current_msg_id, (*old_map).current_msg_id);
@ -561,8 +585,8 @@ impl LlmpSender {
(*out).sender = (*old_map).sender; (*out).sender = (*old_map).sender;
let mut end_of_page_msg = (*out).buf.as_mut_ptr() as *mut LlmpPayloadSharedMapInfo; let mut end_of_page_msg = (*out).buf.as_mut_ptr() as *mut LlmpPayloadSharedMapInfo;
(*end_of_page_msg).map_size = new_map_shmem.shmem.map_size; (*end_of_page_msg).map_size = new_map_shmem.shmem.map().len();
(*end_of_page_msg).shm_str = new_map_shmem.shmem.shm_str; (*end_of_page_msg).shm_str = *new_map_shmem.shmem.shm_str_buf();
// We never sent a msg on the new buf */ // We never sent a msg on the new buf */
self.last_msg_sent = 0 as *mut LlmpMsg; self.last_msg_sent = 0 as *mut LlmpMsg;
@ -631,7 +655,10 @@ impl LlmpSender {
} }
/// Receiving end of an llmp channel /// Receiving end of an llmp channel
impl LlmpReceiver { impl<SH> LlmpReceiver<SH>
where
SH: ShMem,
{
// Never inline, to not get some strange effects // Never inline, to not get some strange effects
/// Read next message. /// Read next message.
#[inline(never)] #[inline(never)]
@ -690,14 +717,14 @@ impl LlmpReceiver {
// Mark the old page save to unmap, in case we didn't so earlier. // Mark the old page save to unmap, in case we didn't so earlier.
ptr::write_volatile(&mut (*page).save_to_unmap, 1); ptr::write_volatile(&mut (*page).save_to_unmap, 1);
// Map the new page. The old one should be unmapped by Drop // Map the new page. The old one should be unmapped by Drop
self.current_recv_map = LlmpSharedMap::from_name_slice( self.current_recv_map = LlmpSharedMap::new_from_shm_str_buf(
&pageinfo_cpy.shm_str, &pageinfo_cpy.shm_str,
pageinfo_cpy.map_size, pageinfo_cpy.map_size,
)?; )?;
// Mark the new page save to unmap also (it's mapped by us, the broker now) // Mark the new page save to unmap also (it's mapped by us, the broker now)
ptr::write_volatile(&mut (*page).save_to_unmap, 1); ptr::write_volatile(&mut (*page).save_to_unmap, 1);
dbg!("Got a new recv map", self.current_recv_map.shmem.shm_str); dbg!("Got a new recv map", self.current_recv_map.shmem.shm_str());
// After we mapped the new page, return the next message, if available // After we mapped the new page, return the next message, if available
return self.recv(); return self.recv();
} }
@ -765,7 +792,7 @@ impl LlmpReceiver {
} }
/// The page struct, placed on a shared mem instance. /// The page struct, placed on a shared mem instance.
impl LlmpSharedMap { impl LlmpSharedMap<AflShmem> {
/// Creates a new page with minimum prev_max_alloc_size or LLMP_PREF_INITIAL_MAP_SIZE /// Creates a new page with minimum prev_max_alloc_size or LLMP_PREF_INITIAL_MAP_SIZE
/// returning the initialized shared mem struct /// returning the initialized shared mem struct
pub fn new(sender: u32, min_size: usize) -> Result<Self, AflError> { pub fn new(sender: u32, min_size: usize) -> Result<Self, AflError> {
@ -777,12 +804,41 @@ impl LlmpSharedMap {
Ok(Self { shmem }) Ok(Self { shmem })
} }
/// Initialize from a shm_str with fixed len of 20
pub fn from_name(shm_str: &str, map_size: usize) -> Result<Self, AflError> {
let slice: [u8; 20];
slice.copy_from_slice(shm_str.as_bytes());
Self::from_name_slice(&slice, map_size)
}
/// Initialize from a shm_str with fixed len of 20 /// Initialize from a shm_str with fixed len of 20
pub fn from_name_slice(shm_str: &[u8; 20], map_size: usize) -> Result<Self, AflError> { pub fn from_name_slice(shm_str: &[u8; 20], map_size: usize) -> Result<Self, AflError> {
let shmem = AflShmem::from_name_slice(shm_str, map_size)?; let shmem = AflShmem::from_name_slice(shm_str, map_size)?;
// Not initializing the page here - the other side should have done it already! // Not initializing the page here - the other side should have done it already!
Ok(Self { shmem }) Ok(Self { shmem })
} }
}
impl<SH> LlmpSharedMap<SH>
where
SH: ShMem,
{
/// Initialize from a shm_str with fixed len of 20
pub fn new_from_shm_str_buf(&self, shm_str: &[u8; 20], map_size: usize) -> Result<Self, AflError> {
let shmem = self.shmem.new_from_shm_str_buf(shm_str, map_size)?;
// Not initializing the page here - the other side should have done it already!
Ok(Self { shmem })
}
pub fn write_to_env(&self, env_name: String) -> Result<(), AflError> {
let map_size = self.shmem.map().len();
let map_env = self.shmem.shm_str() ;
let map_size_env = format!("{}_SIZE", map_env);
env::set_var(map_env, self.shmem.shm_str());
env::set_var(map_size_env, format!("{}", map_size));
Ok(())
}
/// Get the unsafe ptr to this page, situated on the shared map /// Get the unsafe ptr to this page, situated on the shared map
pub unsafe fn page(&self) -> *mut LlmpPage { pub unsafe fn page(&self) -> *mut LlmpPage {
@ -792,7 +848,7 @@ impl LlmpSharedMap {
/// The broker forwards all messages to its own bus-like broadcast map. /// The broker forwards all messages to its own bus-like broadcast map.
/// It may intercept messages passing through. /// It may intercept messages passing through.
impl LlmpBroker { impl LlmpBroker<AflShmem> {
/// Create and initialize a new llmp_broker /// Create and initialize a new llmp_broker
pub fn new() -> Result<Self, AflError> { pub fn new() -> Result<Self, AflError> {
let broker = LlmpBroker { let broker = LlmpBroker {
@ -817,7 +873,7 @@ impl LlmpBroker {
/// Registers a new client for the given sharedmap str and size. /// Registers a new client for the given sharedmap str and size.
/// Returns the id of the new client in broker.client_map /// Returns the id of the new client in broker.client_map
pub fn register_client(&mut self, client_page: LlmpSharedMap) { pub fn register_client(&mut self, client_page: LlmpSharedMap<SH>) {
let id = self.llmp_clients.len() as u32; let id = self.llmp_clients.len() as u32;
self.llmp_clients.push(LlmpReceiver { self.llmp_clients.push(LlmpReceiver {
id, id,
@ -845,6 +901,232 @@ impl LlmpBroker {
Ok(()) Ok(())
} }
/// The broker walks all pages and looks for changes, then broadcasts them on
/// its own shared page, once.
#[inline]
pub fn once<F>(&mut self, on_new_msg: &mut F) -> Result<(), AflError>
where
F: FnMut(u32, Tag, &[u8]) -> Result<LlmpMsgHookResult, AflError>,
{
compiler_fence(Ordering::SeqCst);
for i in 0..self.llmp_clients.len() {
unsafe {
self.handle_new_msgs(i as u32, on_new_msg)?;
}
}
Ok(())
}
/// Loops infinitely, forwarding and handling all incoming messages from clients.
/// Never returns. Panics on error.
/// 5 millis of sleep can't hurt to keep busywait not at 100%
pub fn loop_forever<F>(&mut self, on_new_msg: &mut F, sleep_time: Option<Duration>) -> !
where
F: FnMut(u32, Tag, &[u8]) -> Result<LlmpMsgHookResult, AflError>,
{
loop {
compiler_fence(Ordering::SeqCst);
self.once(on_new_msg)
.expect("An error occurred when brokering. Exiting.");
match sleep_time {
Some(time) => thread::sleep(time),
None => (),
}
}
}
/// Broadcasts the given buf to all lients
pub fn send_buf(&mut self, tag: Tag, buf: &[u8]) -> Result<(), AflError> {
self.llmp_out.send_buf(tag, buf)
}
}
impl LlmpBroker<AflShmem> {
/// Launches a thread using a tcp listener socket, on which new clients may connect to this broker
/// Does so on the given port.
pub fn launch_tcp_listener_on(
&mut self,
port: u16,
) -> Result<thread::JoinHandle<()>, AflError> {
let listener = TcpListener::bind(format!("127.0.0.1:{}", port))?;
// accept connections and process them, spawning a new thread for each one
println!("Server listening on port {}", port);
return self.launch_tcp_listener(listener);
}
/// Launches a thread using a tcp listener socket, on which new clients may connect to this broker
pub fn launch_tcp_listener(
&mut self,
listener: TcpListener,
) -> Result<thread::JoinHandle<()>, AflError> {
// Later in the execution, after the initial map filled up,
// the current broacast map will will point to a different map.
// However, the original map is (as of now) never freed, new clients will start
// to read from the initial map id.
let client_out_map_mem = &self.llmp_out.out_maps.first().unwrap().shmem;
let broadcast_str_initial = client_out_map_mem.shm_str_buf().clone();
let llmp_tcp_id = self.llmp_clients.len() as u32;
// Tcp out map sends messages from background thread tcp server to foreground client
let tcp_out_map = LlmpSharedMap::<AflShmem>::new(llmp_tcp_id, LLMP_PREF_INITIAL_MAP_SIZE)?;
let tcp_out_map_str = tcp_out_map.shmem.shm_str_buf();
let tcp_out_map_size = tcp_out_map.shmem.map().len();
self.register_client(tcp_out_map);
Ok(thread::spawn(move || {
let mut new_client_sender = LlmpSender {
id: 0,
last_msg_sent: 0 as *mut LlmpMsg,
out_maps: vec![
LlmpSharedMap::from_name_slice(&tcp_out_map_str, tcp_out_map_size).unwrap(),
],
// drop pages to the broker if it already read them
keep_pages_forever: false,
};
loop {
let (mut stream, addr) = match listener.accept() {
Ok(res) => res,
Err(e) => {
dbg!("Ignoring failed accept", e);
continue;
}
};
dbg!("New connection", addr, stream.peer_addr().unwrap());
match stream.write(&broadcast_str_initial) {
Ok(_) => {} // fire & forget
Err(e) => {
dbg!("Could not send to shmap to client", e);
continue;
}
};
let mut new_client_map_str: [u8; 20] = Default::default();
match stream.read_exact(&mut new_client_map_str) {
Ok(()) => (),
Err(e) => {
dbg!("Ignoring failed read from client", e);
continue;
}
};
unsafe {
let msg = new_client_sender
.alloc_next(size_of::<LlmpPayloadSharedMapInfo>())
.expect("Could not allocate a new message in shared map.");
(*msg).tag = LLMP_TAG_NEW_SHM_CLIENT;
let pageinfo = (*msg).buf.as_mut_ptr() as *mut LlmpPayloadSharedMapInfo;
(*pageinfo).shm_str = new_client_map_str;
(*pageinfo).map_size = LLMP_PREF_INITIAL_MAP_SIZE;
match new_client_sender.send(msg) {
Ok(()) => (),
Err(e) => println!("Error forwarding client on map: {:?}", e),
};
}
}
}))
}
fn map_new_page(shm_str_buf: &[u8; 20], map_size: usize) -> Result<LlmpSharedMap<AflShmem>, AflError> {
LlmpSharedMap::from_name_slice(shm_str_buf, map_size)
}
}
/// `n` clients connect to a broker. They share an outgoing map with the broker,
/// and get incoming messages from the shared broker bus
impl LlmpClient<AflShmem> {
fn map_new_page(shm_str_buf: &[u8; 20], map_size: usize) -> Result<LlmpSharedMap<SH>, AflError> {
panic!("End of page not handled!");
}
/// Creates a new LlmpClient
pub fn new(initial_broker_map: LlmpSharedMap<SH>) -> Result<Self, AflError> {
Ok(Self {
llmp_out: LlmpSender {
id: 0,
last_msg_sent: 0 as *mut LlmpMsg,
out_maps: vec![LlmpSharedMap::new(0, LLMP_PREF_INITIAL_MAP_SIZE)?],
// drop pages to the broker if it already read them
keep_pages_forever: false,
},
llmp_in: LlmpReceiver {
id: 0,
current_recv_map: initial_broker_map,
last_msg_recvd: 0 as *mut LlmpMsg,
},
})
}
/// Commits a msg to the client's out map
pub unsafe fn send(&mut self, msg: *mut LlmpMsg) -> Result<(), AflError> {
self.llmp_out.send(msg)
}
/// Allocates a message of the given size, tags it, and sends it off.
pub fn send_buf(&mut self, tag: Tag, buf: &[u8]) -> Result<(), AflError> {
self.llmp_out.send_buf(tag, buf)
}
/// Informs the broker about a new client in town, with the given map id
pub fn send_client_added_msg(
&mut self,
shm_str: &[u8; 20],
shm_id: usize,
) -> Result<(), AflError> {
// We write this by hand to get around checks in send_buf
unsafe {
let msg = self
.alloc_next(size_of::<LlmpPayloadSharedMapInfo>())
.expect("Could not allocate a new message in shared map.");
(*msg).tag = LLMP_TAG_NEW_SHM_CLIENT;
let pageinfo = (*msg).buf.as_mut_ptr() as *mut LlmpPayloadSharedMapInfo;
(*pageinfo).shm_str = *shm_str;
(*pageinfo).map_size = shm_id;
self.send(msg)
}
}
/// A client receives a broadcast message.
/// Returns null if no message is availiable
#[inline]
pub unsafe fn recv(&mut self) -> Result<Option<*mut LlmpMsg>, AflError> {
self.llmp_in.recv()
}
/// A client blocks/spins until the next message gets posted to the page,
/// then returns that message.
#[inline]
pub unsafe fn recv_blocking(&mut self) -> Result<*mut LlmpMsg, AflError> {
self.llmp_in.recv_blocking()
}
/// The current page could have changed in recv (EOP)
/// Alloc the next message, internally handling end of page by allocating a new one.
#[inline]
pub unsafe fn alloc_next(&mut self, buf_len: usize) -> Result<*mut LlmpMsg, AflError> {
self.llmp_out.alloc_next(buf_len)
}
/// Returns the next message, tag, buf, if avaliable, else None
#[inline]
pub fn recv_buf(&mut self) -> Result<Option<(u32, u32, &[u8])>, AflError> {
self.llmp_in.recv_buf()
}
/// Receives a buf from the broker, looping until a messages becomes avaliable
#[inline]
pub fn recv_buf_blocking(&mut self) -> Result<(u32, u32, &[u8]), AflError> {
self.llmp_in.recv_buf_blocking()
}
/// broker broadcast to its own page for all others to read */ /// broker broadcast to its own page for all others to read */
#[inline] #[inline]
unsafe fn handle_new_msgs<F>( unsafe fn handle_new_msgs<F>(
@ -912,153 +1194,20 @@ impl LlmpBroker {
} }
} }
/// The broker walks all pages and looks for changes, then broadcasts them on
/// its own shared page, once.
#[inline]
pub fn once<F>(&mut self, on_new_msg: &mut F) -> Result<(), AflError>
where
F: FnMut(u32, Tag, &[u8]) -> Result<LlmpMsgHookResult, AflError>,
{
compiler_fence(Ordering::SeqCst);
for i in 0..self.llmp_clients.len() {
unsafe {
self.handle_new_msgs(i as u32, on_new_msg)?;
}
}
Ok(())
} }
/// Loops infinitely, forwarding and handling all incoming messages from clients. impl LlmpClient<AflShmem> {
/// Never returns. Panics on error.
/// 5 millis of sleep can't hurt to keep busywait not at 100% /// Creates a new LlmpClient, reading the map id and len from env
pub fn loop_forever<F>(&mut self, on_new_msg: &mut F, sleep_time: Option<Duration>) -> ! pub fn create_using_env(env_var: &str) -> Result<Self, AflError> {
where
F: FnMut(u32, Tag, &[u8]) -> Result<LlmpMsgHookResult, AflError>, let map_str = env::var(env_var)?;
{ let map_size = str::parse::<usize>(&env::var(format!("{}_SIZE", env_var))?)?;
loop { Ok(Self::new(LlmpSharedMap::from_name(&map_str, map_size)?)?)
compiler_fence(Ordering::SeqCst);
self.once(on_new_msg)
.expect("An error occurred when brokering. Exiting.");
match sleep_time {
Some(time) => thread::sleep(time),
None => (),
}
}
}
/// Launches a thread using a tcp listener socket, on which new clients may connect to this broker
/// Does so on the given port.
pub fn launch_tcp_listener_on(
&mut self,
port: u16,
) -> Result<thread::JoinHandle<()>, AflError> {
let listener = TcpListener::bind(format!("127.0.0.1:{}", port))?;
// accept connections and process them, spawning a new thread for each one
println!("Server listening on port {}", port);
return self.launch_tcp_listener(listener);
}
/// Launches a thread using a tcp listener socket, on which new clients may connect to this broker
pub fn launch_tcp_listener(
&mut self,
listener: TcpListener,
) -> Result<thread::JoinHandle<()>, AflError> {
// Later in the execution, after the initial map filled up,
// the current broacast map will will point to a different map.
// However, the original map is (as of now) never freed, new clients will start
// to read from the initial map id.
let client_out_map_mem = &self.llmp_out.out_maps.first().unwrap().shmem;
let broadcast_str_initial = client_out_map_mem.shm_str.clone();
let llmp_tcp_id = self.llmp_clients.len() as u32;
// Tcp out map sends messages from background thread tcp server to foreground client
let tcp_out_map = LlmpSharedMap::new(llmp_tcp_id, LLMP_PREF_INITIAL_MAP_SIZE)?;
let tcp_out_map_str = tcp_out_map.shmem.shm_str;
let tcp_out_map_size = tcp_out_map.shmem.map_size;
self.register_client(tcp_out_map);
Ok(thread::spawn(move || {
let mut new_client_sender = LlmpSender {
id: 0,
last_msg_sent: 0 as *mut LlmpMsg,
out_maps: vec![
LlmpSharedMap::from_name_slice(&tcp_out_map_str, tcp_out_map_size).unwrap(),
],
// drop pages to the broker if it already read them
keep_pages_forever: false,
};
loop {
let (mut stream, addr) = match listener.accept() {
Ok(res) => res,
Err(e) => {
dbg!("Ignoring failed accept", e);
continue;
}
};
dbg!("New connection", addr, stream.peer_addr().unwrap());
match stream.write(&broadcast_str_initial) {
Ok(_) => {} // fire & forget
Err(e) => {
dbg!("Could not send to shmap to client", e);
continue;
}
};
let mut new_client_map_str: [u8; 20] = Default::default();
match stream.read_exact(&mut new_client_map_str) {
Ok(()) => (),
Err(e) => {
dbg!("Ignoring failed read from client", e);
continue;
}
};
unsafe {
let msg = new_client_sender
.alloc_next(size_of::<LlmpPayloadSharedMapInfo>())
.expect("Could not allocate a new message in shared map.");
(*msg).tag = LLMP_TAG_NEW_SHM_CLIENT;
let pageinfo = (*msg).buf.as_mut_ptr() as *mut LlmpPayloadSharedMapInfo;
(*pageinfo).shm_str = new_client_map_str;
(*pageinfo).map_size = LLMP_PREF_INITIAL_MAP_SIZE;
match new_client_sender.send(msg) {
Ok(()) => (),
Err(e) => println!("Error forwarding client on map: {:?}", e),
};
}
}
}))
}
/// Broadcasts the given buf to all lients
pub fn send_buf(&mut self, tag: Tag, buf: &[u8]) -> Result<(), AflError> {
self.llmp_out.send_buf(tag, buf)
}
}
/// `n` clients connect to a broker. They share an outgoing map with the broker,
/// and get incoming messages from the shared broker bus
impl LlmpClient {
/// Creates a new LlmpClient
pub fn new(initial_broker_map: LlmpSharedMap) -> Result<Self, AflError> {
Ok(Self {
llmp_out: LlmpSender {
id: 0,
last_msg_sent: 0 as *mut LlmpMsg,
out_maps: vec![LlmpSharedMap::new(0, LLMP_PREF_INITIAL_MAP_SIZE)?],
// drop pages to the broker if it already read them
keep_pages_forever: false,
},
llmp_in: LlmpReceiver {
id: 0,
current_recv_map: initial_broker_map,
last_msg_recvd: 0 as *mut LlmpMsg,
},
})
} }
/// Create a LlmpClient, getting the ID from a given port
pub fn create_attach_to_tcp(port: u16) -> Result<Self, AflError> { pub fn create_attach_to_tcp(port: u16) -> Result<Self, AflError> {
let mut stream = TcpStream::connect(format!("127.0.0.1:{}", port))?; let mut stream = TcpStream::connect(format!("127.0.0.1:{}", port))?;
println!("Connected to port {}", port); println!("Connected to port {}", port);
@ -1071,71 +1220,10 @@ impl LlmpClient {
LLMP_PREF_INITIAL_MAP_SIZE, LLMP_PREF_INITIAL_MAP_SIZE,
)?)?; )?)?;
stream.write(&ret.llmp_out.out_maps.first().unwrap().shmem.shm_str)?; stream.write(ret.llmp_out.out_maps.first().unwrap().shmem.shm_str_buf())?;
Ok(ret) Ok(ret)
} }
/// Commits a msg to the client's out map
pub unsafe fn send(&mut self, msg: *mut LlmpMsg) -> Result<(), AflError> {
self.llmp_out.send(msg)
}
/// Allocates a message of the given size, tags it, and sends it off.
pub fn send_buf(&mut self, tag: Tag, buf: &[u8]) -> Result<(), AflError> {
self.llmp_out.send_buf(tag, buf)
}
/// Informs the broker about a new client in town, with the given map id
pub fn send_client_added_msg(
&mut self,
shm_str: &[u8; 20],
shm_id: usize,
) -> Result<(), AflError> {
// We write this by hand to get around checks in send_buf
unsafe {
let msg = self
.alloc_next(size_of::<LlmpPayloadSharedMapInfo>())
.expect("Could not allocate a new message in shared map.");
(*msg).tag = LLMP_TAG_NEW_SHM_CLIENT;
let pageinfo = (*msg).buf.as_mut_ptr() as *mut LlmpPayloadSharedMapInfo;
(*pageinfo).shm_str = *shm_str;
(*pageinfo).map_size = shm_id;
self.send(msg)
}
}
/// A client receives a broadcast message.
/// Returns null if no message is availiable
#[inline]
pub unsafe fn recv(&mut self) -> Result<Option<*mut LlmpMsg>, AflError> {
self.llmp_in.recv()
}
/// A client blocks/spins until the next message gets posted to the page,
/// then returns that message.
#[inline]
pub unsafe fn recv_blocking(&mut self) -> Result<*mut LlmpMsg, AflError> {
self.llmp_in.recv_blocking()
}
/// The current page could have changed in recv (EOP)
/// Alloc the next message, internally handling end of page by allocating a new one.
#[inline]
pub unsafe fn alloc_next(&mut self, buf_len: usize) -> Result<*mut LlmpMsg, AflError> {
self.llmp_out.alloc_next(buf_len)
}
/// Returns the next message, tag, buf, if avaliable, else None
#[inline]
pub fn recv_buf(&mut self) -> Result<Option<(u32, u32, &[u8])>, AflError> {
self.llmp_in.recv_buf()
}
/// Receives a buf from the broker, looping until a messages becomes avaliable
#[inline]
pub fn recv_buf_blocking(&mut self) -> Result<(u32, u32, &[u8]), AflError> {
self.llmp_in.recv_buf_blocking()
}
} }
#[cfg(test)] #[cfg(test)]
@ -1189,4 +1277,5 @@ mod tests {
// We want at least the tcp and sender clients. // We want at least the tcp and sender clients.
assert_eq!(broker.llmp_clients.len(), 2); assert_eq!(broker.llmp_clients.len(), 2);
} }
} }

View File

@ -6,6 +6,7 @@ pub mod shmem_translated;
use alloc::string::{String, ToString}; use alloc::string::{String, ToString};
use alloc::vec::Vec; use alloc::vec::Vec;
use shmem_translated::AflShmem;
use core::time::Duration; use core::time::Duration;
use core::{marker::PhantomData, time}; use core::{marker::PhantomData, time};
#[cfg(feature = "std")] #[cfg(feature = "std")]
@ -720,7 +721,7 @@ where
ST: Stats, ST: Stats,
//CE: CustomEvent<I>, //CE: CustomEvent<I>,
{ {
llmp: llmp::LlmpConnection, llmp: llmp::LlmpConnection<AflShmem>,
stats: ST, stats: ST,
phantom: PhantomData<(C, E, OT, FT, I, R)>, phantom: PhantomData<(C, E, OT, FT, I, R)>,
} }

View File

@ -1,4 +1,5 @@
use libc::{c_char, c_int, c_long, c_uchar, c_uint, c_ulong, c_ushort, c_void}; use libc::{c_char, c_int, c_long, c_uchar, c_uint, c_ulong, c_ushort, c_void};
use core::slice;
use std::{ffi::CStr, mem::size_of}; use std::{ffi::CStr, mem::size_of};
use crate::AflError; use crate::AflError;
@ -50,15 +51,52 @@ const AFL_RET_SUCCESS: c_uint = 0;
// A generic sharememory region to be used by any functions (queues or feedbacks // A generic sharememory region to be used by any functions (queues or feedbacks
// too.) // too.)
/// A Shared map
pub trait ShMem {
/// The string to identify this shm
fn shm_str(&self) -> String;
/// Let's just fix this to a large enough buf
fn shm_str_buf(&self) -> &[u8; 20];
/// The actual shared map, in memory
fn map(&self) -> &[u8];
/// The actual shared map, mutable
fn map_mut(&mut self) -> &mut [u8];
}
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct AflShmem { pub struct AflShmem {
pub shm_str: [u8; 20], pub shm_str: [u8; 20],
pub shm_id: c_int, pub shm_id: c_int,
pub map: *mut c_uchar, pub map: *mut u8,
pub map_size: usize, pub map_size: usize,
} }
/// Deinit on drop impl ShMem for AflShmem {
fn shm_str(&self) -> String {
unsafe { CStr::from_ptr(self.shm_str.as_ptr() as *const i8) }.to_string_lossy().into()
}
fn shm_str_buf(&self) -> &[u8; 20] {
&self.shm_str
}
fn map(&self) -> &[u8] {
unsafe { slice::from_raw_parts(self.map, self.map_size) }
}
fn map_mut(&mut self) -> &mut [u8] {
unsafe { slice::from_raw_parts_mut(self.map, self.map_size) }
}
}
/// Deinit sharedmaps on drop
impl Drop for AflShmem { impl Drop for AflShmem {
fn drop(&mut self) { fn drop(&mut self) {
unsafe { unsafe {

View File

@ -28,7 +28,7 @@ pub mod utils;
use alloc::string::String; use alloc::string::String;
use core::fmt; use core::fmt;
#[cfg(feature = "std")] #[cfg(feature = "std")]
use std::io; use std::{env::VarError, io, num::ParseIntError, string::FromUtf8Error};
/// Main error struct for AFL /// Main error struct for AFL
#[derive(Debug)] #[derive(Debug)]
@ -88,6 +88,27 @@ impl From<io::Error> for AflError {
} }
} }
#[cfg(feature = "std")]
impl From<FromUtf8Error> for AflError {
fn from(err: FromUtf8Error) -> Self {
Self::Unknown(format!("Could not convert byte to utf-8: {:?}", err))
}
}
#[cfg(feature = "std")]
impl From<VarError> for AflError {
fn from(err: VarError) -> Self {
Self::Empty(format!("Could not get env var: {:?}", err))
}
}
impl From<ParseIntError> for AflError {
fn from(err: ParseIntError) -> Self {
Self::Unknown(format!("Failed to parse Int: {:?}", err))
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
#[test] #[test]

View File

@ -45,11 +45,18 @@ pub extern "C" fn afl_libfuzzer_main() {
let mut generator = RandPrintablesGenerator::new(32); let mut generator = RandPrintablesGenerator::new(32);
let stats = SimpleStats::new(|s| println!("{}", s)); let stats = SimpleStats::new(|s| println!("{}", s));
///
match LlmpFuzzInstance::from_env("FUZZER_ENV") {
}
let mut mgr = LlmpEventManager::new_on_port(1337, stats).unwrap(); let mut mgr = LlmpEventManager::new_on_port(1337, stats).unwrap();
if mgr.is_broker() { if mgr.is_broker() {
println!("Doing broker things."); println!("Doing broker things. Run this tool again to start fuzzing in a client.");
mgr.broker_loop().unwrap(); mgr.broker_loop().unwrap();
} }
println!("We're a client, let's fuzz :)"); println!("We're a client, let's fuzz :)");
let edges_observer = let edges_observer =