diff --git a/afl/Cargo.toml b/afl/Cargo.toml index 83accdea15..fcac574c2a 100644 --- a/afl/Cargo.toml +++ b/afl/Cargo.toml @@ -33,4 +33,4 @@ num = "*" xxhash-rust = { version = "0.8.0", features = ["xxh3"] } # xxh3 hashing for rust serde = { version = "1.0", default-features = false, features = ["alloc"] } # serialization lib erased-serde = "0.3.12" -postcard = "0.5.1" # no_std compatible serde serialization fromat +postcard = "0.5.1" # no_std compatible serde serialization fromat \ No newline at end of file diff --git a/afl/llmp_test/src/main.rs b/afl/llmp_test/src/main.rs index ed852b990f..84d5dd5b5e 100644 --- a/afl/llmp_test/src/main.rs +++ b/afl/llmp_test/src/main.rs @@ -1,60 +1,33 @@ -#[macro_use] extern crate alloc; use core::convert::TryInto; -use core::ffi::c_void; -use core::mem::size_of; -use core::ptr; +use core::time::Duration; use std::thread; use std::time; -use afl::events::llmp_translated::*; +use afl::events::llmp; const TAG_SIMPLE_U32_V1: u32 = 0x51300321; const TAG_MATH_RESULT_V1: u32 = 0x77474331; -unsafe fn llmp_test_clientloop(client: *mut LlmpClient, _data: *mut c_void) -> ! { - let mut counter: u32 = 0; - loop { - counter += 1; - - let msg = llmp_client_alloc_next(client, size_of::()); - core::ptr::copy( - counter.to_be_bytes().as_ptr(), - (*msg).buf.as_mut_ptr(), - size_of::(), - ); - (*msg).tag = TAG_SIMPLE_U32_V1; - llmp_client_send(client, msg).unwrap(); - - thread::sleep(time::Duration::from_millis(100)); - } -} - -unsafe fn u32_from_msg(msg: *const LlmpMsg) -> u32 { - u32::from_be_bytes( - alloc::slice::from_raw_parts((*msg).buf.as_ptr(), size_of::()) - .try_into() - .unwrap(), - ) -} - -unsafe fn test_adder_clientloop(client: *mut LlmpClient, _data: *mut c_void) -> ! { +fn adder_loop(port: u16) -> ! { + let mut client = llmp::LlmpClient::create_attach_to_tcp(port).unwrap(); let mut last_result: u32 = 0; let mut current_result: u32 = 0; loop { let mut msg_counter = 0; loop { - let last_msg = llmp_client_recv(client); - if last_msg == 0 as *mut LlmpMsg { - break; - } + let (tag, buf) = match client.recv_buf().unwrap() { + None => break, + Some(msg) => msg, + }; msg_counter += 1; - match (*last_msg).tag { + match tag { TAG_SIMPLE_U32_V1 => { - current_result = current_result.wrapping_add(u32_from_msg(last_msg)); + current_result = + current_result.wrapping_add(u32::from_le_bytes(buf.try_into().unwrap())); } - _ => println!("Adder Client ignored unknown message {}", (*last_msg).tag), + _ => println!("Adder Client ignored unknown message {}", tag), }; } @@ -64,14 +37,9 @@ unsafe fn test_adder_clientloop(client: *mut LlmpClient, _data: *mut c_void) -> msg_counter, current_result ); - let msg = llmp_client_alloc_next(client, size_of::()); - core::ptr::copy( - current_result.to_be_bytes().as_ptr(), - (*msg).buf.as_mut_ptr(), - size_of::(), - ); - (*msg).tag = TAG_MATH_RESULT_V1; - llmp_client_send(client, msg).unwrap(); + client + .send_buf(TAG_MATH_RESULT_V1, ¤t_result.to_le_bytes()) + .unwrap(); last_result = current_result; } @@ -80,61 +48,69 @@ unsafe fn test_adder_clientloop(client: *mut LlmpClient, _data: *mut c_void) -> } unsafe fn broker_message_hook( - _broker: *mut LlmpBroker, - client_metadata: *mut LlmpBrokerClientMetadata, - message: *mut LlmpMsg, - _data: *mut c_void, -) -> LlmpMsgHookResult { + client_id: u32, + message: *mut llmp::LlmpMsg, +) -> llmp::LlmpMsgHookResult { match (*message).tag { TAG_SIMPLE_U32_V1 => { println!( "Client {:?} sent message: {:?}", - (*client_metadata).pid, - u32_from_msg(message) + client_id, + u32::from_le_bytes((*message).as_slice().try_into().unwrap()) ); - LlmpMsgHookResult::ForwardToClients + llmp::LlmpMsgHookResult::ForwardToClients } TAG_MATH_RESULT_V1 => { println!( "Adder Client has this current result: {:?}", - u32_from_msg(message) + u32::from_le_bytes((*message).as_slice().try_into().unwrap()) ); - LlmpMsgHookResult::Handled + llmp::LlmpMsgHookResult::Handled } _ => { println!("Unknwon message id received!"); - LlmpMsgHookResult::ForwardToClients + llmp::LlmpMsgHookResult::ForwardToClients } } } fn main() { /* The main node has a broker, and a few worker threads */ - let threads_total = num_cpus::get(); - let counter_thread_count = threads_total - 2; - println!( - "Running with 1 broker, 1 adder, and {} counter clients", - counter_thread_count - ); + let mode = std::env::args() + .nth(1) + .expect("no mode specified, chose 'broker', 'ctr', or 'adder'"); + let port: u16 = std::env::args() + .nth(2) + .unwrap_or("1337".into()) + .parse::() + .unwrap(); + println!("Launching in mode {} on port {}", mode, port); - unsafe { - let mut broker = LlmpBroker::new().expect("Failed to create llmp broker"); - for i in 0..counter_thread_count { - println!("Adding client {}", i); - broker - .register_childprocess_clientloop(llmp_test_clientloop, ptr::null_mut()) - .expect("could not add child clientloop"); + match mode.as_str() { + "broker" => { + let mut broker: llmp::LlmpBroker = llmp::LlmpBroker::new().unwrap(); + broker.launch_tcp_listener(port).unwrap(); + broker.add_message_hook(broker_message_hook); + broker.loop_forever(Some(Duration::from_millis(5))) + } + "ctr" => { + let mut client = llmp::LlmpClient::create_attach_to_tcp(port).unwrap(); + let mut counter: u32 = 0; + loop { + counter = counter.wrapping_add(1); + client + .send_buf(TAG_SIMPLE_U32_V1, &counter.to_le_bytes()) + .unwrap(); + println!("CTR Client writing {}", counter); + thread::sleep(Duration::from_secs(1)) + } + } + "adder" => { + adder_loop(port); + } + _ => { + println!("No valid mode supplied"); } - - broker - .register_childprocess_clientloop(test_adder_clientloop, ptr::null_mut()) - .expect("Error registering childprocess"); - - println!("Spawning broker"); - - broker.add_message_hook(broker_message_hook, ptr::null_mut()); - - broker.run(); } } diff --git a/afl/src/events/llmp.rs b/afl/src/events/llmp.rs index 4a869b17b8..f309afa6ab 100644 --- a/afl/src/events/llmp.rs +++ b/afl/src/events/llmp.rs @@ -48,129 +48,145 @@ Then register some clientloops using llmp_broker_register_threaded_clientloop */ -use ::libc; - -use core::ffi::c_void; -use core::ptr; -use core::sync::atomic::{compiler_fence, Ordering}; -use libc::{c_int, c_uint, c_ulong, c_ushort}; -use std::{ffi::CStr, os::raw::c_char}; +use core::{ + cmp::max, + mem::size_of, + ptr, slice, + sync::atomic::{compiler_fence, Ordering}, + time::Duration, +}; +use std::{ + io::{Read, Write}, + net::{TcpListener, TcpStream}, + thread, +}; use crate::utils::next_pow2; use crate::AflError; -use super::shmem_translated::{afl_shmem_by_str, afl_shmem_deinit, afl_shmem_init, AflShmem}; +use super::shmem_translated::AflShmem; -extern "C" { - #[no_mangle] - fn usleep(__useconds: c_uint) -> c_int; - #[no_mangle] - fn fork() -> c_int; - #[no_mangle] - fn calloc(_: c_ulong, _: c_ulong) -> *mut c_void; - #[no_mangle] - fn realloc(_: *mut c_void, _: c_ulong) -> *mut c_void; - #[no_mangle] - fn free(__ptr: *mut c_void); - #[no_mangle] - fn memcpy(_: *mut c_void, _: *const c_void, _: c_ulong) -> *mut c_void; - #[no_mangle] - fn memmove(_: *mut c_void, _: *const c_void, _: c_ulong) -> *mut c_void; - #[no_mangle] - fn memset(_: *mut c_void, _: c_int, _: c_ulong) -> *mut c_void; -} +/// We'll start off with 256 megabyte maps per fuzzer client +const LLMP_PREF_INITIAL_MAP_SIZE: usize = 1 << 28; +/// What byte count to align messages to +/// LlmpMsg sizes (including header) will always be rounded up to be a multiple of this value +const LLMP_PREF_ALIGNNMENT: usize = 64; -pub type AflRet = c_uint; -pub const AFL_RET_ALLOC: AflRet = 3; -pub const AFL_RET_SUCCESS: AflRet = 0; +/// A msg fresh from the press: No tag got sent by the user yet +const LLMP_TAG_UNSET: u32 = 0xDEADAF; +/// This message should not exist yet. Some bug in unsafe code! +const LLMP_TAG_UNINITIALIZED: u32 = 0xA143AF11; +/// The end of page mesasge +/// When receiving this, a new sharedmap needs to be allocated. +const LLMP_TAG_END_OF_PAGE: u32 = 0xAF1E0F1; +/// A new client for this broekr got added. +const LLMP_TAG_NEW_SHM_CLIENT: u32 = 0xC11E471; -/* AFL alloc buffer, the struct is here so we don't need to do fancy ptr - * arithmetics */ -#[derive(Copy, Clone)] -#[repr(C)] -pub struct AflAllocBuf { - pub complete_size: c_ulong, - pub magic: c_ulong, - pub buf: [u8; 0], -} +/// Size of a new page message, header, payload, and alignment +const EOP_MSG_SIZE: usize = + llmp_align(size_of::() + size_of::()); +/// The header length of a llmp page in a shared map (until messages start) +const LLMP_PAGE_HEADER_LEN: usize = size_of::(); +/// Message hook type +pub type LlmpMsgHookFn = unsafe fn(client_id: u32, msg: *mut LlmpMsg) -> LlmpMsgHookResult; + +/// Sending end on a (unidirectional) sharedmap channel #[derive(Clone)] -#[repr(C)] -pub struct LlmpClient { +pub struct LlmpSender { + /// ID of this sender. Only used in the broker. pub id: u32, - pub last_msg_recvd: *mut LlmpMsg, - pub current_broadcast_map: *mut AflShmem, + /// Ref to the last message this sender sent on the last page. + /// If null, a new page (just) started. pub last_msg_sent: *mut LlmpMsg, - pub out_map_count: c_ulong, - pub out_maps: *mut AflShmem, - pub new_out_page_hook_count: c_ulong, - pub new_out_page_hooks: *mut LlmpHookdataGeneric, + /// A vec of page wrappers, each containing an intialized AfShmem + pub out_maps: Vec, + /// If true, pages will never be pruned. + /// The broker uses this feature. + /// By keeping the message history around, + /// new clients may join at any time in the future. + pub keep_pages_forever: bool, } -#[derive(Copy, Clone)] -#[repr(C)] -pub struct LlmpHookdataGeneric { - pub func: *mut c_void, - pub data: *mut c_void, +/// Receiving end on a (unidirectional) sharedmap channel +#[derive(Clone)] +pub struct LlmpReceiver { + pub id: u32, + /// Pointer to the last meg this received + pub last_msg_recvd: *mut LlmpMsg, + /// current page. After EOP, this gets replaced with the new one + pub current_recv_map: LlmpSharedMap, } +/// Client side of LLMP +#[derive(Clone)] +pub struct LlmpClient { + /// Outgoing channel to the broker + pub llmp_out: LlmpSender, + /// Incoming (broker) broadcast map + pub llmp_in: LlmpReceiver, +} + +/// A page wrapper +#[derive(Clone)] +pub struct LlmpSharedMap { + /// Shmem containg the actual (unsafe) page, + /// shared between one LlmpSender and one LlmpReceiver + shmem: AflShmem, +} +/// Message sent over the "wire" #[derive(Copy, Clone)] #[repr(C, packed)] pub struct LlmpMsg { - pub tag: c_uint, - pub sender: c_uint, - pub message_id: c_uint, - pub buf_len: c_ulong, - pub buf_len_padded: c_ulong, + /// A tag + pub tag: u32, + /// Sender of this messge + pub sender: u32, + /// The message ID, unique per page + pub message_id: u64, + /// Buffer length as specified by the user + pub buf_len: u64, + /// (Actual) buffer length after padding + pub buf_len_padded: u64, + /// The buf pub buf: [u8; 0], } -#[derive(Clone)] -#[repr(C)] -pub struct LlmpBroker { - pub last_msg_sent: *mut LlmpMsg, - pub broadcast_map_count: c_ulong, - pub broadcast_maps: *mut AflShmem, - pub msg_hook_count: c_ulong, - pub msg_hooks: *mut LlmpHookdataGeneric, - pub llmp_client_count: c_ulong, - pub llmp_clients: *mut LlmpBrokerClientMetadata, +/// The message we receive +impl LlmpMsg { + /// Gets the buffer from this message as slice, with the corrent length. + pub fn as_slice(&self) -> &[u8] { + unsafe { slice::from_raw_parts(self.buf.as_ptr(), self.buf_len as usize) } + } } -#[derive(Copy, Clone)] -#[repr(C)] -pub struct LlmpBrokerClientMetadata { - pub client_type: LlmpClientType, - pub client_state: *mut LlmpClient, - pub cur_client_map: *mut AflShmem, - pub last_msg_broker_read: *mut LlmpMsg, - pub pid: c_int, - pub clientloop: Option, - pub data: *mut c_void, -} - -/// The client loop, running for each spawned client -pub type LlmpClientloopFn = unsafe fn(client: *mut LlmpClient, data: *mut c_void) -> !; - -/// Client type enum (TODO: Enumize) -type LlmpClientType = c_uint; -const LLMP_CLIENT_TYPE_FOREIGN_PROCESS: LlmpClientType = 3; -const LLMP_CLIENT_TYPE_CHILD_PROCESS: LlmpClientType = 2; - -/// A share mem page, as used by llmp internally +/// Contents of the share mem pages, used by llmp internally #[derive(Copy, Clone)] #[repr(C, packed)] pub struct LlmpPage { pub sender: u32, - pub save_to_unmap: c_ushort, - pub sender_dead: c_ushort, - pub current_msg_id: c_ulong, - pub c_ulongotal: c_ulong, - pub size_used: c_ulong, - pub max_alloc_size: c_ulong, + pub save_to_unmap: u16, + pub sender_dead: u16, + pub current_msg_id: u64, + pub size_total: usize, + pub size_used: usize, + pub max_alloc_size: usize, pub messages: [LlmpMsg; 0], } +/// The broker (node 0) +#[derive(Clone)] +#[repr(C)] +pub struct LlmpBroker { + /// Broadcast map from broker to all clients + pub llmp_out: LlmpSender, + /// Users of Llmp can add message handlers in the broker. + /// This allows us to intercept messages right in the broker + /// This keeps the out map clean. + pub msg_hooks: Vec, + pub llmp_clients: Vec, +} + /// Result of an LLMP Mesasge hook pub enum LlmpMsgHookResult { /// This has been handled in the broker. No need to forward. @@ -179,307 +195,427 @@ pub enum LlmpMsgHookResult { ForwardToClients, } -/// Message Hook -pub type LlmpMsgHookFn = unsafe fn( - _: *mut LlmpBroker, - _: *mut LlmpBrokerClientMetadata, - _: *mut LlmpMsg, - _: *mut c_void, -) -> LlmpMsgHookResult; - -/// Hook that gets called for each new page, created by LLMP -pub type LlmpClientNewPageHookFn = - unsafe fn(_: *mut LlmpClient, _: *mut LlmpPage, _: *mut c_void) -> (); - /// Message payload when a client got added LLMP_TAG_CLIENT_ADDED_V1 */ /// This is an internal message! -/// LLMP_TAG_NEW_PAGE_V1 +/// LLMP_TAG_END_OF_PAGE_V1 #[derive(Copy, Clone)] #[repr(C, packed)] -struct LlmpPayloadNewPage { - pub map_size: c_ulong, +struct LlmpPayloadSharedMapInfo { + pub map_size: usize, pub shm_str: [u8; 20], } -/// Returs the container element to this ptr +/// Get sharedmem from a page #[inline] -unsafe fn afl_alloc_bufptr(buf: *mut c_void) -> *mut AflAllocBuf { - return (buf as *mut u8).offset(-(16 as c_ulong as isize)) as *mut AflAllocBuf; +unsafe fn shmem2page(afl_shmem: &AflShmem) -> *mut LlmpPage { + afl_shmem.map as *mut LlmpPage } -/// Optimized realloc wrapper, taken over from AFL. -/// This function makes sure *size is > size_needed after call. -/// It will realloc *buf otherwise. -/// *size will grow exponentially as per: -/// https://blog.mozilla.org/nnethercote/2014/11/04/please-grow-your-buffers-exponentially/ -/// @return For convenience, this function returns *buf. -/// Will return NULL and free *buf if size_needed is <1 or realloc failed. -unsafe fn afl_realloc(buf: *mut c_void, mut size_needed: c_ulong) -> *mut c_void { - let mut new_buf: *mut AflAllocBuf = 0 as *mut AflAllocBuf; - let mut current_size: c_ulong = 0 as c_ulong; - let mut next_size: c_ulong; - if !buf.is_null() { - /* the size is always stored at buf - 1*c_ulong */ - new_buf = afl_alloc_bufptr(buf); - if (*new_buf).magic != 0xaf1a110c as c_ulong { - panic!(format!( - "Illegal, non-null pointer passed to afl_realloc (buf {:?}, magic {:?})", - new_buf, - (*new_buf).magic as c_uint - )); - } - current_size = (*new_buf).complete_size - } - size_needed = (size_needed as c_ulong).wrapping_add(16 as c_ulong) as c_ulong; - /* No need to realloc */ - if current_size >= size_needed { - return buf; - } - /* No initial size was set */ - if size_needed < 64 as c_ulong { - next_size = 64 as c_ulong - } else { - /* grow exponentially */ - next_size = next_pow2(size_needed); - /* handle overflow: fall back to the original size_needed */ - if next_size == 0 { - next_size = size_needed - } - } - /* alloc */ - new_buf = realloc(new_buf as *mut c_void, next_size) as *mut AflAllocBuf; - if new_buf.is_null() { - return 0 as *mut c_void; - } - (*new_buf).complete_size = next_size; - (*new_buf).magic = 0xaf1a110c as c_ulong; - return (*new_buf).buf.as_mut_ptr() as *mut c_void; -} - -/// Call alf_free on all afl_realloc buffers. -#[inline] -unsafe fn afl_free(buf: *mut c_void) { - if !buf.is_null() { - free(afl_alloc_bufptr(buf) as *mut c_void); - }; -} -#[inline] -unsafe fn shmem2page(afl_shmem: *mut AflShmem) -> *mut LlmpPage { - return (*afl_shmem).map as *mut LlmpPage; -} -/* If a msg is contained in the current page */ +/// Return, if a msg is contained in the current page unsafe fn llmp_msg_in_page(page: *mut LlmpPage, msg: *mut LlmpMsg) -> bool { - /* DBG("llmp_msg_in_page %p within %p-%p\n", msg, page, page + page->c_ulongotal); */ + /* DBG("llmp_msg_in_page %p within %p-%p\n", msg, page, page + page->size_total); */ return (page as *mut u8) < msg as *mut u8 - && (page as *mut u8).offset((*page).c_ulongotal as isize) > msg as *mut u8; + && (page as *mut u8).offset((*page).size_total as isize) > msg as *mut u8; } -/* allign to LLMP_ALIGNNMENT bytes */ + +/// allign to LLMP_PREF_ALIGNNMENT=64 bytes #[inline] -unsafe fn llmp_align(to_align: c_ulong) -> c_ulong { - if 64 as c_int == 0 as c_int || to_align.wrapping_rem(64 as c_ulong) == 0 as c_int as c_ulong { +const fn llmp_align(to_align: usize) -> usize { + // check if we need to align first + if LLMP_PREF_ALIGNNMENT == 0 { return to_align; } - return to_align - .wrapping_add((64 as c_ulong).wrapping_sub(to_align.wrapping_rem(64 as c_int as c_ulong))); + // Then do the alignment + let modulo = to_align % LLMP_PREF_ALIGNNMENT; + if modulo == 0 { + to_align + } else { + to_align + LLMP_PREF_ALIGNNMENT - modulo + } } -/* In case we don't have enough space, make sure the next page will be large -enough. For now, we want to have at least enough space to store 2 of the -largest messages we encountered. */ + +/// In case we don't have enough space, make sure the next page will be large +/// enough. For now, we want to have at least enough space to store 2 of the +/// largest messages we encountered (plus message one new_page message). #[inline] -unsafe fn new_map_size(max_alloc: c_ulong) -> c_ulong { - return next_pow2({ - let mut _a: c_ulong = max_alloc - .wrapping_mul(2 as c_ulong) - .wrapping_add(llmp_align( - (::std::mem::size_of::() as c_ulong) - .wrapping_add(::std::mem::size_of::() as c_ulong), - )); - let mut _b: c_ulong = ((1 as c_int) << 28 as c_int) as c_ulong; - if _a > _b { - _a - } else { - _b - } - }); +fn new_map_size(max_alloc: usize) -> usize { + next_pow2(max( + max_alloc * 2 + EOP_MSG_SIZE + LLMP_PAGE_HEADER_LEN, + LLMP_PREF_INITIAL_MAP_SIZE, + ) as u64) as usize } -/* Initialize a new llmp_page. size should be relative to - * llmp_page->messages */ -unsafe fn _llmp_page_init(mut page: *mut LlmpPage, sender: u32, size: c_ulong) { + +/// Initialize a new llmp_page. size should be relative to +/// llmp_page->messages +unsafe fn _llmp_page_init(shmem: &mut AflShmem, sender: u32) { + let page = shmem2page(&shmem); (*page).sender = sender; - ::std::ptr::write_volatile(&mut (*page).current_msg_id as *mut c_ulong, 0 as c_ulong); - (*page).max_alloc_size = 0 as c_ulong; - (*page).c_ulongotal = size; - (*page).size_used = 0 as c_ulong; - (*(*page).messages.as_mut_ptr()).message_id = 0 as c_uint; - (*(*page).messages.as_mut_ptr()).tag = 0xdeadaf as c_uint; - ::std::ptr::write_volatile(&mut (*page).save_to_unmap as *mut u16, 0 as c_int as u16); - ::std::ptr::write_volatile(&mut (*page).sender_dead as *mut u16, 0 as c_int as u16); + ptr::write_volatile(&mut (*page).current_msg_id, 0); + (*page).max_alloc_size = 0; + // Don't forget to subtract our own header size + (*page).size_total = shmem.map_size - LLMP_PAGE_HEADER_LEN; + (*page).size_used = 0; + (*(*page).messages.as_mut_ptr()).message_id = 0; + (*(*page).messages.as_mut_ptr()).tag = LLMP_TAG_UNSET; + ptr::write_volatile(&mut (*page).save_to_unmap, 0); + ptr::write_volatile(&mut (*page).sender_dead, 0); } -/* Pointer to the message behind the last message */ + +/// Pointer to the message behind the last message #[inline] -unsafe fn _llmp_next_msg_ptr(last_msg: *mut LlmpMsg) -> *mut LlmpMsg { +unsafe fn _llmp_next_msg_ptr(last_msg: *const LlmpMsg) -> *mut LlmpMsg { /* DBG("_llmp_next_msg_ptr %p %lu + %lu\n", last_msg, last_msg->buf_len_padded, sizeof(llmp_message)); */ return (last_msg as *mut u8) - .offset(::std::mem::size_of::() as isize) + .offset(size_of::() as isize) .offset((*last_msg).buf_len_padded as isize) as *mut LlmpMsg; } -/* Read next message. */ -unsafe fn llmp_recv(page: *mut LlmpPage, last_msg: *mut LlmpMsg) -> *mut LlmpMsg { - /* DBG("llmp_recv %p %p\n", page, last_msg); */ - compiler_fence(Ordering::SeqCst); - if (*page).current_msg_id == 0 { - /* No messages yet */ - return 0 as *mut LlmpMsg; - } else if last_msg.is_null() { - /* We never read a message from this queue. Return first. */ - return (*page).messages.as_mut_ptr(); - } else if (*last_msg).message_id as c_ulong == (*page).current_msg_id { - /* Oops! No new message! */ - return 0 as *mut LlmpMsg; - } else { - return _llmp_next_msg_ptr(last_msg); - }; -} -/* Blocks/spins until the next message gets posted to the page, -then returns that message. */ -pub unsafe fn llmp_recv_blocking(page: *mut LlmpPage, last_msg: *mut LlmpMsg) -> *mut LlmpMsg { - let mut current_msg_id: u32 = 0 as u32; - if !last_msg.is_null() { - if (*last_msg).tag == 0xaf1e0f1 as c_uint && llmp_msg_in_page(page, last_msg) as c_int != 0 - { - panic!("BUG: full page passed to await_message_blocking or reset failed"); - } - current_msg_id = (*last_msg).message_id - } - loop { - compiler_fence(Ordering::SeqCst); - if (*page).current_msg_id != current_msg_id as c_ulong { - let ret: *mut LlmpMsg = llmp_recv(page, last_msg); - if ret.is_null() { - panic!("BUG: blocking llmp message should never be NULL"); + +/// An actor on the sendin part of the shared map +impl LlmpSender { + /// For non zero-copy, we want to get rid of old pages with duplicate messages in the client + /// eventually. This function This funtion sees if we can unallocate older pages. + /// The broker would have informed us by setting the save_to_unmap-flag. + unsafe fn prune_old_pages(&mut self) { + // Exclude the current page by splitting of the last element for this iter + let mut unmap_until_excl = 0; + for map in self.out_maps.split_last().unwrap().1 { + if (*map.page()).save_to_unmap == 0 { + // The broker didn't read this page yet, no more pages to unmap. + break; } - return ret; + unmap_until_excl += 1; } + // Remove all maps that the broker already mapped + // simply removing them from the vec should then call drop and unmap them. + self.out_maps.drain(0..unmap_until_excl); } -} -/* Special allocation function for EOP messages (and nothing else!) - The normal alloc will fail if there is not enough space for buf_len_padded + EOP - So if llmp_alloc_next fails, create new page if necessary, use this function, - place EOP, commit EOP, reset, alloc again on the new space. -*/ -unsafe fn llmp_alloc_eop(mut page: *mut LlmpPage, mut last_msg: *mut LlmpMsg) -> *mut LlmpMsg { - if (*page).size_used.wrapping_add(llmp_align( - (::std::mem::size_of::() as c_ulong) - .wrapping_add(::std::mem::size_of::() as c_ulong), - )) > (*page).c_ulongotal - { - panic!(format!("PROGRAM ABORT : BUG: EOP does not fit in page! page {:?}, size_current {:?}, c_ulongotal {:?}", page, - (*page).size_used, (*page).c_ulongotal)); - } - let mut ret: *mut LlmpMsg = if !last_msg.is_null() { - _llmp_next_msg_ptr(last_msg) - } else { - (*page).messages.as_mut_ptr() - }; - if (*ret).tag == 0xa143af11 as c_uint { - panic!("Did not call send() on last message!"); - } - (*ret).buf_len_padded = ::std::mem::size_of::() as c_ulong; - (*ret).message_id = if !last_msg.is_null() { - (*last_msg).message_id = - ((*last_msg).message_id as c_uint).wrapping_add(1 as c_int as c_uint) as u32 as u32; - (*last_msg).message_id - } else { - 1 as c_uint - }; - (*ret).tag = 0xaf1e0f1 as u32; - (*page).size_used = ((*page).size_used as c_ulong).wrapping_add(llmp_align( - (::std::mem::size_of::() as c_ulong) - .wrapping_add(::std::mem::size_of::() as c_ulong), - )) as c_ulong; - return ret; -} -/* Will return a ptr to the next msg buf, or NULL if map is full. -Never call alloc_next without either sending or cancelling the last allocated message for this page! -There can only ever be up to one message allocated per page at each given time. -*/ -unsafe fn llmp_alloc_next( - mut page: *mut LlmpPage, - last_msg: *mut LlmpMsg, - buf_len: c_ulong, -) -> *mut LlmpMsg { - let mut buf_len_padded: c_ulong = buf_len; - let mut complete_msg_size: c_ulong = - llmp_align((::std::mem::size_of::() as c_ulong).wrapping_add(buf_len_padded)); - /* DBG("XXX complete_msg_size %lu (h: %lu)\n", complete_msg_size, sizeof(llmp_message)); */ - /* In case we don't have enough space, make sure the next page will be large - * enough */ - (*page).max_alloc_size = { - let mut _a: c_ulong = (*page).max_alloc_size; - let mut _b: c_ulong = complete_msg_size; - if _a > _b { - _a + + /// Intern: Special allocation function for EOP messages (and nothing else!) + /// The normal alloc will fail if there is not enough space for buf_len_padded + EOP + /// So if alloc_next fails, create new page if necessary, use this function, + /// place EOP, commit EOP, reset, alloc again on the new space. + unsafe fn alloc_eop(&mut self) -> *mut LlmpMsg { + let page = self.out_maps.last().unwrap().page(); + let last_msg = self.last_msg_sent; + if (*page).size_used + EOP_MSG_SIZE > (*page).size_total { + panic!(format!("PROGRAM ABORT : BUG: EOP does not fit in page! page {:?}, size_current {:?}, size_total {:?}", page, + (*page).size_used, (*page).size_total)); + } + let mut ret: *mut LlmpMsg = if !last_msg.is_null() { + _llmp_next_msg_ptr(last_msg) } else { - _b + (*page).messages.as_mut_ptr() + }; + if (*ret).tag == LLMP_TAG_UNINITIALIZED { + panic!("Did not call send() on last message!"); } - }; - let mut ret: *mut LlmpMsg; - /* DBG("last_msg %p %d (%d)\n", last_msg, last_msg ? (int)last_msg->tag : -1, (int)LLMP_TAG_END_OF_PAGE_V1); */ - if last_msg.is_null() || (*last_msg).tag == 0xaf1e0f1 as c_uint { - /* We start fresh */ - ret = (*page).messages.as_mut_ptr(); - /* The initial message may not be alligned, so we at least align the end of - it. Technically, c_ulong can be smaller than a pointer, then who knows what - happens */ - let base_addr: c_ulong = ret as c_ulong; - buf_len_padded = llmp_align(base_addr.wrapping_add(complete_msg_size)) - .wrapping_sub(base_addr) - .wrapping_sub(::std::mem::size_of::() as c_ulong); - complete_msg_size = - buf_len_padded.wrapping_add(::std::mem::size_of::() as c_ulong); - /* DBG("XXX complete_msg_size NEW %lu\n", complete_msg_size); */ - /* Still space for the new message plus the additional "we're full" message? - */ - if (*page) - .size_used - .wrapping_add(complete_msg_size) - .wrapping_add(llmp_align( - (::std::mem::size_of::() as c_ulong) - .wrapping_add(::std::mem::size_of::() as c_ulong), - )) - > (*page).c_ulongotal - { - /* We're full. */ - return 0 as *mut LlmpMsg; - } - /* We need to start with 1 for ids, as current message id is initialized - * with 0... */ + (*ret).buf_len_padded = size_of::() as u64; (*ret).message_id = if !last_msg.is_null() { - (*last_msg).message_id.wrapping_add(1 as c_uint) + (*last_msg).message_id + 1 } else { - 1 as c_uint - } - } else if (*page).current_msg_id != (*last_msg).message_id as c_ulong { - /* Oops, wrong usage! */ - panic!(format!("BUG: The current message never got commited using llmp_send! (page->current_msg_id {:?}, last_msg->message_id: {})", (*page).current_msg_id, (*last_msg).message_id)); - } else { - buf_len_padded = - complete_msg_size.wrapping_sub(::std::mem::size_of::() as c_ulong); - /* DBG("XXX ret %p id %u buf_len_padded %lu complete_msg_size %lu\n", ret, ret->message_id, buf_len_padded, - * complete_msg_size); */ - if (*page) - .size_used - .wrapping_add(complete_msg_size) - .wrapping_add(llmp_align( - (::std::mem::size_of::() as c_ulong) - .wrapping_add(::std::mem::size_of::() as c_ulong), - )) - > (*page).c_ulongotal - { + 1 + }; + (*ret).tag = LLMP_TAG_END_OF_PAGE; + (*page).size_used += EOP_MSG_SIZE; + ret + } + + /// Intern: Will return a ptr to the next msg buf, or None if map is full. + /// Never call alloc_next without either sending or cancelling the last allocated message for this page! + /// There can only ever be up to one message allocated per page at each given time. + unsafe fn alloc_next_if_space(&mut self, buf_len: usize) -> Option<*mut LlmpMsg> { + let mut buf_len_padded = buf_len; + let mut complete_msg_size = llmp_align(size_of::() + buf_len_padded); + let page = self.out_maps.last().unwrap().page(); + let last_msg = self.last_msg_sent; + /* DBG("XXX complete_msg_size %lu (h: %lu)\n", complete_msg_size, sizeof(llmp_message)); */ + /* In case we don't have enough space, make sure the next page will be large + * enough */ + // For future allocs, keep track of the maximum (aligned) alloc size we used + (*page).max_alloc_size = max((*page).max_alloc_size, complete_msg_size); + + let mut ret: *mut LlmpMsg; + /* DBG("last_msg %p %d (%d)\n", last_msg, last_msg ? (int)last_msg->tag : -1, (int)LLMP_TAG_END_OF_PAGE_V1); */ + if last_msg.is_null() || (*last_msg).tag == LLMP_TAG_END_OF_PAGE { + /* We start fresh, on a new page */ + ret = (*page).messages.as_mut_ptr(); + /* The initial message may not be alligned, so we at least align the end of + it. Technically, c_ulong can be smaller than a pointer, then who knows what + happens */ + let base_addr = ret as usize; + buf_len_padded = + llmp_align(base_addr + complete_msg_size) - base_addr - size_of::(); + complete_msg_size = buf_len_padded + size_of::(); + /* DBG("XXX complete_msg_size NEW %lu\n", complete_msg_size); */ /* Still space for the new message plus the additional "we're full" message? */ - /* We're full. */ - return 0 as *mut LlmpMsg; + if (*page).size_used + complete_msg_size + EOP_MSG_SIZE > (*page).size_total { + /* We're full. */ + return None; + } + /* We need to start with 1 for ids, as current message id is initialized + * with 0... */ + (*ret).message_id = if !last_msg.is_null() { + (*last_msg).message_id + 1 + } else { + 1 + } + } else if (*page).current_msg_id != (*last_msg).message_id { + /* Oops, wrong usage! */ + panic!(format!("BUG: The current message never got commited using send! (page->current_msg_id {:?}, last_msg->message_id: {})", (*page).current_msg_id, (*last_msg).message_id)); + } else { + buf_len_padded = complete_msg_size - size_of::(); + /* DBG("XXX ret %p id %u buf_len_padded %lu complete_msg_size %lu\n", ret, ret->message_id, buf_len_padded, + * complete_msg_size); */ + + /* Still space for the new message plus the additional "we're full" message? */ + if (*page).size_used + complete_msg_size + EOP_MSG_SIZE > (*page).size_total { + /* We're full. */ + return None; + } + ret = _llmp_next_msg_ptr(last_msg); + (*ret).message_id = (*last_msg).message_id + 1 + } + + /* The beginning of our message should be messages + size_used, else nobody + * sent the last msg! */ + /* DBG("XXX ret %p - page->messages %p = %lu != %lu, will add %lu -> %p\n", ret, page->messages, + (c_ulong)((u8 *)ret - (u8 *)page->messages), page->size_used, complete_msg_size, ((u8 *)ret) + complete_msg_size); + */ + if last_msg.is_null() && (*page).size_used != 0 + || ((ret as usize) - (*page).messages.as_mut_ptr() as usize) != (*page).size_used + { + panic!(format!("Allocated new message without calling send() inbetween. ret: {:?}, page: {:?}, complete_msg_size: {:?}, size_used: {:?}, last_msg: {:?}", ret, page, + buf_len_padded, (*page).size_used, last_msg)); + } + (*page).size_used = (*page).size_used + complete_msg_size; + (*ret).buf_len_padded = buf_len_padded as u64; + (*ret).buf_len = buf_len as u64; + /* DBG("Returning new message at %p with len %ld, TAG was %x", ret, ret->buf_len_padded, ret->tag); */ + /* Maybe catch some bugs... */ + (*_llmp_next_msg_ptr(ret)).tag = LLMP_TAG_UNSET; + (*ret).tag = LLMP_TAG_UNINITIALIZED; + Some(ret) + } + + /// Commit the message last allocated by alloc_next to the queue. + /// After commiting, the msg shall no longer be altered! + /// It will be read by the consuming threads (broker->clients or client->broker) + unsafe fn send(&mut self, msg: *mut LlmpMsg) -> Result<(), AflError> { + if self.last_msg_sent == msg { + panic!("Message sent twice!"); + } + if (*msg).tag == LLMP_TAG_UNSET { + panic!(format!( + "No tag set on message with id {}", + (*msg).message_id + )); + } + let page = self.out_maps.last().unwrap().page(); + if msg.is_null() || !llmp_msg_in_page(page, msg) { + return Err(AflError::Unknown(format!( + "Llmp Message {:?} is null or not in current page", + msg + ))); + } + (*msg).message_id = (*page).current_msg_id + 1; + compiler_fence(Ordering::SeqCst); + ptr::write_volatile(&mut (*page).current_msg_id, (*msg).message_id); + compiler_fence(Ordering::SeqCst); + self.last_msg_sent = msg; + Ok(()) + } + + /// listener about it using a EOP message. + unsafe fn handle_out_eop(&mut self) -> Result<(), AflError> { + let old_map = self.out_maps.last_mut().unwrap().page(); + + // Create a new shard page. + let new_map_shmem = LlmpSharedMap::new((*old_map).sender, (*old_map).max_alloc_size)?; + let mut new_map = new_map_shmem.page(); + + ptr::write_volatile(&mut (*new_map).current_msg_id, (*old_map).current_msg_id); + (*new_map).max_alloc_size = (*old_map).max_alloc_size; + /* On the old map, place a last message linking to the new map for the clients + * to consume */ + let mut out: *mut LlmpMsg = self.alloc_eop(); + (*out).sender = (*old_map).sender; + + let mut end_of_page_msg = (*out).buf.as_mut_ptr() as *mut LlmpPayloadSharedMapInfo; + (*end_of_page_msg).map_size = new_map_shmem.shmem.map_size; + (*end_of_page_msg).shm_str = new_map_shmem.shmem.shm_str; + + // We never sent a msg on the new buf */ + self.last_msg_sent = 0 as *mut LlmpMsg; + + /* Send the last msg on the old buf */ + self.send(out)?; + + if !self.keep_pages_forever { + self.prune_old_pages(); + } + + self.out_maps.push(new_map_shmem); + + Ok(()) + } + + /// Allocates the next space on this sender page + pub unsafe fn alloc_next(&mut self, buf_len: usize) -> Result<*mut LlmpMsg, AflError> { + match self.alloc_next_if_space(buf_len) { + Some(msg) => return Ok(msg), + _ => (), + }; + + /* no more space left! We'll have to start a new page */ + self.handle_out_eop()?; + + match self.alloc_next_if_space(buf_len) { + Some(msg) => Ok(msg), + None => Err(AflError::Unknown(format!( + "Error allocating {} bytes in shmap", + buf_len + ))), + } + } + + /// Cancel send of the next message, this allows us to allocate a new message without sending this one. + pub unsafe fn cancel_send(&mut self, msg: *mut LlmpMsg) { + /* DBG("Client %d cancels send of msg at %p with tag 0x%X and size %ld", client->id, msg, msg->tag, + * msg->buf_len_padded); */ + let page = self.out_maps.last().unwrap().page(); + (*msg).tag = LLMP_TAG_UNSET; + (*page).size_used -= (*msg).buf_len_padded as usize + size_of::(); + } + + /// Allocates a message of the given size, tags it, and sends it off. + pub fn send_buf(&mut self, tag: u32, buf: &[u8]) -> Result<(), AflError> { + // Make sure we don't reuse already allocated tags + if tag == LLMP_TAG_NEW_SHM_CLIENT + || tag == LLMP_TAG_END_OF_PAGE + || tag == LLMP_TAG_UNINITIALIZED + || tag == LLMP_TAG_UNSET + { + return Err(AflError::Unknown(format!( + "Reserved tag supplied to send_buf ({:#X})", + tag + ))); + } + unsafe { + let msg = self.alloc_next(buf.len())?; + (*msg).tag = tag; + buf.as_ptr() + .copy_to_nonoverlapping((*msg).buf.as_mut_ptr(), buf.len()); + self.send(msg) + } + } +} + +/// Receiving end of an llmp channel +impl LlmpReceiver { + /// Read next message. + unsafe fn recv(&mut self) -> Result, AflError> { + /* DBG("recv %p %p\n", page, last_msg); */ + compiler_fence(Ordering::SeqCst); + let page = self.current_recv_map.page(); + let last_msg = self.last_msg_recvd; + let current_msg_id = ptr::read_volatile(&mut (*page).current_msg_id); + + // Read the message from the page + let ret = if current_msg_id == 0 { + /* No messages yet */ + None + } else if last_msg.is_null() { + /* We never read a message from this queue. Return first. */ + Some((*page).messages.as_mut_ptr()) + } else if (*last_msg).message_id == current_msg_id { + /* Oops! No new message! */ + None + } else { + Some(_llmp_next_msg_ptr(last_msg)) + }; + + // Let's see what we go here. + match ret { + Some(msg) => { + // Handle special, LLMP internal, messages. + match (*msg).tag { + LLMP_TAG_UNSET => panic!("BUG: Read unallocated msg"), + LLMP_TAG_END_OF_PAGE => { + dbg!("Got end of page, allocing next"); + // Handle end of page + if (*msg).buf_len < size_of::() as u64 { + panic!(format!( + "Illegal message length for EOP (is {}, expected {})", + (*msg).buf_len_padded, + size_of::() + )); + } + let pageinfo = (*msg).buf.as_mut_ptr() as *mut LlmpPayloadSharedMapInfo; + + /* We can reuse the map mem space, no need to free and calloc. + However, the pageinfo points to the map we're about to unmap. + Clone the contents first to be safe (probably fine in rust eitner way). */ + let pageinfo_cpy = (*pageinfo).clone(); + + ptr::write_volatile(&mut (*page).save_to_unmap, 1); + self.current_recv_map = LlmpSharedMap::from_name_slice( + &pageinfo_cpy.shm_str, + pageinfo_cpy.map_size, + )?; + dbg!("Got a new recv map", self.current_recv_map.shmem.shm_str); + // After we mapped the new page, return the next message, if available + return self.recv(); + } + _ => (), + } + + // Store the last msg for next time + self.last_msg_recvd = msg; + } + _ => (), + }; + Ok(ret) + } + + /// Blocks/spins until the next message gets posted to the page, + /// then returns that message. + pub unsafe fn recv_blocking(&mut self) -> Result<*mut LlmpMsg, AflError> { + let mut current_msg_id = 0; + let page = self.current_recv_map.page(); + let last_msg = self.last_msg_recvd; + if !last_msg.is_null() { + if (*last_msg).tag == LLMP_TAG_END_OF_PAGE && !llmp_msg_in_page(page, last_msg) { + panic!("BUG: full page passed to await_message_blocking or reset failed"); + } + current_msg_id = (*last_msg).message_id + } + loop { + compiler_fence(Ordering::SeqCst); + if ptr::read_volatile(&mut (*page).current_msg_id) != current_msg_id { + return match self.recv()? { + Some(msg) => Ok(msg), + None => panic!("BUG: blocking llmp message should never be NULL"), + }; + } + } + } + + /// Returns the next message, tag, buf, if avaliable, else None + pub fn recv_buf(&mut self) -> Result, AflError> { + unsafe { + Ok(match self.recv()? { + Some(msg) => Some(((*msg).tag, (*msg).as_slice())), + None => None, + }) + } + } + + /// Returns the next message, tag, buf, looping until it becomes available + pub fn recv_buf_blocking(&mut self) -> Result<(u32, &[u8]), AflError> { + unsafe { + let msg = self.recv_blocking()?; + Ok(((*msg).tag, (*msg).as_slice())) } ret = _llmp_next_msg_ptr(last_msg); (*ret).message_id = (*last_msg).message_id.wrapping_add(1 as c_uint) @@ -529,36 +665,51 @@ unsafe fn llmp_send(page: *mut LlmpPage, msg: *mut LlmpMsg) -> Result<(), AflErr (*msg).message_id as c_ulong, ); - compiler_fence(Ordering::SeqCst); - return Ok(()); +/// The page struct, placed on a shared mem instance. +impl LlmpSharedMap { + /// Creates a new page with minimum prev_max_alloc_size or LLMP_PREF_INITIAL_MAP_SIZE + /// returning the initialized shared mem struct + pub fn new(sender: u32, min_size: usize) -> Result { + // Create a new shard page. + let mut shmem = AflShmem::new(new_map_size(min_size))?; + unsafe { + _llmp_page_init(&mut shmem, sender); + } + Ok(Self { shmem }) + } + + /// Initialize from a shm_str with fixed len of 20 + pub fn from_name_slice(shm_str: &[u8; 20], map_size: usize) -> Result { + let shmem = AflShmem::from_name_slice(shm_str, map_size)?; + // Not initializing the page here - the other side should have done it already! + Ok(Self { shmem }) + } + + /// Get the unsafe ptr to this page, situated on the shared map + pub unsafe fn page(&self) -> *mut LlmpPage { + shmem2page(&self.shmem) + } } -#[inline] -unsafe fn _llmp_broker_current_broadcast_map(broker_state: *mut LlmpBroker) -> *mut AflShmem { - return &mut *(*broker_state).broadcast_maps.offset( - (*broker_state) - .broadcast_map_count - .wrapping_sub(1 as c_ulong) as isize, - ) as *mut AflShmem; -} -/* create a new shard page. Size_requested will be the min size, you may get a - * larger map. Retruns NULL on error. */ -unsafe fn llmp_new_page_shmem( - uninited_shmem: *mut AflShmem, - sender: c_ulong, - size_requested: c_ulong, -) -> *mut LlmpPage { - let size: c_ulong = next_pow2({ - let mut _a: c_ulong = size_requested.wrapping_add(40 as c_ulong); - let mut _b: c_ulong = ((1 as c_int) << 28 as c_int) as c_ulong; - if _a > _b { - _a - } else { - _b - } - }); - if afl_shmem_init(uninited_shmem, size).is_null() { - return 0 as *mut LlmpPage; +/// The broker forwards all messages to its own bus-like broadcast map. +/// It may intercept messages passing through. +impl LlmpBroker { + /// Create and initialize a new llmp_broker + pub fn new() -> Result { + let broker = LlmpBroker { + llmp_out: LlmpSender { + id: 0, + last_msg_sent: ptr::null_mut(), + out_maps: vec![LlmpSharedMap::new(0, 0)?], + // Broker never cleans up the pages so that new + // clients may join at any time + keep_pages_forever: true, + }, + msg_hooks: vec![], + llmp_clients: vec![], + }; + + Ok(broker) } _llmp_page_init(shmem2page(uninited_shmem), sender as u32, size_requested); return shmem2page(uninited_shmem); @@ -664,815 +815,297 @@ pub unsafe fn llmp_broker_alloc_next(broker: *mut LlmpBroker, len: c_ulong) -> * return out; } -impl LlmpBroker { - /// Create and initialize a new llmp_broker - pub unsafe fn new() -> Result { - let mut broker = LlmpBroker { - last_msg_sent: ptr::null_mut(), - broadcast_map_count: 0, - broadcast_maps: ptr::null_mut(), - msg_hook_count: 0, - msg_hooks: ptr::null_mut(), - llmp_client_count: 0, - llmp_clients: ptr::null_mut(), - }; - llmp_broker_init(&mut broker)?; - Ok(broker) + /// Allocate the next message on the outgoing map + unsafe fn alloc_next(&mut self, buf_len: usize) -> Result<*mut LlmpMsg, AflError> { + self.llmp_out.alloc_next(buf_len) } /// Registers a new client for the given sharedmap str and size. - /// Be careful: Intenral realloc may change the location of the client map - unsafe fn register_client( - &mut self, - shm_str: &CStr, - map_size: c_ulong, - ) -> *mut LlmpBrokerClientMetadata { - /* make space for a new client and calculate its id */ - self.llmp_clients = afl_realloc( - self.llmp_clients as *mut c_void, - self.llmp_client_count - .wrapping_add(1 as c_ulong) - .wrapping_mul(::std::mem::size_of::() as c_ulong), - ) as *mut LlmpBrokerClientMetadata; - if self.llmp_clients.is_null() { - return 0 as *mut LlmpBrokerClientMetadata; - } - let mut client: *mut LlmpBrokerClientMetadata = - self.llmp_clients.offset(self.llmp_client_count as isize) - as *mut LlmpBrokerClientMetadata; - memset( - client as *mut c_void, - 0 as c_int, - ::std::mem::size_of::() as c_ulong, - ); - (*client).client_state = - calloc(1 as c_ulong, ::std::mem::size_of::() as c_ulong) as *mut LlmpClient; - if (*client).client_state.is_null() { - return 0 as *mut LlmpBrokerClientMetadata; - } - (*(*client).client_state).id = (*self).llmp_client_count as u32; - (*client).cur_client_map = - calloc(1 as c_ulong, ::std::mem::size_of::() as c_ulong) as *mut AflShmem; - if (*client).cur_client_map.is_null() { - return 0 as *mut LlmpBrokerClientMetadata; - } - if afl_shmem_by_str((*client).cur_client_map, shm_str, map_size).is_null() { - return 0 as *mut LlmpBrokerClientMetadata; - } - self.llmp_client_count = self.llmp_client_count.wrapping_add(1); - // TODO: Add client map - return client; + /// Returns the id of the new client in broker.client_map + pub fn register_client(&mut self, client_page: LlmpSharedMap) { + let id = self.llmp_clients.len() as u32; + self.llmp_clients.push(LlmpReceiver { + id, + current_recv_map: client_page, + last_msg_recvd: 0 as *mut LlmpMsg, + }); } /// Adds a hook that gets called in the broker for each new message the broker touches. /// if the callback returns false, the message is not forwarded to the clients. */ - pub unsafe fn add_message_hook(&mut self, hook: LlmpMsgHookFn, data: *mut c_void) -> AflRet { - return llmp_add_hook_generic( - &mut self.msg_hooks, - &mut self.msg_hook_count, - ::std::mem::transmute::, *mut c_void>(Some(hook)), - data, - ); + pub fn add_message_hook(&mut self, hook: LlmpMsgHookFn) { + self.msg_hooks.push(hook); + } + + /// For internal use: Forward the current message to the out map. + unsafe fn forward_msg(&mut self, msg: *mut LlmpMsg) -> Result<(), AflError> { + let mut out: *mut LlmpMsg = self.alloc_next((*msg).buf_len_padded as usize)?; + + /* Copy over the whole message. + If we should need zero copy, we could instead post a link to the + original msg with the map_id and offset. */ + let actual_size = (*out).buf_len_padded; + msg.copy_to_nonoverlapping(out, size_of::() + (*msg).buf_len_padded as usize); + (*out).buf_len_padded = actual_size; + /* We need to replace the message ID with our own */ + match self.llmp_out.send(out) { + Err(e) => panic!(format!("Error sending msg: {:?}", e)), + _ => (), + }; + self.llmp_out.last_msg_sent = out; + Ok(()) } /// broker broadcast to its own page for all others to read */ - #[inline] - unsafe fn handle_new_msgs(&mut self, mut client: *mut LlmpBrokerClientMetadata) { + unsafe fn handle_new_msgs(&mut self, client_id: u32) -> Result<(), AflError> { + let mut next_id = self.llmp_clients.len() as u32; + // TODO: We could memcpy a range of pending messages, instead of one by one. - /* DBG("llmp_broker_handle_new_msgs %p %p->%u\n", broker, client, client->client_state->id); */ - let incoming: *mut LlmpPage = shmem2page((*client).cur_client_map); - let mut current_message_id: u32 = if !(*client).last_msg_broker_read.is_null() { - (*(*client).last_msg_broker_read).message_id - } else { - 0 as c_uint - }; - while current_message_id as c_ulong != (*incoming).current_msg_id { - let msg: *mut LlmpMsg = llmp_recv(incoming, (*client).last_msg_broker_read); - if msg.is_null() { - panic!("No message received but not all message ids receved! Data out of sync?"); - } - if (*msg).tag == 0xaf1e0f1 as c_uint { - let pageinfo: *mut LlmpPayloadNewPage = { - let mut _msg: *mut LlmpMsg = msg; - (if (*_msg).buf_len >= ::std::mem::size_of::() as c_ulong { - (*_msg).buf.as_mut_ptr() - } else { - 0 as *mut u8 - }) as *mut LlmpPayloadNewPage - }; - if pageinfo.is_null() { - panic!(format!( - "Illegal message length for EOP (is {}, expected {})", - (*msg).buf_len_padded, - ::std::mem::size_of::() as c_ulong - )); + loop { + let msg = { + let client = &mut self.llmp_clients[client_id as usize]; + match client.recv()? { + None => { + // We're done handling this client + return Ok(()); + } + Some(msg) => msg, } - /* We can reuse the map mem space, no need to free and calloc. - However, the pageinfo points to the map we're about to unmap. - Copy the contents first. */ - let mut pageinfo_cpy: LlmpPayloadNewPage = LlmpPayloadNewPage { - map_size: 0, - shm_str: [0; 20], - }; - memcpy( - &mut pageinfo_cpy as *mut LlmpPayloadNewPage as *mut c_void, - pageinfo as *const c_void, - ::std::mem::size_of::() as c_ulong, - ); - let client_map: *mut AflShmem = (*client).cur_client_map; - ::std::ptr::write_volatile( - &mut (*shmem2page(client_map)).save_to_unmap as *mut u16, - 1 as u16, - ); - afl_shmem_deinit(client_map); - if afl_shmem_by_str( - client_map, - CStr::from_bytes_with_nul(&(*pageinfo).shm_str).expect("Illegal shm_str"), - (*pageinfo).map_size, - ) - .is_null() - { - panic!(format!( - "Could not get shmem by str for map {:?} of size {:?}", - (*pageinfo).shm_str.as_mut_ptr(), - (*pageinfo).map_size - )); - } - } else if (*msg).tag == 0xc11e471 as c_uint { + }; + if (*msg).tag == LLMP_TAG_NEW_SHM_CLIENT { /* This client informs us about yet another new client add it to the list! Also, no need to forward this msg. */ - let pageinfo: *mut LlmpPayloadNewPage = { - let mut _msg: *mut LlmpMsg = msg; - (if (*_msg).buf_len >= ::std::mem::size_of::() as c_ulong { - (*_msg).buf.as_mut_ptr() - } else { - 0 as *mut u8 - }) as *mut LlmpPayloadNewPage - }; - if pageinfo.is_null() { - println!("Ignoring broken CLIENT_ADDED msg due to incorrect size. Expected {:?} but got {:?}", - ::std::mem::size_of::() as - c_ulong, (*msg).buf_len_padded); - } - /* register_client may realloc the clients, we need to find ours again */ - let client_id: u32 = (*(*client).client_state).id; - if self - .register_client( - CStr::from_bytes_with_nul(&(*pageinfo).shm_str).expect("Illegal shm_str"), - (*pageinfo).map_size, - ) - .is_null() - { - panic!(format!( - "Could not register clientprocess with shm_str {:?}", - (*pageinfo).shm_str.as_mut_ptr() - )); - } - (*client).client_type = LLMP_CLIENT_TYPE_FOREIGN_PROCESS; - /* find client again */ - client = - self.llmp_clients.offset(client_id as isize) as *mut LlmpBrokerClientMetadata - } else { - let mut forward_msg: bool = 1 as c_int != 0; - let mut i: c_ulong = 0; - while i < self.msg_hook_count { - let msg_hook: *mut LlmpHookdataGeneric = - self.msg_hooks.offset(i as isize) as *mut LlmpHookdataGeneric; - forward_msg = forward_msg as c_int != 0 - && ::std::mem::transmute::<*mut c_void, Option>( - (*msg_hook).func, - ) - .expect("non-null function pointer")( - self, client, msg, (*msg_hook).data - ) as c_int - != 0; - if !llmp_msg_in_page(shmem2page((*client).cur_client_map), msg) { - /* Special handling in case the client got exchanged inside the message_hook, for example after a crash. */ - return; - } - i = i.wrapping_add(1) - } - if forward_msg { - let mut out: *mut LlmpMsg = llmp_broker_alloc_next(self, (*msg).buf_len_padded); - if out.is_null() { - panic!(format!( - "Error allocating {} bytes in shmap {:?}", - (*msg).buf_len_padded, - (*_llmp_broker_current_broadcast_map(self)) - .shm_str - .as_mut_ptr(), - )); - } - /* Copy over the whole message. - If we should need zero copy, we could instead post a link to the - original msg with the map_id and offset. */ - let actual_size: c_ulong = (*out).buf_len_padded; - memcpy( - out as *mut c_void, - msg as *const c_void, - (::std::mem::size_of::() as c_ulong) - .wrapping_add((*msg).buf_len_padded), + if (*msg).buf_len < size_of::() as u64 { + println!("Ignoring broken CLIENT_ADDED msg due to incorrect size. Expected {} but got {}", + (*msg).buf_len_padded, + size_of::() ); - (*out).buf_len_padded = actual_size; - /* We need to replace the message ID with our own */ - let out_page: *mut LlmpPage = - shmem2page(_llmp_broker_current_broadcast_map(self)); - (*out).message_id = - (*out_page).current_msg_id.wrapping_add(1 as c_ulong) as u32; - match llmp_send(out_page, out) { - Err(e) => panic!(format!("Error sending msg: {:?}", e)), - _ => (), + } else { + let pageinfo = (*msg).buf.as_mut_ptr() as *mut LlmpPayloadSharedMapInfo; + + match LlmpSharedMap::from_name_slice(&(*pageinfo).shm_str, (*pageinfo).map_size) + { + Ok(new_page) => { + let id = next_id; + next_id += 1; + self.llmp_clients.push(LlmpReceiver { + id, + current_recv_map: new_page, + last_msg_recvd: 0 as *mut LlmpMsg, + }); + } + Err(e) => println!("Error adding client! {:?}", e), }; - self.last_msg_sent = out + } + } else { + // The message is not specifically for use. Let the user handle it, then forward it to the clients, if necessary. + let mut should_forward_msg = true; + for hook in &self.msg_hooks { + match (hook)(client_id, msg) { + LlmpMsgHookResult::Handled => should_forward_msg = false, + _ => (), + } + } + if should_forward_msg { + self.forward_msg(msg)?; } } - (*client).last_msg_broker_read = msg; - current_message_id = (*msg).message_id } } /// The broker walks all pages and looks for changes, then broadcasts them on /// its own shared page, once. - pub unsafe fn once(&mut self) { + pub fn once(&mut self) -> Result<(), AflError> { compiler_fence(Ordering::SeqCst); - let mut i: u32 = 0; - while (i as c_ulong) < self.llmp_client_count { - let client: *mut LlmpBrokerClientMetadata = - self.llmp_clients.offset(i as isize) as *mut LlmpBrokerClientMetadata; - self.handle_new_msgs(client); - i = i.wrapping_add(1) - } - } - - /// Loops infinitely, forwarding and handling all incoming messages from clients. - /// Never returns. - pub unsafe fn broker_loop(&mut self) -> ! { - loop { - compiler_fence(Ordering::SeqCst); - self.once(); - /* 5 milis of sleep for now to not busywait at 100% */ - usleep((5 as c_int * 1000 as c_int) as c_uint); - } - } - - /// launch a specific client. This function doesn't need to be called externally - all registered clients will get launched at broker_run - unsafe fn launch_client(&mut self, mut clientdata: *mut LlmpBrokerClientMetadata) -> bool { - if clientdata < self.llmp_clients - || clientdata - > self - .llmp_clients - .offset(self.llmp_client_count.wrapping_sub(1 as c_ulong) as isize) - as *mut LlmpBrokerClientMetadata - { - println!( - "[!] WARNING: Illegal client specified at ptr {:?} (instead of {:?} to {:?})", - clientdata, - self.llmp_clients, - self.llmp_clients - .offset(self.llmp_client_count.wrapping_sub(1 as c_ulong) as isize,) - as *mut LlmpBrokerClientMetadata, - ); - return 0 as c_int != 0; - } - if (*clientdata).client_type as c_uint == LLMP_CLIENT_TYPE_CHILD_PROCESS as c_int as c_uint - { - if (*clientdata).pid != 0 { - println!("[!] WARNING: Tried to relaunch already running client. Set ->pid to 0 if this is what you want."); - return 0 as c_int != 0; + for i in 0..self.llmp_clients.len() { + unsafe { + self.handle_new_msgs(i as u32)?; } - let child_id: c_int = fork(); - if child_id < 0 as c_int { - println!("[!] WARNING: Could not fork"); - return 0 as c_int != 0; - } else { - if child_id == 0 as c_int { - /* child */ - _llmp_client_wrapped_loop(clientdata as *mut c_void); - } - } - /* parent */ - (*clientdata).pid = child_id; - return 1 as c_int != 0; - } else { - println!("[!] WARNING: Tried to spawn llmp child with unknown thread type."); - return 0 as c_int != 0; - } - //return 1 as c_int != 0; - } - - /// Launches all clientloops registered with this broker - pub unsafe fn launch_clientloops(&mut self) -> Result<(), AflError> { - let mut i: c_ulong = 0; - while i < self.llmp_client_count { - if (*self.llmp_clients.offset(i as isize)).client_type as c_uint - == LLMP_CLIENT_TYPE_CHILD_PROCESS as c_uint - { - if !self.launch_client(self.llmp_clients.offset(i as isize)) { - println!("[!] WARNING: Could not launch all clients"); - return Err(AflError::Unknown("Failed to launch clients".into())); - } - } - i = i.wrapping_add(1) } Ok(()) } - /// Start all threads and the main broker. - /// Same as llmp_broker_launch_threaded clients(); - /// Never returns. - pub unsafe fn run(&mut self) -> ! { - self.launch_clientloops().expect("Failed to launch clients"); - self.broker_loop(); + /// Loops infinitely, forwarding and handling all incoming messages from clients. + /// Never returns. Panics on error. + /// 5 millis of sleep can't hurt to keep busywait not at 100% + pub fn loop_forever(&mut self, sleep_time: Option) -> ! { + loop { + compiler_fence(Ordering::SeqCst); + self.once() + .expect("An error occurred when brokering. Exiting."); + match sleep_time { + Some(time) => thread::sleep(time), + None => (), + } + } } - /// Register a new forked/child client. - /// Client thread will be called with llmp_client client, containing - /// the data in ->data. This will register a client to be spawned up as soon as - /// broker_loop() starts. Clients can also be added later via - /// llmp_broker_register_remote(..) or the local_tcp_client - /// TODO: TCP remote client not yet supported in rust llmp - pub unsafe fn register_childprocess_clientloop( - &mut self, - clientloop: LlmpClientloopFn, - data: *mut c_void, - ) -> Result<(), AflError> { - let mut client_map: AflShmem = { - let init = AflShmem { - shm_str: [0; 20], - shm_id: 0, - map: 0 as *mut u8, - map_size: 0, + pub fn launch_tcp_listener(&mut self, port: u16) -> Result, AflError> { + // Later in the execution, after the initial map filled up, + // the current broacast map will will point to a different map. + // However, the original map is (as of now) never freed, new clients will start + // to read from the initial map id. + + let listener = TcpListener::bind(format!("127.0.0.1:{}", port))?; + // accept connections and process them, spawning a new thread for each one + println!("Server listening on port {}", port); + + let client_out_map_mem = &self.llmp_out.out_maps.first().unwrap().shmem; + let broadcast_str_initial = client_out_map_mem.shm_str.clone(); + + let llmp_tcp_id = self.llmp_clients.len() as u32; + + // Tcp out map sends messages from background thread tcp server to foreground client + let tcp_out_map = LlmpSharedMap::new(llmp_tcp_id, LLMP_PREF_INITIAL_MAP_SIZE)?; + let tcp_out_map_str = tcp_out_map.shmem.shm_str; + let tcp_out_map_size = tcp_out_map.shmem.map_size; + self.register_client(tcp_out_map); + + Ok(thread::spawn(move || { + let mut new_client_sender = LlmpSender { + id: 0, + last_msg_sent: 0 as *mut LlmpMsg, + out_maps: vec![ + LlmpSharedMap::from_name_slice(&tcp_out_map_str, tcp_out_map_size).unwrap(), + ], + // drop pages to the broker if it already read them + keep_pages_forever: false, }; - init - }; - if llmp_new_page_shmem( - &mut client_map, - self.llmp_client_count, - ((1 as c_int) << 28 as c_int) as c_ulong, - ) - .is_null() - { - return Err(AflError::Unknown("Alloc".into())); - } - let mut client: *mut LlmpBrokerClientMetadata = self.register_client( - CStr::from_ptr(&client_map.shm_str as *const u8 as *const c_char), - client_map.map_size, - ); - if client.is_null() { - afl_shmem_deinit(&mut client_map); - return Err(AflError::Unknown("Something in clients failed".into())); - } - (*client).clientloop = Some(clientloop); - (*client).data = data; - (*client).client_type = LLMP_CLIENT_TYPE_CHILD_PROCESS; - /* Copy the already allocated shmem to the client state */ - (*(*client).client_state).out_maps = afl_realloc( - (*(*client).client_state).out_maps as *mut c_void, - ::std::mem::size_of::() as c_ulong, - ) as *mut AflShmem; - if (*(*client).client_state).out_maps.is_null() { - afl_shmem_deinit(&mut client_map); - afl_shmem_deinit((*client).cur_client_map); - /* "Unregister" by subtracting the client from count */ - self.llmp_client_count = self.llmp_client_count.wrapping_sub(1); - return Err(AflError::Unknown("Something in clients failed".into())); - } - memcpy( - (*(*client).client_state).out_maps as *mut c_void, - &mut client_map as *mut AflShmem as *const c_void, - ::std::mem::size_of::() as c_ulong, - ); - (*(*client).client_state).out_map_count = 1 as c_ulong; - /* Each client starts with the very first map. - They should then iterate through all maps once and work on all old messages. - */ - (*(*client).client_state).current_broadcast_map = - self.broadcast_maps.offset(0 as isize) as *mut AflShmem; - (*(*client).client_state).out_map_count = 1 as c_ulong; - return Ok(()); - } -} -/// A new page will be used. Notify each registered hook in the client about this fact. -unsafe fn llmp_clientrigger_new_out_page_hooks(client: *mut LlmpClient) { - let mut i: c_ulong = 0; - while i < (*client).new_out_page_hook_count { - ::std::mem::transmute::<*mut c_void, Option>( - (*(*client).new_out_page_hooks.offset(i as isize)).func, - ) - .expect("non-null function pointer")( - client, - shmem2page( - &mut *(*client) - .out_maps - .offset((*client).out_map_count.wrapping_sub(1 as c_ulong) as isize), - ), - (*(*client).new_out_page_hooks.offset(i as isize)).data, - ); - i = i.wrapping_add(1) - } -} - -/// A wrapper around unpacking the data, calling through to the loop -unsafe fn _llmp_client_wrapped_loop(llmp_client_broker_metadata_ptr: *mut c_void) -> ! { - let metadata: *mut LlmpBrokerClientMetadata = - llmp_client_broker_metadata_ptr as *mut LlmpBrokerClientMetadata; - /* Before doing anything else:, notify registered hooks about the new page we're about to use */ - llmp_clientrigger_new_out_page_hooks((*metadata).client_state); - - (*metadata).clientloop.expect("non-null function pointer")( - (*metadata).client_state, - (*metadata).data, - ); -} - -/// For non zero-copy, we want to get rid of old pages with duplicate messages -/// eventually. This function This funtion sees if we can unallocate older pages. -/// The broker would have informed us by setting the save_to_unmap-flag. -unsafe fn llmp_client_prune_old_pages(client: *mut LlmpClient) { - let current_map: *mut u8 = (*(*client) - .out_maps - .offset((*client).out_map_count.wrapping_sub(1 as c_ulong) as isize)) - .map; - /* look for pages that are save_to_unmap, then unmap them. */ - while (*(*client).out_maps.offset(0 as isize)).map != current_map - && (*shmem2page(&mut *(*client).out_maps.offset(0 as isize))).save_to_unmap as c_int != 0 - { - /* This page is save to unmap. The broker already reads or read it. */ - afl_shmem_deinit(&mut *(*client).out_maps.offset(0 as isize)); - /* We remove at the start, move the other pages back. */ - memmove( - (*client).out_maps as *mut c_void, - (*client).out_maps.offset(1 as isize) as *const c_void, - (*client) - .out_map_count - .wrapping_sub(1 as c_ulong) - .wrapping_mul(::std::mem::size_of::() as c_ulong), - ); - (*client).out_map_count = (*client).out_map_count.wrapping_sub(1) - } -} - -/// We don't have any space. Send eop, then continue on a new page. -unsafe fn llmp_client_handle_out_eop(client: *mut LlmpClient) -> bool { - (*client).out_maps = llmp_handle_out_eop( - (*client).out_maps, - &mut (*client).out_map_count, - &mut (*client).last_msg_sent, - ); - if (*client).out_maps.is_null() { - return 0 as c_int != 0; - } - /* Prune old pages! - This is a good time to see if we can unallocate older pages. - The broker would have informed us by setting the flag - */ - llmp_client_prune_old_pages(client); - /* So we got a new page. Inform potential hooks */ - llmp_clientrigger_new_out_page_hooks(client); - return 1 as c_int != 0; -} - -/// A client receives a broadcast message. -/// Returns null if no message is availiable -pub unsafe fn llmp_client_recv(client: *mut LlmpClient) -> *mut LlmpMsg { - loop { - let msg = llmp_recv( - shmem2page((*client).current_broadcast_map), - (*client).last_msg_recvd, - ); - if msg.is_null() { - return 0 as *mut LlmpMsg; - } - (*client).last_msg_recvd = msg; - if (*msg).tag == 0xdeadaf as c_uint { - panic!("BUG: Read unallocated msg"); - } else { - if (*msg).tag == 0xaf1e0f1 as c_uint { - /* we reached the end of the current page. - We'll init a new page but can reuse the mem are of the current map. - However, we cannot use the message if we deinit its page, so let's copy */ - let mut pageinfo_cpy: LlmpPayloadNewPage = LlmpPayloadNewPage { - map_size: 0, - shm_str: [0; 20], + loop { + let (mut stream, addr) = match listener.accept() { + Ok(res) => res, + Err(e) => { + dbg!("Ignoring failed accept", e); + continue; + } }; - let broadcast_map: *mut AflShmem = (*client).current_broadcast_map; - let pageinfo: *mut LlmpPayloadNewPage = { - let mut _msg: *mut LlmpMsg = msg; - (if (*_msg).buf_len >= ::std::mem::size_of::() as c_ulong { - (*_msg).buf.as_mut_ptr() - } else { - 0 as *mut u8 - }) as *mut LlmpPayloadNewPage + dbg!("New connection", addr, stream.peer_addr().unwrap()); + match stream.write(&broadcast_str_initial) { + Ok(_) => {} // fire & forget + Err(e) => { + dbg!("Could not send to shmap to client", e); + continue; + } }; - if pageinfo.is_null() { - panic!(format!( - "Illegal message length for EOP (is {}, expected {})", - (*msg).buf_len_padded, - ::std::mem::size_of::() as c_ulong - )); + let mut new_client_map_str: [u8; 20] = Default::default(); + match stream.read_exact(&mut new_client_map_str) { + Ok(()) => (), + Err(e) => { + dbg!("Ignoring failed read from client", e); + continue; + } + }; + + unsafe { + let msg = new_client_sender + .alloc_next(size_of::()) + .expect("Could not allocate a new message in shared map."); + (*msg).tag = LLMP_TAG_NEW_SHM_CLIENT; + let pageinfo = (*msg).buf.as_mut_ptr() as *mut LlmpPayloadSharedMapInfo; + (*pageinfo).shm_str = new_client_map_str; + (*pageinfo).map_size = LLMP_PREF_INITIAL_MAP_SIZE; + match new_client_sender.send(msg) { + Ok(()) => (), + Err(e) => println!("Error forwarding client on map: {:?}", e), + }; } - memcpy( - &mut pageinfo_cpy as *mut LlmpPayloadNewPage as *mut c_void, - pageinfo as *const c_void, - ::std::mem::size_of::() as c_ulong, - ); - /* Never read by broker broker: shmem2page(map)->save_to_unmap = true; */ - afl_shmem_deinit(broadcast_map); - if afl_shmem_by_str( - (*client).current_broadcast_map, - CStr::from_bytes_with_nul(&(*pageinfo).shm_str).expect("Illegal shm_str"), - (*pageinfo).map_size, - ) - .is_null() - { - panic!(format!( - "Could not get shmem by str for map {:?} of size {}", - (*pageinfo).shm_str.as_mut_ptr(), - (*pageinfo).map_size - )); - } - } else { - return msg; } + })) + } +} + +/// `n` clients connect to a broker. They share an outgoing map with the broker, +/// and get incoming messages from the shared broker bus +impl LlmpClient { + /// Creates a new LlmpClient + pub fn new(initial_broker_map: LlmpSharedMap) -> Result { + Ok(Self { + llmp_out: LlmpSender { + id: 0, + last_msg_sent: 0 as *mut LlmpMsg, + out_maps: vec![LlmpSharedMap::new(0, LLMP_PREF_INITIAL_MAP_SIZE)?], + // drop pages to the broker if it already read them + keep_pages_forever: false, + }, + llmp_in: LlmpReceiver { + id: 0, + current_recv_map: initial_broker_map, + last_msg_recvd: 0 as *mut LlmpMsg, + }, + }) + } + + pub fn create_attach_to_tcp(port: u16) -> Result { + let mut stream = TcpStream::connect(format!("127.0.0.1:{}", port))?; + println!("Connected to port {}", port); + + let mut new_broker_map_str: [u8; 20] = Default::default(); + stream.read_exact(&mut new_broker_map_str)?; + + let ret = Self::new(LlmpSharedMap::from_name_slice( + &new_broker_map_str, + LLMP_PREF_INITIAL_MAP_SIZE, + )?)?; + + stream.write(&ret.llmp_out.out_maps.first().unwrap().shmem.shm_str)?; + Ok(ret) + } + + /// Commits a msg to the client's out map + pub unsafe fn send(&mut self, msg: *mut LlmpMsg) -> Result<(), AflError> { + self.llmp_out.send(msg) + } + + /// Allocates a message of the given size, tags it, and sends it off. + pub fn send_buf(&mut self, tag: u32, buf: &[u8]) -> Result<(), AflError> { + self.llmp_out.send_buf(tag, buf) + } + + /// Informs the broker about a new client in town, with the given map id + pub fn send_client_added_msg( + &mut self, + shm_str: &[u8; 20], + shm_id: usize, + ) -> Result<(), AflError> { + // We write this by hand to get around checks in send_buf + unsafe { + let msg = self + .alloc_next(size_of::()) + .expect("Could not allocate a new message in shared map."); + (*msg).tag = LLMP_TAG_NEW_SHM_CLIENT; + let pageinfo = (*msg).buf.as_mut_ptr() as *mut LlmpPayloadSharedMapInfo; + (*pageinfo).shm_str = *shm_str; + (*pageinfo).map_size = shm_id; + self.send(msg) } } -} -/// A client blocks/spins until the next message gets posted to the page, -/// then returns that message. -pub unsafe fn llmp_client_recv_blocking(client: *mut LlmpClient) -> *mut LlmpMsg { - let mut page: *mut LlmpPage = shmem2page((*client).current_broadcast_map); - loop { - compiler_fence(Ordering::SeqCst); - /* busy-wait for a new msg_id to show up in the page */ - if (*page).current_msg_id - != (if !(*client).last_msg_recvd.is_null() { - (*(*client).last_msg_recvd).message_id - } else { - 0 as c_uint - }) as c_ulong - { - let ret: *mut LlmpMsg = llmp_client_recv(client); - if !ret.is_null() { - return ret; - } - /* last msg will exist, even if EOP was handled internally */ - page = shmem2page((*client).current_broadcast_map) - } + /// A client receives a broadcast message. + /// Returns null if no message is availiable + pub unsafe fn recv(&mut self) -> Result, AflError> { + self.llmp_in.recv() } -} -/// The current page could have changed in recv (EOP) -/// Alloc the next message, internally handling end of page by allocating a new one. -pub unsafe fn llmp_client_alloc_next(client: *mut LlmpClient, size: usize) -> *mut LlmpMsg { - if client.is_null() { - panic!("Client is NULL"); + /// A client blocks/spins until the next message gets posted to the page, + /// then returns that message. + pub unsafe fn recv_blocking(&mut self) -> Result<*mut LlmpMsg, AflError> { + self.llmp_in.recv_blocking() } - let mut msg = llmp_alloc_next( - shmem2page( - &mut *(*client) - .out_maps - .offset((*client).out_map_count.wrapping_sub(1) as isize), - ), - (*client).last_msg_sent, - size as c_ulong, - ); - if msg.is_null() { - let last_map_count: c_ulong = (*client).out_map_count; - /* Page is full -> Tell broker and start from the beginning. - Also, pray the broker got all messaes we're overwriting. :) */ - if !llmp_client_handle_out_eop(client) { - return 0 as *mut LlmpMsg; - } - if (*client).out_map_count == last_map_count - || (*(*shmem2page( - &mut *(*client) - .out_maps - .offset((*client).out_map_count.wrapping_sub(1) as isize), - )) - .messages - .as_mut_ptr()) - .tag != 0xdeadaf as c_uint - { - panic!("Error in handle_out_eop"); - } - /* The client_out_map will have been changed by llmp_handle_out_eop. Don't - * alias. - */ - msg = llmp_alloc_next( - shmem2page( - &mut *(*client) - .out_maps - .offset((*client).out_map_count.wrapping_sub(1) as isize), - ), - 0 as *mut LlmpMsg, - size as c_ulong, - ); - if msg.is_null() { - return 0 as *mut LlmpMsg; - } - } - (*msg).sender = (*client).id; - (*msg).message_id = if !(*client).last_msg_sent.is_null() { - (*(*client).last_msg_sent).message_id.wrapping_add(1) - } else { - 1 as c_uint - }; - /* DBG("Allocated message at loc %p with buflen %ld", msg, msg->buf_len_padded); */ - return msg; -} -/// Cancel send of the next message, this allows us to allocate a new message without sending this one. -pub unsafe fn llmp_client_cancel(client: *mut LlmpClient, mut msg: *mut LlmpMsg) { - /* DBG("Client %d cancels send of msg at %p with tag 0x%X and size %ld", client->id, msg, msg->tag, - * msg->buf_len_padded); */ - let mut page: *mut LlmpPage = shmem2page( - &mut *(*client) - .out_maps - .offset((*client).out_map_count.wrapping_sub(1 as c_ulong) as isize), - ); - (*msg).tag = 0xdeadaf as c_uint; - (*page).size_used = ((*page).size_used as c_ulong).wrapping_sub( - (*msg) - .buf_len_padded - .wrapping_add(::std::mem::size_of::() as c_ulong), - ) as c_ulong; -} -/* Commits a msg to the client's out ringbuf */ -pub unsafe fn llmp_client_send( - client_state: *mut LlmpClient, - msg: *mut LlmpMsg, -) -> Result<(), AflError> { - let page: *mut LlmpPage = shmem2page( - &mut *(*client_state) - .out_maps - .offset((*client_state).out_map_count.wrapping_sub(1) as isize), - ); - llmp_send(page, msg)?; - (*client_state).last_msg_sent = msg; - Ok(()) -} + /// The current page could have changed in recv (EOP) + /// Alloc the next message, internally handling end of page by allocating a new one. + pub unsafe fn alloc_next(&mut self, buf_len: usize) -> Result<*mut LlmpMsg, AflError> { + self.llmp_out.alloc_next(buf_len) + } -/// Creates a new, unconnected, client state -pub unsafe fn llmp_client_new_unconnected() -> *mut LlmpClient { - let client_state: *mut LlmpClient = - calloc(1 as c_ulong, ::std::mem::size_of::() as c_ulong) as *mut LlmpClient; - (*client_state).current_broadcast_map = - calloc(1 as c_ulong, ::std::mem::size_of::() as c_ulong) as *mut AflShmem; - if (*client_state).current_broadcast_map.is_null() { - return 0 as *mut LlmpClient; + /// Returns the next message, tag, buf, if avaliable, else None + pub fn recv_buf(&mut self) -> Result, AflError> { + self.llmp_in.recv_buf() } - (*client_state).out_maps = afl_realloc( - (*client_state).out_maps as *mut c_void, - (1 as c_ulong).wrapping_mul(::std::mem::size_of::() as c_ulong), - ) as *mut AflShmem; - if (*client_state).out_maps.is_null() { - free((*client_state).current_broadcast_map as *mut c_void); - free(client_state as *mut c_void); - return 0 as *mut LlmpClient; - } - (*client_state).out_map_count = 1 as c_ulong; - if llmp_new_page_shmem( - &mut *(*client_state).out_maps.offset(0 as isize), - (*client_state).id as c_ulong, - ((1 as c_int) << 28 as c_int) as c_ulong, - ) - .is_null() - { - afl_free((*client_state).out_maps as *mut c_void); - free((*client_state).current_broadcast_map as *mut c_void); - free(client_state as *mut c_void); - return 0 as *mut LlmpClient; - } - (*client_state).new_out_page_hook_count = 0 as c_ulong; - (*client_state).new_out_page_hooks = 0 as *mut LlmpHookdataGeneric; - return client_state; -} -/// Destroys the given cient state -pub unsafe fn llmp_client_delete(client_state: *mut LlmpClient) { - let mut i: c_ulong = 0; - while i < (*client_state).out_map_count { - afl_shmem_deinit(&mut *(*client_state).out_maps.offset(i as isize)); - i = i.wrapping_add(1) - } - afl_free((*client_state).out_maps as *mut c_void); - (*client_state).out_maps = 0 as *mut AflShmem; - (*client_state).out_map_count = 0 as c_ulong; - afl_free((*client_state).new_out_page_hooks as *mut c_void); - (*client_state).new_out_page_hooks = 0 as *mut LlmpHookdataGeneric; - (*client_state).new_out_page_hook_count = 0 as c_ulong; - afl_shmem_deinit((*client_state).current_broadcast_map); - free((*client_state).current_broadcast_map as *mut c_void); - (*client_state).current_broadcast_map = 0 as *mut AflShmem; - free(client_state as *mut c_void); -} - -impl Drop for LlmpClient { - fn drop(&mut self) { - unsafe { llmp_client_delete(self) }; + /// Receives a buf from the broker, looping until a messages becomes avaliable + pub fn recv_buf_blocking(&mut self) -> Result<(u32, &[u8]), AflError> { + self.llmp_in.recv_buf_blocking() } } - -/// Generic function to add a hook to the mem pointed to by hooks_p, using afl_realloc on the mem area, and increasing -/// hooks_count_p -pub unsafe fn llmp_add_hook_generic( - hooks_p: *mut *mut LlmpHookdataGeneric, - hooks_count_p: *mut c_ulong, - new_hook_func: *mut c_void, - new_hook_data: *mut c_void, -) -> AflRet { - let hooks_count: c_ulong = *hooks_count_p; - let hooks: *mut LlmpHookdataGeneric = afl_realloc( - *hooks_p as *mut c_void, - hooks_count - .wrapping_add(1 as c_ulong) - .wrapping_mul(::std::mem::size_of::() as c_ulong), - ) as *mut LlmpHookdataGeneric; - if hooks.is_null() { - *hooks_p = 0 as *mut LlmpHookdataGeneric; - *hooks_count_p = 0 as c_ulong; - return AFL_RET_ALLOC; - } - let ref mut fresh9 = (*hooks.offset(hooks_count as isize)).func; - *fresh9 = new_hook_func; - let ref mut fresh10 = (*hooks.offset(hooks_count as isize)).data; - *fresh10 = new_hook_data; - *hooks_p = hooks; - *hooks_count_p = hooks_count.wrapping_add(1 as c_ulong); - return AFL_RET_SUCCESS; -} - -/// Adds a hook that gets called in the client for each new outgoing page the client creates. -pub unsafe fn llmp_client_add_new_out_page_hook( - client: *mut LlmpClient, - hook: Option, - data: *mut c_void, -) -> AflRet { - return llmp_add_hook_generic( - &mut (*client).new_out_page_hooks, - &mut (*client).new_out_page_hook_count, - ::std::mem::transmute::, *mut c_void>(hook), - data, - ); -} - -/// Clean up the broker instance -unsafe fn llmp_broker_deinit(broker: *mut LlmpBroker) { - let mut i: c_ulong; - i = 0 as c_ulong; - while i < (*broker).broadcast_map_count { - afl_shmem_deinit(&mut *(*broker).broadcast_maps.offset(i as isize)); - i = i.wrapping_add(1) - } - i = 0 as c_ulong; - while i < (*broker).llmp_client_count { - afl_shmem_deinit((*(*broker).llmp_clients.offset(i as isize)).cur_client_map); - free((*(*broker).llmp_clients.offset(i as isize)).cur_client_map as *mut c_void); - i = i.wrapping_add(1) - // TODO: Properly clean up the client - } - afl_free((*broker).broadcast_maps as *mut c_void); - (*broker).broadcast_map_count = 0 as c_ulong; - afl_free((*broker).llmp_clients as *mut c_void); - (*broker).llmp_client_count = 0 as c_ulong; -} - -impl Drop for LlmpBroker { - fn drop(&mut self) { - unsafe { llmp_broker_deinit(self) }; - } -} - -/// Allocate and set up the new broker instance. Afterwards, run with broker_run. -/// Use llmp_broker::new instead. -unsafe fn llmp_broker_init(broker: *mut LlmpBroker) -> Result<(), AflError> { - memset( - broker as *mut c_void, - 0 as c_int, - ::std::mem::size_of::() as c_ulong, - ); - /* let's create some space for outgoing maps */ - (*broker).broadcast_maps = afl_realloc( - 0 as *mut c_void, - (1 as c_ulong).wrapping_mul(::std::mem::size_of::() as c_ulong), - ) as *mut AflShmem; - if (*broker).broadcast_maps.is_null() { - return Err(AflError::Unknown("Alloc".into())); - } - (*broker).broadcast_map_count = 1 as c_ulong; - (*broker).llmp_client_count = 0 as c_ulong; - (*broker).llmp_clients = 0 as *mut LlmpBrokerClientMetadata; - if llmp_new_page_shmem( - _llmp_broker_current_broadcast_map(broker), - -(1 as c_int) as c_ulong, - ((1 as c_int) << 28 as c_int) as c_ulong, - ) - .is_null() - { - afl_free((*broker).broadcast_maps as *mut c_void); - return Err(AflError::Unknown("Alloc".into())); - } - return Ok(()); -} diff --git a/afl/src/events/mod.rs b/afl/src/events/mod.rs index 94e9e57955..9fedea0904 100644 --- a/afl/src/events/mod.rs +++ b/afl/src/events/mod.rs @@ -6,13 +6,11 @@ use core::marker::PhantomData; use serde::{Deserialize, Serialize}; -#[cfg(feature = "std")] -pub mod llmp_translated; // TODO: Abstract away. #[cfg(feature = "std")] pub mod shmem_translated; -#[cfg(feature = "std")] -pub use crate::events::llmp::LLMPEventManager; +/*#[cfg(feature = "std")] +pub use crate::events::llmp::LLMPEventManager;*/ #[cfg(feature = "std")] use std::io::Write; diff --git a/afl/src/events/shmem_translated.rs b/afl/src/events/shmem_translated.rs index 30552e9dad..dbe195dcdd 100644 --- a/afl/src/events/shmem_translated.rs +++ b/afl/src/events/shmem_translated.rs @@ -1,5 +1,5 @@ use libc::{c_char, c_int, c_long, c_uchar, c_uint, c_ulong, c_ushort, c_void}; -use std::ffi::CStr; +use std::{ffi::CStr, mem::size_of}; use crate::AflError; @@ -59,12 +59,11 @@ const AFL_RET_SUCCESS: c_uint = 0; // too.) #[derive(Clone)] -#[repr(C)] pub struct AflShmem { pub shm_str: [u8; 20], pub shm_id: c_int, pub map: *mut c_uchar, - pub map_size: c_ulong, + pub map_size: usize, } /// Deinit on drop @@ -87,20 +86,28 @@ const fn afl_shmem_unitialized() -> AflShmem { } impl AflShmem { - fn from_str(shm_str: &CStr, map_size: c_ulong) -> Result { + pub fn from_str(shm_str: &CStr, map_size: usize) -> Result { let mut ret = afl_shmem_unitialized(); - let map = unsafe { afl_shmem_init(&mut ret, map_size) }; + let map = unsafe { afl_shmem_by_str(&mut ret, shm_str, map_size) }; if map != 0 as *mut u8 { Ok(ret) } else { Err(AflError::Unknown(format!( - "Could not allocate map with id {:?}", - shm_str + "Could not allocate map with id {:?} and size {}", + shm_str, map_size ))) } } - fn new(map_size: c_ulong) -> Result { + /// Generate a shared map with a fixed byte array of 20 + pub fn from_name_slice(shm_str: &[u8; 20], map_size: usize) -> Result { + unsafe { + let str_bytes = shm_str as *const [u8; 20] as *const libc::c_char; + Self::from_str(CStr::from_ptr(str_bytes), map_size) + } + } + + pub fn new(map_size: usize) -> Result { let mut ret = afl_shmem_unitialized(); let map = unsafe { afl_shmem_init(&mut ret, map_size) }; if map != 0 as *mut u8 { @@ -115,7 +122,7 @@ impl AflShmem { /// Sets this shm id as env variable with the given name /// Also write the map size as name#_SIZE env - fn to_env_var(&self, env_name: &CStr) -> Result<(), AflError> { + pub fn to_env_var(&self, env_name: &CStr) -> Result<(), AflError> { if unsafe { afl_shmem_to_env_var(&self, env_name) } == AFL_RET_SUCCESS { Ok(()) } else { @@ -141,12 +148,12 @@ pub unsafe fn afl_shmem_deinit(shm: *mut AflShmem) { /// Functions to create Shared memory region, for observation channels and /// opening inputs and stuff. -pub unsafe fn afl_shmem_init(shm: *mut AflShmem, map_size: c_ulong) -> *mut c_uchar { +pub unsafe fn afl_shmem_init(shm: *mut AflShmem, map_size: usize) -> *mut c_uchar { (*shm).map_size = map_size; (*shm).map = 0 as *mut c_uchar; (*shm).shm_id = shmget( 0 as c_int, - map_size, + map_size as c_ulong, 0o1000 as c_int | 0o2000 as c_int | 0o600 as c_int, ); if (*shm).shm_id < 0 as c_int { @@ -155,12 +162,13 @@ pub unsafe fn afl_shmem_init(shm: *mut AflShmem, map_size: c_ulong) -> *mut c_uc } snprintf( (*shm).shm_str.as_mut_ptr() as *mut i8, - ::std::mem::size_of::<[c_char; 20]>() as c_ulong, + size_of::<[c_char; 20]>() as c_ulong, b"%d\x00" as *const u8 as *const c_char, (*shm).shm_id, ); - (*shm).shm_str[(::std::mem::size_of::<[c_char; 20]>() as c_ulong) - .wrapping_sub(1 as c_int as c_ulong) as usize] = '\u{0}' as u8; + (*shm).shm_str + [(size_of::<[c_char; 20]>() as c_ulong).wrapping_sub(1 as c_int as c_ulong) as usize] = + '\u{0}' as u8; (*shm).map = shmat((*shm).shm_id, 0 as *const c_void, 0 as c_int) as *mut c_uchar; if (*shm).map == -(1 as c_int) as *mut c_void as *mut c_uchar || (*shm).map.is_null() { shmctl((*shm).shm_id, 0 as c_int, 0 as *mut shmid_ds); @@ -175,7 +183,7 @@ pub unsafe fn afl_shmem_init(shm: *mut AflShmem, map_size: c_ulong) -> *mut c_uc pub unsafe fn afl_shmem_by_str( shm: *mut AflShmem, shm_str: &CStr, - map_size: c_ulong, + map_size: usize, ) -> *mut c_uchar { if shm.is_null() || shm_str.to_bytes().len() == 0 || map_size == 0 { return 0 as *mut c_uchar; @@ -185,7 +193,7 @@ pub unsafe fn afl_shmem_by_str( strncpy( (*shm).shm_str.as_mut_ptr() as *mut c_char, shm_str.as_ptr() as *const c_char, - (::std::mem::size_of::<[c_char; 20]>() as c_ulong).wrapping_sub(1 as c_int as c_ulong), + (size_of::<[c_char; 20]>() as c_ulong).wrapping_sub(1 as c_int as c_ulong), ); (*shm).shm_id = shm_str .to_str() @@ -195,8 +203,8 @@ pub unsafe fn afl_shmem_by_str( (*shm).map = shmat((*shm).shm_id, 0 as *const c_void, 0 as c_int) as *mut c_uchar; if (*shm).map == -(1 as c_int) as *mut c_void as *mut c_uchar { (*shm).map = 0 as *mut c_uchar; - (*shm).map_size = 0 as c_int as c_ulong; - (*shm).shm_str[0 as c_int as usize] = '\u{0}' as u8; + (*shm).map_size = 0; + (*shm).shm_str[0] = '\u{0}' as u8; return 0 as *mut c_uchar; } return (*shm).map; @@ -211,7 +219,7 @@ pub unsafe fn afl_shmem_to_env_var(shmem: &AflShmem, env_name: &CStr) -> c_uint let mut shm_str: [c_char; 256] = [0; 256]; snprintf( shm_str.as_mut_ptr(), - ::std::mem::size_of::<[c_char; 256]>() as c_ulong, + size_of::<[c_char; 256]>() as c_ulong, b"%d\x00" as *const u8 as *const c_char, (*shmem).shm_id, ); @@ -227,13 +235,13 @@ pub unsafe fn afl_shmem_to_env_var(shmem: &AflShmem, env_name: &CStr) -> c_uint let mut size_env_name: [c_char; 256] = [0; 256]; snprintf( size_env_name.as_mut_ptr(), - ::std::mem::size_of::<[c_char; 256]>() as c_ulong, + size_of::<[c_char; 256]>() as c_ulong, b"%s_SIZE\x00" as *const u8 as *const c_char, env_name, ); snprintf( shm_str.as_mut_ptr(), - ::std::mem::size_of::<[c_char; 256]>() as c_ulong, + size_of::<[c_char; 256]>() as c_ulong, b"%d\x00" as *const u8 as *const c_char, (*shmem).shm_id, );