groundwork for restarting clients

This commit is contained in:
Dominik Maier 2020-12-20 01:03:44 +01:00
parent 5c7810763d
commit d4b83ec85e
3 changed files with 154 additions and 76 deletions

View File

@ -192,14 +192,14 @@ impl LlmpMsg {
unsafe {
let map_size = map.shmem.map().len();
let buf_ptr = self.buf.as_ptr();
if buf_ptr > (map.page() as *const u8).offset(size_of::<LlmpPage>() as isize)
if buf_ptr > (map.page_mut() as *const u8).offset(size_of::<LlmpPage>() as isize)
&& buf_ptr
<= (map.page() as *const u8)
<= (map.page_mut() as *const u8)
.offset((map_size - size_of::<LlmpMsg>() as usize) as isize)
{
// The message header is in the page. Continue with checking the body.
let len = self.buf_len_padded as usize + size_of::<LlmpMsg>();
buf_ptr <= (map.page() as *const u8).offset((map_size - len) as isize)
buf_ptr <= (map.page_mut() as *const u8).offset((map_size - len) as isize)
} else {
false
}
@ -207,26 +207,22 @@ impl LlmpMsg {
}
}
#[cfg(feature = "std")]
/// An Llmp instance
pub enum LlmpConnection<SH>
where
SH: ShMem,
{
/// A broker and a thread using this tcp background thread
IsBroker {
broker: LlmpBroker<SH>,
listener_thread: thread::JoinHandle<()>,
},
IsBroker { broker: LlmpBroker<SH> },
/// A client, connected to the port
IsClient { client: LlmpClient<SH> },
}
#[cfg(feature = "std")]
impl<SH> LlmpConnection<SH>
where
SH: ShMem,
{
#[cfg(feature = "std")]
/// Creates either a broker, if the tcp port is not bound, or a client, connected to this port.
pub fn on_port(port: u16) -> Result<Self, AflError> {
match TcpListener::bind(format!("127.0.0.1:{}", port)) {
@ -234,11 +230,8 @@ where
// We got the port. We are the broker! :)
dbg!("We're the broker");
let mut broker = LlmpBroker::new()?;
let listener_thread = broker.launch_tcp_listener(listener)?;
Ok(LlmpConnection::IsBroker {
broker,
listener_thread,
})
let _listener_thread = broker.launch_tcp_listener(listener)?;
Ok(LlmpConnection::IsBroker { broker })
}
Err(e) => {
match e.kind() {
@ -258,10 +251,7 @@ where
/// Sends the given buffer over this connection, no matter if client or broker.
pub fn send_buf(&mut self, tag: Tag, buf: &[u8]) -> Result<(), AflError> {
match self {
LlmpConnection::IsBroker {
broker,
listener_thread: _,
} => broker.send_buf(tag, buf),
LlmpConnection::IsBroker { broker } => broker.send_buf(tag, buf),
LlmpConnection::IsClient { client } => client.send_buf(tag, buf),
}
}
@ -281,21 +271,7 @@ pub struct LlmpPage {
pub messages: [LlmpMsg; 0],
}
/// The broker (node 0)
#[derive(Clone, Debug)]
#[repr(C)]
pub struct LlmpBroker<SH>
where
SH: ShMem,
{
/// Broadcast map from broker to all clients
pub llmp_out: LlmpSender<SH>,
/// Users of Llmp can add message handlers in the broker.
/// This allows us to intercept messages right in the broker
/// This keeps the out map clean.
pub llmp_clients: Vec<LlmpReceiver<SH>>,
}
#[derive(Copy, Clone, Debug)]
/// Result of an LLMP Mesasge hook
pub enum LlmpMsgHookResult {
/// This has been handled in the broker. No need to forward.
@ -316,10 +292,16 @@ struct LlmpPayloadSharedMapInfo {
/// Get sharedmem from a page
#[inline]
unsafe fn shmem2page<SH: ShMem>(afl_shmem: &mut SH) -> *mut LlmpPage {
unsafe fn shmem2page_mut<SH: ShMem>(afl_shmem: &mut SH) -> *mut LlmpPage {
afl_shmem.map_mut().as_mut_ptr() as *mut LlmpPage
}
/// Get sharedmem from a page
#[inline]
unsafe fn shmem2page<SH: ShMem>(afl_shmem: &SH) -> *const LlmpPage {
afl_shmem.map().as_ptr() as *const LlmpPage
}
/// Return, if a msg is contained in the current page
#[inline]
unsafe fn llmp_msg_in_page(page: *mut LlmpPage, msg: *mut LlmpMsg) -> bool {
@ -359,7 +341,7 @@ fn new_map_size(max_alloc: usize) -> usize {
/// llmp_page->messages
unsafe fn _llmp_page_init<SH: ShMem>(shmem: &mut SH, sender: u32) {
let map_size = shmem.map().len();
let page = shmem2page(shmem);
let page = shmem2page_mut(shmem);
(*page).sender = sender;
ptr::write_volatile(&mut (*page).current_msg_id, 0);
(*page).max_alloc_size = 0;
@ -379,7 +361,7 @@ unsafe fn llmp_next_msg_ptr_checked<SH: ShMem>(
last_msg: *const LlmpMsg,
alloc_size: usize,
) -> Result<*mut LlmpMsg, AflError> {
let page = map.page();
let page = map.page_mut();
let map_size = map.shmem.map().len();
let msg_begin_min = (page as *const u8).offset(size_of::<LlmpPage>() as isize);
// We still need space for this msg (alloc_size).
@ -417,7 +399,7 @@ where
// Exclude the current page by splitting of the last element for this iter
let mut unmap_until_excl = 0;
for map in self.out_maps.split_last_mut().unwrap().1 {
if (*map.page()).save_to_unmap == 0 {
if (*map.page_mut()).save_to_unmap == 0 {
// The broker didn't read this page yet, no more pages to unmap.
break;
}
@ -434,7 +416,7 @@ where
/// place EOP, commit EOP, reset, alloc again on the new space.
unsafe fn alloc_eop(&mut self) -> Result<*mut LlmpMsg, AflError> {
let mut map = self.out_maps.last_mut().unwrap();
let page = map.page();
let page = map.page_mut();
let last_msg = self.last_msg_sent;
if (*page).size_used + EOP_MSG_SIZE > (*page).size_total {
panic!("PROGRAM ABORT : BUG: EOP does not fit in page! page {:?}, size_current {:?}, size_total {:?}", page,
@ -466,7 +448,7 @@ where
let buf_len_padded;
let mut complete_msg_size = llmp_align(size_of::<LlmpMsg>() + buf_len);
let map = self.out_maps.last_mut().unwrap();
let page = map.page();
let page = map.page_mut();
let last_msg = self.last_msg_sent;
/* DBG("XXX complete_msg_size %lu (h: %lu)\n", complete_msg_size, sizeof(llmp_message)); */
/* In case we don't have enough space, make sure the next page will be large
@ -559,7 +541,7 @@ where
if (*msg).tag == LLMP_TAG_UNSET {
panic!("No tag set on message with id {}", (*msg).message_id);
}
let page = self.out_maps.last_mut().unwrap().page();
let page = self.out_maps.last_mut().unwrap().page_mut();
if msg.is_null() || !llmp_msg_in_page(page, msg) {
return Err(AflError::Unknown(format!(
"Llmp Message {:?} is null or not in current page",
@ -576,14 +558,14 @@ where
/// listener about it using a EOP message.
unsafe fn handle_out_eop(&mut self) -> Result<(), AflError> {
let old_map = self.out_maps.last_mut().unwrap().page();
let old_map = self.out_maps.last_mut().unwrap().page_mut();
// Create a new shard page.
let mut new_map_shmem = LlmpSharedMap::new(
(*old_map).sender,
SH::new_map(new_map_size((*old_map).max_alloc_size))?,
);
let mut new_map = new_map_shmem.page();
let mut new_map = new_map_shmem.page_mut();
ptr::write_volatile(&mut (*new_map).current_msg_id, (*old_map).current_msg_id);
(*new_map).max_alloc_size = (*old_map).max_alloc_size;
@ -634,7 +616,7 @@ where
pub unsafe fn cancel_send(&mut self, msg: *mut LlmpMsg) {
/* DBG("Client %d cancels send of msg at %p with tag 0x%X and size %ld", client->id, msg, msg->tag,
* msg->buf_len_padded); */
let page = self.out_maps.last_mut().unwrap().page();
let page = self.out_maps.last_mut().unwrap().page_mut();
(*msg).tag = LLMP_TAG_UNSET;
(*page).size_used -= (*msg).buf_len_padded as usize + size_of::<LlmpMsg>();
}
@ -673,7 +655,7 @@ where
unsafe fn recv(&mut self) -> Result<Option<*mut LlmpMsg>, AflError> {
/* DBG("recv %p %p\n", page, last_msg); */
compiler_fence(Ordering::SeqCst);
let page = self.current_recv_map.page();
let page = self.current_recv_map.page_mut();
let last_msg = self.last_msg_recvd;
let current_msg_id = ptr::read_volatile(&mut (*page).current_msg_id);
@ -754,7 +736,7 @@ where
/// then returns that message.
pub unsafe fn recv_blocking(&mut self) -> Result<*mut LlmpMsg, AflError> {
let mut current_msg_id = 0;
let page = self.current_recv_map.page();
let page = self.current_recv_map.page_mut();
let last_msg = self.last_msg_recvd;
if !last_msg.is_null() {
if (*last_msg).tag == LLMP_TAG_END_OF_PAGE && !llmp_msg_in_page(page, last_msg) {
@ -825,9 +807,66 @@ where
}
/// Get the unsafe ptr to this page, situated on the shared map
pub unsafe fn page(&mut self) -> *mut LlmpPage {
shmem2page(&mut self.shmem)
pub unsafe fn page_mut(&mut self) -> *mut LlmpPage {
shmem2page_mut(&mut self.shmem)
}
/// Get the unsafe ptr to this page, situated on the shared map
pub unsafe fn page(&self) -> *const LlmpPage {
shmem2page(&self.shmem)
}
/// Gets the offset of a message on this here page.
/// Will return IllegalArgument error if msg is not on page.
pub fn msg_to_offset(&mut self, msg: *mut LlmpMsg) -> Result<u64, AflError> {
unsafe {
let page = self.page_mut();
if llmp_msg_in_page(page, msg) {
// Cast both sides to u8 arrays, get the offset, then cast the return isize to u64
Ok((msg as *const u8).offset_from((*page).messages.as_ptr() as *const u8) as u64)
} else {
Err(AflError::IllegalArgument(format!(
"Message (0x{:X}) not in page (0x{:X})",
page as u64, msg as u64
)))
}
}
}
/// Gets this message from this page, at the indicated offset.
/// Will return IllegalArgument error if the offset is out of bounds.
pub fn msg_from_offset(&mut self, offset: u64) -> Result<*mut LlmpMsg, AflError> {
unsafe {
let page = self.page_mut();
let page_size = self.shmem.map().len() - size_of::<LlmpPage>();
if offset as isize > page_size as isize {
Err(AflError::IllegalArgument(format!(
"Msg offset out of bounds (size: {}, requested offset: {})",
page_size, offset
)))
} else {
Ok(
((*page).messages.as_mut_ptr() as *mut u8).offset(offset as isize)
as *mut LlmpMsg,
)
}
}
}
}
/// The broker (node 0)
#[derive(Clone, Debug)]
#[repr(C)]
pub struct LlmpBroker<SH>
where
SH: ShMem,
{
/// Broadcast map from broker to all clients
pub llmp_out: LlmpSender<SH>,
/// Users of Llmp can add message handlers in the broker.
/// This allows us to intercept messages right in the broker
/// This keeps the out map clean.
pub llmp_clients: Vec<LlmpReceiver<SH>>,
}
/// The broker forwards all messages to its own bus-like broadcast map.
@ -1116,6 +1155,43 @@ impl<SH> LlmpClient<SH>
where
SH: ShMem,
{
/// Reattach to a vacant client map.
/// It is essential, that the broker (or someone else) kept a pointer to the out_map
/// else reattach will get a new, empty page, from the OS
pub fn on_existing_map(
current_out_map: SH,
last_msg_sent_offset: Option<u64>,
current_broker_map: SH,
last_msg_recvd_offset: Option<u64>,
) -> Result<Self, AflError> {
let mut out_map = LlmpSharedMap::new(0, current_out_map);
let last_msg_sent = match last_msg_sent_offset {
Some(offset) => out_map.msg_from_offset(offset)?,
None => 0 as *mut LlmpMsg,
};
let mut current_recv_map = LlmpSharedMap::new(0, current_broker_map);
let last_msg_recvd = match last_msg_recvd_offset {
Some(offset) => current_recv_map.msg_from_offset(offset)?,
None => 0 as *mut LlmpMsg,
};
Ok(Self {
llmp_out: LlmpSender {
id: 0,
last_msg_sent,
out_maps: vec![out_map],
// drop pages to the broker if it already read them
keep_pages_forever: false,
},
llmp_in: LlmpReceiver {
id: 0,
current_recv_map,
last_msg_recvd,
},
})
}
/// Creates a new LlmpClient
pub fn new(initial_broker_map: LlmpSharedMap<SH>) -> Result<Self, AflError> {
Ok(Self {
@ -1244,18 +1320,12 @@ mod tests {
pub fn llmp_connection() {
let mut broker = match LlmpConnection::<AflShmem>::on_port(1337).unwrap() {
IsClient { client: _ } => panic!("Could not bind to port as broker"),
IsBroker {
broker,
listener_thread: _,
} => broker,
IsBroker { broker } => broker,
};
// Add the first client (2nd, actually, because of the tcp listener client)
let mut client = match LlmpConnection::<AflShmem>::on_port(1337).unwrap() {
IsBroker {
broker: _,
listener_thread: _,
} => panic!("Second connect should be a client!"),
IsBroker { broker: _ } => panic!("Second connect should be a client!"),
IsClient { client } => client,
};

View File

@ -9,9 +9,10 @@ use core::{marker::PhantomData, time};
use serde::{Deserialize, Serialize};
#[cfg(feature = "std")]
use self::llmp::Tag;
#[cfg(feature = "std")]
use self::shmem::AflShmem;
use self::{
llmp::{LlmpClient, Tag},
shmem::ShMem,
};
use crate::corpus::Corpus;
use crate::executors::Executor;
use crate::feedbacks::FeedbacksTuple;
@ -707,8 +708,7 @@ const _LLMP_TAG_EVENT_TO_BROKER: llmp::Tag = 0x2B80438;
/// Handle in both
const LLMP_TAG_EVENT_TO_BOTH: llmp::Tag = 0x2B0741;
#[cfg(feature = "std")]
pub struct LlmpEventManager<C, E, OT, FT, I, R, ST>
pub struct LlmpEventManager<C, E, OT, FT, I, R, SH, ST>
where
C: Corpus<I, R>,
E: Executor<I>,
@ -716,16 +716,16 @@ where
FT: FeedbacksTuple<I>,
I: Input,
R: Rand,
SH: ShMem,
ST: Stats,
//CE: CustomEvent<I>,
{
llmp: llmp::LlmpConnection<AflShmem>,
llmp: llmp::LlmpConnection<SH>,
stats: ST,
phantom: PhantomData<(C, E, OT, FT, I, R)>,
}
#[cfg(feature = "std")]
impl<C, E, OT, FT, I, R, ST> LlmpEventManager<C, E, OT, FT, I, R, ST>
impl<C, E, OT, FT, I, R, SH, ST> LlmpEventManager<C, E, OT, FT, I, R, SH, ST>
where
C: Corpus<I, R>,
E: Executor<I>,
@ -733,27 +733,34 @@ where
FT: FeedbacksTuple<I>,
I: Input,
R: Rand,
SH: ShMem,
ST: Stats,
{
#[cfg(feature = "std")]
/// Create llmp on a port
/// If the port is not yet bound, it will act as broker
/// Else, it will act as client.
pub fn new_on_port(port: u16, stats: ST) -> Result<Self, AflError> {
let mgr = Self {
Ok(Self {
llmp: llmp::LlmpConnection::on_port(port)?,
stats: stats,
phantom: PhantomData,
};
Ok(mgr)
})
}
/// A client on an existing map
pub fn for_client(client: LlmpClient<SH>, stats: ST) -> Self {
Self {
llmp: llmp::LlmpConnection::IsClient { client },
stats,
phantom: PhantomData,
}
}
/// Returns if we are the broker
pub fn is_broker(&self) -> bool {
match self.llmp {
llmp::LlmpConnection::IsBroker {
broker: _,
listener_thread: _,
} => true,
llmp::LlmpConnection::IsBroker { broker: _ } => true,
_ => false,
}
}
@ -761,10 +768,7 @@ where
/// Run forever in the broker
pub fn broker_loop(&mut self) -> Result<(), AflError> {
match &mut self.llmp {
llmp::LlmpConnection::IsBroker {
broker,
listener_thread: _,
} => {
llmp::LlmpConnection::IsBroker { broker } => {
let stats = &mut self.stats;
broker.loop_forever(
&mut |sender_id: u32, tag: Tag, msg: &[u8]| {
@ -802,8 +806,8 @@ where
}
#[cfg(feature = "std")]
impl<C, E, OT, FT, I, R, ST> EventManager<C, E, OT, FT, I, R>
for LlmpEventManager<C, E, OT, FT, I, R, ST>
impl<C, E, OT, FT, I, R, SH, ST> EventManager<C, E, OT, FT, I, R>
for LlmpEventManager<C, E, OT, FT, I, R, SH, ST>
where
C: Corpus<I, R>,
E: Executor<I>,
@ -811,6 +815,7 @@ where
OT: ObserversTuple,
I: Input,
R: Rand,
SH: ShMem,
ST: Stats,
//CE: CustomEvent<I>,
{

View File

@ -49,6 +49,8 @@ pub enum AflError {
NotImplemented(String),
/// You're holding it wrong
IllegalState(String),
/// The argument passed to this method or function is not valid
IllegalArgument(String),
/// Something else happened
Unknown(String),
}
@ -67,6 +69,7 @@ impl fmt::Display for AflError {
}
Self::NotImplemented(s) => write!(f, "Not implemented: {0}", &s),
Self::IllegalState(s) => write!(f, "Illegal state: {0}", &s),
Self::IllegalArgument(s) => write!(f, "Illegal argument: {0}", &s),
Self::Unknown(s) => write!(f, "Unknown error: {0}", &s),
}
}