Refactored compression

This commit is contained in:
Dominik Maier 2021-04-29 13:16:51 +02:00
parent 9d748a887c
commit f3b4305dac
5 changed files with 78 additions and 47 deletions

View File

@ -34,13 +34,13 @@ harness = false
#debug = true #debug = true
[features] [features]
default = ["std", "anymap_debug", "derive"] default = ["std", "anymap_debug", "derive", "llmp_compression"]
std = [] # print, sharedmap, ... support std = [] # print, sharedmap, ... support
anymap_debug = ["serde_json"] # uses serde_json to Debug the anymap trait. Disable for smaller footprint. anymap_debug = ["serde_json"] # uses serde_json to Debug the anymap trait. Disable for smaller footprint.
derive = ["libafl_derive"] # provide derive(SerdeAny) macro. derive = ["libafl_derive"] # provide derive(SerdeAny) macro.
llmp_small_maps = [] # reduces initial map size for llmp llmp_small_maps = [] # reduces initial map size for llmp
llmp_debug = ["backtrace"] # Enables debug output for LLMP llmp_debug = ["backtrace"] # Enables debug output for LLMP
llmp_compress = [] #llmp compression using GZip llmp_compression = [] #llmp compression using GZip
[[example]] [[example]]
name = "llmp_test" name = "llmp_test"

View File

@ -1,22 +1,22 @@
//! Compression of events passed between a broker and clients. //! Compression of events passed between a broker and clients.
//! Currently we use the gzip compression algorithm for its fast decompression performance. //! Currently we use the gzip compression algorithm for its fast decompression performance.
#[cfg(feature = "llmp_compress")] #[cfg(feature = "llmp_compression")]
use crate::{ use crate::Error;
bolts::llmp::{Flag, LLMP_FLAG_COMPRESSED},
Error,
};
use alloc::vec::Vec; use alloc::vec::Vec;
use compression::prelude::*; use compression::prelude::*;
use core::fmt::Debug; use core::fmt::Debug;
/// Compression for your stream compression needs.
#[derive(Debug)] #[derive(Debug)]
pub struct GzipCompressor { pub struct GzipCompressor {
/// If less bytes than threshold are being passed to `compress`, the payload is not getting compressed.
threshold: usize, threshold: usize,
} }
impl GzipCompressor { impl GzipCompressor {
/// If the buffer is larger than the threshold value, we compress the buffer. /// If the buffer is at lest larger as large as the `threshold` value, we compress the buffer.
/// When given a `threshold` of `0`, the `GzipCompressor` will always compress.
pub fn new(threshold: usize) -> Self { pub fn new(threshold: usize) -> Self {
GzipCompressor { threshold } GzipCompressor { threshold }
} }
@ -24,9 +24,10 @@ impl GzipCompressor {
impl GzipCompressor { impl GzipCompressor {
/// Compression. /// Compression.
/// The buffer is compressed with the gzip algo /// If the buffer is smaller than the threshold of this compressor, `None` will be returned.
/// Else, the buffer is compressed.
pub fn compress(&self, buf: &[u8]) -> Result<Option<Vec<u8>>, Error> { pub fn compress(&self, buf: &[u8]) -> Result<Option<Vec<u8>>, Error> {
if buf.len() > self.threshold { if buf.len() >= self.threshold {
//compress if the buffer is large enough //compress if the buffer is large enough
let compressed = buf let compressed = buf
.iter() .iter()
@ -41,16 +42,34 @@ impl GzipCompressor {
/// Decompression. /// Decompression.
/// Flag is used to indicate if it's compressed or not /// Flag is used to indicate if it's compressed or not
pub fn decompress(&self, flags: Flag, buf: &[u8]) -> Result<Option<Vec<u8>>, Error> { pub fn decompress(&self, buf: &[u8]) -> Result<Vec<u8>, Error> {
if flags & LLMP_FLAG_COMPRESSED == LLMP_FLAG_COMPRESSED { Ok(buf
let decompressed: Vec<u8> = buf
.iter() .iter()
.cloned() .cloned()
.decode(&mut GZipDecoder::new()) .decode(&mut GZipDecoder::new())
.collect::<Result<Vec<_>, _>>()?; .collect::<Result<Vec<_>, _>>()?)
Ok(Some(decompressed)) }
} else { }
Ok(None)
} #[cfg(test)]
mod tests {
use crate::bolts::compress::GzipCompressor;
#[test]
fn test_compression() {
let compressor = GzipCompressor::new(1);
assert!(
compressor
.decompress(&compressor.compress(&[1u8; 1024]).unwrap().unwrap())
.unwrap()
== vec![1u8; 1024]
);
}
#[test]
fn test_threshold() {
let compressor = GzipCompressor::new(1024);
assert!(compressor.compress(&[1u8; 1023]).unwrap().is_none());
assert!(compressor.compress(&[1u8; 1024]).unwrap().is_some());
} }
} }

View File

@ -2,7 +2,7 @@
pub mod bindings; pub mod bindings;
#[cfg(feature = "llmp_compress")] #[cfg(feature = "llmp_compression")]
pub mod compress; pub mod compress;
pub mod llmp; pub mod llmp;

View File

@ -29,7 +29,7 @@ use crate::{
Error, Error,
}; };
#[cfg(feature = "llmp_compress")] #[cfg(feature = "llmp_compression")]
use crate::bolts::{ use crate::bolts::{
compress::GzipCompressor, compress::GzipCompressor,
llmp::{LLMP_FLAG_COMPRESSED, LLMP_FLAG_INITIALIZED}, llmp::{LLMP_FLAG_COMPRESSED, LLMP_FLAG_INITIALIZED},
@ -65,13 +65,14 @@ where
{ {
stats: Option<ST>, stats: Option<ST>,
llmp: llmp::LlmpConnection<SP>, llmp: llmp::LlmpConnection<SP>,
#[cfg(feature = "llmp_compress")] #[cfg(feature = "llmp_compression")]
compressor: GzipCompressor, compressor: GzipCompressor,
phantom: PhantomData<(I, S)>, phantom: PhantomData<(I, S)>,
} }
#[cfg(feature = "llmp_compress")] /// The minimum buffer size at which to compress LLMP IPC messages.
#[cfg(feature = "llmp_compression")]
const COMPRESS_THRESHOLD: usize = 1024; const COMPRESS_THRESHOLD: usize = 1024;
impl<I, S, SP, ST> Drop for LlmpEventManager<I, S, SP, ST> impl<I, S, SP, ST> Drop for LlmpEventManager<I, S, SP, ST>
@ -102,7 +103,7 @@ where
Ok(Self { Ok(Self {
stats: Some(stats), stats: Some(stats),
llmp: llmp::LlmpConnection::on_port(shmem_provider, port)?, llmp: llmp::LlmpConnection::on_port(shmem_provider, port)?,
#[cfg(feature = "llmp_compress")] #[cfg(feature = "llmp_compression")]
compressor: GzipCompressor::new(COMPRESS_THRESHOLD), compressor: GzipCompressor::new(COMPRESS_THRESHOLD),
phantom: PhantomData, phantom: PhantomData,
}) })
@ -116,7 +117,7 @@ where
llmp: llmp::LlmpConnection::IsClient { llmp: llmp::LlmpConnection::IsClient {
client: LlmpClient::on_existing_from_env(shmem_provider, env_name)?, client: LlmpClient::on_existing_from_env(shmem_provider, env_name)?,
}, },
#[cfg(feature = "llmp_compress")] #[cfg(feature = "llmp_compression")]
compressor: GzipCompressor::new(COMPRESS_THRESHOLD), compressor: GzipCompressor::new(COMPRESS_THRESHOLD),
// Inserting a nop-stats element here so rust won't complain. // Inserting a nop-stats element here so rust won't complain.
// In any case, the client won't currently use it. // In any case, the client won't currently use it.
@ -140,7 +141,7 @@ where
shmem_provider, shmem_provider,
description, description,
)?, )?,
#[cfg(feature = "llmp_compress")] #[cfg(feature = "llmp_compression")]
compressor: GzipCompressor::new(COMPRESS_THRESHOLD), compressor: GzipCompressor::new(COMPRESS_THRESHOLD),
// Inserting a nop-stats element here so rust won't complain. // Inserting a nop-stats element here so rust won't complain.
// In any case, the client won't currently use it. // In any case, the client won't currently use it.
@ -169,18 +170,24 @@ where
match &mut self.llmp { match &mut self.llmp {
llmp::LlmpConnection::IsBroker { broker } => { llmp::LlmpConnection::IsBroker { broker } => {
let stats = self.stats.as_mut().unwrap(); let stats = self.stats.as_mut().unwrap();
#[cfg(feature = "llmp_compress")] #[cfg(feature = "llmp_compression")]
let compressor = &self.compressor; let compressor = &self.compressor;
broker.loop_forever( broker.loop_forever(
&mut |sender_id: u32, tag: Tag, _flags: Flag, msg: &[u8]| { &mut |sender_id: u32, tag: Tag, _flags: Flag, msg: &[u8]| {
if tag == LLMP_TAG_EVENT_TO_BOTH { if tag == LLMP_TAG_EVENT_TO_BOTH {
#[cfg(feature = "llmp_compress")] #[cfg(not(feature = "llmp_compression"))]
let event: Event<I> = match compressor.decompress(_flags, msg)? { let event_bytes = msg;
Some(decompressed) => postcard::from_bytes(&decompressed)?, #[cfg(feature = "llmp_compression")]
None => postcard::from_bytes(msg)?, let compressed;
#[cfg(feature = "llmp_compression")]
let event_bytes =
if _flags & LLMP_FLAG_COMPRESSED == LLMP_FLAG_COMPRESSED {
compressed = compressor.decompress(msg)?;
&compressed
} else {
msg
}; };
#[cfg(not(feature = "llmp_compress"))] let event: Event<I> = postcard::from_bytes(event_bytes)?;
let event: Event<I> = postcard::from_bytes(msg)?;
match Self::handle_in_broker(stats, sender_id, &event)? { match Self::handle_in_broker(stats, sender_id, &event)? {
BrokerEventResult::Forward => { BrokerEventResult::Forward => {
Ok(llmp::LlmpMsgHookResult::ForwardToClients) Ok(llmp::LlmpMsgHookResult::ForwardToClients)
@ -339,13 +346,18 @@ where
if tag == _LLMP_TAG_EVENT_TO_BROKER { if tag == _LLMP_TAG_EVENT_TO_BROKER {
panic!("EVENT_TO_BROKER parcel should not have arrived in the client!"); panic!("EVENT_TO_BROKER parcel should not have arrived in the client!");
} }
#[cfg(feature = "llmp_compress")] #[cfg(not(feature = "llmp_compression"))]
let event: Event<I> = match self.compressor.decompress(_flags, msg)? { let event_bytes = msg;
Some(decompressed) => postcard::from_bytes(&decompressed)?, #[cfg(feature = "llmp_compression")]
None => postcard::from_bytes(msg)?, let compressed;
#[cfg(feature = "llmp_compression")]
let event_bytes = if _flags & LLMP_FLAG_COMPRESSED == LLMP_FLAG_COMPRESSED {
compressed = self.compressor.decompress(msg)?;
&compressed
} else {
msg
}; };
#[cfg(not(feature = "llmp_compress"))] let event: Event<I> = postcard::from_bytes(event_bytes)?;
let event: Event<I> = postcard::from_bytes(msg)?;
events.push((sender_id, event)); events.push((sender_id, event));
} }
} }
@ -361,7 +373,7 @@ where
Ok(count) Ok(count)
} }
#[cfg(feature = "llmp_compress")] #[cfg(feature = "llmp_compression")]
fn fire(&mut self, _state: &mut S, event: Event<I>) -> Result<(), Error> { fn fire(&mut self, _state: &mut S, event: Event<I>) -> Result<(), Error> {
let serialized = postcard::to_allocvec(&event)?; let serialized = postcard::to_allocvec(&event)?;
let flags: Flag = LLMP_FLAG_INITIALIZED; let flags: Flag = LLMP_FLAG_INITIALIZED;
@ -381,7 +393,7 @@ where
Ok(()) Ok(())
} }
#[cfg(not(feature = "llmp_compress"))] #[cfg(not(feature = "llmp_compression"))]
fn fire(&mut self, _state: &mut S, event: Event<I>) -> Result<(), Error> { fn fire(&mut self, _state: &mut S, event: Event<I>) -> Result<(), Error> {
let serialized = postcard::to_allocvec(&event)?; let serialized = postcard::to_allocvec(&event)?;
self.llmp.send_buf(LLMP_TAG_EVENT_TO_BOTH, &serialized)?; self.llmp.send_buf(LLMP_TAG_EVENT_TO_BOTH, &serialized)?;

View File

@ -51,7 +51,7 @@ pub enum Error {
/// Serialization error /// Serialization error
Serialize(String), Serialize(String),
/// Compression error /// Compression error
#[cfg(feature = "llmp_compress")] #[cfg(feature = "llmp_compression")]
Compression(String), Compression(String),
/// File related error /// File related error
#[cfg(feature = "std")] #[cfg(feature = "std")]
@ -80,7 +80,7 @@ impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self { match self {
Self::Serialize(s) => write!(f, "Error in Serialization: `{0}`", &s), Self::Serialize(s) => write!(f, "Error in Serialization: `{0}`", &s),
#[cfg(feature = "llmp_compress")] #[cfg(feature = "llmp_compression")]
Self::Compression(s) => write!(f, "Error in Compression: `{0}`", &s), Self::Compression(s) => write!(f, "Error in Compression: `{0}`", &s),
#[cfg(feature = "std")] #[cfg(feature = "std")]
Self::File(err) => write!(f, "File IO failed: {:?}", &err), Self::File(err) => write!(f, "File IO failed: {:?}", &err),
@ -106,7 +106,7 @@ impl From<postcard::Error> for Error {
} }
} }
#[cfg(feature = "llmp_compress")] #[cfg(feature = "llmp_compression")]
impl From<compression::prelude::CompressionError> for Error { impl From<compression::prelude::CompressionError> for Error {
fn from(err: compression::prelude::CompressionError) -> Self { fn from(err: compression::prelude::CompressionError) -> Self {
Self::Compression(format!("{:?}", err)) Self::Compression(format!("{:?}", err))