rewrote llmp broker to be more rusty

This commit is contained in:
Dominik Maier 2020-11-27 12:49:41 +01:00
parent e539012660
commit b7c49db3b1
3 changed files with 443 additions and 414 deletions

View File

@ -131,16 +131,14 @@ fn main() {
llmp_broker_init(&mut broker).expect("Could not init"); llmp_broker_init(&mut broker).expect("Could not init");
for i in 0..counter_thread_count { for i in 0..counter_thread_count {
println!("Adding client {}", i); println!("Adding client {}", i);
llmp_broker_register_childprocess_clientloop( broker.register_childprocess_clientloop(
&mut broker,
llmp_test_clientloop, llmp_test_clientloop,
ptr::null_mut(), ptr::null_mut(),
) )
.expect("could not add child clientloop"); .expect("could not add child clientloop");
} }
llmp_broker_register_childprocess_clientloop( broker.register_childprocess_clientloop(
&mut broker,
test_adder_clientloop, test_adder_clientloop,
ptr::null_mut(), ptr::null_mut(),
) )
@ -148,8 +146,8 @@ fn main() {
println!("Spawning broker"); println!("Spawning broker");
llmp_broker_add_message_hook(&mut broker, broker_message_hook, ptr::null_mut()); broker.add_message_hook(broker_message_hook, ptr::null_mut());
llmp_broker_run(&mut broker); broker.run();
} }
} }

View File

@ -125,7 +125,7 @@ pub struct llmp_message {
#[derive(Clone)] #[derive(Clone)]
#[repr(C)] #[repr(C)]
pub struct llmp_broker_state { pub struct llmp_broker {
pub last_msg_sent: *mut llmp_message, pub last_msg_sent: *mut llmp_message,
pub broadcast_map_count: c_ulong, pub broadcast_map_count: c_ulong,
pub broadcast_maps: *mut afl_shmem, pub broadcast_maps: *mut afl_shmem,
@ -171,7 +171,7 @@ pub enum LlmpMessageHookResult {
} }
pub type LlmpMessageHookFn = unsafe fn( pub type LlmpMessageHookFn = unsafe fn(
_: *mut llmp_broker_state, _: *mut llmp_broker,
_: *mut llmp_broker_client_metadata, _: *mut llmp_broker_client_metadata,
_: *mut llmp_message, _: *mut llmp_message,
_: *mut c_void, _: *mut c_void,
@ -529,7 +529,7 @@ pub unsafe fn llmp_send(page: *mut llmp_page, msg: *mut llmp_message) -> Result<
#[inline] #[inline]
unsafe fn _llmp_broker_current_broadcast_map( unsafe fn _llmp_broker_current_broadcast_map(
broker_state: *mut llmp_broker_state, broker_state: *mut llmp_broker,
) -> *mut afl_shmem { ) -> *mut afl_shmem {
return &mut *(*broker_state).broadcast_maps.offset( return &mut *(*broker_state).broadcast_maps.offset(
(*broker_state) (*broker_state)
@ -626,7 +626,7 @@ unsafe fn llmp_handle_out_eop(
} }
} }
/* no more space left! We'll have to start a new page */ /* no more space left! We'll have to start a new page */
pub unsafe fn llmp_broker_handle_out_eop(broker: *mut llmp_broker_state) -> AflRet { pub unsafe fn llmp_broker_handle_out_eop(broker: *mut llmp_broker) -> AflRet {
(*broker).broadcast_maps = llmp_handle_out_eop( (*broker).broadcast_maps = llmp_handle_out_eop(
(*broker).broadcast_maps, (*broker).broadcast_maps,
&mut (*broker).broadcast_map_count, &mut (*broker).broadcast_map_count,
@ -639,7 +639,7 @@ pub unsafe fn llmp_broker_handle_out_eop(broker: *mut llmp_broker_state) -> AflR
} as AflRet; } as AflRet;
} }
pub unsafe fn llmp_broker_alloc_next( pub unsafe fn llmp_broker_alloc_next(
broker: *mut llmp_broker_state, broker: *mut llmp_broker,
len: c_ulong, len: c_ulong,
) -> *mut llmp_message { ) -> *mut llmp_message {
let mut broadcast_page: *mut llmp_page = shmem2page(_llmp_broker_current_broadcast_map(broker)); let mut broadcast_page: *mut llmp_page = shmem2page(_llmp_broker_current_broadcast_map(broker));
@ -666,27 +666,27 @@ pub unsafe fn llmp_broker_alloc_next(
} }
return out; return out;
} }
impl llmp_broker {
/* Registers a new client for the given sharedmap str and size. /* Registers a new client for the given sharedmap str and size.
Be careful: Intenral realloc may change the location of the client map */ Be careful: Intenral realloc may change the location of the client map */
unsafe fn llmp_broker_register_client( unsafe fn register_client(
broker: *mut llmp_broker_state, &mut self,
shm_str: &CStr, shm_str: &CStr,
map_size: c_ulong, map_size: c_ulong,
) -> *mut llmp_broker_client_metadata { ) -> *mut llmp_broker_client_metadata {
/* make space for a new client and calculate its id */ /* make space for a new client and calculate its id */
(*broker).llmp_clients = afl_realloc( self.llmp_clients = afl_realloc(
(*broker).llmp_clients as *mut c_void, self.llmp_clients as *mut c_void,
(*broker) self.llmp_client_count
.llmp_client_count
.wrapping_add(1 as c_ulong) .wrapping_add(1 as c_ulong)
.wrapping_mul(::std::mem::size_of::<llmp_broker_client_metadata>() as c_ulong), .wrapping_mul(::std::mem::size_of::<llmp_broker_client_metadata>() as c_ulong),
) as *mut llmp_broker_client_metadata; ) as *mut llmp_broker_client_metadata;
if (*broker).llmp_clients.is_null() { if self.llmp_clients.is_null() {
return 0 as *mut llmp_broker_client_metadata; return 0 as *mut llmp_broker_client_metadata;
} }
let mut client: *mut llmp_broker_client_metadata = &mut *(*broker) let mut client: *mut llmp_broker_client_metadata =
.llmp_clients self.llmp_clients.offset(self.llmp_client_count as isize)
.offset((*broker).llmp_client_count as isize)
as *mut llmp_broker_client_metadata; as *mut llmp_broker_client_metadata;
memset( memset(
client as *mut c_void, client as *mut c_void,
@ -700,7 +700,7 @@ unsafe fn llmp_broker_register_client(
if (*client).client_state.is_null() { if (*client).client_state.is_null() {
return 0 as *mut llmp_broker_client_metadata; return 0 as *mut llmp_broker_client_metadata;
} }
(*(*client).client_state).id = (*broker).llmp_client_count as u32; (*(*client).client_state).id = (*self).llmp_client_count as u32;
(*client).cur_client_map = (*client).cur_client_map =
calloc(1 as c_ulong, ::std::mem::size_of::<afl_shmem>() as c_ulong) as *mut afl_shmem; calloc(1 as c_ulong, ::std::mem::size_of::<afl_shmem>() as c_ulong) as *mut afl_shmem;
if (*client).cur_client_map.is_null() { if (*client).cur_client_map.is_null() {
@ -709,16 +709,29 @@ unsafe fn llmp_broker_register_client(
if afl_shmem_by_str((*client).cur_client_map, shm_str, map_size).is_null() { if afl_shmem_by_str((*client).cur_client_map, shm_str, map_size).is_null() {
return 0 as *mut llmp_broker_client_metadata; return 0 as *mut llmp_broker_client_metadata;
} }
(*broker).llmp_client_count = (*broker).llmp_client_count.wrapping_add(1); self.llmp_client_count = self.llmp_client_count.wrapping_add(1);
// tODO: Add client map // TODO: Add client map
return client; return client;
} }
/* Adds a hook that gets called in the broker for each new message the broker touches.
if the callback returns false, the message is not forwarded to the clients. */
pub unsafe fn add_message_hook(
&mut self,
hook: LlmpMessageHookFn,
data: *mut c_void,
) -> AflRet {
return llmp_add_hook_generic(
&mut self.msg_hooks,
&mut self.msg_hook_count,
::std::mem::transmute::<Option<LlmpMessageHookFn>, *mut c_void>(Some(hook)),
data,
);
}
/* broker broadcast to its own page for all others to read */ /* broker broadcast to its own page for all others to read */
#[inline] #[inline]
unsafe fn llmp_broker_handle_new_msgs( unsafe fn handle_new_msgs(&mut self, mut client: *mut llmp_broker_client_metadata) {
broker: *mut llmp_broker_state,
mut client: *mut llmp_broker_client_metadata,
) {
// TODO: We could memcpy a range of pending messages, instead of one by one. // TODO: We could memcpy a range of pending messages, instead of one by one.
/* DBG("llmp_broker_handle_new_msgs %p %p->%u\n", broker, client, client->client_state->id); */ /* DBG("llmp_broker_handle_new_msgs %p %p->%u\n", broker, client, client->client_state->id); */
let incoming: *mut llmp_page = shmem2page((*client).cur_client_map); let incoming: *mut llmp_page = shmem2page((*client).cur_client_map);
@ -735,7 +748,8 @@ unsafe fn llmp_broker_handle_new_msgs(
if (*msg).tag == 0xaf1e0f1 as c_uint { if (*msg).tag == 0xaf1e0f1 as c_uint {
let pageinfo: *mut llmp_payload_new_page = { let pageinfo: *mut llmp_payload_new_page = {
let mut _msg: *mut llmp_message = msg; let mut _msg: *mut llmp_message = msg;
(if (*_msg).buf_len >= ::std::mem::size_of::<llmp_payload_new_page>() as c_ulong { (if (*_msg).buf_len >= ::std::mem::size_of::<llmp_payload_new_page>() as c_ulong
{
(*_msg).buf.as_mut_ptr() (*_msg).buf.as_mut_ptr()
} else { } else {
0 as *mut u8 0 as *mut u8
@ -784,7 +798,8 @@ unsafe fn llmp_broker_handle_new_msgs(
add it to the list! Also, no need to forward this msg. */ add it to the list! Also, no need to forward this msg. */
let pageinfo: *mut llmp_payload_new_page = { let pageinfo: *mut llmp_payload_new_page = {
let mut _msg: *mut llmp_message = msg; let mut _msg: *mut llmp_message = msg;
(if (*_msg).buf_len >= ::std::mem::size_of::<llmp_payload_new_page>() as c_ulong { (if (*_msg).buf_len >= ::std::mem::size_of::<llmp_payload_new_page>() as c_ulong
{
(*_msg).buf.as_mut_ptr() (*_msg).buf.as_mut_ptr()
} else { } else {
0 as *mut u8 0 as *mut u8
@ -797,8 +812,8 @@ unsafe fn llmp_broker_handle_new_msgs(
} }
/* register_client may realloc the clients, we need to find ours again */ /* register_client may realloc the clients, we need to find ours again */
let client_id: u32 = (*(*client).client_state).id; let client_id: u32 = (*(*client).client_state).id;
if llmp_broker_register_client( if self
broker, .register_client(
CStr::from_bytes_with_nul(&(*pageinfo).shm_str).expect("Illegal shm_str"), CStr::from_bytes_with_nul(&(*pageinfo).shm_str).expect("Illegal shm_str"),
(*pageinfo).map_size, (*pageinfo).map_size,
) )
@ -811,20 +826,20 @@ unsafe fn llmp_broker_handle_new_msgs(
} }
(*client).client_type = LLMP_CLIENT_TYPE_FOREIGN_PROCESS; (*client).client_type = LLMP_CLIENT_TYPE_FOREIGN_PROCESS;
/* find client again */ /* find client again */
client = &mut *(*broker).llmp_clients.offset(client_id as isize) client =
as *mut llmp_broker_client_metadata self.llmp_clients.offset(client_id as isize) as *mut llmp_broker_client_metadata
} else { } else {
let mut forward_msg: bool = 1 as c_int != 0; let mut forward_msg: bool = 1 as c_int != 0;
let mut i: c_ulong = 0; let mut i: c_ulong = 0;
while i < (*broker).msg_hook_count { while i < self.msg_hook_count {
let msg_hook: *mut llmp_hookdata_generic = let msg_hook: *mut llmp_hookdata_generic =
&mut *(*broker).msg_hooks.offset(i as isize) as *mut llmp_hookdata_generic; self.msg_hooks.offset(i as isize) as *mut llmp_hookdata_generic;
forward_msg = forward_msg as c_int != 0 forward_msg = forward_msg as c_int != 0
&& ::std::mem::transmute::<*mut c_void, Option<LlmpMessageHookFn>>( && ::std::mem::transmute::<*mut c_void, Option<LlmpMessageHookFn>>(
(*msg_hook).func, (*msg_hook).func,
) )
.expect("non-null function pointer")( .expect("non-null function pointer")(
broker, client, msg, (*msg_hook).data self, client, msg, (*msg_hook).data
) as c_int ) as c_int
!= 0; != 0;
if !llmp_msg_in_page(shmem2page((*client).cur_client_map), msg) { if !llmp_msg_in_page(shmem2page((*client).cur_client_map), msg) {
@ -835,12 +850,12 @@ unsafe fn llmp_broker_handle_new_msgs(
} }
if forward_msg { if forward_msg {
let mut out: *mut llmp_message = let mut out: *mut llmp_message =
llmp_broker_alloc_next(broker, (*msg).buf_len_padded); llmp_broker_alloc_next(self, (*msg).buf_len_padded);
if out.is_null() { if out.is_null() {
panic!(format!( panic!(format!(
"Error allocating {} bytes in shmap {:?}", "Error allocating {} bytes in shmap {:?}",
(*msg).buf_len_padded, (*msg).buf_len_padded,
(*_llmp_broker_current_broadcast_map(broker)) (*_llmp_broker_current_broadcast_map(self))
.shm_str .shm_str
.as_mut_ptr(), .as_mut_ptr(),
)); ));
@ -858,43 +873,191 @@ unsafe fn llmp_broker_handle_new_msgs(
(*out).buf_len_padded = actual_size; (*out).buf_len_padded = actual_size;
/* We need to replace the message ID with our own */ /* We need to replace the message ID with our own */
let out_page: *mut llmp_page = let out_page: *mut llmp_page =
shmem2page(_llmp_broker_current_broadcast_map(broker)); shmem2page(_llmp_broker_current_broadcast_map(self));
(*out).message_id = (*out_page).current_msg_id.wrapping_add(1 as c_ulong) as u32; (*out).message_id =
(*out_page).current_msg_id.wrapping_add(1 as c_ulong) as u32;
match llmp_send(out_page, out) { match llmp_send(out_page, out) {
Err(e) => panic!(format!("Error sending msg: {:?}", e)), Err(e) => panic!(format!("Error sending msg: {:?}", e)),
_ => (), _ => (),
}; };
(*broker).last_msg_sent = out self.last_msg_sent = out
} }
} }
(*client).last_msg_broker_read = msg; (*client).last_msg_broker_read = msg;
current_message_id = (*msg).message_id current_message_id = (*msg).message_id
} }
} }
/* The broker walks all pages and looks for changes, then broadcasts them on /* The broker walks all pages and looks for changes, then broadcasts them on
* its own shared page, once. */ * its own shared page, once. */
/* The broker walks all pages and looks for changes, then broadcasts them on /* The broker walks all pages and looks for changes, then broadcasts them on
* its own shared page, once. */ * its own shared page, once. */
pub unsafe fn llmp_broker_once(broker: *mut llmp_broker_state) { pub unsafe fn once(&mut self) {
compiler_fence(Ordering::SeqCst); compiler_fence(Ordering::SeqCst);
let mut i: u32 = 0; let mut i: u32 = 0;
while (i as c_ulong) < (*broker).llmp_client_count { while (i as c_ulong) < self.llmp_client_count {
let client: *mut llmp_broker_client_metadata = let client: *mut llmp_broker_client_metadata =
&mut *(*broker).llmp_clients.offset(i as isize) as *mut llmp_broker_client_metadata; self.llmp_clients.offset(i as isize) as *mut llmp_broker_client_metadata;
llmp_broker_handle_new_msgs(broker, client); self.handle_new_msgs(client);
i = i.wrapping_add(1) i = i.wrapping_add(1)
} }
} }
/* The broker walks all pages and looks for changes, then broadcasts them on /* The broker walks all pages and looks for changes, then broadcasts them on
* its own shared page */ * its own shared page */
pub unsafe fn llmp_broker_loop(broker: *mut llmp_broker_state) -> ! { pub unsafe fn broker_loop(&mut self) -> ! {
loop { loop {
compiler_fence(Ordering::SeqCst); compiler_fence(Ordering::SeqCst);
llmp_broker_once(broker); self.once();
/* 5 milis of sleep for now to not busywait at 100% */ /* 5 milis of sleep for now to not busywait at 100% */
usleep((5 as c_int * 1000 as c_int) as c_uint); usleep((5 as c_int * 1000 as c_int) as c_uint);
} }
} }
/* launch a specific client. This function is rarely needed - all registered clients will get launched at broker_run */
pub unsafe fn launch_client(
&mut self,
mut clientdata: *mut llmp_broker_client_metadata,
) -> bool {
if clientdata < self.llmp_clients
|| clientdata
> self
.llmp_clients
.offset(self.llmp_client_count.wrapping_sub(1 as c_ulong) as isize)
as *mut llmp_broker_client_metadata
{
println!(
"[!] WARNING: Illegal client specified at ptr {:?} (instead of {:?} to {:?})",
clientdata,
self.llmp_clients,
self.llmp_clients
.offset(self.llmp_client_count.wrapping_sub(1 as c_ulong) as isize,)
as *mut llmp_broker_client_metadata,
);
return 0 as c_int != 0;
}
if (*clientdata).client_type as c_uint == LLMP_CLIENT_TYPE_CHILD_PROCESS as c_int as c_uint
{
if (*clientdata).pid != 0 {
println!("[!] WARNING: Tried to relaunch already running client. Set ->pid to 0 if this is what you want.");
return 0 as c_int != 0;
}
let child_id: c_int = fork();
if child_id < 0 as c_int {
println!("[!] WARNING: Could not fork");
return 0 as c_int != 0;
} else {
if child_id == 0 as c_int {
/* child */
_llmp_client_wrapped_loop(clientdata as *mut c_void);
}
}
/* parent */
(*clientdata).pid = child_id;
return 1 as c_int != 0;
} else {
println!("[!] WARNING: Tried to spawn llmp child with unknown thread type.");
return 0 as c_int != 0;
}
//return 1 as c_int != 0;
}
pub unsafe fn launch_clientloops(&mut self) -> Result<(), AflError> {
let mut i: c_ulong = 0;
while i < self.llmp_client_count {
if (*self.llmp_clients.offset(i as isize)).client_type as c_uint
== LLMP_CLIENT_TYPE_CHILD_PROCESS as c_uint
{
if !self.launch_client(self.llmp_clients.offset(i as isize)) {
println!("[!] WARNING: Could not launch all clients");
return Err(AflError::Unknown("Failed to launch clients".into()));
}
}
i = i.wrapping_add(1)
}
Ok(())
}
/* The broker walks all pages and looks for changes, then broadcasts them on
its own shared page.
Never returns. */
/* Start all threads and the main broker.
Same as llmp_broker_launch_threaded clients();
Never returns. */
/* Start all threads and the main broker. Never returns. */
pub unsafe fn run(&mut self) -> ! {
self.launch_clientloops().expect("Failed to launch clients");
self.broker_loop();
}
/* Register a new forked/child client.
Client thread will be called with llmp_client client, containing
the data in ->data. This will register a client to be spawned up as soon as
broker_loop() starts. Clients can also be added later via
llmp_broker_register_remote(..) or the local_tcp_client
*/
pub unsafe fn register_childprocess_clientloop(
&mut self,
clientloop: LlmpClientloopFn,
data: *mut c_void,
) -> Result<(), AflError> {
let mut client_map: afl_shmem = {
let init = afl_shmem {
shm_str: [0; 20],
shm_id: 0,
map: 0 as *mut u8,
map_size: 0,
};
init
};
if llmp_new_page_shmem(
&mut client_map,
self.llmp_client_count,
((1 as c_int) << 28 as c_int) as c_ulong,
)
.is_null()
{
return Err(AflError::Unknown("Alloc".into()));
}
let mut client: *mut llmp_broker_client_metadata = self.register_client(
CStr::from_ptr(&client_map.shm_str as *const i8),
client_map.map_size,
);
if client.is_null() {
afl_shmem_deinit(&mut client_map);
return Err(AflError::Unknown("Something in clients failed".into()));
}
(*client).clientloop = Some(clientloop);
(*client).data = data;
(*client).client_type = LLMP_CLIENT_TYPE_CHILD_PROCESS;
/* Copy the already allocated shmem to the client state */
(*(*client).client_state).out_maps = afl_realloc(
(*(*client).client_state).out_maps as *mut c_void,
::std::mem::size_of::<afl_shmem>() as c_ulong,
) as *mut afl_shmem;
if (*(*client).client_state).out_maps.is_null() {
afl_shmem_deinit(&mut client_map);
afl_shmem_deinit((*client).cur_client_map);
/* "Unregister" by subtracting the client from count */
self.llmp_client_count = self.llmp_client_count.wrapping_sub(1);
return Err(AflError::Unknown("Something in clients failed".into()));
}
memcpy(
(*(*client).client_state).out_maps as *mut c_void,
&mut client_map as *mut afl_shmem as *const c_void,
::std::mem::size_of::<afl_shmem>() as c_ulong,
);
(*(*client).client_state).out_map_count = 1 as c_ulong;
/* Each client starts with the very first map.
They should then iterate through all maps once and work on all old messages.
*/
(*(*client).client_state).current_broadcast_map =
self.broadcast_maps.offset(0 as isize) as *mut afl_shmem;
(*(*client).client_state).out_map_count = 1 as c_ulong;
return Ok(());
}
}
/* A new page will be used. Notify each registered hook in the client about this fact. */ /* A new page will be used. Notify each registered hook in the client about this fact. */
unsafe fn llmp_clientrigger_new_out_page_hooks(client: *mut llmp_client) { unsafe fn llmp_clientrigger_new_out_page_hooks(client: *mut llmp_client) {
let mut i: c_ulong = 0; let mut i: c_ulong = 0;
@ -927,84 +1090,6 @@ unsafe fn _llmp_client_wrapped_loop(llmp_client_broker_metadata_ptr: *mut c_void
); );
} }
/* launch a specific client. This function is rarely needed - all registered clients will get launched at broker_run */
pub unsafe fn llmp_broker_launch_client(
broker: *mut llmp_broker_state,
mut clientdata: *mut llmp_broker_client_metadata,
) -> bool {
if clientdata < (*broker).llmp_clients
|| clientdata
> &mut *(*broker)
.llmp_clients
.offset((*broker).llmp_client_count.wrapping_sub(1 as c_ulong) as isize)
as *mut llmp_broker_client_metadata
{
println!(
"[!] WARNING: Illegal client specified at ptr {:?} (instead of {:?} to {:?})",
clientdata,
(*broker).llmp_clients,
&mut *(*broker)
.llmp_clients
.offset((*broker).llmp_client_count.wrapping_sub(1 as c_ulong) as isize,)
as *mut llmp_broker_client_metadata,
);
return 0 as c_int != 0;
}
if (*clientdata).client_type as c_uint == LLMP_CLIENT_TYPE_CHILD_PROCESS as c_int as c_uint {
if (*clientdata).pid != 0 {
println!("[!] WARNING: Tried to relaunch already running client. Set ->pid to 0 if this is what you want.");
return 0 as c_int != 0;
}
let child_id: c_int = fork();
if child_id < 0 as c_int {
println!("[!] WARNING: Could not fork");
return 0 as c_int != 0;
} else {
if child_id == 0 as c_int {
/* child */
_llmp_client_wrapped_loop(clientdata as *mut c_void);
}
}
/* parent */
(*clientdata).pid = child_id;
return 1 as c_int != 0;
} else {
println!("[!] WARNING: Tried to spawn llmp child with unknown thread type.");
return 0 as c_int != 0;
}
//return 1 as c_int != 0;
}
pub unsafe fn llmp_broker_launch_clientloops(
broker: *mut llmp_broker_state,
) -> Result<(), AflError> {
let mut i: c_ulong = 0;
while i < (*broker).llmp_client_count {
if (*(*broker).llmp_clients.offset(i as isize)).client_type as c_uint
== LLMP_CLIENT_TYPE_CHILD_PROCESS as c_uint
{
if !llmp_broker_launch_client(broker, &mut *(*broker).llmp_clients.offset(i as isize)) {
println!("[!] WARNING: Could not launch all clients");
return Err(AflError::Unknown("Failed to launch clients".into()));
}
}
i = i.wrapping_add(1)
}
Ok(())
}
/* The broker walks all pages and looks for changes, then broadcasts them on
its own shared page.
Never returns. */
/* Start all threads and the main broker.
Same as llmp_broker_launch_threaded clients();
Never returns. */
/* Start all threads and the main broker. Never returns. */
pub unsafe fn llmp_broker_run(broker: *mut llmp_broker_state) -> ! {
llmp_broker_launch_clientloops(broker).expect("Failed to launch clients");
llmp_broker_loop(broker);
}
/* /*
For non zero-copy, we want to get rid of old pages with duplicate messages For non zero-copy, we want to get rid of old pages with duplicate messages
eventually. This function This funtion sees if we can unallocate older pages. eventually. This function This funtion sees if we can unallocate older pages.
@ -1293,74 +1378,6 @@ impl Drop for llmp_client {
} }
} }
/* Register a new forked/child client.
Client thread will be called with llmp_client client, containing
the data in ->data. This will register a client to be spawned up as soon as
broker_loop() starts. Clients can also be added later via
llmp_broker_register_remote(..) or the local_tcp_client
*/
pub unsafe fn llmp_broker_register_childprocess_clientloop(
broker: *mut llmp_broker_state,
clientloop: LlmpClientloopFn,
data: *mut c_void,
) -> Result<(), AflError> {
let mut client_map: afl_shmem = {
let init = afl_shmem {
shm_str: [0; 20],
shm_id: 0,
map: 0 as *mut u8,
map_size: 0,
};
init
};
if llmp_new_page_shmem(
&mut client_map,
(*broker).llmp_client_count,
((1 as c_int) << 28 as c_int) as c_ulong,
)
.is_null()
{
return Err(AflError::Unknown("Alloc".into()));
}
let mut client: *mut llmp_broker_client_metadata = llmp_broker_register_client(
broker,
CStr::from_ptr(&client_map.shm_str as *const i8),
client_map.map_size,
);
if client.is_null() {
afl_shmem_deinit(&mut client_map);
return Err(AflError::Unknown("Something in clients failed".into()));
}
(*client).clientloop = Some(clientloop);
(*client).data = data;
(*client).client_type = LLMP_CLIENT_TYPE_CHILD_PROCESS;
/* Copy the already allocated shmem to the client state */
(*(*client).client_state).out_maps = afl_realloc(
(*(*client).client_state).out_maps as *mut c_void,
::std::mem::size_of::<afl_shmem>() as c_ulong,
) as *mut afl_shmem;
if (*(*client).client_state).out_maps.is_null() {
afl_shmem_deinit(&mut client_map);
afl_shmem_deinit((*client).cur_client_map);
/* "Unregister" by subtracting the client from count */
(*broker).llmp_client_count = (*broker).llmp_client_count.wrapping_sub(1);
return Err(AflError::Unknown("Something in clients failed".into()));
}
memcpy(
(*(*client).client_state).out_maps as *mut c_void,
&mut client_map as *mut afl_shmem as *const c_void,
::std::mem::size_of::<afl_shmem>() as c_ulong,
);
(*(*client).client_state).out_map_count = 1 as c_ulong;
/* Each client starts with the very first map.
They should then iterate through all maps once and work on all old messages.
*/
(*(*client).client_state).current_broadcast_map =
&mut *(*broker).broadcast_maps.offset(0 as isize) as *mut afl_shmem;
(*(*client).client_state).out_map_count = 1 as c_ulong;
return Ok(());
}
/* Generic function to add a hook to the mem pointed to by hooks_p, using afl_realloc on the mem area, and increasing /* Generic function to add a hook to the mem pointed to by hooks_p, using afl_realloc on the mem area, and increasing
* hooks_count_p */ * hooks_count_p */
pub unsafe fn llmp_add_hook_generic( pub unsafe fn llmp_add_hook_generic(
@ -1403,28 +1420,41 @@ pub unsafe fn llmp_client_add_new_out_page_hook(
); );
} }
/* Adds a hook that gets called in the broker for each new message the broker touches. /* Clean up the broker instance */
if the callback returns false, the message is not forwarded to the clients. */ pub unsafe fn llmp_broker_deinit(broker: *mut llmp_broker) {
pub unsafe fn llmp_broker_add_message_hook( let mut i: c_ulong;
broker: *mut llmp_broker_state, i = 0 as c_ulong;
hook: LlmpMessageHookFn, while i < (*broker).broadcast_map_count {
data: *mut c_void, afl_shmem_deinit(&mut *(*broker).broadcast_maps.offset(i as isize));
) -> AflRet { i = i.wrapping_add(1)
return llmp_add_hook_generic(
&mut (*broker).msg_hooks,
&mut (*broker).msg_hook_count,
::std::mem::transmute::<Option<LlmpMessageHookFn>, *mut c_void>(Some(hook)),
data,
);
} }
i = 0 as c_ulong;
while i < (*broker).llmp_client_count {
afl_shmem_deinit((*(*broker).llmp_clients.offset(i as isize)).cur_client_map);
free((*(*broker).llmp_clients.offset(i as isize)).cur_client_map as *mut c_void);
i = i.wrapping_add(1)
// TODO: Properly clean up the client
}
afl_free((*broker).broadcast_maps as *mut c_void);
(*broker).broadcast_map_count = 0 as c_ulong;
afl_free((*broker).llmp_clients as *mut c_void);
(*broker).llmp_client_count = 0 as c_ulong;
}
impl Drop for llmp_broker {
fn drop(&mut self) {
unsafe { llmp_broker_deinit(self) };
}
}
/* Allocate and set up the new broker instance. Afterwards, run with /* Allocate and set up the new broker instance. Afterwards, run with
* broker_run. * broker_run.
*/ */
pub unsafe fn llmp_broker_init(broker: *mut llmp_broker_state) -> Result<(), AflError> { pub unsafe fn llmp_broker_init(broker: *mut llmp_broker) -> Result<(), AflError> {
memset( memset(
broker as *mut c_void, broker as *mut c_void,
0 as c_int, 0 as c_int,
::std::mem::size_of::<llmp_broker_state>() as c_ulong, ::std::mem::size_of::<llmp_broker>() as c_ulong,
); );
/* let's create some space for outgoing maps */ /* let's create some space for outgoing maps */
(*broker).broadcast_maps = afl_realloc( (*broker).broadcast_maps = afl_realloc(
@ -1449,29 +1479,3 @@ pub unsafe fn llmp_broker_init(broker: *mut llmp_broker_state) -> Result<(), Afl
} }
return Ok(()); return Ok(());
} }
/* Clean up the broker instance */
pub unsafe fn llmp_broker_deinit(broker: *mut llmp_broker_state) {
let mut i: c_ulong;
i = 0 as c_ulong;
while i < (*broker).broadcast_map_count {
afl_shmem_deinit(&mut *(*broker).broadcast_maps.offset(i as isize));
i = i.wrapping_add(1)
}
i = 0 as c_ulong;
while i < (*broker).llmp_client_count {
afl_shmem_deinit((*(*broker).llmp_clients.offset(i as isize)).cur_client_map);
free((*(*broker).llmp_clients.offset(i as isize)).cur_client_map as *mut c_void);
i = i.wrapping_add(1)
// TODO: Properly clean up the client
}
afl_free((*broker).broadcast_maps as *mut c_void);
(*broker).broadcast_map_count = 0 as c_ulong;
afl_free((*broker).llmp_clients as *mut c_void);
(*broker).llmp_client_count = 0 as c_ulong;
}
impl Drop for llmp_broker_state {
fn drop(&mut self) {
unsafe { llmp_broker_deinit(self) };
}
}

View File

@ -429,7 +429,34 @@ where
R: Rand, R: Rand,
//TODO CE: CustomEvent, //TODO CE: CustomEvent,
{ {
pub fn new() -> Self { /// Forks n processes, never returns in broker.
pub fn spawn(process_count: usize) -> Self {
/*
llmp_broker_init(&mut broker).expect("Could not init");
for i in 0..counter_thread_count {
println!("Adding client {}", i);
llmp_broker_register_childprocess_clientloop(
&mut broker,
llmp_test_clientloop,
ptr::null_mut(),
)
.expect("could not add child clientloop");
}
llmp_broker_register_childprocess_clientloop(
&mut broker,
test_adder_clientloop,
ptr::null_mut(),
)
.expect("Error registering childprocess");
println!("Spawning broker");
llmp_broker_add_message_hook(&mut broker, broker_message_hook, ptr::null_mut());
llmp_broker_run(&mut broker);
*/
Self { Self {
_marker: PhantomData, _marker: PhantomData,
} }