From 960154a3de297081c19b6317b6c10f865c38c126 Mon Sep 17 00:00:00 2001 From: Dominik Maier Date: Tue, 8 Dec 2020 23:59:24 +0100 Subject: [PATCH] adapted llmp test --- afl/llmp_test/src/main.rs | 136 +++++++++++++++----------------------- afl/src/events/llmp.rs | 99 +++++++++++++++++++-------- 2 files changed, 128 insertions(+), 107 deletions(-) diff --git a/afl/llmp_test/src/main.rs b/afl/llmp_test/src/main.rs index ed852b990f..8742bde4c2 100644 --- a/afl/llmp_test/src/main.rs +++ b/afl/llmp_test/src/main.rs @@ -1,60 +1,33 @@ -#[macro_use] extern crate alloc; use core::convert::TryInto; -use core::ffi::c_void; -use core::mem::size_of; -use core::ptr; +use core::time::Duration; use std::thread; use std::time; -use afl::events::llmp_translated::*; +use afl::events::llmp; const TAG_SIMPLE_U32_V1: u32 = 0x51300321; const TAG_MATH_RESULT_V1: u32 = 0x77474331; -unsafe fn llmp_test_clientloop(client: *mut LlmpClient, _data: *mut c_void) -> ! { - let mut counter: u32 = 0; - loop { - counter += 1; - - let msg = llmp_client_alloc_next(client, size_of::()); - core::ptr::copy( - counter.to_be_bytes().as_ptr(), - (*msg).buf.as_mut_ptr(), - size_of::(), - ); - (*msg).tag = TAG_SIMPLE_U32_V1; - llmp_client_send(client, msg).unwrap(); - - thread::sleep(time::Duration::from_millis(100)); - } -} - -unsafe fn u32_from_msg(msg: *const LlmpMsg) -> u32 { - u32::from_be_bytes( - alloc::slice::from_raw_parts((*msg).buf.as_ptr(), size_of::()) - .try_into() - .unwrap(), - ) -} - -unsafe fn test_adder_clientloop(client: *mut LlmpClient, _data: *mut c_void) -> ! { +fn adder_loop(port: u16) -> ! { + let mut client = llmp::LlmpClient::create_attach_to_tcp(port).unwrap(); let mut last_result: u32 = 0; let mut current_result: u32 = 0; loop { let mut msg_counter = 0; loop { - let last_msg = llmp_client_recv(client); - if last_msg == 0 as *mut LlmpMsg { - break; - } + let (tag, buf) = match client.recv_buf().unwrap() { + None => break, + Some(msg) => msg, + }; msg_counter += 1; - match (*last_msg).tag { + match tag { TAG_SIMPLE_U32_V1 => { - current_result = current_result.wrapping_add(u32_from_msg(last_msg)); + current_result = + current_result.wrapping_add(u32::from_le_bytes(buf.try_into().unwrap())); } - _ => println!("Adder Client ignored unknown message {}", (*last_msg).tag), + _ => println!("Adder Client ignored unknown message {}", tag), }; } @@ -64,14 +37,9 @@ unsafe fn test_adder_clientloop(client: *mut LlmpClient, _data: *mut c_void) -> msg_counter, current_result ); - let msg = llmp_client_alloc_next(client, size_of::()); - core::ptr::copy( - current_result.to_be_bytes().as_ptr(), - (*msg).buf.as_mut_ptr(), - size_of::(), - ); - (*msg).tag = TAG_MATH_RESULT_V1; - llmp_client_send(client, msg).unwrap(); + client + .send_buf(TAG_MATH_RESULT_V1, ¤t_result.to_le_bytes()) + .unwrap(); last_result = current_result; } @@ -80,61 +48,67 @@ unsafe fn test_adder_clientloop(client: *mut LlmpClient, _data: *mut c_void) -> } unsafe fn broker_message_hook( - _broker: *mut LlmpBroker, - client_metadata: *mut LlmpBrokerClientMetadata, - message: *mut LlmpMsg, - _data: *mut c_void, -) -> LlmpMsgHookResult { + client_id: u32, + message: *mut llmp::LlmpMsg, +) -> llmp::LlmpMsgHookResult { match (*message).tag { TAG_SIMPLE_U32_V1 => { println!( "Client {:?} sent message: {:?}", - (*client_metadata).pid, - u32_from_msg(message) + client_id, + u32::from_le_bytes((*message).as_slice().try_into().unwrap()) ); - LlmpMsgHookResult::ForwardToClients + llmp::LlmpMsgHookResult::ForwardToClients } TAG_MATH_RESULT_V1 => { println!( "Adder Client has this current result: {:?}", - u32_from_msg(message) + u32::from_le_bytes((*message).as_slice().try_into().unwrap()) ); - LlmpMsgHookResult::Handled + llmp::LlmpMsgHookResult::Handled } _ => { println!("Unknwon message id received!"); - LlmpMsgHookResult::ForwardToClients + llmp::LlmpMsgHookResult::ForwardToClients } } } fn main() { /* The main node has a broker, and a few worker threads */ - let threads_total = num_cpus::get(); - let counter_thread_count = threads_total - 2; - println!( - "Running with 1 broker, 1 adder, and {} counter clients", - counter_thread_count - ); + let mode = std::env::args() + .nth(1) + .expect("no mode specified, chose 'broker', 'adder', or 'printer'"); + let port: u16 = std::env::args() + .nth(2) + .unwrap_or("1337".into()) + .parse::() + .unwrap(); + println!("Launching in mode {} on port {}", mode, port); - unsafe { - let mut broker = LlmpBroker::new().expect("Failed to create llmp broker"); - for i in 0..counter_thread_count { - println!("Adding client {}", i); - broker - .register_childprocess_clientloop(llmp_test_clientloop, ptr::null_mut()) - .expect("could not add child clientloop"); + match mode.as_str() { + "broker" => { + let mut broker: llmp::LlmpBroker = llmp::LlmpBroker::new().unwrap(); + broker.launch_tcp_listener(port).unwrap(); + broker.add_message_hook(broker_message_hook); + broker.loop_forever(Some(Duration::from_millis(5))) + } + "adder" => { + let mut client = llmp::LlmpClient::create_attach_to_tcp(port).unwrap(); + let mut counter: u32 = 0; + loop { + counter = counter.wrapping_add(1); + client + .send_buf(TAG_SIMPLE_U32_V1, &counter.to_le_bytes()) + .unwrap(); + } + } + "printer" => { + adder_loop(port); + } + _ => { + println!("No valid mode supplied"); } - - broker - .register_childprocess_clientloop(test_adder_clientloop, ptr::null_mut()) - .expect("Error registering childprocess"); - - println!("Spawning broker"); - - broker.add_message_hook(broker_message_hook, ptr::null_mut()); - - broker.run(); } } diff --git a/afl/src/events/llmp.rs b/afl/src/events/llmp.rs index 18cfcfc1f1..fbf4fe0bf9 100644 --- a/afl/src/events/llmp.rs +++ b/afl/src/events/llmp.rs @@ -48,16 +48,16 @@ Then register some clientloops using llmp_broker_register_threaded_clientloop */ -use core::ptr; -use core::sync::atomic::{compiler_fence, Ordering}; -use core::time::Duration; -use libc::{c_uint, c_ulong, c_ushort}; -use std::{ +use core::{ cmp::max, - ffi::CStr, - io::{Read, Write}, mem::size_of, - net::TcpListener, + ptr, slice, + sync::atomic::{compiler_fence, Ordering}, + time::Duration, +}; +use std::{ + io::{Read, Write}, + net::{TcpListener, TcpStream}, thread, }; @@ -66,7 +66,7 @@ use crate::AflError; use super::shmem_translated::AflShmem; -/// We'll start off with 256 megabyte maps per fuzzer +/// We'll start off with 256 megabyte maps per fuzzer client const LLMP_PREF_INITIAL_MAP_SIZE: usize = 1 << 28; /// What byte count to align messages to /// LlmpMsg sizes (including header) will always be rounded up to be a multiple of this value @@ -151,13 +151,21 @@ pub struct LlmpMsg { pub buf: [u8; 0], } +/// The message we receive +impl LlmpMsg { + /// Gets the buffer from this message as slice, with the corrent length. + pub fn as_slice(&self) -> &[u8] { + unsafe { slice::from_raw_parts(self.buf.as_ptr(), self.buf_len as usize) } + } +} + /// Contents of the share mem pages, used by llmp internally #[derive(Copy, Clone)] #[repr(C, packed)] pub struct LlmpPage { pub sender: u32, - pub save_to_unmap: c_ushort, - pub sender_dead: c_ushort, + pub save_to_unmap: u16, + pub sender_dead: u16, pub current_msg_id: u64, pub size_total: usize, pub size_used: usize, @@ -300,7 +308,7 @@ impl LlmpSender { if (*ret).tag == LLMP_TAG_UNINITIALIZED { panic!("Did not call send() on last message!"); } - (*ret).buf_len_padded = size_of::() as c_ulong; + (*ret).buf_len_padded = size_of::() as u64; (*ret).message_id = if !last_msg.is_null() { (*last_msg).message_id + 1 } else { @@ -380,8 +388,8 @@ impl LlmpSender { buf_len_padded, (*page).size_used, last_msg)); } (*page).size_used = (*page).size_used + complete_msg_size; - (*ret).buf_len_padded = buf_len_padded as c_ulong; - (*ret).buf_len = buf_len as c_ulong; + (*ret).buf_len_padded = buf_len_padded as u64; + (*ret).buf_len = buf_len as u64; /* DBG("Returning new message at %p with len %ld, TAG was %x", ret, ret->buf_len_padded, ret->tag); */ /* Maybe catch some bugs... */ (*_llmp_next_msg_ptr(ret)).tag = LLMP_TAG_UNSET; @@ -396,7 +404,7 @@ impl LlmpSender { if self.last_msg_sent == msg { panic!("Message sent twice!"); } - if (*msg).tag == LLMP_TAG_UNSET as c_uint { + if (*msg).tag == LLMP_TAG_UNSET { panic!(format!( "No tag set on message with id {}", (*msg).message_id @@ -591,6 +599,24 @@ impl LlmpReceiver { } } } + + /// Returns the next message, tag, buf, if avaliable, else None + pub fn recv_buf(&mut self) -> Result, AflError> { + unsafe { + Ok(match self.recv()? { + Some(msg) => Some(((*msg).tag, (*msg).as_slice())), + None => None, + }) + } + } + + /// Returns the next message, tag, buf, looping until it becomes available + pub fn recv_buf_blocking(&mut self) -> Result<(u32, &[u8]), AflError> { + unsafe { + let msg = self.recv_blocking()?; + Ok(((*msg).tag, (*msg).as_slice())) + } + } } /// The page struct, placed on a shared mem instance. @@ -606,13 +632,6 @@ impl LlmpSharedMap { Ok(Self { shmem }) } - /// Initialize from a 0-terminated sharedmap id string and its size - pub fn from_str(shm_str: &CStr, map_size: usize) -> Result { - let shmem = AflShmem::from_str(shm_str, map_size)?; - // Not initializing the page here - the other side should have done it already! - Ok(Self { shmem }) - } - /// Initialize from a shm_str with fixed len of 20 pub fn from_name_slice(shm_str: &[u8; 20], map_size: usize) -> Result { let shmem = AflShmem::from_name_slice(shm_str, map_size)?; @@ -630,7 +649,7 @@ impl LlmpSharedMap { /// It may intercept messages passing through. impl LlmpBroker { /// Create and initialize a new llmp_broker - pub unsafe fn new() -> Result { + pub fn new() -> Result { let broker = LlmpBroker { llmp_out: LlmpSender { id: 0, @@ -747,10 +766,12 @@ impl LlmpBroker { /// The broker walks all pages and looks for changes, then broadcasts them on /// its own shared page, once. - pub unsafe fn once(&mut self) -> Result<(), AflError> { + pub fn once(&mut self) -> Result<(), AflError> { compiler_fence(Ordering::SeqCst); for i in 0..self.llmp_clients.len() { - self.handle_new_msgs(i as u32)?; + unsafe { + self.handle_new_msgs(i as u32)?; + } } Ok(()) } @@ -758,7 +779,7 @@ impl LlmpBroker { /// Loops infinitely, forwarding and handling all incoming messages from clients. /// Never returns. Panics on error. /// 5 millis of sleep can't hurt to keep busywait not at 100% - pub unsafe fn loop_forever(&mut self, sleep_time: Option) -> ! { + pub fn loop_forever(&mut self, sleep_time: Option) -> ! { loop { compiler_fence(Ordering::SeqCst); self.once() @@ -861,6 +882,22 @@ impl LlmpClient { }) } + pub fn create_attach_to_tcp(port: u16) -> Result { + let mut stream = TcpStream::connect(format!("127.0.0.1:{}", port))?; + println!("Connected to port {}", port); + + let mut new_broker_map_str: [u8; 20] = Default::default(); + stream.read_exact(&mut new_broker_map_str)?; + + let ret = Self::new(LlmpSharedMap::from_name_slice( + &new_broker_map_str, + LLMP_PREF_INITIAL_MAP_SIZE, + )?)?; + + stream.write(&ret.llmp_out.out_maps.first().unwrap().shmem.shm_str)?; + Ok(ret) + } + /// Commits a msg to the client's out map pub unsafe fn send(&mut self, msg: *mut LlmpMsg) -> Result<(), AflError> { self.llmp_out.send(msg) @@ -888,4 +925,14 @@ impl LlmpClient { pub unsafe fn alloc_next(&mut self, buf_len: usize) -> Result<*mut LlmpMsg, AflError> { self.llmp_out.alloc_next(buf_len) } + + /// Returns the next message, tag, buf, if avaliable, else None + pub fn recv_buf(&mut self) -> Result, AflError> { + self.llmp_in.recv_buf() + } + + /// Receives a buf from the broker, looping until a messages becomes avaliable + pub fn recv_buf_blocking(&mut self) -> Result<(u32, &[u8]), AflError> { + self.llmp_in.recv_buf_blocking() + } }