adapted llmp test

This commit is contained in:
Dominik Maier 2020-12-08 23:59:24 +01:00
parent ff8a89f0c1
commit 960154a3de
2 changed files with 128 additions and 107 deletions

View File

@ -1,60 +1,33 @@
#[macro_use]
extern crate alloc; extern crate alloc;
use core::convert::TryInto; use core::convert::TryInto;
use core::ffi::c_void; use core::time::Duration;
use core::mem::size_of;
use core::ptr;
use std::thread; use std::thread;
use std::time; use std::time;
use afl::events::llmp_translated::*; use afl::events::llmp;
const TAG_SIMPLE_U32_V1: u32 = 0x51300321; const TAG_SIMPLE_U32_V1: u32 = 0x51300321;
const TAG_MATH_RESULT_V1: u32 = 0x77474331; const TAG_MATH_RESULT_V1: u32 = 0x77474331;
unsafe fn llmp_test_clientloop(client: *mut LlmpClient, _data: *mut c_void) -> ! { fn adder_loop(port: u16) -> ! {
let mut counter: u32 = 0; let mut client = llmp::LlmpClient::create_attach_to_tcp(port).unwrap();
loop {
counter += 1;
let msg = llmp_client_alloc_next(client, size_of::<u32>());
core::ptr::copy(
counter.to_be_bytes().as_ptr(),
(*msg).buf.as_mut_ptr(),
size_of::<u32>(),
);
(*msg).tag = TAG_SIMPLE_U32_V1;
llmp_client_send(client, msg).unwrap();
thread::sleep(time::Duration::from_millis(100));
}
}
unsafe fn u32_from_msg(msg: *const LlmpMsg) -> u32 {
u32::from_be_bytes(
alloc::slice::from_raw_parts((*msg).buf.as_ptr(), size_of::<u32>())
.try_into()
.unwrap(),
)
}
unsafe fn test_adder_clientloop(client: *mut LlmpClient, _data: *mut c_void) -> ! {
let mut last_result: u32 = 0; let mut last_result: u32 = 0;
let mut current_result: u32 = 0; let mut current_result: u32 = 0;
loop { loop {
let mut msg_counter = 0; let mut msg_counter = 0;
loop { loop {
let last_msg = llmp_client_recv(client); let (tag, buf) = match client.recv_buf().unwrap() {
if last_msg == 0 as *mut LlmpMsg { None => break,
break; Some(msg) => msg,
} };
msg_counter += 1; msg_counter += 1;
match (*last_msg).tag { match tag {
TAG_SIMPLE_U32_V1 => { TAG_SIMPLE_U32_V1 => {
current_result = current_result.wrapping_add(u32_from_msg(last_msg)); current_result =
current_result.wrapping_add(u32::from_le_bytes(buf.try_into().unwrap()));
} }
_ => println!("Adder Client ignored unknown message {}", (*last_msg).tag), _ => println!("Adder Client ignored unknown message {}", tag),
}; };
} }
@ -64,14 +37,9 @@ unsafe fn test_adder_clientloop(client: *mut LlmpClient, _data: *mut c_void) ->
msg_counter, current_result msg_counter, current_result
); );
let msg = llmp_client_alloc_next(client, size_of::<u32>()); client
core::ptr::copy( .send_buf(TAG_MATH_RESULT_V1, &current_result.to_le_bytes())
current_result.to_be_bytes().as_ptr(), .unwrap();
(*msg).buf.as_mut_ptr(),
size_of::<u32>(),
);
(*msg).tag = TAG_MATH_RESULT_V1;
llmp_client_send(client, msg).unwrap();
last_result = current_result; last_result = current_result;
} }
@ -80,61 +48,67 @@ unsafe fn test_adder_clientloop(client: *mut LlmpClient, _data: *mut c_void) ->
} }
unsafe fn broker_message_hook( unsafe fn broker_message_hook(
_broker: *mut LlmpBroker, client_id: u32,
client_metadata: *mut LlmpBrokerClientMetadata, message: *mut llmp::LlmpMsg,
message: *mut LlmpMsg, ) -> llmp::LlmpMsgHookResult {
_data: *mut c_void,
) -> LlmpMsgHookResult {
match (*message).tag { match (*message).tag {
TAG_SIMPLE_U32_V1 => { TAG_SIMPLE_U32_V1 => {
println!( println!(
"Client {:?} sent message: {:?}", "Client {:?} sent message: {:?}",
(*client_metadata).pid, client_id,
u32_from_msg(message) u32::from_le_bytes((*message).as_slice().try_into().unwrap())
); );
LlmpMsgHookResult::ForwardToClients llmp::LlmpMsgHookResult::ForwardToClients
} }
TAG_MATH_RESULT_V1 => { TAG_MATH_RESULT_V1 => {
println!( println!(
"Adder Client has this current result: {:?}", "Adder Client has this current result: {:?}",
u32_from_msg(message) u32::from_le_bytes((*message).as_slice().try_into().unwrap())
); );
LlmpMsgHookResult::Handled llmp::LlmpMsgHookResult::Handled
} }
_ => { _ => {
println!("Unknwon message id received!"); println!("Unknwon message id received!");
LlmpMsgHookResult::ForwardToClients llmp::LlmpMsgHookResult::ForwardToClients
} }
} }
} }
fn main() { fn main() {
/* The main node has a broker, and a few worker threads */ /* The main node has a broker, and a few worker threads */
let threads_total = num_cpus::get();
let counter_thread_count = threads_total - 2; let mode = std::env::args()
println!( .nth(1)
"Running with 1 broker, 1 adder, and {} counter clients", .expect("no mode specified, chose 'broker', 'adder', or 'printer'");
counter_thread_count let port: u16 = std::env::args()
); .nth(2)
.unwrap_or("1337".into())
.parse::<u16>()
.unwrap();
println!("Launching in mode {} on port {}", mode, port);
unsafe { match mode.as_str() {
let mut broker = LlmpBroker::new().expect("Failed to create llmp broker"); "broker" => {
for i in 0..counter_thread_count { let mut broker: llmp::LlmpBroker = llmp::LlmpBroker::new().unwrap();
println!("Adding client {}", i); broker.launch_tcp_listener(port).unwrap();
broker broker.add_message_hook(broker_message_hook);
.register_childprocess_clientloop(llmp_test_clientloop, ptr::null_mut()) broker.loop_forever(Some(Duration::from_millis(5)))
.expect("could not add child clientloop"); }
"adder" => {
let mut client = llmp::LlmpClient::create_attach_to_tcp(port).unwrap();
let mut counter: u32 = 0;
loop {
counter = counter.wrapping_add(1);
client
.send_buf(TAG_SIMPLE_U32_V1, &counter.to_le_bytes())
.unwrap();
}
}
"printer" => {
adder_loop(port);
}
_ => {
println!("No valid mode supplied");
} }
broker
.register_childprocess_clientloop(test_adder_clientloop, ptr::null_mut())
.expect("Error registering childprocess");
println!("Spawning broker");
broker.add_message_hook(broker_message_hook, ptr::null_mut());
broker.run();
} }
} }

View File

@ -48,16 +48,16 @@ Then register some clientloops using llmp_broker_register_threaded_clientloop
*/ */
use core::ptr; use core::{
use core::sync::atomic::{compiler_fence, Ordering};
use core::time::Duration;
use libc::{c_uint, c_ulong, c_ushort};
use std::{
cmp::max, cmp::max,
ffi::CStr,
io::{Read, Write},
mem::size_of, mem::size_of,
net::TcpListener, ptr, slice,
sync::atomic::{compiler_fence, Ordering},
time::Duration,
};
use std::{
io::{Read, Write},
net::{TcpListener, TcpStream},
thread, thread,
}; };
@ -66,7 +66,7 @@ use crate::AflError;
use super::shmem_translated::AflShmem; use super::shmem_translated::AflShmem;
/// We'll start off with 256 megabyte maps per fuzzer /// We'll start off with 256 megabyte maps per fuzzer client
const LLMP_PREF_INITIAL_MAP_SIZE: usize = 1 << 28; const LLMP_PREF_INITIAL_MAP_SIZE: usize = 1 << 28;
/// What byte count to align messages to /// What byte count to align messages to
/// LlmpMsg sizes (including header) will always be rounded up to be a multiple of this value /// LlmpMsg sizes (including header) will always be rounded up to be a multiple of this value
@ -151,13 +151,21 @@ pub struct LlmpMsg {
pub buf: [u8; 0], pub buf: [u8; 0],
} }
/// The message we receive
impl LlmpMsg {
/// Gets the buffer from this message as slice, with the corrent length.
pub fn as_slice(&self) -> &[u8] {
unsafe { slice::from_raw_parts(self.buf.as_ptr(), self.buf_len as usize) }
}
}
/// Contents of the share mem pages, used by llmp internally /// Contents of the share mem pages, used by llmp internally
#[derive(Copy, Clone)] #[derive(Copy, Clone)]
#[repr(C, packed)] #[repr(C, packed)]
pub struct LlmpPage { pub struct LlmpPage {
pub sender: u32, pub sender: u32,
pub save_to_unmap: c_ushort, pub save_to_unmap: u16,
pub sender_dead: c_ushort, pub sender_dead: u16,
pub current_msg_id: u64, pub current_msg_id: u64,
pub size_total: usize, pub size_total: usize,
pub size_used: usize, pub size_used: usize,
@ -300,7 +308,7 @@ impl LlmpSender {
if (*ret).tag == LLMP_TAG_UNINITIALIZED { if (*ret).tag == LLMP_TAG_UNINITIALIZED {
panic!("Did not call send() on last message!"); panic!("Did not call send() on last message!");
} }
(*ret).buf_len_padded = size_of::<LlmpPayloadSharedMap>() as c_ulong; (*ret).buf_len_padded = size_of::<LlmpPayloadSharedMap>() as u64;
(*ret).message_id = if !last_msg.is_null() { (*ret).message_id = if !last_msg.is_null() {
(*last_msg).message_id + 1 (*last_msg).message_id + 1
} else { } else {
@ -380,8 +388,8 @@ impl LlmpSender {
buf_len_padded, (*page).size_used, last_msg)); buf_len_padded, (*page).size_used, last_msg));
} }
(*page).size_used = (*page).size_used + complete_msg_size; (*page).size_used = (*page).size_used + complete_msg_size;
(*ret).buf_len_padded = buf_len_padded as c_ulong; (*ret).buf_len_padded = buf_len_padded as u64;
(*ret).buf_len = buf_len as c_ulong; (*ret).buf_len = buf_len as u64;
/* DBG("Returning new message at %p with len %ld, TAG was %x", ret, ret->buf_len_padded, ret->tag); */ /* DBG("Returning new message at %p with len %ld, TAG was %x", ret, ret->buf_len_padded, ret->tag); */
/* Maybe catch some bugs... */ /* Maybe catch some bugs... */
(*_llmp_next_msg_ptr(ret)).tag = LLMP_TAG_UNSET; (*_llmp_next_msg_ptr(ret)).tag = LLMP_TAG_UNSET;
@ -396,7 +404,7 @@ impl LlmpSender {
if self.last_msg_sent == msg { if self.last_msg_sent == msg {
panic!("Message sent twice!"); panic!("Message sent twice!");
} }
if (*msg).tag == LLMP_TAG_UNSET as c_uint { if (*msg).tag == LLMP_TAG_UNSET {
panic!(format!( panic!(format!(
"No tag set on message with id {}", "No tag set on message with id {}",
(*msg).message_id (*msg).message_id
@ -591,6 +599,24 @@ impl LlmpReceiver {
} }
} }
} }
/// Returns the next message, tag, buf, if avaliable, else None
pub fn recv_buf(&mut self) -> Result<Option<(u32, &[u8])>, AflError> {
unsafe {
Ok(match self.recv()? {
Some(msg) => Some(((*msg).tag, (*msg).as_slice())),
None => None,
})
}
}
/// Returns the next message, tag, buf, looping until it becomes available
pub fn recv_buf_blocking(&mut self) -> Result<(u32, &[u8]), AflError> {
unsafe {
let msg = self.recv_blocking()?;
Ok(((*msg).tag, (*msg).as_slice()))
}
}
} }
/// The page struct, placed on a shared mem instance. /// The page struct, placed on a shared mem instance.
@ -606,13 +632,6 @@ impl LlmpSharedMap {
Ok(Self { shmem }) Ok(Self { shmem })
} }
/// Initialize from a 0-terminated sharedmap id string and its size
pub fn from_str(shm_str: &CStr, map_size: usize) -> Result<Self, AflError> {
let shmem = AflShmem::from_str(shm_str, map_size)?;
// Not initializing the page here - the other side should have done it already!
Ok(Self { shmem })
}
/// Initialize from a shm_str with fixed len of 20 /// Initialize from a shm_str with fixed len of 20
pub fn from_name_slice(shm_str: &[u8; 20], map_size: usize) -> Result<Self, AflError> { pub fn from_name_slice(shm_str: &[u8; 20], map_size: usize) -> Result<Self, AflError> {
let shmem = AflShmem::from_name_slice(shm_str, map_size)?; let shmem = AflShmem::from_name_slice(shm_str, map_size)?;
@ -630,7 +649,7 @@ impl LlmpSharedMap {
/// It may intercept messages passing through. /// It may intercept messages passing through.
impl LlmpBroker { impl LlmpBroker {
/// Create and initialize a new llmp_broker /// Create and initialize a new llmp_broker
pub unsafe fn new() -> Result<Self, AflError> { pub fn new() -> Result<Self, AflError> {
let broker = LlmpBroker { let broker = LlmpBroker {
llmp_out: LlmpSender { llmp_out: LlmpSender {
id: 0, id: 0,
@ -747,10 +766,12 @@ impl LlmpBroker {
/// The broker walks all pages and looks for changes, then broadcasts them on /// The broker walks all pages and looks for changes, then broadcasts them on
/// its own shared page, once. /// its own shared page, once.
pub unsafe fn once(&mut self) -> Result<(), AflError> { pub fn once(&mut self) -> Result<(), AflError> {
compiler_fence(Ordering::SeqCst); compiler_fence(Ordering::SeqCst);
for i in 0..self.llmp_clients.len() { for i in 0..self.llmp_clients.len() {
self.handle_new_msgs(i as u32)?; unsafe {
self.handle_new_msgs(i as u32)?;
}
} }
Ok(()) Ok(())
} }
@ -758,7 +779,7 @@ impl LlmpBroker {
/// Loops infinitely, forwarding and handling all incoming messages from clients. /// Loops infinitely, forwarding and handling all incoming messages from clients.
/// Never returns. Panics on error. /// Never returns. Panics on error.
/// 5 millis of sleep can't hurt to keep busywait not at 100% /// 5 millis of sleep can't hurt to keep busywait not at 100%
pub unsafe fn loop_forever(&mut self, sleep_time: Option<Duration>) -> ! { pub fn loop_forever(&mut self, sleep_time: Option<Duration>) -> ! {
loop { loop {
compiler_fence(Ordering::SeqCst); compiler_fence(Ordering::SeqCst);
self.once() self.once()
@ -861,6 +882,22 @@ impl LlmpClient {
}) })
} }
pub fn create_attach_to_tcp(port: u16) -> Result<Self, AflError> {
let mut stream = TcpStream::connect(format!("127.0.0.1:{}", port))?;
println!("Connected to port {}", port);
let mut new_broker_map_str: [u8; 20] = Default::default();
stream.read_exact(&mut new_broker_map_str)?;
let ret = Self::new(LlmpSharedMap::from_name_slice(
&new_broker_map_str,
LLMP_PREF_INITIAL_MAP_SIZE,
)?)?;
stream.write(&ret.llmp_out.out_maps.first().unwrap().shmem.shm_str)?;
Ok(ret)
}
/// Commits a msg to the client's out map /// Commits a msg to the client's out map
pub unsafe fn send(&mut self, msg: *mut LlmpMsg) -> Result<(), AflError> { pub unsafe fn send(&mut self, msg: *mut LlmpMsg) -> Result<(), AflError> {
self.llmp_out.send(msg) self.llmp_out.send(msg)
@ -888,4 +925,14 @@ impl LlmpClient {
pub unsafe fn alloc_next(&mut self, buf_len: usize) -> Result<*mut LlmpMsg, AflError> { pub unsafe fn alloc_next(&mut self, buf_len: usize) -> Result<*mut LlmpMsg, AflError> {
self.llmp_out.alloc_next(buf_len) self.llmp_out.alloc_next(buf_len)
} }
/// Returns the next message, tag, buf, if avaliable, else None
pub fn recv_buf(&mut self) -> Result<Option<(u32, &[u8])>, AflError> {
self.llmp_in.recv_buf()
}
/// Receives a buf from the broker, looping until a messages becomes avaliable
pub fn recv_buf_blocking(&mut self) -> Result<(u32, &[u8]), AflError> {
self.llmp_in.recv_buf_blocking()
}
} }