overwrote llmp merge issues
This commit is contained in:
parent
aab77fe825
commit
5fdbd5439e
@ -617,53 +617,8 @@ impl LlmpReceiver {
|
|||||||
let msg = self.recv_blocking()?;
|
let msg = self.recv_blocking()?;
|
||||||
Ok(((*msg).tag, (*msg).as_slice()))
|
Ok(((*msg).tag, (*msg).as_slice()))
|
||||||
}
|
}
|
||||||
ret = _llmp_next_msg_ptr(last_msg);
|
|
||||||
(*ret).message_id = (*last_msg).message_id.wrapping_add(1 as c_uint)
|
|
||||||
}
|
}
|
||||||
/* The beginning of our message should be messages + size_used, else nobody
|
|
||||||
* sent the last msg! */
|
|
||||||
/* DBG("XXX ret %p - page->messages %p = %lu != %lu, will add %lu -> %p\n", ret, page->messages,
|
|
||||||
(c_ulong)((u8 *)ret - (u8 *)page->messages), page->size_used, complete_msg_size, ((u8 *)ret) + complete_msg_size);
|
|
||||||
*/
|
|
||||||
if last_msg.is_null() && (*page).size_used != 0
|
|
||||||
|| ((ret as *mut u8).wrapping_sub((*page).messages.as_mut_ptr() as *mut u8 as usize))
|
|
||||||
as c_ulong
|
|
||||||
!= (*page).size_used
|
|
||||||
{
|
|
||||||
panic!(format!("Allocated new message without calling send() inbetween. ret: {:?}, page: {:?}, complete_msg_size: {:?}, size_used: {:?}, last_msg: {:?}", ret, page,
|
|
||||||
buf_len_padded, (*page).size_used, last_msg));
|
|
||||||
}
|
|
||||||
(*page).size_used = ((*page).size_used as c_ulong).wrapping_add(complete_msg_size) as c_ulong;
|
|
||||||
(*ret).buf_len_padded = buf_len_padded;
|
|
||||||
(*ret).buf_len = buf_len;
|
|
||||||
/* DBG("Returning new message at %p with len %ld, TAG was %x", ret, ret->buf_len_padded, ret->tag); */
|
|
||||||
/* Maybe catch some bugs... */
|
|
||||||
(*_llmp_next_msg_ptr(ret)).tag = 0xdeadaf as c_uint;
|
|
||||||
(*ret).tag = 0xa143af11 as c_uint;
|
|
||||||
return ret;
|
|
||||||
}
|
}
|
||||||
/* Commit the message last allocated by llmp_alloc_next to the queue.
|
|
||||||
After commiting, the msg shall no longer be altered!
|
|
||||||
It will be read by the consuming threads (broker->clients or client->broker)
|
|
||||||
*/
|
|
||||||
unsafe fn llmp_send(page: *mut LlmpPage, msg: *mut LlmpMsg) -> Result<(), AflError> {
|
|
||||||
if (*msg).tag == 0xdeadaf as c_uint {
|
|
||||||
panic!(format!(
|
|
||||||
"No tag set on message with id {}",
|
|
||||||
(*msg).message_id
|
|
||||||
));
|
|
||||||
}
|
|
||||||
if msg.is_null() || !llmp_msg_in_page(page, msg) {
|
|
||||||
return Err(AflError::Unknown(format!(
|
|
||||||
"Llmp Message {:?} is null or not in current page",
|
|
||||||
msg
|
|
||||||
)));
|
|
||||||
}
|
|
||||||
compiler_fence(Ordering::SeqCst);
|
|
||||||
::std::ptr::write_volatile(
|
|
||||||
&mut (*page).current_msg_id as *mut c_ulong,
|
|
||||||
(*msg).message_id as c_ulong,
|
|
||||||
);
|
|
||||||
|
|
||||||
/// The page struct, placed on a shared mem instance.
|
/// The page struct, placed on a shared mem instance.
|
||||||
impl LlmpSharedMap {
|
impl LlmpSharedMap {
|
||||||
@ -711,109 +666,6 @@ impl LlmpBroker {
|
|||||||
|
|
||||||
Ok(broker)
|
Ok(broker)
|
||||||
}
|
}
|
||||||
_llmp_page_init(shmem2page(uninited_shmem), sender as u32, size_requested);
|
|
||||||
return shmem2page(uninited_shmem);
|
|
||||||
}
|
|
||||||
/* This function handles EOP by creating a new shared page and informing the
|
|
||||||
listener about it using a EOP message. */
|
|
||||||
unsafe fn llmp_handle_out_eop(
|
|
||||||
mut maps: *mut AflShmem,
|
|
||||||
map_count_p: *mut c_ulong,
|
|
||||||
last_msg_p: *mut *mut LlmpMsg,
|
|
||||||
) -> *mut AflShmem {
|
|
||||||
let map_count: u32 = *map_count_p as u32;
|
|
||||||
let mut old_map: *mut LlmpPage =
|
|
||||||
shmem2page(&mut *maps.offset(map_count.wrapping_sub(1 as c_uint) as isize));
|
|
||||||
maps = afl_realloc(
|
|
||||||
maps as *mut c_void,
|
|
||||||
(map_count.wrapping_add(1 as c_uint) as c_ulong)
|
|
||||||
.wrapping_mul(::std::mem::size_of::<AflShmem>() as c_ulong),
|
|
||||||
) as *mut AflShmem;
|
|
||||||
if maps.is_null() {
|
|
||||||
return 0 as *mut AflShmem;
|
|
||||||
}
|
|
||||||
/* Broadcast a new, large enough, message. Also sorry for that c ptr stuff! */
|
|
||||||
let mut new_map: *mut LlmpPage = llmp_new_page_shmem(
|
|
||||||
&mut *maps.offset(map_count as isize),
|
|
||||||
(*old_map).sender as c_ulong,
|
|
||||||
new_map_size((*old_map).max_alloc_size),
|
|
||||||
);
|
|
||||||
if new_map.is_null() {
|
|
||||||
afl_free(maps as *mut c_void);
|
|
||||||
return 0 as *mut AflShmem;
|
|
||||||
}
|
|
||||||
/* Realloc may have changed the location of maps_p (and old_map) in memory :/
|
|
||||||
*/
|
|
||||||
old_map = shmem2page(&mut *maps.offset(map_count.wrapping_sub(1 as c_uint) as isize));
|
|
||||||
*map_count_p = map_count.wrapping_add(1 as c_uint) as c_ulong;
|
|
||||||
::std::ptr::write_volatile(
|
|
||||||
&mut (*new_map).current_msg_id as *mut c_ulong,
|
|
||||||
(*old_map).current_msg_id,
|
|
||||||
);
|
|
||||||
(*new_map).max_alloc_size = (*old_map).max_alloc_size;
|
|
||||||
/* On the old map, place a last message linking to the new map for the clients
|
|
||||||
* to consume */
|
|
||||||
let mut out: *mut LlmpMsg = llmp_alloc_eop(old_map, *last_msg_p);
|
|
||||||
(*out).sender = (*old_map).sender;
|
|
||||||
let mut new_page_msg: *mut LlmpPayloadNewPage =
|
|
||||||
(*out).buf.as_mut_ptr() as *mut LlmpPayloadNewPage;
|
|
||||||
/* copy the infos to the message we're going to send on the old buf */
|
|
||||||
(*new_page_msg).map_size = (*maps.offset(map_count as isize)).map_size;
|
|
||||||
memcpy(
|
|
||||||
(*new_page_msg).shm_str.as_mut_ptr() as *mut c_void,
|
|
||||||
(*maps.offset(map_count as isize)).shm_str.as_mut_ptr() as *const c_void,
|
|
||||||
20 as c_ulong,
|
|
||||||
);
|
|
||||||
// We never sent a msg on the new buf */
|
|
||||||
*last_msg_p = 0 as *mut LlmpMsg;
|
|
||||||
/* Send the last msg on the old buf */
|
|
||||||
match llmp_send(old_map, out) {
|
|
||||||
Err(_e) => {
|
|
||||||
afl_free(maps as *mut c_void);
|
|
||||||
println!("Error sending message");
|
|
||||||
0 as *mut AflShmem
|
|
||||||
}
|
|
||||||
Ok(_) => maps,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
/* no more space left! We'll have to start a new page */
|
|
||||||
pub unsafe fn llmp_broker_handle_out_eop(broker: *mut LlmpBroker) -> AflRet {
|
|
||||||
(*broker).broadcast_maps = llmp_handle_out_eop(
|
|
||||||
(*broker).broadcast_maps,
|
|
||||||
&mut (*broker).broadcast_map_count,
|
|
||||||
&mut (*broker).last_msg_sent,
|
|
||||||
);
|
|
||||||
return if !(*broker).broadcast_maps.is_null() {
|
|
||||||
AFL_RET_SUCCESS
|
|
||||||
} else {
|
|
||||||
AFL_RET_ALLOC
|
|
||||||
} as AflRet;
|
|
||||||
}
|
|
||||||
pub unsafe fn llmp_broker_alloc_next(broker: *mut LlmpBroker, len: c_ulong) -> *mut LlmpMsg {
|
|
||||||
let mut broadcast_page: *mut LlmpPage = shmem2page(_llmp_broker_current_broadcast_map(broker));
|
|
||||||
let mut out: *mut LlmpMsg = llmp_alloc_next(broadcast_page, (*broker).last_msg_sent, len);
|
|
||||||
if out.is_null() {
|
|
||||||
/* no more space left! We'll have to start a new page */
|
|
||||||
let ret: AflRet = llmp_broker_handle_out_eop(broker);
|
|
||||||
if ret != AFL_RET_SUCCESS as AflRet {
|
|
||||||
panic!("Error handling broker out EOP");
|
|
||||||
}
|
|
||||||
/* llmp_handle_out_eop allocates a new current broadcast_map */
|
|
||||||
broadcast_page = shmem2page(_llmp_broker_current_broadcast_map(broker));
|
|
||||||
/* the alloc is now on a new page */
|
|
||||||
out = llmp_alloc_next(broadcast_page, (*broker).last_msg_sent, len);
|
|
||||||
if out.is_null() {
|
|
||||||
panic!(format!(
|
|
||||||
"Error allocating {} bytes in shmap {:?}",
|
|
||||||
len,
|
|
||||||
(*_llmp_broker_current_broadcast_map(broker))
|
|
||||||
.shm_str
|
|
||||||
.as_mut_ptr(),
|
|
||||||
));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return out;
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Allocate the next message on the outgoing map
|
/// Allocate the next message on the outgoing map
|
||||||
unsafe fn alloc_next(&mut self, buf_len: usize) -> Result<*mut LlmpMsg, AflError> {
|
unsafe fn alloc_next(&mut self, buf_len: usize) -> Result<*mut LlmpMsg, AflError> {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user