* add compression * modify event/llmp.rs * rename to LLMP_TAG_COMPRESS * remove compression code from bolts/llmp.rs * add compress.rs * handle compress & decompress in GzipCompress struct, compress if the size is large enough * add code for benchmark * remove LLMP_TAG_COMPRESS, use a flag instead * cargo fmt * rm test.sh * passes the test * comment benchmarks code out * add recv_buf_with_flag() * add the llmp_compress feature * add send_buf, do not compile compression code if it's not used * fix warning * merged dev * add error handling code * doc for compress.rs * remove tag from decompress * rename every flag to flags * fix some clippy.sh errors * simplify recv_buf * delete benchmark printf code * cargo fmt * fix doc Co-authored-by: Dominik Maier <domenukk@gmail.com>
This commit is contained in:
parent
f9e4e7cbf0
commit
9d748a887c
@ -32,6 +32,11 @@ LibAFL offers integrations with popular instrumemntation frameworks. At the mome
|
|||||||
+ Frida, in [libafl_frida](./libafl_frida), by s1341 <github@shmarya.net> (Windows support is broken atm, it relies on [this upstream issue](https://github.com/meme/frida-rust/issues/9) to be fixed.)
|
+ Frida, in [libafl_frida](./libafl_frida), by s1341 <github@shmarya.net> (Windows support is broken atm, it relies on [this upstream issue](https://github.com/meme/frida-rust/issues/9) to be fixed.)
|
||||||
+ More to come (QEMU-mode, ...)
|
+ More to come (QEMU-mode, ...)
|
||||||
|
|
||||||
|
LibAFL offers integrations with popular instrumemntation frameworks too. At the moment, the supported backends are:
|
||||||
|
|
||||||
|
+ SanitizerCoverage, in [libafl_targets](./libafl_targets)
|
||||||
|
+ Frida, in [libafl_frida](./libafl_frida), by s1341 <github@shmarya.net> (Windows support will be added soon)
|
||||||
|
|
||||||
## Getting started
|
## Getting started
|
||||||
|
|
||||||
Clone the LibAFL repository with
|
Clone the LibAFL repository with
|
||||||
|
@ -40,6 +40,7 @@ anymap_debug = ["serde_json"] # uses serde_json to Debug the anymap trait. Disab
|
|||||||
derive = ["libafl_derive"] # provide derive(SerdeAny) macro.
|
derive = ["libafl_derive"] # provide derive(SerdeAny) macro.
|
||||||
llmp_small_maps = [] # reduces initial map size for llmp
|
llmp_small_maps = [] # reduces initial map size for llmp
|
||||||
llmp_debug = ["backtrace"] # Enables debug output for LLMP
|
llmp_debug = ["backtrace"] # Enables debug output for LLMP
|
||||||
|
llmp_compress = [] #llmp compression using GZip
|
||||||
|
|
||||||
[[example]]
|
[[example]]
|
||||||
name = "llmp_test"
|
name = "llmp_test"
|
||||||
@ -58,6 +59,7 @@ static_assertions = "1.1.0"
|
|||||||
ctor = "*"
|
ctor = "*"
|
||||||
libafl_derive = { version = "*", optional = true, path = "../libafl_derive" }
|
libafl_derive = { version = "*", optional = true, path = "../libafl_derive" }
|
||||||
serde_json = { version = "1.0", optional = true, default-features = false, features = ["alloc"] } # an easy way to debug print SerdeAnyMap
|
serde_json = { version = "1.0", optional = true, default-features = false, features = ["alloc"] } # an easy way to debug print SerdeAnyMap
|
||||||
|
compression = { version = "0.1.5" }
|
||||||
num_enum = "0.5.1"
|
num_enum = "0.5.1"
|
||||||
spin = "0.9.0"
|
spin = "0.9.0"
|
||||||
|
|
||||||
|
@ -83,6 +83,7 @@ fn large_msg_loop(port: u16) -> ! {
|
|||||||
fn broker_message_hook(
|
fn broker_message_hook(
|
||||||
client_id: u32,
|
client_id: u32,
|
||||||
tag: llmp::Tag,
|
tag: llmp::Tag,
|
||||||
|
_flags: llmp::Flag,
|
||||||
message: &[u8],
|
message: &[u8],
|
||||||
) -> Result<llmp::LlmpMsgHookResult, Error> {
|
) -> Result<llmp::LlmpMsgHookResult, Error> {
|
||||||
match tag {
|
match tag {
|
||||||
|
56
libafl/src/bolts/compress.rs
Normal file
56
libafl/src/bolts/compress.rs
Normal file
@ -0,0 +1,56 @@
|
|||||||
|
//! Compression of events passed between a broker and clients.
|
||||||
|
//! Currently we use the gzip compression algorithm for its fast decompression performance.
|
||||||
|
|
||||||
|
#[cfg(feature = "llmp_compress")]
|
||||||
|
use crate::{
|
||||||
|
bolts::llmp::{Flag, LLMP_FLAG_COMPRESSED},
|
||||||
|
Error,
|
||||||
|
};
|
||||||
|
use alloc::vec::Vec;
|
||||||
|
use compression::prelude::*;
|
||||||
|
use core::fmt::Debug;
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct GzipCompressor {
|
||||||
|
threshold: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl GzipCompressor {
|
||||||
|
/// If the buffer is larger than the threshold value, we compress the buffer.
|
||||||
|
pub fn new(threshold: usize) -> Self {
|
||||||
|
GzipCompressor { threshold }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl GzipCompressor {
|
||||||
|
/// Compression.
|
||||||
|
/// The buffer is compressed with the gzip algo
|
||||||
|
pub fn compress(&self, buf: &[u8]) -> Result<Option<Vec<u8>>, Error> {
|
||||||
|
if buf.len() > self.threshold {
|
||||||
|
//compress if the buffer is large enough
|
||||||
|
let compressed = buf
|
||||||
|
.iter()
|
||||||
|
.cloned()
|
||||||
|
.encode(&mut GZipEncoder::new(), Action::Finish)
|
||||||
|
.collect::<Result<Vec<_>, _>>()?;
|
||||||
|
Ok(Some(compressed))
|
||||||
|
} else {
|
||||||
|
Ok(None)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Decompression.
|
||||||
|
/// Flag is used to indicate if it's compressed or not
|
||||||
|
pub fn decompress(&self, flags: Flag, buf: &[u8]) -> Result<Option<Vec<u8>>, Error> {
|
||||||
|
if flags & LLMP_FLAG_COMPRESSED == LLMP_FLAG_COMPRESSED {
|
||||||
|
let decompressed: Vec<u8> = buf
|
||||||
|
.iter()
|
||||||
|
.cloned()
|
||||||
|
.decode(&mut GZipDecoder::new())
|
||||||
|
.collect::<Result<Vec<_>, _>>()?;
|
||||||
|
Ok(Some(decompressed))
|
||||||
|
} else {
|
||||||
|
Ok(None)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -104,6 +104,9 @@ const LLMP_TAG_NEW_SHM_CLIENT: Tag = 0xC11E471;
|
|||||||
/// The sender on this map is exiting (if broker exits, clients should exit gracefully);
|
/// The sender on this map is exiting (if broker exits, clients should exit gracefully);
|
||||||
const LLMP_TAG_EXITING: Tag = 0x13C5171;
|
const LLMP_TAG_EXITING: Tag = 0x13C5171;
|
||||||
|
|
||||||
|
pub const LLMP_FLAG_INITIALIZED: Flag = 0x0;
|
||||||
|
pub const LLMP_FLAG_COMPRESSED: Flag = 0x1;
|
||||||
|
|
||||||
/// An env var of this value indicates that the set value was a NULL PTR
|
/// An env var of this value indicates that the set value was a NULL PTR
|
||||||
const _NULL_ENV_STR: &str = "_NULL";
|
const _NULL_ENV_STR: &str = "_NULL";
|
||||||
|
|
||||||
@ -124,6 +127,7 @@ static mut GLOBAL_SIGHANDLER_STATE: LlmpBrokerSignalHandler = LlmpBrokerSignalHa
|
|||||||
|
|
||||||
/// TAGs used thorughout llmp
|
/// TAGs used thorughout llmp
|
||||||
pub type Tag = u32;
|
pub type Tag = u32;
|
||||||
|
pub type Flag = u64;
|
||||||
|
|
||||||
/// This is for the server the broker will spawn.
|
/// This is for the server the broker will spawn.
|
||||||
/// If an llmp connection is local - use sharedmaps
|
/// If an llmp connection is local - use sharedmaps
|
||||||
@ -323,6 +327,8 @@ pub struct LlmpMsg {
|
|||||||
pub tag: Tag,
|
pub tag: Tag,
|
||||||
/// Sender of this messge
|
/// Sender of this messge
|
||||||
pub sender: u32,
|
pub sender: u32,
|
||||||
|
/// flags, currently only used for indicating compression
|
||||||
|
pub flags: Flag,
|
||||||
/// The message ID, unique per page
|
/// The message ID, unique per page
|
||||||
pub message_id: u64,
|
pub message_id: u64,
|
||||||
/// Buffer length as specified by the user
|
/// Buffer length as specified by the user
|
||||||
@ -442,6 +448,13 @@ where
|
|||||||
LlmpConnection::IsClient { client } => client.send_buf(tag, buf),
|
LlmpConnection::IsClient { client } => client.send_buf(tag, buf),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn send_buf_with_flags(&mut self, tag: Tag, buf: &[u8], flags: Flag) -> Result<(), Error> {
|
||||||
|
match self {
|
||||||
|
LlmpConnection::IsBroker { broker } => broker.send_buf_with_flags(tag, flags, buf),
|
||||||
|
LlmpConnection::IsClient { client } => client.send_buf_with_flags(tag, flags, buf),
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Contents of the share mem pages, used by llmp internally
|
/// Contents of the share mem pages, used by llmp internally
|
||||||
@ -898,6 +911,30 @@ where
|
|||||||
unsafe {
|
unsafe {
|
||||||
let msg = self.alloc_next(buf.len())?;
|
let msg = self.alloc_next(buf.len())?;
|
||||||
(*msg).tag = tag;
|
(*msg).tag = tag;
|
||||||
|
(*msg).flags = LLMP_FLAG_INITIALIZED;
|
||||||
|
buf.as_ptr()
|
||||||
|
.copy_to_nonoverlapping((*msg).buf.as_mut_ptr(), buf.len());
|
||||||
|
self.send(msg)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn send_buf_with_flags(&mut self, tag: Tag, flags: Flag, buf: &[u8]) -> Result<(), Error> {
|
||||||
|
// Make sure we don't reuse already allocated tags
|
||||||
|
if tag == LLMP_TAG_NEW_SHM_CLIENT
|
||||||
|
|| tag == LLMP_TAG_END_OF_PAGE
|
||||||
|
|| tag == LLMP_TAG_UNINITIALIZED
|
||||||
|
|| tag == LLMP_TAG_UNSET
|
||||||
|
{
|
||||||
|
return Err(Error::Unknown(format!(
|
||||||
|
"Reserved tag supplied to send_buf ({:#X})",
|
||||||
|
tag
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
|
||||||
|
unsafe {
|
||||||
|
let msg = self.alloc_next(buf.len())?;
|
||||||
|
(*msg).tag = tag;
|
||||||
|
(*msg).flags = flags;
|
||||||
buf.as_ptr()
|
buf.as_ptr()
|
||||||
.copy_to_nonoverlapping((*msg).buf.as_mut_ptr(), buf.len());
|
.copy_to_nonoverlapping((*msg).buf.as_mut_ptr(), buf.len());
|
||||||
self.send(msg)
|
self.send(msg)
|
||||||
@ -1114,12 +1151,22 @@ where
|
|||||||
/// Returns the next message, tag, buf, if avaliable, else None
|
/// Returns the next message, tag, buf, if avaliable, else None
|
||||||
#[allow(clippy::type_complexity)]
|
#[allow(clippy::type_complexity)]
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn recv_buf(&mut self) -> Result<Option<(u32, u32, &[u8])>, Error> {
|
pub fn recv_buf(&mut self) -> Result<Option<(u32, Tag, &[u8])>, Error> {
|
||||||
|
if let Some((sender, tag, _flags, buf)) = self.recv_buf_with_flags()? {
|
||||||
|
Ok(Some((sender, tag, buf)))
|
||||||
|
} else {
|
||||||
|
Ok(None)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
pub fn recv_buf_with_flags(&mut self) -> Result<Option<(u32, Tag, Flag, &[u8])>, Error> {
|
||||||
unsafe {
|
unsafe {
|
||||||
Ok(match self.recv()? {
|
Ok(match self.recv()? {
|
||||||
Some(msg) => Some((
|
Some(msg) => Some((
|
||||||
(*msg).sender,
|
(*msg).sender,
|
||||||
(*msg).tag,
|
(*msg).tag,
|
||||||
|
(*msg).flags,
|
||||||
(*msg).as_slice(&mut self.current_recv_map)?,
|
(*msg).as_slice(&mut self.current_recv_map)?,
|
||||||
)),
|
)),
|
||||||
None => None,
|
None => None,
|
||||||
@ -1129,7 +1176,7 @@ where
|
|||||||
|
|
||||||
/// Returns the next sender, tag, buf, looping until it becomes available
|
/// Returns the next sender, tag, buf, looping until it becomes available
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn recv_buf_blocking(&mut self) -> Result<(u32, u32, &[u8]), Error> {
|
pub fn recv_buf_blocking(&mut self) -> Result<(u32, Tag, &[u8]), Error> {
|
||||||
unsafe {
|
unsafe {
|
||||||
let msg = self.recv_blocking()?;
|
let msg = self.recv_blocking()?;
|
||||||
Ok((
|
Ok((
|
||||||
@ -1424,7 +1471,7 @@ where
|
|||||||
#[inline]
|
#[inline]
|
||||||
pub fn once<F>(&mut self, on_new_msg: &mut F) -> Result<(), Error>
|
pub fn once<F>(&mut self, on_new_msg: &mut F) -> Result<(), Error>
|
||||||
where
|
where
|
||||||
F: FnMut(u32, Tag, &[u8]) -> Result<LlmpMsgHookResult, Error>,
|
F: FnMut(u32, Tag, Flag, &[u8]) -> Result<LlmpMsgHookResult, Error>,
|
||||||
{
|
{
|
||||||
compiler_fence(Ordering::SeqCst);
|
compiler_fence(Ordering::SeqCst);
|
||||||
for i in 0..self.llmp_clients.len() {
|
for i in 0..self.llmp_clients.len() {
|
||||||
@ -1455,7 +1502,7 @@ where
|
|||||||
/// 5 millis of sleep can't hurt to keep busywait not at 100%
|
/// 5 millis of sleep can't hurt to keep busywait not at 100%
|
||||||
pub fn loop_forever<F>(&mut self, on_new_msg: &mut F, sleep_time: Option<Duration>)
|
pub fn loop_forever<F>(&mut self, on_new_msg: &mut F, sleep_time: Option<Duration>)
|
||||||
where
|
where
|
||||||
F: FnMut(u32, Tag, &[u8]) -> Result<LlmpMsgHookResult, Error>,
|
F: FnMut(u32, Tag, Flag, &[u8]) -> Result<LlmpMsgHookResult, Error>,
|
||||||
{
|
{
|
||||||
#[cfg(unix)]
|
#[cfg(unix)]
|
||||||
if let Err(_e) = unsafe { setup_signal_handler(&mut GLOBAL_SIGHANDLER_STATE) } {
|
if let Err(_e) = unsafe { setup_signal_handler(&mut GLOBAL_SIGHANDLER_STATE) } {
|
||||||
@ -1492,6 +1539,10 @@ where
|
|||||||
self.llmp_out.send_buf(tag, buf)
|
self.llmp_out.send_buf(tag, buf)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn send_buf_with_flags(&mut self, tag: Tag, flags: Flag, buf: &[u8]) -> Result<(), Error> {
|
||||||
|
self.llmp_out.send_buf_with_flags(tag, flags, buf)
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(feature = "std")]
|
#[cfg(feature = "std")]
|
||||||
/// Launches a thread using a tcp listener socket, on which new clients may connect to this broker
|
/// Launches a thread using a tcp listener socket, on which new clients may connect to this broker
|
||||||
/// Does so on the given port.
|
/// Does so on the given port.
|
||||||
@ -1595,7 +1646,7 @@ where
|
|||||||
#[inline]
|
#[inline]
|
||||||
unsafe fn handle_new_msgs<F>(&mut self, client_id: u32, on_new_msg: &mut F) -> Result<(), Error>
|
unsafe fn handle_new_msgs<F>(&mut self, client_id: u32, on_new_msg: &mut F) -> Result<(), Error>
|
||||||
where
|
where
|
||||||
F: FnMut(u32, Tag, &[u8]) -> Result<LlmpMsgHookResult, Error>,
|
F: FnMut(u32, Tag, Flag, &[u8]) -> Result<LlmpMsgHookResult, Error>,
|
||||||
{
|
{
|
||||||
let mut next_id = self.llmp_clients.len() as u32;
|
let mut next_id = self.llmp_clients.len() as u32;
|
||||||
|
|
||||||
@ -1662,7 +1713,9 @@ where
|
|||||||
|
|
||||||
let map = &mut self.llmp_clients[client_id as usize].current_recv_map;
|
let map = &mut self.llmp_clients[client_id as usize].current_recv_map;
|
||||||
let msg_buf = (*msg).as_slice(map)?;
|
let msg_buf = (*msg).as_slice(map)?;
|
||||||
if let LlmpMsgHookResult::Handled = (on_new_msg)(client_id, (*msg).tag, msg_buf)? {
|
if let LlmpMsgHookResult::Handled =
|
||||||
|
(on_new_msg)(client_id, (*msg).tag, (*msg).flags, msg_buf)?
|
||||||
|
{
|
||||||
should_forward_msg = false
|
should_forward_msg = false
|
||||||
};
|
};
|
||||||
if should_forward_msg {
|
if should_forward_msg {
|
||||||
@ -1827,6 +1880,10 @@ where
|
|||||||
self.sender.send_buf(tag, buf)
|
self.sender.send_buf(tag, buf)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn send_buf_with_flags(&mut self, tag: Tag, flags: Flag, buf: &[u8]) -> Result<(), Error> {
|
||||||
|
self.sender.send_buf_with_flags(tag, flags, buf)
|
||||||
|
}
|
||||||
|
|
||||||
/// Informs the broker about a new client in town, with the given map id
|
/// Informs the broker about a new client in town, with the given map id
|
||||||
pub fn send_client_added_msg(
|
pub fn send_client_added_msg(
|
||||||
&mut self,
|
&mut self,
|
||||||
@ -1876,16 +1933,20 @@ where
|
|||||||
/// Returns the next message, tag, buf, if avaliable, else None
|
/// Returns the next message, tag, buf, if avaliable, else None
|
||||||
#[allow(clippy::type_complexity)]
|
#[allow(clippy::type_complexity)]
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn recv_buf(&mut self) -> Result<Option<(u32, u32, &[u8])>, Error> {
|
pub fn recv_buf(&mut self) -> Result<Option<(u32, Tag, &[u8])>, Error> {
|
||||||
self.receiver.recv_buf()
|
self.receiver.recv_buf()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Receives a buf from the broker, looping until a messages becomes avaliable
|
/// Receives a buf from the broker, looping until a messages becomes avaliable
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn recv_buf_blocking(&mut self) -> Result<(u32, u32, &[u8]), Error> {
|
pub fn recv_buf_blocking(&mut self) -> Result<(u32, Tag, &[u8]), Error> {
|
||||||
self.receiver.recv_buf_blocking()
|
self.receiver.recv_buf_blocking()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn recv_buf_with_flags(&mut self) -> Result<Option<(u32, Tag, Flag, &[u8])>, Error> {
|
||||||
|
self.receiver.recv_buf_with_flags()
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(feature = "std")]
|
#[cfg(feature = "std")]
|
||||||
/// Creates a new LlmpClient, reading the map id and len from env
|
/// Creates a new LlmpClient, reading the map id and len from env
|
||||||
pub fn create_using_env(mut shmem_provider: SP, env_var: &str) -> Result<Self, Error> {
|
pub fn create_using_env(mut shmem_provider: SP, env_var: &str) -> Result<Self, Error> {
|
||||||
@ -1952,7 +2013,7 @@ mod tests {
|
|||||||
// Give the (background) tcp thread a few millis to post the message
|
// Give the (background) tcp thread a few millis to post the message
|
||||||
sleep(Duration::from_millis(100));
|
sleep(Duration::from_millis(100));
|
||||||
broker
|
broker
|
||||||
.once(&mut |_sender_id, _tag, _msg| Ok(ForwardToClients))
|
.once(&mut |_sender_id, _tag, _flags, _msg| Ok(ForwardToClients))
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
let tag: Tag = 0x1337;
|
let tag: Tag = 0x1337;
|
||||||
@ -1975,7 +2036,7 @@ mod tests {
|
|||||||
|
|
||||||
// Forward stuff to clients
|
// Forward stuff to clients
|
||||||
broker
|
broker
|
||||||
.once(&mut |_sender_id, _tag, _msg| Ok(ForwardToClients))
|
.once(&mut |_sender_id, _tag, _flags, _msg| Ok(ForwardToClients))
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let (_sender_id, tag2, arr2) = client.recv_buf_blocking().unwrap();
|
let (_sender_id, tag2, arr2) = client.recv_buf_blocking().unwrap();
|
||||||
assert_eq!(tag, tag2);
|
assert_eq!(tag, tag2);
|
||||||
|
@ -1,6 +1,10 @@
|
|||||||
//! Bolts are no conceptual fuzzing elements, but they keep libafl-based fuzzers together.
|
//! Bolts are no conceptual fuzzing elements, but they keep libafl-based fuzzers together.
|
||||||
|
|
||||||
pub mod bindings;
|
pub mod bindings;
|
||||||
|
|
||||||
|
#[cfg(feature = "llmp_compress")]
|
||||||
|
pub mod compress;
|
||||||
|
|
||||||
pub mod llmp;
|
pub mod llmp;
|
||||||
pub mod os;
|
pub mod os;
|
||||||
pub mod ownedref;
|
pub mod ownedref;
|
||||||
|
@ -15,7 +15,7 @@ use crate::bolts::{
|
|||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
bolts::{
|
bolts::{
|
||||||
llmp::{self, LlmpClientDescription, LlmpSender, Tag},
|
llmp::{self, Flag, LlmpClientDescription, LlmpSender, Tag},
|
||||||
shmem::ShMemProvider,
|
shmem::ShMemProvider,
|
||||||
},
|
},
|
||||||
corpus::CorpusScheduler,
|
corpus::CorpusScheduler,
|
||||||
@ -29,6 +29,12 @@ use crate::{
|
|||||||
Error,
|
Error,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
#[cfg(feature = "llmp_compress")]
|
||||||
|
use crate::bolts::{
|
||||||
|
compress::GzipCompressor,
|
||||||
|
llmp::{LLMP_FLAG_COMPRESSED, LLMP_FLAG_INITIALIZED},
|
||||||
|
};
|
||||||
|
|
||||||
#[cfg(all(feature = "std", windows))]
|
#[cfg(all(feature = "std", windows))]
|
||||||
use crate::utils::startable_self;
|
use crate::utils::startable_self;
|
||||||
|
|
||||||
@ -45,7 +51,6 @@ const _LLMP_TAG_EVENT_TO_BROKER: llmp::Tag = 0x2B80438;
|
|||||||
/// Handle in both
|
/// Handle in both
|
||||||
///
|
///
|
||||||
const LLMP_TAG_EVENT_TO_BOTH: llmp::Tag = 0x2B0741;
|
const LLMP_TAG_EVENT_TO_BOTH: llmp::Tag = 0x2B0741;
|
||||||
|
|
||||||
const _LLMP_TAG_RESTART: llmp::Tag = 0x8357A87;
|
const _LLMP_TAG_RESTART: llmp::Tag = 0x8357A87;
|
||||||
const _LLMP_TAG_NO_RESTART: llmp::Tag = 0x57A7EE71;
|
const _LLMP_TAG_NO_RESTART: llmp::Tag = 0x57A7EE71;
|
||||||
|
|
||||||
@ -60,9 +65,15 @@ where
|
|||||||
{
|
{
|
||||||
stats: Option<ST>,
|
stats: Option<ST>,
|
||||||
llmp: llmp::LlmpConnection<SP>,
|
llmp: llmp::LlmpConnection<SP>,
|
||||||
|
#[cfg(feature = "llmp_compress")]
|
||||||
|
compressor: GzipCompressor,
|
||||||
|
|
||||||
phantom: PhantomData<(I, S)>,
|
phantom: PhantomData<(I, S)>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "llmp_compress")]
|
||||||
|
const COMPRESS_THRESHOLD: usize = 1024;
|
||||||
|
|
||||||
impl<I, S, SP, ST> Drop for LlmpEventManager<I, S, SP, ST>
|
impl<I, S, SP, ST> Drop for LlmpEventManager<I, S, SP, ST>
|
||||||
where
|
where
|
||||||
I: Input,
|
I: Input,
|
||||||
@ -91,6 +102,8 @@ where
|
|||||||
Ok(Self {
|
Ok(Self {
|
||||||
stats: Some(stats),
|
stats: Some(stats),
|
||||||
llmp: llmp::LlmpConnection::on_port(shmem_provider, port)?,
|
llmp: llmp::LlmpConnection::on_port(shmem_provider, port)?,
|
||||||
|
#[cfg(feature = "llmp_compress")]
|
||||||
|
compressor: GzipCompressor::new(COMPRESS_THRESHOLD),
|
||||||
phantom: PhantomData,
|
phantom: PhantomData,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@ -103,6 +116,8 @@ where
|
|||||||
llmp: llmp::LlmpConnection::IsClient {
|
llmp: llmp::LlmpConnection::IsClient {
|
||||||
client: LlmpClient::on_existing_from_env(shmem_provider, env_name)?,
|
client: LlmpClient::on_existing_from_env(shmem_provider, env_name)?,
|
||||||
},
|
},
|
||||||
|
#[cfg(feature = "llmp_compress")]
|
||||||
|
compressor: GzipCompressor::new(COMPRESS_THRESHOLD),
|
||||||
// Inserting a nop-stats element here so rust won't complain.
|
// Inserting a nop-stats element here so rust won't complain.
|
||||||
// In any case, the client won't currently use it.
|
// In any case, the client won't currently use it.
|
||||||
phantom: PhantomData,
|
phantom: PhantomData,
|
||||||
@ -125,6 +140,8 @@ where
|
|||||||
shmem_provider,
|
shmem_provider,
|
||||||
description,
|
description,
|
||||||
)?,
|
)?,
|
||||||
|
#[cfg(feature = "llmp_compress")]
|
||||||
|
compressor: GzipCompressor::new(COMPRESS_THRESHOLD),
|
||||||
// Inserting a nop-stats element here so rust won't complain.
|
// Inserting a nop-stats element here so rust won't complain.
|
||||||
// In any case, the client won't currently use it.
|
// In any case, the client won't currently use it.
|
||||||
phantom: PhantomData,
|
phantom: PhantomData,
|
||||||
@ -152,9 +169,17 @@ where
|
|||||||
match &mut self.llmp {
|
match &mut self.llmp {
|
||||||
llmp::LlmpConnection::IsBroker { broker } => {
|
llmp::LlmpConnection::IsBroker { broker } => {
|
||||||
let stats = self.stats.as_mut().unwrap();
|
let stats = self.stats.as_mut().unwrap();
|
||||||
|
#[cfg(feature = "llmp_compress")]
|
||||||
|
let compressor = &self.compressor;
|
||||||
broker.loop_forever(
|
broker.loop_forever(
|
||||||
&mut |sender_id: u32, tag: Tag, msg: &[u8]| {
|
&mut |sender_id: u32, tag: Tag, _flags: Flag, msg: &[u8]| {
|
||||||
if tag == LLMP_TAG_EVENT_TO_BOTH {
|
if tag == LLMP_TAG_EVENT_TO_BOTH {
|
||||||
|
#[cfg(feature = "llmp_compress")]
|
||||||
|
let event: Event<I> = match compressor.decompress(_flags, msg)? {
|
||||||
|
Some(decompressed) => postcard::from_bytes(&decompressed)?,
|
||||||
|
None => postcard::from_bytes(msg)?,
|
||||||
|
};
|
||||||
|
#[cfg(not(feature = "llmp_compress"))]
|
||||||
let event: Event<I> = postcard::from_bytes(msg)?;
|
let event: Event<I> = postcard::from_bytes(msg)?;
|
||||||
match Self::handle_in_broker(stats, sender_id, &event)? {
|
match Self::handle_in_broker(stats, sender_id, &event)? {
|
||||||
BrokerEventResult::Forward => {
|
BrokerEventResult::Forward => {
|
||||||
@ -310,10 +335,16 @@ where
|
|||||||
let mut events = vec![];
|
let mut events = vec![];
|
||||||
match &mut self.llmp {
|
match &mut self.llmp {
|
||||||
llmp::LlmpConnection::IsClient { client } => {
|
llmp::LlmpConnection::IsClient { client } => {
|
||||||
while let Some((sender_id, tag, msg)) = client.recv_buf()? {
|
while let Some((sender_id, tag, _flags, msg)) = client.recv_buf_with_flags()? {
|
||||||
if tag == _LLMP_TAG_EVENT_TO_BROKER {
|
if tag == _LLMP_TAG_EVENT_TO_BROKER {
|
||||||
panic!("EVENT_TO_BROKER parcel should not have arrived in the client!");
|
panic!("EVENT_TO_BROKER parcel should not have arrived in the client!");
|
||||||
}
|
}
|
||||||
|
#[cfg(feature = "llmp_compress")]
|
||||||
|
let event: Event<I> = match self.compressor.decompress(_flags, msg)? {
|
||||||
|
Some(decompressed) => postcard::from_bytes(&decompressed)?,
|
||||||
|
None => postcard::from_bytes(msg)?,
|
||||||
|
};
|
||||||
|
#[cfg(not(feature = "llmp_compress"))]
|
||||||
let event: Event<I> = postcard::from_bytes(msg)?;
|
let event: Event<I> = postcard::from_bytes(msg)?;
|
||||||
events.push((sender_id, event));
|
events.push((sender_id, event));
|
||||||
}
|
}
|
||||||
@ -330,6 +361,27 @@ where
|
|||||||
Ok(count)
|
Ok(count)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "llmp_compress")]
|
||||||
|
fn fire(&mut self, _state: &mut S, event: Event<I>) -> Result<(), Error> {
|
||||||
|
let serialized = postcard::to_allocvec(&event)?;
|
||||||
|
let flags: Flag = LLMP_FLAG_INITIALIZED;
|
||||||
|
|
||||||
|
match self.compressor.compress(&serialized)? {
|
||||||
|
Some(comp_buf) => {
|
||||||
|
self.llmp.send_buf_with_flags(
|
||||||
|
LLMP_TAG_EVENT_TO_BOTH,
|
||||||
|
&comp_buf,
|
||||||
|
flags | LLMP_FLAG_COMPRESSED,
|
||||||
|
)?;
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
self.llmp.send_buf(LLMP_TAG_EVENT_TO_BOTH, &serialized)?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(not(feature = "llmp_compress"))]
|
||||||
fn fire(&mut self, _state: &mut S, event: Event<I>) -> Result<(), Error> {
|
fn fire(&mut self, _state: &mut S, event: Event<I>) -> Result<(), Error> {
|
||||||
let serialized = postcard::to_allocvec(&event)?;
|
let serialized = postcard::to_allocvec(&event)?;
|
||||||
self.llmp.send_buf(LLMP_TAG_EVENT_TO_BOTH, &serialized)?;
|
self.llmp.send_buf(LLMP_TAG_EVENT_TO_BOTH, &serialized)?;
|
||||||
|
@ -50,6 +50,9 @@ use std::{env::VarError, io, num::ParseIntError, string::FromUtf8Error};
|
|||||||
pub enum Error {
|
pub enum Error {
|
||||||
/// Serialization error
|
/// Serialization error
|
||||||
Serialize(String),
|
Serialize(String),
|
||||||
|
/// Compression error
|
||||||
|
#[cfg(feature = "llmp_compress")]
|
||||||
|
Compression(String),
|
||||||
/// File related error
|
/// File related error
|
||||||
#[cfg(feature = "std")]
|
#[cfg(feature = "std")]
|
||||||
File(io::Error),
|
File(io::Error),
|
||||||
@ -77,6 +80,8 @@ impl fmt::Display for Error {
|
|||||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||||
match self {
|
match self {
|
||||||
Self::Serialize(s) => write!(f, "Error in Serialization: `{0}`", &s),
|
Self::Serialize(s) => write!(f, "Error in Serialization: `{0}`", &s),
|
||||||
|
#[cfg(feature = "llmp_compress")]
|
||||||
|
Self::Compression(s) => write!(f, "Error in Compression: `{0}`", &s),
|
||||||
#[cfg(feature = "std")]
|
#[cfg(feature = "std")]
|
||||||
Self::File(err) => write!(f, "File IO failed: {:?}", &err),
|
Self::File(err) => write!(f, "File IO failed: {:?}", &err),
|
||||||
Self::EmptyOptional(s) => write!(f, "Optional value `{0}` was not set", &s),
|
Self::EmptyOptional(s) => write!(f, "Optional value `{0}` was not set", &s),
|
||||||
@ -101,6 +106,13 @@ impl From<postcard::Error> for Error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "llmp_compress")]
|
||||||
|
impl From<compression::prelude::CompressionError> for Error {
|
||||||
|
fn from(err: compression::prelude::CompressionError) -> Self {
|
||||||
|
Self::Compression(format!("{:?}", err))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Stringify the json serializer error
|
/// Stringify the json serializer error
|
||||||
#[cfg(feature = "std")]
|
#[cfg(feature = "std")]
|
||||||
impl From<serde_json::Error> for Error {
|
impl From<serde_json::Error> for Error {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user