added (more) crash restart

This commit is contained in:
Dominik Maier 2021-02-11 05:33:04 +01:00
parent 9d2635a597
commit fd2de83e1c
9 changed files with 172 additions and 108 deletions

View File

@ -26,7 +26,7 @@ use crate::{
observers::ObserversTuple,
shmem::ShMem,
state::State,
utils::Rand,
utils::{serialize_state_mgr, Rand},
AflError,
};
@ -173,7 +173,7 @@ where
/// Lookup for incoming events and process them.
/// Return the number of processes events or an error
fn process<C, FT, R>(&mut self, state: &mut State<C, I, R, FT>) -> Result<usize, AflError>
fn process<C, FT, R>(&mut self, state: &mut State<C, FT, I, R>) -> Result<usize, AflError>
where
C: Corpus<I, R>,
FT: FeedbacksTuple<I>,
@ -195,10 +195,25 @@ where
Ok(postcard::from_bytes(observers_buf)?)
}
/// For restarting event managers, implement a way to forward state to their next peers.
#[inline]
fn on_restart<C, FT, R>(&mut self, _state: &mut State<C, FT, I, R>) -> Result<(), AflError>
where
C: Corpus<I, R>,
FT: FeedbacksTuple<I>,
R: Rand,
{
Ok(())
}
/// Block until we are safe to exit.
#[inline]
fn await_restart_safe(&mut self) {}
/// Send off an event to the broker
fn fire<C, FT, R>(
&mut self,
_state: &mut State<C, I, R, FT>,
_state: &mut State<C, FT, I, R>,
event: Event<I>,
) -> Result<(), AflError>
where
@ -216,7 +231,7 @@ impl<I> EventManager<I> for NopEventManager<I>
where
I: Input,
{
fn process<C, FT, R>(&mut self, _state: &mut State<C, I, R, FT>) -> Result<usize, AflError>
fn process<C, FT, R>(&mut self, _state: &mut State<C, FT, I, R>) -> Result<usize, AflError>
where
C: Corpus<I, R>,
FT: FeedbacksTuple<I>,
@ -227,7 +242,7 @@ where
fn fire<C, FT, R>(
&mut self,
_state: &mut State<C, I, R, FT>,
_state: &mut State<C, FT, I, R>,
_event: Event<I>,
) -> Result<(), AflError>
where
@ -257,7 +272,7 @@ where
I: Input,
ST: Stats, //CE: CustomEvent<I, OT>,
{
fn process<C, FT, R>(&mut self, state: &mut State<C, I, R, FT>) -> Result<usize, AflError>
fn process<C, FT, R>(&mut self, state: &mut State<C, FT, I, R>) -> Result<usize, AflError>
where
C: Corpus<I, R>,
FT: FeedbacksTuple<I>,
@ -273,7 +288,7 @@ where
fn fire<C, FT, R>(
&mut self,
_state: &mut State<C, I, R, FT>,
_state: &mut State<C, FT, I, R>,
event: Event<I>,
) -> Result<(), AflError>
where
@ -350,7 +365,7 @@ where
// Handle arriving events in the client
fn handle_in_client<C, FT, R>(
&mut self,
_state: &mut State<C, I, R, FT>,
_state: &mut State<C, FT, I, R>,
_sender_id: u32,
event: Event<I>,
) -> Result<(), AflError>
@ -419,6 +434,18 @@ where
}
}
impl<I, SH, ST> Drop for LlmpEventManager<I, SH, ST>
where
I: Input,
SH: ShMem,
ST: Stats,
{
/// LLMP clients will have to wait until their pages are mapped by somebody.
fn drop(&mut self) {
self.await_restart_safe()
}
}
impl<I, SH, ST> LlmpEventManager<I, SH, ST>
where
I: Input,
@ -580,7 +607,7 @@ where
// Handle arriving events in the client
fn handle_in_client<C, FT, R>(
&mut self,
state: &mut State<C, I, R, FT>,
state: &mut State<C, FT, I, R>,
_sender_id: u32,
event: Event<I>,
) -> Result<(), AflError>
@ -619,7 +646,19 @@ where
SH: ShMem,
ST: Stats, //CE: CustomEvent<I>,
{
fn process<C, FT, R>(&mut self, state: &mut State<C, I, R, FT>) -> Result<usize, AflError>
/// The llmp client needs to wait until a broker mapped all pages, before shutting down.
/// Otherwise, the OS may already have removed the shared maps,
fn await_restart_safe(&mut self) {
match &self.llmp {
llmp::LlmpConnection::IsClient { client } => {
// wait until we can drop the message safely.
client.await_save_to_unmap_blocking();
}
_ => (),
}
}
fn process<C, FT, R>(&mut self, state: &mut State<C, FT, I, R>) -> Result<usize, AflError>
where
C: Corpus<I, R>,
FT: FeedbacksTuple<I>,
@ -654,7 +693,7 @@ where
fn fire<C, FT, R>(
&mut self,
_state: &mut State<C, I, R, FT>,
_state: &mut State<C, FT, I, R>,
event: Event<I>,
) -> Result<(), AflError>
where
@ -688,7 +727,7 @@ where
*/
/// A manager that can restart on the fly
/// A manager that can restart on the fly, storing states in-between (in `on_resatrt`)
#[derive(Clone, Debug)]
pub struct LlmpRestartingEventManager<I, SH, ST>
where
@ -709,7 +748,27 @@ where
SH: ShMem,
ST: Stats, //CE: CustomEvent<I>,
{
fn process<C, FT, R>(&mut self, state: &mut State<C, I, R, FT>) -> Result<usize, AflError>
/// Reset the single page (we reuse it over and over from pos 0), then send the current state to the next runner.
fn on_restart<C, FT, R>(&mut self, state: &mut State<C, FT, I, R>) -> Result<(), AflError>
where
C: Corpus<I, R>,
FT: FeedbacksTuple<I>,
R: Rand,
{
unsafe { self.sender.reset_last_page() };
let state_corpus_serialized = serialize_state_mgr(state, &self.llmp_mgr)?;
self.sender
.send_buf(_LLMP_TAG_RESTART, &state_corpus_serialized)
}
/// The llmp client needs to wait until a broker mapped all pages, before shutting down.
/// Otherwise, the OS may already have removed the shared maps,
#[inline]
fn await_restart_safe(&mut self) {
self.llmp_mgr.await_restart_safe();
}
fn process<C, FT, R>(&mut self, state: &mut State<C, FT, I, R>) -> Result<usize, AflError>
where
C: Corpus<I, R>,
FT: FeedbacksTuple<I>,
@ -720,7 +779,7 @@ where
fn fire<C, FT, R>(
&mut self,
state: &mut State<C, I, R, FT>,
state: &mut State<C, FT, I, R>,
event: Event<I>,
) -> Result<(), AflError>
where
@ -773,7 +832,7 @@ where
pub fn temp<C, FT, R>(
stats: ST,
broker_port: u16,
) -> Result<(Self, Option<State<C, I, R, FT>>), AflError>
) -> Result<(Self, Option<State<C, FT, I, R>>), AflError>
where
C: Corpus<I, R>,
FT: FeedbacksTuple<I>,
@ -860,7 +919,7 @@ pub fn setup_restarting_state<I, C, FT, R, SH, ST>(
mgr: &mut LlmpEventManager<I, SH, ST>,
) -> Result<
(
Option<State<C, I, R, FT>>,
Option<State<C, FT, I, R>>,
LlmpRestartingEventManager<I, SH, ST>,
),
AflError,
@ -923,7 +982,7 @@ where
// Restoring from a previous run, deserialize state and corpus.
Some((_sender, _tag, msg)) => {
println!("Subsequent run. Let's load all data from shmem (received {} bytes from previous instance)", msg.len());
let (state, mgr): (State<C, I, R, FT>, LlmpEventManager<I, SH, ST>) =
let (state, mgr): (State<C, FT, I, R>, LlmpEventManager<I, SH, ST>) =
deserialize_state_mgr(&msg)?;
(Some(state), LlmpRestartingEventManager::new(mgr, sender))

View File

@ -54,9 +54,9 @@ where
OT: ObserversTuple,
{
#[inline]
fn pre_exec<R, FT, C, EM>(
fn pre_exec<C, EM, FT, R>(
&mut self,
_state: &State<C, I, R, FT>,
_state: &mut State<C, FT, I, R>,
_event_mgr: &mut EM,
_input: &I,
) -> Result<(), AflError>
@ -75,9 +75,9 @@ where
}
#[inline]
fn post_exec<R, FT, C, EM>(
fn post_exec<C, EM, FT, R>(
&mut self,
_state: &State<C, I, R, FT>,
_state: &State<C, FT, I, R>,
_event_mgr: &mut EM,
_input: &I,
) -> Result<(), AflError>
@ -142,11 +142,11 @@ where
/// * `on_crash_fn` - When an in-mem harness crashes, it may safe some state to continue fuzzing later.
/// Do that that in this function. The program will crash afterwards.
/// * `observers` - the observers observing the target during execution
pub fn new<R, FT, C, EM>(
pub fn new<C, EM, FT, R>(
name: &'static str,
harness_fn: HarnessFunction<Self>,
observers: OT,
_state: &mut State<C, I, R, FT>,
_state: &mut State<C, FT, I, R>,
_event_mgr: &mut EM,
) -> Self
where
@ -195,6 +195,9 @@ pub mod unix_signals {
extern crate libc;
#[cfg(target_os = "linux")]
use fs::read_to_string;
// Unhandled signals: SIGALRM, SIGHUP, SIGINT, SIGKILL, SIGQUIT, SIGTERM
use libc::{
c_int, c_void, sigaction, siginfo_t, SA_NODEFER, SA_SIGINFO, SIGABRT, SIGBUS, SIGFPE,
@ -202,20 +205,24 @@ pub mod unix_signals {
};
use std::{
io::{stdout, Write}, // Write brings flush() into scope
mem,
process,
ptr,
fs,
io::{stdout, Write},
mem, ptr,
};
use crate::{
corpus::Corpus, events::EventManager, feedbacks::FeedbacksTuple, inputs::Input,
observers::ObserversTuple, state::State, utils::Rand,
corpus::Corpus,
events::{Event, EventManager},
feedbacks::FeedbacksTuple,
inputs::Input,
observers::ObserversTuple,
state::State,
utils::Rand,
};
/// Pointers to values only needed on crash. As the program will not continue after a crash,
/// we should (tm) be okay with raw pointers here,
static mut STATE_PTR: *const c_void = ptr::null_mut();
static mut STATE_PTR: *mut c_void = ptr::null_mut();
static mut EVENT_MGR_PTR: *mut c_void = ptr::null_mut();
/// The (unsafe) pointer to the current inmem input, for the current run.
/// This is neede for certain non-rust side effects, as well as unix signal handling.
@ -235,9 +242,15 @@ pub mod unix_signals {
{
if CURRENT_INPUT_PTR == ptr::null() {
println!(
"We died accessing addr {}, but are not in client... Exiting.",
"We crashed at addr 0x{:x}, but are not in the target... Bug in the fuzzer? Exiting.",
info.si_addr() as usize
);
// let's yolo-cat the maps for debugging, if possible.
#[cfg(target_os = "linux")]
match fs::read_to_string("/proc/self/maps") {
Ok(maps) => println!("maps:\n{}", maps),
Err(e) => println!("Couldn't load mappings: {:?}", e),
};
return;
//exit(1);
}
@ -247,21 +260,23 @@ pub mod unix_signals {
#[cfg(feature = "std")]
let _ = stdout().flush();
/*let input = (CURRENT_INPUT_PTR as *const I).as_ref().unwrap();
let state = (EVENT_MGR_PTR as *const State<I, R, FT>).as_ref().unwrap();
let manager = (EVENT_MGR_PTR as *mut EM).as_mut().unwrap();
let input = (CURRENT_INPUT_PTR as *const I).as_ref().unwrap();
// Make sure we don't crash in the crash handler forever.
CURRENT_INPUT_PTR = ptr::null();
let state = (STATE_PTR as *mut State<C, FT, I, R>).as_mut().unwrap();
let mgr = (EVENT_MGR_PTR as *mut EM).as_mut().unwrap();
mgr.fire(
state,
Event::Crash {
input: input.to_owned(),
},
)
.expect(&format!("Could not send crashing input {:?}", input));
if !on_crash_fn_ptr.is_null() {
(*(on_crash_fn_ptr as *mut Box<OnCrashFunction<I, C, EM, FT, R>>))(
ExitKind::Crash,
input,
state,
corpus,
manager,
);
}*/
std::process::exit(139);
// Send our current state to the next execution
mgr.on_restart(state).unwrap();
mgr.await_restart_safe();
//std::process::exit(139);
}
pub unsafe extern "C" fn libaflrs_executor_inmem_handle_timeout<C, EM, FT, I, OT, R>(
@ -282,41 +297,31 @@ pub mod unix_signals {
return;
}
/*let input = (CURRENT_INPUT_PTR as *const I).as_ref().unwrap();
let state = (EVENT_MGR_PTR as *const State<I, R, FT>).as_ref().unwrap();
let manager = (EVENT_MGR_PTR as *mut EM).as_mut().unwrap();
if !on_crash_fn_ptr.is_null() {
(*(on_crash_fn_ptr as *mut Box<OnCrashFunction<I, C, EM, FT, R>>))(
ExitKind::Timeout,
input,
state,
corpus,
manager,
);
}*/
/* TODO: If we want to be on the safe side, we really need to do this:
match manager.llmp {
IsClient { client } => {
let map = client.out_maps.last().unwrap();
/// wait until we can drop the message safely.
map.await_save_to_unmap_blocking();
/// Make sure all pages are unmapped.
drop(manager);
}
_ => (),
}
*/
println!("Timeout in fuzz run.");
let _ = stdout().flush();
process::abort();
let input = (CURRENT_INPUT_PTR as *const I).as_ref().unwrap();
// Make sure we don't crash in the crash handler forever.
CURRENT_INPUT_PTR = ptr::null();
let state = (STATE_PTR as *mut State<C, FT, I, R>).as_mut().unwrap();
let mgr = (EVENT_MGR_PTR as *mut EM).as_mut().unwrap();
mgr.fire(
state,
Event::Timeout {
input: input.to_owned(),
},
)
.expect(&format!("Could not send timeouting input {:?}", input));
// Send our current state to the next execution
mgr.on_restart(state).unwrap();
mgr.await_restart_safe();
//process::abort();
}
#[inline]
pub unsafe fn set_oncrash_ptrs<C, EM, FT, I, OT, R>(
state: &State<C, I, R, FT>,
state: &mut State<C, FT, I, R>,
event_mgr: &mut EM,
input: &I,
) where
@ -327,15 +332,17 @@ pub mod unix_signals {
I: Input,
R: Rand,
{
println!("Setting oncrash");
CURRENT_INPUT_PTR = input as *const _ as *const c_void;
STATE_PTR = state as *const _ as *const c_void;
STATE_PTR = state as *mut _ as *mut c_void;
EVENT_MGR_PTR = event_mgr as *mut _ as *mut c_void;
}
#[inline]
pub unsafe fn reset_oncrash_ptrs<C, EM, FT, I, OT, R>() {
println!("Resetting oncrash");
CURRENT_INPUT_PTR = ptr::null();
STATE_PTR = ptr::null();
STATE_PTR = ptr::null_mut();
EVENT_MGR_PTR = ptr::null_mut();
}

View File

@ -82,9 +82,9 @@ where
{
#[inline]
/// Called right before exexution starts
fn pre_exec<R, FT, C, EM>(
fn pre_exec<C, EM, FT, R>(
&mut self,
_state: &State<C, I, R, FT>,
_state: &mut State<C, FT, I, R>,
_event_mgr: &mut EM,
_input: &I,
) -> Result<(), AflError>
@ -99,9 +99,9 @@ where
#[inline]
/// Called right after execution finished.
fn post_exec<R, FT, C, EM>(
fn post_exec<C, EM, FT, R>(
&mut self,
_state: &State<C, I, R, FT>,
_state: &State<C, FT, I, R>,
_event_mgr: &mut EM,
_input: &I,
) -> Result<(), AflError>

View File

@ -61,7 +61,7 @@ where
&mut self,
rand: &mut R,
executor: &mut E,
state: &mut State<C, I, R, FT>,
state: &mut State<C, FT, I, R>,
manager: &mut EM,
) -> Result<usize, AflError> {
let (_, idx) = state.corpus_mut().next(rand)?;
@ -77,7 +77,7 @@ where
&mut self,
rand: &mut R,
executor: &mut E,
state: &mut State<C, I, R, FT>,
state: &mut State<C, FT, I, R>,
manager: &mut EM,
) -> Result<(), AflError> {
let mut last = current_milliseconds();

View File

@ -31,7 +31,7 @@ where
&mut self,
rand: &mut R,
executor: &mut E,
state: &mut State<C, I, R, FT>,
state: &mut State<C, FT, I, R>,
manager: &mut EM,
corpus_idx: usize,
) -> Result<(), AflError>;
@ -51,7 +51,7 @@ where
&mut self,
rand: &mut R,
executor: &mut E,
state: &mut State<C, I, R, FT>,
state: &mut State<C, FT, I, R>,
manager: &mut EM,
corpus_idx: usize,
) -> Result<(), AflError>;
@ -73,7 +73,7 @@ where
&mut self,
_rand: &mut R,
_executor: &mut E,
_state: &mut State<C, I, R, FT>,
_state: &mut State<C, FT, I, R>,
_manager: &mut EM,
_corpus_idx: usize,
) -> Result<(), AflError> {
@ -99,7 +99,7 @@ where
&mut self,
rand: &mut R,
executor: &mut E,
state: &mut State<C, I, R, FT>,
state: &mut State<C, FT, I, R>,
manager: &mut EM,
corpus_idx: usize,
) -> Result<(), AflError> {

View File

@ -21,7 +21,7 @@ use crate::{
/// being applied to the input one by one, between executions.
pub trait MutationalStage<C, E, EM, FT, I, M, OT, R>: Stage<C, E, EM, FT, I, OT, R>
where
M: Mutator<C, I, R, State<C, I, R, FT>>,
M: Mutator<C, I, R, State<C, FT, I, R>>,
EM: EventManager<I>,
E: Executor<I> + HasObservers<OT>,
OT: ObserversTuple,
@ -48,7 +48,7 @@ where
&mut self,
rand: &mut R,
executor: &mut E,
state: &mut State<C, I, R, FT>,
state: &mut State<C, FT, I, R>,
manager: &mut EM,
corpus_idx: usize,
) -> Result<(), AflError> {
@ -107,7 +107,7 @@ where
EM: EventManager<I>,
FT: FeedbacksTuple<I>,
I: Input,
M: Mutator<C, I, R, State<C, I, R, FT>>,
M: Mutator<C, I, R, State<C, FT, I, R>>,
OT: ObserversTuple,
R: Rand,
{
@ -123,7 +123,7 @@ where
EM: EventManager<I>,
FT: FeedbacksTuple<I>,
I: Input,
M: Mutator<C, I, R, State<C, I, R, FT>>,
M: Mutator<C, I, R, State<C, FT, I, R>>,
OT: ObserversTuple,
R: Rand,
{
@ -143,7 +143,7 @@ where
impl<C, E, EM, FT, I, M, OT, R> Stage<C, E, EM, FT, I, OT, R>
for StdMutationalStage<C, E, EM, FT, I, M, OT, R>
where
M: Mutator<C, I, R, State<C, I, R, FT>>,
M: Mutator<C, I, R, State<C, FT, I, R>>,
EM: EventManager<I>,
E: Executor<I> + HasObservers<OT>,
OT: ObserversTuple,
@ -157,7 +157,7 @@ where
&mut self,
rand: &mut R,
executor: &mut E,
state: &mut State<C, I, R, FT>,
state: &mut State<C, FT, I, R>,
manager: &mut EM,
corpus_idx: usize,
) -> Result<(), AflError> {
@ -167,7 +167,7 @@ where
impl<C, E, EM, FT, I, M, OT, R> StdMutationalStage<C, E, EM, FT, I, M, OT, R>
where
M: Mutator<C, I, R, State<C, I, R, FT>>,
M: Mutator<C, I, R, State<C, FT, I, R>>,
EM: EventManager<I>,
E: Executor<I> + HasObservers<OT>,
OT: ObserversTuple,

View File

@ -52,7 +52,7 @@ pub trait HasMetadata {
/// The state a fuzz run.
#[derive(Serialize, Deserialize, Clone, Debug)]
#[serde(bound = "FT: serde::de::DeserializeOwned")]
pub struct State<C, I, R, FT>
pub struct State<C, FT, I, R>
where
C: Corpus<I, R>,
I: Input,
@ -74,7 +74,7 @@ where
}
#[cfg(feature = "std")]
impl<C, R, FT> State<C, BytesInput, R, FT>
impl<C, FT, R> State<C, FT, BytesInput, R>
where
C: Corpus<BytesInput, R>,
R: Rand,
@ -147,7 +147,7 @@ where
}
}
impl<C, I, R, FT> HasCorpus<C> for State<C, I, R, FT>
impl<C, FT, I, R> HasCorpus<C> for State<C, FT, I, R>
where
C: Corpus<I, R>,
I: Input,
@ -166,7 +166,7 @@ where
}
/// Trait for elements offering metadata
impl<C, I, R, FT> HasMetadata for State<C, I, R, FT>
impl<C, FT, I, R> HasMetadata for State<C, FT, I, R>
where
C: Corpus<I, R>,
I: Input,
@ -186,7 +186,7 @@ where
}
}
impl<C, I, R, FT> State<C, I, R, FT>
impl<C, FT, I, R> State<C, FT, I, R>
where
C: Corpus<I, R>,
I: Input,
@ -264,7 +264,7 @@ where
{
executor.pre_exec_observers()?;
executor.pre_exec(&self, event_mgr, input)?;
executor.pre_exec(self, event_mgr, input)?;
executor.run_target(input)?;
executor.post_exec(&self, event_mgr, input)?;

View File

@ -25,7 +25,7 @@ pub type StdRand = RomuTrioRand;
/// On top, add the current llmp event manager instance to be restored
/// This method is needed when the fuzzer run crashes and has to restart.
pub fn serialize_state_mgr<C, FT, I, R, SH, ST>(
state: &State<C, I, R, FT>,
state: &State<C, FT, I, R>,
mgr: &LlmpEventManager<I, SH, ST>,
) -> Result<Vec<u8>, AflError>
where
@ -56,7 +56,7 @@ where
/// Deserialize the state and corpus tuple, previously serialized with `serialize_state_corpus(...)`
pub fn deserialize_state_mgr<C, FT, I, R, SH, ST>(
state_corpus_serialized: &[u8],
) -> Result<(State<C, I, R, FT>, LlmpEventManager<I, SH, ST>), AflError>
) -> Result<(State<C, FT, I, R>, LlmpEventManager<I, SH, ST>), AflError>
where
C: Corpus<I, R>,
FT: FeedbacksTuple<I>,
@ -76,7 +76,7 @@ where
/// Serialize the current state and corpus during an executiont to bytes.
/// This method is needed when the fuzzer run crashes and has to restart.
pub fn serialize_state_corpus<C, FT, I, R>(
state: &State<C, I, R, FT>,
state: &State<C, FT, I, R>,
corpus: &C,
) -> Result<Vec<u8>, AflError>
where
@ -93,7 +93,7 @@ where
/// Deserialize the state and corpus tuple, previously serialized with `serialize_state_corpus(...)`
pub fn deserialize_state_corpus<C, FT, I, R>(
state_corpus_serialized: &[u8],
) -> Result<(State<C, I, R, FT>, C), AflError>
) -> Result<(State<C, FT, I, R>, C), AflError>
where
C: Corpus<I, R>,
FT: FeedbacksTuple<I>,

View File

@ -5,7 +5,7 @@
extern crate clap;
use clap::{App, Arg};
use std::{env, path::PathBuf, process::Command};
use std::{env, path::PathBuf};
use afl::{
corpus::{Corpus, InMemoryCorpus},
@ -13,11 +13,9 @@ use afl::{
events::{LlmpEventManager, SimpleStats},
executors::{inprocess::InProcessExecutor, Executor, ExitKind},
feedbacks::MaxMapFeedback,
generators::RandPrintablesGenerator,
inputs::{BytesInput, Input},
inputs::Input,
mutators::{scheduled::HavocBytesMutator, HasMaxSize},
observers::StdMapObserver,
shmem::{AflShmem, ShMem},
stages::mutational::StdMutationalStage,
state::{HasCorpus, State},
tuples::tuple_list,
@ -119,7 +117,7 @@ pub fn main() {
/// The actual fuzzer
fn fuzz(input: Option<Vec<PathBuf>>, broker_port: u16) -> Result<(), AflError> {
let mut rand = StdRand::new(0);
/// TODO: Don't the stats need to be serialized, too?
// 'While the stats are state, they are usually used in the broker - which is likely never restarted
let stats = SimpleStats::new(|s| println!("{}", s));
let mut mgr = LlmpEventManager::new_on_port_std(stats, broker_port)?;