diff --git a/afl/Cargo.toml b/afl/Cargo.toml index 4eb56e00d5..fcac574c2a 100644 --- a/afl/Cargo.toml +++ b/afl/Cargo.toml @@ -33,5 +33,4 @@ num = "*" xxhash-rust = { version = "0.8.0", features = ["xxh3"] } # xxh3 hashing for rust serde = { version = "1.0", default-features = false, features = ["alloc"] } # serialization lib erased-serde = "0.3.12" -postcard = "0.5.1" # no_std compatible serde serialization fromat -memoffset = "0.6" # for offset_of support \ No newline at end of file +postcard = "0.5.1" # no_std compatible serde serialization fromat \ No newline at end of file diff --git a/afl/src/events/llmp.rs b/afl/src/events/llmp.rs index f6adf68eab..fd99b9d5ae 100644 --- a/afl/src/events/llmp.rs +++ b/afl/src/events/llmp.rs @@ -1,5 +1,4 @@ use core::marker::PhantomData; -use core::ptr; use std::{ffi::c_void, io::Read, io::Write, net::TcpListener}; use crate::{ @@ -7,11 +6,12 @@ use crate::{ }; use super::{ - llmp_translated::{LlmpBroker, LlmpClient, LlmpClientloopFn, LlmpMsgHookFn}, + llmp_translated::{LlmpBroker, LlmpClient, LlmpMsgHookFn}, Event, EventManager, }; -pub unsafe fn llmp_tcp_server_clientloop(client: *mut LlmpClient, _data: *mut c_void) -> ! { +/* +pub unsafe fn llmp_tcp_server_clientloop(client: &mut LlmpClient, _data: *mut c_void) -> ! { // Later in the execution, after the initial map filled up, // the current broacast map will will point to a different map. // However, the original map is (as of now) never freed, new clients will start @@ -57,6 +57,7 @@ pub unsafe fn llmp_tcp_server_clientloop(client: *mut LlmpClient, _data: *mut c_ } } } +*/ /// Eventmanager for multi-processed application #[cfg(feature = "std")] @@ -126,6 +127,7 @@ where } } +/* #[cfg(feature = "std")] impl LLMPEventManager where @@ -141,20 +143,5 @@ where broker_message_hook: LlmpMsgHookFn, clientloops: LlmpClientloopFn, ) -> ! { - unsafe { - let mut broker = LlmpBroker::new().expect("Failed to create llmp"); - - for i in 0..process_count - 1 { - println!("Adding client {}", i); - broker - .register_childprocess_clientloop(clientloops, ptr::null_mut()) - .expect("could not add child clientloop"); - } - - println!("Spawning broker"); - broker.add_message_hook(broker_message_hook, ptr::null_mut()); - - broker.run(); - } } -} +}*/ diff --git a/afl/src/events/llmp_translated.rs b/afl/src/events/llmp_translated.rs index df0b6a96bb..0878dca724 100644 --- a/afl/src/events/llmp_translated.rs +++ b/afl/src/events/llmp_translated.rs @@ -48,21 +48,19 @@ Then register some clientloops using llmp_broker_register_threaded_clientloop */ -use ::libc; - use core::ptr; use core::sync::atomic::{compiler_fence, Ordering}; -use core::{ffi::c_void, time}; -use libc::{c_int, c_uint, c_ulong, c_ushort}; -use std::{cmp::max, ffi::CStr, mem::size_of, os::raw::c_char, thread}; +use core::time; +use libc::{c_uint, c_ulong, c_ushort}; +use std::{cmp::max, ffi::CStr, mem::size_of, thread}; use crate::utils::next_pow2; use crate::AflError; -use super::shmem_translated::{afl_shmem_by_str, afl_shmem_deinit, afl_shmem_init, AflShmem}; +use super::shmem_translated::AflShmem; /// The header length of a llmp page in a shared map (until messages start) -const LLMP_PAGE_HEADER_LEN: usize = offset_of!(LlmpPage, messages); +const LLMP_PAGE_HEADER_LEN: usize = size_of::(); /// We'll start off with 256 megabyte maps per fuzzer const LLMP_INITIAL_MAP_SIZE: usize = 1 << 28; @@ -77,38 +75,49 @@ const LLMP_TAG_END_OF_PAGE: u32 = 0xaf1e0f1; /// A new client for this broekr got added. const LLMP_TAG_NEW_SHM_CLIENT: u32 = 0xc11e471; -extern "C" { - #[no_mangle] - fn memcpy(_: *mut c_void, _: *const c_void, _: c_ulong) -> *mut c_void; - #[no_mangle] - fn memmove(_: *mut c_void, _: *const c_void, _: c_ulong) -> *mut c_void; - #[no_mangle] - fn memset(_: *mut c_void, _: c_int, _: c_ulong) -> *mut c_void; -} - pub type AflRet = c_uint; pub const AFL_RET_ALLOC: AflRet = 3; pub const AFL_RET_SUCCESS: AflRet = 0; +/// Sending end on a (unidirectional) sharedmap channel #[derive(Clone)] pub struct LlmpSender { + /// ID of this sender. Only used in the broker. pub id: u32, + /// Ref to the last message this sender sent on the last page. + /// If null, a new page (just) started. pub last_msg_sent: *mut LlmpMsg, + /// A vec of page wrappers, each containing an intialized AfShmem pub out_maps: Vec, + /// If true, pages will never be pruned. + /// The broker uses this feature. + /// By keeping the message history around, + /// new clients may join at any time in the future. + pub keep_pages_forever: bool, } +/// Receiving end on a (unidirectional) sharedmap channel +#[derive(Clone)] +pub struct LlmpReceiver { + pub id: u32, + /// Pointer to the last meg this received + pub last_msg_recvd: *mut LlmpMsg, + /// current page. After EOP, this gets replaced with the new one + pub current_recv_map: LlmpPageWrapper, +} + +/// Client side of LLMP #[derive(Clone)] pub struct LlmpClient { pub llmp_out: LlmpSender, - pub last_msg_recvd: *mut LlmpMsg, - pub current_broadcast_map: LlmpPageWrapper, - pub last_msg_sent: *mut LlmpMsg, - pub out_maps: Vec, - pub new_out_page_hooks: Vec>, + pub llmp_in: LlmpReceiver, } +/// A page wrapper #[derive(Clone)] -struct LlmpPageWrapper { +pub struct LlmpPageWrapper { + /// Shmem containg the actual (unsafe) page, + /// shared between one LlmpSender and one LlmpReceiver shmem: AflShmem, } @@ -116,7 +125,7 @@ struct LlmpPageWrapper { impl LlmpPageWrapper { /// Creates a new page with minimum prev_max_alloc_size or LLMP_INITIAL_MAP_SIZE /// returning the initialized shared mem struct - unsafe fn new(sender: u32, min_size: usize) -> Result { + pub unsafe fn new(sender: u32, min_size: usize) -> Result { // Create a new shard page. let mut shmem = AflShmem::new(new_map_size(min_size))?; _llmp_page_init(&mut shmem, sender); @@ -124,64 +133,57 @@ impl LlmpPageWrapper { } /// Initialize from a 0-terminated sharedmap id string and its size - unsafe fn from_str(shm_str: &CStr, map_size: usize) -> Result { + pub unsafe fn from_str(shm_str: &CStr, map_size: usize) -> Result { let shmem = AflShmem::from_str(shm_str, map_size)?; // Not initializing the page here - the other side should have done it already! Ok(Self { shmem }) } /// Initialize from a shm_str with fixed len of 20 - unsafe fn from_name_slice(shm_str: &[u8; 20], map_size: usize) -> Result { + pub unsafe fn from_name_slice(shm_str: &[u8; 20], map_size: usize) -> Result { let shmem = AflShmem::from_name_slice(shm_str, map_size)?; // Not initializing the page here - the other side should have done it already! Ok(Self { shmem }) } - unsafe fn page(&self) -> *mut LlmpPage { + /// Get the unsafe ptr to this page, situated on the shared map + pub unsafe fn page(&self) -> *mut LlmpPage { shmem2page(&self.shmem) } } -#[derive(Copy, Clone)] -#[repr(C)] -pub struct LlmpHookdataGeneric { - pub func: T, - pub data: *mut c_void, -} - +/// Message sent over the "wire" #[derive(Copy, Clone)] #[repr(C, packed)] pub struct LlmpMsg { + /// A tag pub tag: u32, + /// Sender of this messge pub sender: u32, + /// The message ID, unique per page pub message_id: u64, + /// Buffer length as specified by the user pub buf_len: u64, + /// (Actual) buffer length after padding pub buf_len_padded: u64, + /// The buf pub buf: [u8; 0], } +/// The broker (node 0) #[derive(Clone)] #[repr(C)] pub struct LlmpBroker { + /// Broadcast map from broker to all clients pub llmp_out: LlmpSender, - pub msg_hooks: Vec>, - pub llmp_clients: Vec, + /// Users of Llmp can add message handlers in the broker. + /// This allows us to intercept messages right in the broker + /// This keeps the out map clean. + pub msg_hooks: Vec, + pub llmp_clients: Vec, } -#[derive(Clone)] -#[repr(C)] -pub struct LlmpBrokerClientMetadata { - pub id: u32, - pub cur_client_map: LlmpPageWrapper, - pub last_msg_broker_read: *mut LlmpMsg, - pub clientloop: Option, - pub data: *mut c_void, -} - -/// The client loop, running for each spawned client -pub type LlmpClientloopFn = unsafe fn(client: *mut LlmpClient, data: *mut c_void) -> !; - -/// A share mem page, as used by llmp internally +/// Contents of the share mem pages, used by llmp internally #[derive(Copy, Clone)] #[repr(C, packed)] pub struct LlmpPage { @@ -204,15 +206,7 @@ pub enum LlmpMsgHookResult { } /// Message Hook -pub type LlmpMsgHookFn = unsafe fn( - broker: &LlmpBroker, - client_data: &LlmpBrokerClientMetadata, - msg: *mut LlmpMsg, - data: *mut c_void, -) -> LlmpMsgHookResult; - -/// Hook that gets called for each new page, created by LLMP -pub type LlmpClientNewPageHookFn = unsafe fn(client: &LlmpClient) -> (); +pub type LlmpMsgHookFn = unsafe fn(client_id: u32, msg: *mut LlmpMsg) -> LlmpMsgHookResult; /// Message payload when a client got added LLMP_TAG_CLIENT_ADDED_V1 */ /// This is an internal message! @@ -263,7 +257,7 @@ const fn llmp_align(to_align: usize) -> usize { /// enough. For now, we want to have at least enough space to store 2 of the /// largest messages we encountered (plus message one new_page message). #[inline] -const fn new_map_size(max_alloc: usize) -> usize { +fn new_map_size(max_alloc: usize) -> usize { next_pow2(max( max_alloc * 2 + EOP_MSG_SIZE + LLMP_PAGE_HEADER_LEN, LLMP_INITIAL_MAP_SIZE, @@ -288,7 +282,7 @@ unsafe fn _llmp_page_init(shmem: &mut AflShmem, sender: u32) { /* Pointer to the message behind the last message */ #[inline] -const unsafe fn _llmp_next_msg_ptr(last_msg: *const LlmpMsg) -> *mut LlmpMsg { +unsafe fn _llmp_next_msg_ptr(last_msg: *const LlmpMsg) -> *mut LlmpMsg { /* DBG("_llmp_next_msg_ptr %p %lu + %lu\n", last_msg, last_msg->buf_len_padded, sizeof(llmp_message)); */ return (last_msg as *mut u8) .offset(size_of::() as isize) @@ -296,15 +290,15 @@ const unsafe fn _llmp_next_msg_ptr(last_msg: *const LlmpMsg) -> *mut LlmpMsg { } /* Read next message. */ -unsafe fn llmp_recv( - page_wrapper: &LlmpPageWrapper, - last_msg: *mut LlmpMsg, -) -> Option<*mut LlmpMsg> { +unsafe fn llmp_recv(receiver: &mut LlmpReceiver) -> Result, AflError> { /* DBG("llmp_recv %p %p\n", page, last_msg); */ compiler_fence(Ordering::SeqCst); - let page = page_wrapper.page(); + let page = receiver.current_recv_map.page(); + let last_msg = receiver.last_msg_recvd; let current_msg_id = ptr::read_volatile(&mut (*page).current_msg_id); - if current_msg_id == 0 { + + // Read the message from the page + let ret = if current_msg_id == 0 { /* No messages yet */ None } else if last_msg.is_null() { @@ -315,17 +309,60 @@ unsafe fn llmp_recv( None } else { Some(_llmp_next_msg_ptr(last_msg)) - } + }; + + // Let's see what we go here. + match ret { + Some(msg) => { + // Handle special, LLMP internal, messages. + match (*msg).tag { + LLMP_TAG_UNSET => panic!("BUG: Read unallocated msg"), + LLMP_TAG_END_OF_PAGE => { + dbg!("Got end of page, allocing next"); + // Handle end of page + if (*msg).buf_len < size_of::() as u64 { + panic!(format!( + "Illegal message length for EOP (is {}, expected {})", + (*msg).buf_len_padded, + size_of::() + )); + } + let pageinfo = (*msg).buf.as_mut_ptr() as *mut LlmpPayloadSharedMap; + + /* We can reuse the map mem space, no need to free and calloc. + However, the pageinfo points to the map we're about to unmap. + Clone the contents first to be safe (probably fine in rust eitner way). */ + let pageinfo_cpy = (*pageinfo).clone(); + + ptr::write_volatile(&mut (*page).save_to_unmap, 1); + receiver.current_recv_map = LlmpPageWrapper::from_name_slice( + &pageinfo_cpy.shm_str, + pageinfo_cpy.map_size, + )?; + dbg!( + "Got a new recv map", + receiver.current_recv_map.shmem.shm_str + ); + // After we mapped the new page, return the next message, if available + return llmp_recv(receiver); + } + _ => (), + } + + // Store the last msg for next time + receiver.last_msg_recvd = msg; + } + _ => (), + }; + Ok(ret) } /* Blocks/spins until the next message gets posted to the page, then returns that message. */ -pub unsafe fn llmp_recv_blocking( - page_wrapper: &LlmpPageWrapper, - last_msg: *mut LlmpMsg, -) -> *mut LlmpMsg { +pub unsafe fn llmp_recv_blocking(receiver: &mut LlmpReceiver) -> Result<*mut LlmpMsg, AflError> { let mut current_msg_id = 0; - let page = page_wrapper.page(); + let page = receiver.current_recv_map.page(); + let last_msg = receiver.last_msg_recvd; if !last_msg.is_null() { if (*last_msg).tag == LLMP_TAG_END_OF_PAGE && !llmp_msg_in_page(page, last_msg) { panic!("BUG: full page passed to await_message_blocking or reset failed"); @@ -335,8 +372,8 @@ pub unsafe fn llmp_recv_blocking( loop { compiler_fence(Ordering::SeqCst); if ptr::read_volatile(&mut (*page).current_msg_id) != current_msg_id { - return match llmp_recv(page_wrapper, last_msg) { - Some(msg) => msg, + return match llmp_recv(receiver)? { + Some(msg) => Ok(msg), None => panic!("BUG: blocking llmp message should never be NULL"), }; } @@ -375,7 +412,7 @@ unsafe fn llmp_alloc_eop(page: *mut LlmpPage, last_msg: *const LlmpMsg) -> *mut /// Will return a ptr to the next msg buf, or None if map is full. /// Never call alloc_next without either sending or cancelling the last allocated message for this page! /// There can only ever be up to one message allocated per page at each given time. -unsafe fn llmp_alloc_next(llmp: &mut LlmpSender, buf_len: usize) -> Option<*mut LlmpMsg> { +unsafe fn llmp_alloc_next_if_space(llmp: &mut LlmpSender, buf_len: usize) -> Option<*mut LlmpMsg> { let mut buf_len_padded = buf_len; let mut complete_msg_size = llmp_align(size_of::() + buf_len_padded); let page = llmp.out_maps.last().unwrap().page(); @@ -453,39 +490,44 @@ unsafe fn llmp_alloc_next(llmp: &mut LlmpSender, buf_len: usize) -> Option<*mut /// Commit the message last allocated by llmp_alloc_next to the queue. /// After commiting, the msg shall no longer be altered! /// It will be read by the consuming threads (broker->clients or client->broker) -unsafe fn llmp_send(page: *mut LlmpPage, msg: *mut LlmpMsg) -> Result<(), AflError> { +unsafe fn llmp_send(sender: &mut LlmpSender, msg: *mut LlmpMsg) -> Result<(), AflError> { + if sender.last_msg_sent == msg { + panic!("Message sent twice!"); + } if (*msg).tag == LLMP_TAG_UNSET as c_uint { panic!(format!( "No tag set on message with id {}", (*msg).message_id )); } + let page = sender.out_maps.last().unwrap().page(); if msg.is_null() || !llmp_msg_in_page(page, msg) { return Err(AflError::Unknown(format!( "Llmp Message {:?} is null or not in current page", msg ))); } + (*msg).message_id = (*page).current_msg_id + 1; compiler_fence(Ordering::SeqCst); ptr::write_volatile(&mut (*page).current_msg_id, (*msg).message_id); compiler_fence(Ordering::SeqCst); + sender.last_msg_sent = msg; Ok(()) } /// listener about it using a EOP message. -unsafe fn llmp_handle_out_eop(llmp: &mut LlmpSender) -> Result<(), AflError> { - let map_count = llmp.out_maps.len(); - let mut old_map = llmp.out_maps.last_mut().unwrap().page(); +unsafe fn llmp_handle_out_eop(sender: &mut LlmpSender) -> Result<(), AflError> { + let old_map = sender.out_maps.last_mut().unwrap().page(); // Create a new shard page. - let mut new_map_shmem = LlmpPageWrapper::new((*old_map).sender, (*old_map).max_alloc_size)?; + let new_map_shmem = LlmpPageWrapper::new((*old_map).sender, (*old_map).max_alloc_size)?; let mut new_map = new_map_shmem.page(); ptr::write_volatile(&mut (*new_map).current_msg_id, (*old_map).current_msg_id); (*new_map).max_alloc_size = (*old_map).max_alloc_size; /* On the old map, place a last message linking to the new map for the clients * to consume */ - let mut out: *mut LlmpMsg = llmp_alloc_eop(old_map, llmp.last_msg_sent); + let mut out: *mut LlmpMsg = llmp_alloc_eop(old_map, sender.last_msg_sent); (*out).sender = (*old_map).sender; let mut end_of_page_msg = (*out).buf.as_mut_ptr() as *mut LlmpPayloadSharedMap; @@ -493,32 +535,38 @@ unsafe fn llmp_handle_out_eop(llmp: &mut LlmpSender) -> Result<(), AflError> { (*end_of_page_msg).shm_str = new_map_shmem.shmem.shm_str; // We never sent a msg on the new buf */ - llmp.last_msg_sent = 0 as *mut LlmpMsg; + sender.last_msg_sent = 0 as *mut LlmpMsg; /* Send the last msg on the old buf */ - llmp_send(old_map, out)?; - llmp.out_maps.push(new_map_shmem); + llmp_send(sender, out)?; + + if !sender.keep_pages_forever { + llmp_prune_old_pages(sender); + } + + sender.out_maps.push(new_map_shmem); Ok(()) } -pub unsafe fn llmp_broker_alloc_next( - broker: &LlmpBroker, - len: usize, +/// Allocates the next space on this sender page +pub unsafe fn llmp_alloc_next( + sender: &mut LlmpSender, + buf_len: usize, ) -> Result<*mut LlmpMsg, AflError> { - match llmp_alloc_next(&mut broker.llmp_out, len) { + match llmp_alloc_next_if_space(sender, buf_len) { Some(msg) => return Ok(msg), _ => (), }; /* no more space left! We'll have to start a new page */ - llmp_handle_out_eop(&mut broker.llmp_out); + llmp_handle_out_eop(sender)?; - match llmp_alloc_next(&mut broker.llmp_out, len) { + match llmp_alloc_next_if_space(sender, buf_len) { Some(msg) => Ok(msg), None => Err(AflError::Unknown(format!( "Error allocating {} bytes in shmap", - len + buf_len ))), } } @@ -526,11 +574,14 @@ pub unsafe fn llmp_broker_alloc_next( impl LlmpBroker { /// Create and initialize a new llmp_broker pub unsafe fn new() -> Result { - let mut broker = LlmpBroker { + let broker = LlmpBroker { llmp_out: LlmpSender { id: 0, last_msg_sent: ptr::null_mut(), out_maps: vec![LlmpPageWrapper::new(0, 0)?], + // Broker never cleans up the pages so that new + // clients may join at any time + keep_pages_forever: true, }, msg_hooks: vec![], llmp_clients: vec![], @@ -539,44 +590,39 @@ impl LlmpBroker { Ok(broker) } + unsafe fn alloc_next(&mut self, buf_len: usize) -> Result<*mut LlmpMsg, AflError> { + llmp_alloc_next(&mut self.llmp_out, buf_len) + } + /// Registers a new client for the given sharedmap str and size. /// Returns the id of the new client in broker.client_map - unsafe fn register_client(&mut self, client_page: LlmpPageWrapper) { + pub unsafe fn register_client(&mut self, client_page: LlmpPageWrapper) { let id = self.llmp_clients.len() as u32; - self.llmp_clients.push(LlmpBrokerClientMetadata { + self.llmp_clients.push(LlmpReceiver { id, - cur_client_map: client_page, - last_msg_broker_read: 0 as *mut LlmpMsg, - clientloop: None, - data: 0 as *mut c_void, + current_recv_map: client_page, + last_msg_recvd: 0 as *mut LlmpMsg, }); } /// Adds a hook that gets called in the broker for each new message the broker touches. /// if the callback returns false, the message is not forwarded to the clients. */ - pub fn add_message_hook(&mut self, hook: LlmpMsgHookFn, data: *mut c_void) { - self.msg_hooks - .push(LlmpHookdataGeneric { func: hook, data }); + pub fn add_message_hook(&mut self, hook: LlmpMsgHookFn) { + self.msg_hooks.push(hook); } /// For internal use: Forward the current message to the out map. unsafe fn forward_msg(&mut self, msg: *mut LlmpMsg) -> Result<(), AflError> { - let mut out: *mut LlmpMsg = llmp_broker_alloc_next(self, (*msg).buf_len_padded as usize)?; + let mut out: *mut LlmpMsg = self.alloc_next((*msg).buf_len_padded as usize)?; /* Copy over the whole message. If we should need zero copy, we could instead post a link to the original msg with the map_id and offset. */ let actual_size = (*out).buf_len_padded; - memcpy( - out as *mut c_void, - msg as *const c_void, - size_of::() as c_ulong + (*msg).buf_len_padded as c_ulong, - ); + msg.copy_to_nonoverlapping(out, size_of::() + (*msg).buf_len_padded as usize); (*out).buf_len_padded = actual_size; /* We need to replace the message ID with our own */ - let out_page = self.llmp_out.out_maps.last().unwrap().page(); - (*out).message_id = (*out_page).current_msg_id + 1; - match llmp_send(out_page, out) { + match llmp_send(&mut self.llmp_out, out) { Err(e) => panic!(format!("Error sending msg: {:?}", e)), _ => (), }; @@ -585,49 +631,22 @@ impl LlmpBroker { } /// broker broadcast to its own page for all others to read */ - unsafe fn handle_new_msgs( - &mut self, - client: &LlmpBrokerClientMetadata, - ) -> Result<(), AflError> { + unsafe fn handle_new_msgs(&mut self, client_id: u32) -> Result<(), AflError> { + let mut next_id = self.llmp_clients.len() as u32; + // TODO: We could memcpy a range of pending messages, instead of one by one. - /* DBG("llmp_broker_handle_new_msgs %p %p->%u\n", broker, client, client->client_state->id); */ - let incoming: *mut LlmpPage = client.cur_client_map.page(); - let mut current_message_id = if client.last_msg_broker_read.is_null() { - 0 - } else { - (*client.last_msg_broker_read).message_id - }; - - while current_message_id != ptr::read_volatile(&(*incoming).current_msg_id) { - let msg = match llmp_recv(&client.cur_client_map, (*client).last_msg_broker_read) { - None => { - panic!("No message received but not all message ids receved! Data out of sync?") + loop { + let msg = { + let mut client = &mut self.llmp_clients[client_id as usize]; + match llmp_recv(&mut client)? { + None => { + // We're done handling this client + return Ok(()); + } + Some(msg) => msg, } - Some(msg) => msg, }; - if (*msg).tag == LLMP_TAG_END_OF_PAGE { - // Handle end of page - if (*msg).buf_len < size_of::() as u64 { - panic!(format!( - "Illegal message length for EOP (is {}, expected {})", - (*msg).buf_len_padded, - size_of::() - )); - } - let pageinfo = (*msg).buf.as_mut_ptr() as *mut LlmpPayloadSharedMap; - - /* We can reuse the map mem space, no need to free and calloc. - However, the pageinfo points to the map we're about to unmap. - Clone the contents first to be safe (probably fine in rust eitner way). */ - let mut pageinfo_cpy = (*pageinfo).clone(); - - let client_map = (*client).cur_client_map; - - ptr::write_volatile(&mut (*client_map.page()).save_to_unmap, 1); - client.cur_client_map = - LlmpPageWrapper::from_name_slice(&pageinfo_cpy.shm_str, pageinfo_cpy.map_size)?; - dbg!("Client got a new map", client.cur_client_map.shmem.shm_str); - } else if (*msg).tag == LLMP_TAG_NEW_SHM_CLIENT { + if (*msg).tag == LLMP_TAG_NEW_SHM_CLIENT { /* This client informs us about yet another new client add it to the list! Also, no need to forward this msg. */ if (*msg).buf_len < size_of::() as u64 { @@ -638,50 +657,55 @@ impl LlmpBroker { } else { let pageinfo = (*msg).buf.as_mut_ptr() as *mut LlmpPayloadSharedMap; - let client_id: u32 = client.id; match LlmpPageWrapper::from_name_slice( &(*pageinfo).shm_str, (*pageinfo).map_size, ) { - Ok(new_page) => self.register_client(new_page), + Ok(new_page) => { + let id = next_id; + next_id += 1; + self.llmp_clients.push(LlmpReceiver { + id, + current_recv_map: new_page, + last_msg_recvd: 0 as *mut LlmpMsg, + }); + } Err(e) => println!("Error adding client! {:?}", e), }; } } else { // The message is not specifically for use. Let the user handle it, then forward it to the clients, if necessary. let mut should_forward_msg = true; - for hook in self.msg_hooks { - match (hook.func)(&self, client, msg, hook.data) { + for hook in &self.msg_hooks { + match (hook)(client_id, msg) { LlmpMsgHookResult::Handled => should_forward_msg = false, _ => (), } } if should_forward_msg { - self.forward_msg(msg); + self.forward_msg(msg)?; } } - (*client).last_msg_broker_read = msg; - current_message_id = (*msg).message_id } - Ok(()) } /// The broker walks all pages and looks for changes, then broadcasts them on /// its own shared page, once. - pub unsafe fn once(&mut self) { + pub unsafe fn once(&mut self) -> Result<(), AflError> { compiler_fence(Ordering::SeqCst); - let mut i: u32 = 0; - for client in self.llmp_clients { - self.handle_new_msgs(&client); + for i in 0..self.llmp_clients.len() { + self.handle_new_msgs(i as u32)?; } + Ok(()) } /// Loops infinitely, forwarding and handling all incoming messages from clients. - /// Never returns. + /// Never returns. Panics on error. pub unsafe fn broker_loop(&mut self) -> ! { loop { compiler_fence(Ordering::SeqCst); - self.once(); + self.once() + .expect("An error occurred when brokering. Exiting."); /* 5 milis of sleep for now to not busywait at 100% */ thread::sleep(time::Duration::from_millis(5)); @@ -689,353 +713,75 @@ impl LlmpBroker { } } -/// A new page will be used. Notify each registered hook in the client about this fact. -unsafe fn llmp_clien_trigger_new_out_page_hooks(client: &LlmpClient) { - for hook in client.new_out_page_hooks { - (hook.func)(client); - } -} - -/// A wrapper around unpacking the data, calling through to the loop -unsafe fn _llmp_client_wrapped_loop(llmp_client_broker_metadata_ptr: *mut c_void) -> ! { - let metadata: *mut LlmpBrokerClientMetadata = - llmp_client_broker_metadata_ptr as *mut LlmpBrokerClientMetadata; - /* Before doing anything else:, notify registered hooks about the new page we're about to use */ - llmp_clien_trigger_new_out_page_hooks((*metadata).client_state); - - (*metadata).clientloop.expect("non-null function pointer")( - (*metadata).client_state, - (*metadata).data, - ); -} - -/// For non zero-copy, we want to get rid of old pages with duplicate messages +/// For non zero-copy, we want to get rid of old pages with duplicate messages in the client /// eventually. This function This funtion sees if we can unallocate older pages. /// The broker would have informed us by setting the save_to_unmap-flag. -unsafe fn llmp_client_prune_old_pages(client: *mut LlmpClient) { - let current_map: *mut u8 = (*(*client) - .out_maps - .offset((*client).out_map_count.wrapping_sub(1 as c_ulong) as isize)) - .map; - /* look for pages that are save_to_unmap, then unmap them. */ - while (*(*client).out_maps.offset(0 as isize)).map != current_map - && (*shmem2page(&mut *(*client).out_maps.offset(0 as isize))).save_to_unmap as c_int != 0 - { - /* This page is save to unmap. The broker already reads or read it. */ - afl_shmem_deinit(&mut *(*client).out_maps.offset(0 as isize)); - /* We remove at the start, move the other pages back. */ - memmove( - (*client).out_maps as *mut c_void, - (*client).out_maps.offset(1 as isize) as *const c_void, - (*client) - .out_map_count - .wrapping_sub(1 as c_ulong) - .wrapping_mul(::std::mem::size_of::() as c_ulong), - ); - (*client).out_map_count = (*client).out_map_count.wrapping_sub(1) +unsafe fn llmp_prune_old_pages(sender: &mut LlmpSender) { + // Exclude the current page by splitting of the last element for this iter + let mut unmap_until_excl = 0; + for map in sender.out_maps.split_last().unwrap().1 { + if (*map.page()).save_to_unmap == 0 { + // The broker didn't read this page yet, no more pages to unmap. + break; + } + unmap_until_excl += 1; } + // Remove all maps that the broker already mapped + // simply removing them from the vec should then call drop and unmap them. + sender.out_maps.drain(0..unmap_until_excl); } -/// We don't have any space. Send eop, then continue on a new page. -unsafe fn llmp_client_handle_out_eop(client: *mut LlmpClient) -> bool { - (*client).out_maps = llmp_handle_out_eop( - (*client).out_maps, - &mut (*client).out_map_count, - &mut (*client).last_msg_sent, - ); - if (*client).out_maps.is_null() { - return 0 as c_int != 0; +impl LlmpClient { + /// Creates a new LlmpClient + pub unsafe fn new(initial_broker_page: LlmpPageWrapper) -> Result { + Ok(Self { + llmp_out: LlmpSender { + id: 0, + last_msg_sent: 0 as *mut LlmpMsg, + out_maps: vec![LlmpPageWrapper::new(0, LLMP_INITIAL_MAP_SIZE)?], + // drop pages to the broker if it already read them + keep_pages_forever: false, + }, + llmp_in: LlmpReceiver { + id: 0, + current_recv_map: initial_broker_page, + last_msg_recvd: 0 as *mut LlmpMsg, + }, + }) } - /* Prune old pages! - This is a good time to see if we can unallocate older pages. - The broker would have informed us by setting the flag - */ - llmp_client_prune_old_pages(client); - /* So we got a new page. Inform potential hooks */ - llmp_clien_trigger_new_out_page_hooks(client); - return 1 as c_int != 0; } /// A client receives a broadcast message. /// Returns null if no message is availiable -pub unsafe fn llmp_client_recv(client: *mut LlmpClient) -> *mut LlmpMsg { - loop { - let msg = llmp_recv( - shmem2page((*client).current_broadcast_map), - (*client).last_msg_recvd, - ); - if msg.is_null() { - return 0 as *mut LlmpMsg; - } - (*client).last_msg_recvd = msg; - if (*msg).tag == LLMP_TAG_UNSET as c_uint { - panic!("BUG: Read unallocated msg"); - } else { - if (*msg).tag == LLMP_TAG_END_OF_PAGE as c_uint { - /* we reached the end of the current page. - We'll init a new page but can reuse the mem are of the current map. - However, we cannot use the message if we deinit its page, so let's copy */ - let mut pageinfo_cpy: LlmpPayloadSharedMap = LlmpPayloadSharedMap { - map_size: 0, - shm_str: [0; 20], - }; - let broadcast_map: *mut AflShmem = (*client).current_broadcast_map; - let pageinfo: *mut LlmpPayloadSharedMap = { - let mut _msg: *mut LlmpMsg = msg; - (if (*_msg).buf_len >= ::std::mem::size_of::() as c_ulong - { - (*_msg).buf.as_mut_ptr() - } else { - 0 as *mut u8 - }) as *mut LlmpPayloadSharedMap - }; - if pageinfo.is_null() { - panic!(format!( - "Illegal message length for EOP (is {}, expected {})", - (*msg).buf_len_padded, - ::std::mem::size_of::() as c_ulong - )); - } - memcpy( - &mut pageinfo_cpy as *mut LlmpPayloadSharedMap as *mut c_void, - pageinfo as *const c_void, - ::std::mem::size_of::() as c_ulong, - ); - /* Never read by broker broker: shmem2page(map)->save_to_unmap = true; */ - afl_shmem_deinit(broadcast_map); - if afl_shmem_by_str( - (*client).current_broadcast_map, - CStr::from_bytes_with_nul(&(*pageinfo).shm_str).expect("Illegal shm_str"), - (*pageinfo).map_size, - ) - .is_null() - { - panic!(format!( - "Could not get shmem by str for map {:?} of size {}", - (*pageinfo).shm_str.as_mut_ptr(), - (*pageinfo).map_size - )); - } - } else { - return msg; - } - } - } +pub unsafe fn llmp_client_recv(client: &mut LlmpClient) -> Result, AflError> { + llmp_recv(&mut client.llmp_in) } /// A client blocks/spins until the next message gets posted to the page, /// then returns that message. -pub unsafe fn llmp_client_recv_blocking(client: *mut LlmpClient) -> *mut LlmpMsg { - let mut page: *mut LlmpPage = shmem2page((*client).current_broadcast_map); - loop { - compiler_fence(Ordering::SeqCst); - /* busy-wait for a new msg_id to show up in the page */ - if (*page).current_msg_id - != (if !(*client).last_msg_recvd.is_null() { - (*(*client).last_msg_recvd).message_id - } else { - 0 as c_uint - }) as c_ulong - { - let ret: *mut LlmpMsg = llmp_client_recv(client); - if !ret.is_null() { - return ret; - } - /* last msg will exist, even if EOP was handled internally */ - page = shmem2page((*client).current_broadcast_map) - } - } +pub unsafe fn llmp_client_recv_blocking(client: &mut LlmpClient) -> Result<*mut LlmpMsg, AflError> { + llmp_recv_blocking(&mut client.llmp_in) } /// The current page could have changed in recv (EOP) /// Alloc the next message, internally handling end of page by allocating a new one. -pub unsafe fn llmp_client_alloc_next(client: *mut LlmpClient, size: usize) -> *mut LlmpMsg { - if client.is_null() { - panic!("Client is NULL"); - } - let mut msg = llmp_alloc_next( - shmem2page( - &mut *(*client) - .out_maps - .offset((*client).out_map_count.wrapping_sub(1) as isize), - ), - (*client).last_msg_sent, - size as c_ulong, - ); - if msg.is_null() { - let last_map_count: c_ulong = (*client).out_map_count; - /* Page is full -> Tell broker and start from the beginning. - Also, pray the broker got all messaes we're overwriting. :) */ - if !llmp_client_handle_out_eop(client) { - return 0 as *mut LlmpMsg; - } - if (*client).out_map_count == last_map_count - || (*(*shmem2page( - &mut *(*client) - .out_maps - .offset((*client).out_map_count.wrapping_sub(1) as isize), - )) - .messages - .as_mut_ptr()) - .tag != LLMP_TAG_UNSET as c_uint - { - panic!("Error in handle_out_eop"); - } - /* The client_out_map will have been changed by llmp_handle_out_eop. Don't - * alias. - */ - msg = llmp_alloc_next( - shmem2page( - &mut *(*client) - .out_maps - .offset((*client).out_map_count.wrapping_sub(1) as isize), - ), - 0 as *mut LlmpMsg, - size as c_ulong, - ); - if msg.is_null() { - return 0 as *mut LlmpMsg; - } - } - (*msg).sender = (*client).id; - (*msg).message_id = if !(*client).last_msg_sent.is_null() { - (*(*client).last_msg_sent).message_id.wrapping_add(1) - } else { - 1 as c_uint - }; - /* DBG("Allocated message at loc %p with buflen %ld", msg, msg->buf_len_padded); */ - return msg; +pub unsafe fn llmp_client_alloc_next( + client: &mut LlmpClient, + buf_len: usize, +) -> Result<*mut LlmpMsg, AflError> { + llmp_alloc_next(&mut client.llmp_out, buf_len) } /// Cancel send of the next message, this allows us to allocate a new message without sending this one. -pub unsafe fn llmp_client_cancel(client: *mut LlmpClient, mut msg: *mut LlmpMsg) { +pub unsafe fn llmp_cancel_send(sender: &mut LlmpSender, msg: *mut LlmpMsg) { /* DBG("Client %d cancels send of msg at %p with tag 0x%X and size %ld", client->id, msg, msg->tag, * msg->buf_len_padded); */ - let mut page: *mut LlmpPage = shmem2page( - &mut *(*client) - .out_maps - .offset((*client).out_map_count.wrapping_sub(1 as c_ulong) as isize), - ); - (*msg).tag = LLMP_TAG_UNSET as c_uint; - (*page).size_used = ((*page).size_used as c_ulong).wrapping_sub( - (*msg) - .buf_len_padded - .wrapping_add(::std::mem::size_of::() as c_ulong), - ) as c_ulong; -} -/* Commits a msg to the client's out ringbuf */ -pub unsafe fn llmp_client_send( - client_state: *mut LlmpClient, - msg: *mut LlmpMsg, -) -> Result<(), AflError> { - let page: *mut LlmpPage = shmem2page( - &mut *(*client_state) - .out_maps - .offset((*client_state).out_map_count.wrapping_sub(1) as isize), - ); - llmp_send(page, msg)?; - (*client_state).last_msg_sent = msg; - Ok(()) + let page = sender.out_maps.last().unwrap().page(); + (*msg).tag = LLMP_TAG_UNSET; + (*page).size_used -= (*msg).buf_len_padded as usize + size_of::(); } -/// Creates a new, unconnected, client state -pub unsafe fn llmp_client_new_unconnected() -> *mut LlmpClient { - let client_state: *mut LlmpClient = - calloc(1 as c_ulong, ::std::mem::size_of::() as c_ulong) as *mut LlmpClient; - (*client_state).current_broadcast_map = - calloc(1 as c_ulong, ::std::mem::size_of::() as c_ulong) as *mut AflShmem; - if (*client_state).current_broadcast_map.is_null() { - return 0 as *mut LlmpClient; - } - (*client_state).out_maps = afl_realloc( - (*client_state).out_maps as *mut c_void, - (1 as c_ulong).wrapping_mul(::std::mem::size_of::() as c_ulong), - ) as *mut AflShmem; - if (*client_state).out_maps.is_null() { - free((*client_state).current_broadcast_map as *mut c_void); - free(client_state as *mut c_void); - return 0 as *mut LlmpClient; - } - (*client_state).out_map_count = 1 as c_ulong; - if llmp_new_page_shmem( - &mut *(*client_state).out_maps.offset(0 as isize), - (*client_state).id as c_ulong, - LLMP_INITIAL_MAP_SIZE, - ) - .is_null() - { - afl_free((*client_state).out_maps as *mut c_void); - free((*client_state).current_broadcast_map as *mut c_void); - free(client_state as *mut c_void); - return 0 as *mut LlmpClient; - } - (*client_state).new_out_page_hook_count = 0 as c_ulong; - (*client_state).new_out_page_hooks = 0 as *mut LlmpHookdataGeneric; - return client_state; -} - -/// Destroys the given cient state -pub unsafe fn llmp_client_delete(client_state: *mut LlmpClient) { - let mut i: c_ulong = 0; - while i < (*client_state).out_map_count { - afl_shmem_deinit(&mut *(*client_state).out_maps.offset(i as isize)); - i = i.wrapping_add(1) - } - afl_free((*client_state).out_maps as *mut c_void); - (*client_state).out_maps = 0 as *mut AflShmem; - (*client_state).out_map_count = 0 as c_ulong; - afl_free((*client_state).new_out_page_hooks as *mut c_void); - (*client_state).new_out_page_hooks = 0 as *mut LlmpHookdataGeneric; - (*client_state).new_out_page_hook_count = 0 as c_ulong; - afl_shmem_deinit((*client_state).current_broadcast_map); - free((*client_state).current_broadcast_map as *mut c_void); - (*client_state).current_broadcast_map = 0 as *mut AflShmem; - free(client_state as *mut c_void); -} - -impl Drop for LlmpClient { - fn drop(&mut self) { - unsafe { llmp_client_delete(self) }; - } -} - -/// Adds a hook that gets called in the client for each new outgoing page the client creates. -pub unsafe fn llmp_client_add_new_out_page_hook( - client: *mut LlmpClient, - hook: Option, - data: *mut c_void, -) -> AflRet { - return llmp_add_hook_generic( - &mut (*client).new_out_page_hooks, - &mut (*client).new_out_page_hook_count, - ::std::mem::transmute::, *mut c_void>(hook), - data, - ); -} - -/// Clean up the broker instance -unsafe fn llmp_broker_deinit(broker: *mut LlmpBroker) { - let mut i: c_ulong; - i = 0 as c_ulong; - while i < (*broker).broadcast_map_count { - afl_shmem_deinit(&mut *(*broker).broadcast_maps.offset(i as isize)); - i = i.wrapping_add(1) - } - i = 0 as c_ulong; - while i < (*broker).llmp_client_count { - afl_shmem_deinit((*(*broker).llmp_clients.offset(i as isize)).cur_client_map); - free((*(*broker).llmp_clients.offset(i as isize)).cur_client_map as *mut c_void); - i = i.wrapping_add(1) - // TODO: Properly clean up the client - } - afl_free((*broker).broadcast_maps as *mut c_void); - (*broker).broadcast_map_count = 0 as c_ulong; - afl_free((*broker).llmp_clients as *mut c_void); - (*broker).llmp_client_count = 0 as c_ulong; -} - -impl Drop for LlmpBroker { - fn drop(&mut self) { - unsafe { llmp_broker_deinit(self) }; - } +/// Commits a msg to the client's out map +pub unsafe fn llmp_client_send(client: &mut LlmpClient, msg: *mut LlmpMsg) -> Result<(), AflError> { + llmp_send(&mut client.llmp_out, msg) } diff --git a/afl/src/events/shmem_translated.rs b/afl/src/events/shmem_translated.rs index cbf2866bde..0f1b901ef1 100644 --- a/afl/src/events/shmem_translated.rs +++ b/afl/src/events/shmem_translated.rs @@ -94,8 +94,8 @@ impl AflShmem { Ok(ret) } else { Err(AflError::Unknown(format!( - "Could not allocate map with id {:?}", - shm_str + "Could not allocate map with id {:?} and size {}", + shm_str, map_size ))) } } diff --git a/afl/src/lib.rs b/afl/src/lib.rs index 1c1f204600..c19f048093 100644 --- a/afl/src/lib.rs +++ b/afl/src/lib.rs @@ -3,10 +3,6 @@ #[macro_use] extern crate alloc; -#[cfg_attr(feature = "std")] -#[macro_use] -extern crate memoffset; // for offset_of - pub mod corpus; pub mod engines; pub mod events;