From f3b4305dac6892409d949fcabb09b32a67b65807 Mon Sep 17 00:00:00 2001 From: Dominik Maier Date: Thu, 29 Apr 2021 13:16:51 +0200 Subject: [PATCH] Refactored compression --- libafl/Cargo.toml | 4 +-- libafl/src/bolts/compress.rs | 57 ++++++++++++++++++++++++------------ libafl/src/bolts/mod.rs | 2 +- libafl/src/events/llmp.rs | 56 +++++++++++++++++++++-------------- libafl/src/lib.rs | 6 ++-- 5 files changed, 78 insertions(+), 47 deletions(-) diff --git a/libafl/Cargo.toml b/libafl/Cargo.toml index 321e50ecd3..9d0466b2d5 100644 --- a/libafl/Cargo.toml +++ b/libafl/Cargo.toml @@ -34,13 +34,13 @@ harness = false #debug = true [features] -default = ["std", "anymap_debug", "derive"] +default = ["std", "anymap_debug", "derive", "llmp_compression"] std = [] # print, sharedmap, ... support anymap_debug = ["serde_json"] # uses serde_json to Debug the anymap trait. Disable for smaller footprint. derive = ["libafl_derive"] # provide derive(SerdeAny) macro. llmp_small_maps = [] # reduces initial map size for llmp llmp_debug = ["backtrace"] # Enables debug output for LLMP -llmp_compress = [] #llmp compression using GZip +llmp_compression = [] #llmp compression using GZip [[example]] name = "llmp_test" diff --git a/libafl/src/bolts/compress.rs b/libafl/src/bolts/compress.rs index e2cdb689f8..1da1f84cb8 100644 --- a/libafl/src/bolts/compress.rs +++ b/libafl/src/bolts/compress.rs @@ -1,22 +1,22 @@ //! Compression of events passed between a broker and clients. //! Currently we use the gzip compression algorithm for its fast decompression performance. -#[cfg(feature = "llmp_compress")] -use crate::{ - bolts::llmp::{Flag, LLMP_FLAG_COMPRESSED}, - Error, -}; +#[cfg(feature = "llmp_compression")] +use crate::Error; use alloc::vec::Vec; use compression::prelude::*; use core::fmt::Debug; +/// Compression for your stream compression needs. #[derive(Debug)] pub struct GzipCompressor { + /// If less bytes than threshold are being passed to `compress`, the payload is not getting compressed. threshold: usize, } impl GzipCompressor { - /// If the buffer is larger than the threshold value, we compress the buffer. + /// If the buffer is at lest larger as large as the `threshold` value, we compress the buffer. + /// When given a `threshold` of `0`, the `GzipCompressor` will always compress. pub fn new(threshold: usize) -> Self { GzipCompressor { threshold } } @@ -24,9 +24,10 @@ impl GzipCompressor { impl GzipCompressor { /// Compression. - /// The buffer is compressed with the gzip algo + /// If the buffer is smaller than the threshold of this compressor, `None` will be returned. + /// Else, the buffer is compressed. pub fn compress(&self, buf: &[u8]) -> Result>, Error> { - if buf.len() > self.threshold { + if buf.len() >= self.threshold { //compress if the buffer is large enough let compressed = buf .iter() @@ -41,16 +42,34 @@ impl GzipCompressor { /// Decompression. /// Flag is used to indicate if it's compressed or not - pub fn decompress(&self, flags: Flag, buf: &[u8]) -> Result>, Error> { - if flags & LLMP_FLAG_COMPRESSED == LLMP_FLAG_COMPRESSED { - let decompressed: Vec = buf - .iter() - .cloned() - .decode(&mut GZipDecoder::new()) - .collect::, _>>()?; - Ok(Some(decompressed)) - } else { - Ok(None) - } + pub fn decompress(&self, buf: &[u8]) -> Result, Error> { + Ok(buf + .iter() + .cloned() + .decode(&mut GZipDecoder::new()) + .collect::, _>>()?) + } +} + +#[cfg(test)] +mod tests { + use crate::bolts::compress::GzipCompressor; + + #[test] + fn test_compression() { + let compressor = GzipCompressor::new(1); + assert!( + compressor + .decompress(&compressor.compress(&[1u8; 1024]).unwrap().unwrap()) + .unwrap() + == vec![1u8; 1024] + ); + } + + #[test] + fn test_threshold() { + let compressor = GzipCompressor::new(1024); + assert!(compressor.compress(&[1u8; 1023]).unwrap().is_none()); + assert!(compressor.compress(&[1u8; 1024]).unwrap().is_some()); } } diff --git a/libafl/src/bolts/mod.rs b/libafl/src/bolts/mod.rs index 87db34aa7c..62c8830b95 100644 --- a/libafl/src/bolts/mod.rs +++ b/libafl/src/bolts/mod.rs @@ -2,7 +2,7 @@ pub mod bindings; -#[cfg(feature = "llmp_compress")] +#[cfg(feature = "llmp_compression")] pub mod compress; pub mod llmp; diff --git a/libafl/src/events/llmp.rs b/libafl/src/events/llmp.rs index 62caf9d0de..90fd00f864 100644 --- a/libafl/src/events/llmp.rs +++ b/libafl/src/events/llmp.rs @@ -29,7 +29,7 @@ use crate::{ Error, }; -#[cfg(feature = "llmp_compress")] +#[cfg(feature = "llmp_compression")] use crate::bolts::{ compress::GzipCompressor, llmp::{LLMP_FLAG_COMPRESSED, LLMP_FLAG_INITIALIZED}, @@ -65,13 +65,14 @@ where { stats: Option, llmp: llmp::LlmpConnection, - #[cfg(feature = "llmp_compress")] + #[cfg(feature = "llmp_compression")] compressor: GzipCompressor, phantom: PhantomData<(I, S)>, } -#[cfg(feature = "llmp_compress")] +/// The minimum buffer size at which to compress LLMP IPC messages. +#[cfg(feature = "llmp_compression")] const COMPRESS_THRESHOLD: usize = 1024; impl Drop for LlmpEventManager @@ -102,7 +103,7 @@ where Ok(Self { stats: Some(stats), llmp: llmp::LlmpConnection::on_port(shmem_provider, port)?, - #[cfg(feature = "llmp_compress")] + #[cfg(feature = "llmp_compression")] compressor: GzipCompressor::new(COMPRESS_THRESHOLD), phantom: PhantomData, }) @@ -116,7 +117,7 @@ where llmp: llmp::LlmpConnection::IsClient { client: LlmpClient::on_existing_from_env(shmem_provider, env_name)?, }, - #[cfg(feature = "llmp_compress")] + #[cfg(feature = "llmp_compression")] compressor: GzipCompressor::new(COMPRESS_THRESHOLD), // Inserting a nop-stats element here so rust won't complain. // In any case, the client won't currently use it. @@ -140,7 +141,7 @@ where shmem_provider, description, )?, - #[cfg(feature = "llmp_compress")] + #[cfg(feature = "llmp_compression")] compressor: GzipCompressor::new(COMPRESS_THRESHOLD), // Inserting a nop-stats element here so rust won't complain. // In any case, the client won't currently use it. @@ -169,18 +170,24 @@ where match &mut self.llmp { llmp::LlmpConnection::IsBroker { broker } => { let stats = self.stats.as_mut().unwrap(); - #[cfg(feature = "llmp_compress")] + #[cfg(feature = "llmp_compression")] let compressor = &self.compressor; broker.loop_forever( &mut |sender_id: u32, tag: Tag, _flags: Flag, msg: &[u8]| { if tag == LLMP_TAG_EVENT_TO_BOTH { - #[cfg(feature = "llmp_compress")] - let event: Event = match compressor.decompress(_flags, msg)? { - Some(decompressed) => postcard::from_bytes(&decompressed)?, - None => postcard::from_bytes(msg)?, - }; - #[cfg(not(feature = "llmp_compress"))] - let event: Event = postcard::from_bytes(msg)?; + #[cfg(not(feature = "llmp_compression"))] + let event_bytes = msg; + #[cfg(feature = "llmp_compression")] + let compressed; + #[cfg(feature = "llmp_compression")] + let event_bytes = + if _flags & LLMP_FLAG_COMPRESSED == LLMP_FLAG_COMPRESSED { + compressed = compressor.decompress(msg)?; + &compressed + } else { + msg + }; + let event: Event = postcard::from_bytes(event_bytes)?; match Self::handle_in_broker(stats, sender_id, &event)? { BrokerEventResult::Forward => { Ok(llmp::LlmpMsgHookResult::ForwardToClients) @@ -339,13 +346,18 @@ where if tag == _LLMP_TAG_EVENT_TO_BROKER { panic!("EVENT_TO_BROKER parcel should not have arrived in the client!"); } - #[cfg(feature = "llmp_compress")] - let event: Event = match self.compressor.decompress(_flags, msg)? { - Some(decompressed) => postcard::from_bytes(&decompressed)?, - None => postcard::from_bytes(msg)?, + #[cfg(not(feature = "llmp_compression"))] + let event_bytes = msg; + #[cfg(feature = "llmp_compression")] + let compressed; + #[cfg(feature = "llmp_compression")] + let event_bytes = if _flags & LLMP_FLAG_COMPRESSED == LLMP_FLAG_COMPRESSED { + compressed = self.compressor.decompress(msg)?; + &compressed + } else { + msg }; - #[cfg(not(feature = "llmp_compress"))] - let event: Event = postcard::from_bytes(msg)?; + let event: Event = postcard::from_bytes(event_bytes)?; events.push((sender_id, event)); } } @@ -361,7 +373,7 @@ where Ok(count) } - #[cfg(feature = "llmp_compress")] + #[cfg(feature = "llmp_compression")] fn fire(&mut self, _state: &mut S, event: Event) -> Result<(), Error> { let serialized = postcard::to_allocvec(&event)?; let flags: Flag = LLMP_FLAG_INITIALIZED; @@ -381,7 +393,7 @@ where Ok(()) } - #[cfg(not(feature = "llmp_compress"))] + #[cfg(not(feature = "llmp_compression"))] fn fire(&mut self, _state: &mut S, event: Event) -> Result<(), Error> { let serialized = postcard::to_allocvec(&event)?; self.llmp.send_buf(LLMP_TAG_EVENT_TO_BOTH, &serialized)?; diff --git a/libafl/src/lib.rs b/libafl/src/lib.rs index b08b258916..421506380d 100644 --- a/libafl/src/lib.rs +++ b/libafl/src/lib.rs @@ -51,7 +51,7 @@ pub enum Error { /// Serialization error Serialize(String), /// Compression error - #[cfg(feature = "llmp_compress")] + #[cfg(feature = "llmp_compression")] Compression(String), /// File related error #[cfg(feature = "std")] @@ -80,7 +80,7 @@ impl fmt::Display for Error { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self { Self::Serialize(s) => write!(f, "Error in Serialization: `{0}`", &s), - #[cfg(feature = "llmp_compress")] + #[cfg(feature = "llmp_compression")] Self::Compression(s) => write!(f, "Error in Compression: `{0}`", &s), #[cfg(feature = "std")] Self::File(err) => write!(f, "File IO failed: {:?}", &err), @@ -106,7 +106,7 @@ impl From for Error { } } -#[cfg(feature = "llmp_compress")] +#[cfg(feature = "llmp_compression")] impl From for Error { fn from(err: compression::prelude::CompressionError) -> Self { Self::Compression(format!("{:?}", err))