restarting forever :)

This commit is contained in:
Dominik Maier 2021-02-11 21:17:19 +01:00
parent 9a5b8f0baa
commit 2a937eb88d
3 changed files with 48 additions and 25 deletions

View File

@ -431,7 +431,7 @@ where
pub fn new(id: u32, keep_pages_forever: bool) -> Result<Self, AflError> { pub fn new(id: u32, keep_pages_forever: bool) -> Result<Self, AflError> {
Ok(Self { Ok(Self {
id, id,
last_msg_sent: 0 as *mut LlmpMsg, last_msg_sent: ptr::null_mut(),
out_maps: vec![LlmpSharedMap::new( out_maps: vec![LlmpSharedMap::new(
0, 0,
SH::new_map(new_map_size(LLMP_PREF_INITIAL_MAP_SIZE))?, SH::new_map(new_map_size(LLMP_PREF_INITIAL_MAP_SIZE))?,
@ -444,8 +444,9 @@ where
/// Completely reset the current sender map. /// Completely reset the current sender map.
/// Afterwards, no receiver should read from it at a different location. /// Afterwards, no receiver should read from it at a different location.
/// This is only useful if all connected llmp parties start over, for example after a crash. /// This is only useful if all connected llmp parties start over, for example after a crash.
pub unsafe fn reset_last_page(&mut self) { pub unsafe fn reset(&mut self) {
_llmp_page_init(&mut self.out_maps.last_mut().unwrap().shmem, self.id, true); _llmp_page_init(&mut self.out_maps.last_mut().unwrap().shmem, self.id, true);
self.last_msg_sent = ptr::null_mut();
} }
/// Reattach to a vacant out_map, to with a previous sender stored the information in an env before. /// Reattach to a vacant out_map, to with a previous sender stored the information in an env before.
@ -494,7 +495,7 @@ where
let mut out_map = LlmpSharedMap::existing(current_out_map); let mut out_map = LlmpSharedMap::existing(current_out_map);
let last_msg_sent = match last_msg_sent_offset { let last_msg_sent = match last_msg_sent_offset {
Some(offset) => out_map.msg_from_offset(offset)?, Some(offset) => out_map.msg_from_offset(offset)?,
None => 0 as *mut LlmpMsg, None => ptr::null_mut(),
}; };
Ok(Self { Ok(Self {
@ -694,7 +695,7 @@ where
(*end_of_page_msg).shm_str = *new_map_shmem.shmem.shm_slice(); (*end_of_page_msg).shm_str = *new_map_shmem.shmem.shm_slice();
// We never sent a msg on the new buf */ // We never sent a msg on the new buf */
self.last_msg_sent = 0 as *mut LlmpMsg; self.last_msg_sent = ptr::null_mut();
/* Send the last msg on the old buf */ /* Send the last msg on the old buf */
self.send(out)?; self.send(out)?;
@ -827,7 +828,7 @@ where
let mut current_recv_map = LlmpSharedMap::existing(current_sender_map); let mut current_recv_map = LlmpSharedMap::existing(current_sender_map);
let last_msg_recvd = match last_msg_recvd_offset { let last_msg_recvd = match last_msg_recvd_offset {
Some(offset) => current_recv_map.msg_from_offset(offset)?, Some(offset) => current_recv_map.msg_from_offset(offset)?,
None => 0 as *mut LlmpMsg, None => ptr::null_mut(),
}; };
Ok(Self { Ok(Self {
@ -1074,7 +1075,7 @@ where
pub fn msg_from_env(&mut self, map_env_name: &str) -> Result<*mut LlmpMsg, AflError> { pub fn msg_from_env(&mut self, map_env_name: &str) -> Result<*mut LlmpMsg, AflError> {
match msg_offset_from_env(map_env_name)? { match msg_offset_from_env(map_env_name)? {
Some(offset) => self.msg_from_offset(offset), Some(offset) => self.msg_from_offset(offset),
None => Ok(0 as *mut LlmpMsg), None => Ok(ptr::null_mut()),
} }
} }
@ -1166,7 +1167,7 @@ where
self.llmp_clients.push(LlmpReceiver { self.llmp_clients.push(LlmpReceiver {
id, id,
current_recv_map: client_page, current_recv_map: client_page,
last_msg_recvd: 0 as *mut LlmpMsg, last_msg_recvd: ptr::null_mut(),
}); });
} }
@ -1279,7 +1280,7 @@ where
Ok(thread::spawn(move || { Ok(thread::spawn(move || {
let mut new_client_sender = LlmpSender { let mut new_client_sender = LlmpSender {
id: 0, id: 0,
last_msg_sent: 0 as *mut LlmpMsg, last_msg_sent: ptr::null_mut(),
out_maps: vec![LlmpSharedMap::existing( out_maps: vec![LlmpSharedMap::existing(
SH::existing_from_shm_str(&tcp_out_map_str, tcp_out_map_size).unwrap(), SH::existing_from_shm_str(&tcp_out_map_str, tcp_out_map_size).unwrap(),
)], )],
@ -1380,7 +1381,7 @@ where
self.llmp_clients.push(LlmpReceiver { self.llmp_clients.push(LlmpReceiver {
id, id,
current_recv_map: new_page, current_recv_map: new_page,
last_msg_recvd: 0 as *mut LlmpMsg, last_msg_recvd: ptr::null_mut(),
}); });
} }
Err(e) => { Err(e) => {
@ -1505,7 +1506,7 @@ where
Ok(Self { Ok(Self {
sender: LlmpSender { sender: LlmpSender {
id: 0, id: 0,
last_msg_sent: 0 as *mut LlmpMsg, last_msg_sent: ptr::null_mut(),
out_maps: vec![LlmpSharedMap::new( out_maps: vec![LlmpSharedMap::new(
0, 0,
SH::new_map(new_map_size(LLMP_PREF_INITIAL_MAP_SIZE))?, SH::new_map(new_map_size(LLMP_PREF_INITIAL_MAP_SIZE))?,
@ -1517,7 +1518,7 @@ where
receiver: LlmpReceiver { receiver: LlmpReceiver {
id: 0, id: 0,
current_recv_map: initial_broker_map, current_recv_map: initial_broker_map,
last_msg_recvd: 0 as *mut LlmpMsg, last_msg_recvd: ptr::null_mut(),
}, },
}) })
} }

View File

@ -199,6 +199,17 @@ where
Ok(postcard::from_bytes(observers_buf)?) Ok(postcard::from_bytes(observers_buf)?)
} }
/// For restarting event managers, implement a way to forward state to their next peers.
#[inline]
fn on_restart<C, FT, R>(&mut self, _state: &mut State<C, FT, I, R>) -> Result<(), AflError>
where
C: Corpus<I, R>,
FT: FeedbacksTuple<I>,
R: Rand,
{
Ok(())
}
/// Block until we are safe to exit. /// Block until we are safe to exit.
#[inline] #[inline]
fn await_restart_safe(&mut self) {} fn await_restart_safe(&mut self) {}
@ -766,6 +777,20 @@ where
self.llmp_mgr.await_restart_safe(); self.llmp_mgr.await_restart_safe();
} }
/// Reset the single page (we reuse it over and over from pos 0), then send the current state to the next runner.
fn on_restart<C, FT, R>(&mut self, state: &mut State<C, FT, I, R>) -> Result<(), AflError>
where
C: Corpus<I, R>,
FT: FeedbacksTuple<I>,
R: Rand,
{
// First, reset the page to 0 so the next iteration can read read from the beginning of this page
unsafe { self.sender.reset() };
let state_corpus_serialized = serialize_state_mgr(state, &self.llmp_mgr)?;
self.sender
.send_buf(_LLMP_TAG_RESTART, &state_corpus_serialized)
}
fn process<C, FT, R>(&mut self, state: &mut State<C, FT, I, R>) -> Result<usize, AflError> fn process<C, FT, R>(&mut self, state: &mut State<C, FT, I, R>) -> Result<usize, AflError>
where where
C: Corpus<I, R>, C: Corpus<I, R>,
@ -786,15 +811,6 @@ where
R: Rand, R: Rand,
{ {
// Check if we are going to crash in the event, in which case we store our current state for the next runner // Check if we are going to crash in the event, in which case we store our current state for the next runner
match &event {
Event::Crash { input: _ } | Event::Timeout { input: _ } => {
// First, reset the page to 0 so the next iteration can read read from the beginning of this page
unsafe { self.sender.reset_last_page() };
let buf = serialize_state_mgr(&state, &self.llmp_mgr)?;
self.sender.send_buf(_LLMP_TAG_RESTART, &buf).unwrap();
}
_ => (),
};
self.llmp_mgr.fire(state, event) self.llmp_mgr.fire(state, event)
} }
} }
@ -874,15 +890,12 @@ where
let mut ctr = 0; let mut ctr = 0;
// Client->parent loop // Client->parent loop
loop { loop {
dbg!("Spawning next client"); dbg!("Spawning next client (id {})", ctr);
Command::new(env::current_exe()?) Command::new(env::current_exe()?)
.current_dir(env::current_dir()?) .current_dir(env::current_dir()?)
.args(env::args()) .args(env::args())
.status()?; .status()?;
ctr += 1; ctr += 1;
if ctr == 10 {
todo!("Fix this");
}
} }
} }
} }
@ -915,7 +928,7 @@ where
} }
}; };
// We reset the sender, the next sender and receiver (after crash) will reuse the page from the initial message. // We reset the sender, the next sender and receiver (after crash) will reuse the page from the initial message.
unsafe { mgr.sender_mut().reset_last_page() }; unsafe { mgr.sender_mut().reset() };
/* TODO: Not sure if this is needed /* TODO: Not sure if this is needed
// We commit an empty NO_RESTART message to this buf, against infinite loops, // We commit an empty NO_RESTART message to this buf, against infinite loops,
// in case something crashes in the fuzzer. // in case something crashes in the fuzzer.

View File

@ -273,7 +273,13 @@ pub mod unix_signals {
) )
.expect(&format!("Could not send crashing input {:?}", input)); .expect(&format!("Could not send crashing input {:?}", input));
mgr.on_restart(state).unwrap();
println!("Waiting for broker...");
mgr.await_restart_safe(); mgr.await_restart_safe();
println!("Bye!");
std::process::exit(1);
} }
pub unsafe extern "C" fn libaflrs_executor_inmem_handle_timeout<C, EM, FT, I, OT, R>( pub unsafe extern "C" fn libaflrs_executor_inmem_handle_timeout<C, EM, FT, I, OT, R>(
@ -302,6 +308,7 @@ pub mod unix_signals {
CURRENT_INPUT_PTR = ptr::null(); CURRENT_INPUT_PTR = ptr::null();
let state = (STATE_PTR as *mut State<C, FT, I, R>).as_mut().unwrap(); let state = (STATE_PTR as *mut State<C, FT, I, R>).as_mut().unwrap();
let mgr = (EVENT_MGR_PTR as *mut EM).as_mut().unwrap(); let mgr = (EVENT_MGR_PTR as *mut EM).as_mut().unwrap();
mgr.fire( mgr.fire(
state, state,
Event::Timeout { Event::Timeout {
@ -310,6 +317,8 @@ pub mod unix_signals {
) )
.expect(&format!("Could not send timeouting input {:?}", input)); .expect(&format!("Could not send timeouting input {:?}", input));
mgr.on_restart(state).unwrap();
mgr.await_restart_safe(); mgr.await_restart_safe();
std::process::exit(1); std::process::exit(1);