diff --git a/libafl/Cargo.toml b/libafl/Cargo.toml
index 321e50ecd3..9d0466b2d5 100644
--- a/libafl/Cargo.toml
+++ b/libafl/Cargo.toml
@@ -34,13 +34,13 @@ harness = false
#debug = true
[features]
-default = ["std", "anymap_debug", "derive"]
+default = ["std", "anymap_debug", "derive", "llmp_compression"]
std = [] # print, sharedmap, ... support
anymap_debug = ["serde_json"] # uses serde_json to Debug the anymap trait. Disable for smaller footprint.
derive = ["libafl_derive"] # provide derive(SerdeAny) macro.
llmp_small_maps = [] # reduces initial map size for llmp
llmp_debug = ["backtrace"] # Enables debug output for LLMP
-llmp_compress = [] #llmp compression using GZip
+llmp_compression = [] #llmp compression using GZip
[[example]]
name = "llmp_test"
diff --git a/libafl/src/bolts/compress.rs b/libafl/src/bolts/compress.rs
index e2cdb689f8..1da1f84cb8 100644
--- a/libafl/src/bolts/compress.rs
+++ b/libafl/src/bolts/compress.rs
@@ -1,22 +1,22 @@
//! Compression of events passed between a broker and clients.
//! Currently we use the gzip compression algorithm for its fast decompression performance.
-#[cfg(feature = "llmp_compress")]
-use crate::{
- bolts::llmp::{Flag, LLMP_FLAG_COMPRESSED},
- Error,
-};
+#[cfg(feature = "llmp_compression")]
+use crate::Error;
use alloc::vec::Vec;
use compression::prelude::*;
use core::fmt::Debug;
+/// Compression for your stream compression needs.
#[derive(Debug)]
pub struct GzipCompressor {
+ /// If less bytes than threshold are being passed to `compress`, the payload is not getting compressed.
threshold: usize,
}
impl GzipCompressor {
- /// If the buffer is larger than the threshold value, we compress the buffer.
+ /// If the buffer is at lest larger as large as the `threshold` value, we compress the buffer.
+ /// When given a `threshold` of `0`, the `GzipCompressor` will always compress.
pub fn new(threshold: usize) -> Self {
GzipCompressor { threshold }
}
@@ -24,9 +24,10 @@ impl GzipCompressor {
impl GzipCompressor {
/// Compression.
- /// The buffer is compressed with the gzip algo
+ /// If the buffer is smaller than the threshold of this compressor, `None` will be returned.
+ /// Else, the buffer is compressed.
pub fn compress(&self, buf: &[u8]) -> Result>, Error> {
- if buf.len() > self.threshold {
+ if buf.len() >= self.threshold {
//compress if the buffer is large enough
let compressed = buf
.iter()
@@ -41,16 +42,34 @@ impl GzipCompressor {
/// Decompression.
/// Flag is used to indicate if it's compressed or not
- pub fn decompress(&self, flags: Flag, buf: &[u8]) -> Result >, Error> {
- if flags & LLMP_FLAG_COMPRESSED == LLMP_FLAG_COMPRESSED {
- let decompressed: Vec = buf
- .iter()
- .cloned()
- .decode(&mut GZipDecoder::new())
- .collect::, _>>()?;
- Ok(Some(decompressed))
- } else {
- Ok(None)
- }
+ pub fn decompress(&self, buf: &[u8]) -> Result, Error> {
+ Ok(buf
+ .iter()
+ .cloned()
+ .decode(&mut GZipDecoder::new())
+ .collect::, _>>()?)
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use crate::bolts::compress::GzipCompressor;
+
+ #[test]
+ fn test_compression() {
+ let compressor = GzipCompressor::new(1);
+ assert!(
+ compressor
+ .decompress(&compressor.compress(&[1u8; 1024]).unwrap().unwrap())
+ .unwrap()
+ == vec![1u8; 1024]
+ );
+ }
+
+ #[test]
+ fn test_threshold() {
+ let compressor = GzipCompressor::new(1024);
+ assert!(compressor.compress(&[1u8; 1023]).unwrap().is_none());
+ assert!(compressor.compress(&[1u8; 1024]).unwrap().is_some());
}
}
diff --git a/libafl/src/bolts/mod.rs b/libafl/src/bolts/mod.rs
index 87db34aa7c..62c8830b95 100644
--- a/libafl/src/bolts/mod.rs
+++ b/libafl/src/bolts/mod.rs
@@ -2,7 +2,7 @@
pub mod bindings;
-#[cfg(feature = "llmp_compress")]
+#[cfg(feature = "llmp_compression")]
pub mod compress;
pub mod llmp;
diff --git a/libafl/src/events/llmp.rs b/libafl/src/events/llmp.rs
index 62caf9d0de..90fd00f864 100644
--- a/libafl/src/events/llmp.rs
+++ b/libafl/src/events/llmp.rs
@@ -29,7 +29,7 @@ use crate::{
Error,
};
-#[cfg(feature = "llmp_compress")]
+#[cfg(feature = "llmp_compression")]
use crate::bolts::{
compress::GzipCompressor,
llmp::{LLMP_FLAG_COMPRESSED, LLMP_FLAG_INITIALIZED},
@@ -65,13 +65,14 @@ where
{
stats: Option,
llmp: llmp::LlmpConnection,
- #[cfg(feature = "llmp_compress")]
+ #[cfg(feature = "llmp_compression")]
compressor: GzipCompressor,
phantom: PhantomData<(I, S)>,
}
-#[cfg(feature = "llmp_compress")]
+/// The minimum buffer size at which to compress LLMP IPC messages.
+#[cfg(feature = "llmp_compression")]
const COMPRESS_THRESHOLD: usize = 1024;
impl Drop for LlmpEventManager
@@ -102,7 +103,7 @@ where
Ok(Self {
stats: Some(stats),
llmp: llmp::LlmpConnection::on_port(shmem_provider, port)?,
- #[cfg(feature = "llmp_compress")]
+ #[cfg(feature = "llmp_compression")]
compressor: GzipCompressor::new(COMPRESS_THRESHOLD),
phantom: PhantomData,
})
@@ -116,7 +117,7 @@ where
llmp: llmp::LlmpConnection::IsClient {
client: LlmpClient::on_existing_from_env(shmem_provider, env_name)?,
},
- #[cfg(feature = "llmp_compress")]
+ #[cfg(feature = "llmp_compression")]
compressor: GzipCompressor::new(COMPRESS_THRESHOLD),
// Inserting a nop-stats element here so rust won't complain.
// In any case, the client won't currently use it.
@@ -140,7 +141,7 @@ where
shmem_provider,
description,
)?,
- #[cfg(feature = "llmp_compress")]
+ #[cfg(feature = "llmp_compression")]
compressor: GzipCompressor::new(COMPRESS_THRESHOLD),
// Inserting a nop-stats element here so rust won't complain.
// In any case, the client won't currently use it.
@@ -169,18 +170,24 @@ where
match &mut self.llmp {
llmp::LlmpConnection::IsBroker { broker } => {
let stats = self.stats.as_mut().unwrap();
- #[cfg(feature = "llmp_compress")]
+ #[cfg(feature = "llmp_compression")]
let compressor = &self.compressor;
broker.loop_forever(
&mut |sender_id: u32, tag: Tag, _flags: Flag, msg: &[u8]| {
if tag == LLMP_TAG_EVENT_TO_BOTH {
- #[cfg(feature = "llmp_compress")]
- let event: Event = match compressor.decompress(_flags, msg)? {
- Some(decompressed) => postcard::from_bytes(&decompressed)?,
- None => postcard::from_bytes(msg)?,
- };
- #[cfg(not(feature = "llmp_compress"))]
- let event: Event = postcard::from_bytes(msg)?;
+ #[cfg(not(feature = "llmp_compression"))]
+ let event_bytes = msg;
+ #[cfg(feature = "llmp_compression")]
+ let compressed;
+ #[cfg(feature = "llmp_compression")]
+ let event_bytes =
+ if _flags & LLMP_FLAG_COMPRESSED == LLMP_FLAG_COMPRESSED {
+ compressed = compressor.decompress(msg)?;
+ &compressed
+ } else {
+ msg
+ };
+ let event: Event = postcard::from_bytes(event_bytes)?;
match Self::handle_in_broker(stats, sender_id, &event)? {
BrokerEventResult::Forward => {
Ok(llmp::LlmpMsgHookResult::ForwardToClients)
@@ -339,13 +346,18 @@ where
if tag == _LLMP_TAG_EVENT_TO_BROKER {
panic!("EVENT_TO_BROKER parcel should not have arrived in the client!");
}
- #[cfg(feature = "llmp_compress")]
- let event: Event = match self.compressor.decompress(_flags, msg)? {
- Some(decompressed) => postcard::from_bytes(&decompressed)?,
- None => postcard::from_bytes(msg)?,
+ #[cfg(not(feature = "llmp_compression"))]
+ let event_bytes = msg;
+ #[cfg(feature = "llmp_compression")]
+ let compressed;
+ #[cfg(feature = "llmp_compression")]
+ let event_bytes = if _flags & LLMP_FLAG_COMPRESSED == LLMP_FLAG_COMPRESSED {
+ compressed = self.compressor.decompress(msg)?;
+ &compressed
+ } else {
+ msg
};
- #[cfg(not(feature = "llmp_compress"))]
- let event: Event = postcard::from_bytes(msg)?;
+ let event: Event = postcard::from_bytes(event_bytes)?;
events.push((sender_id, event));
}
}
@@ -361,7 +373,7 @@ where
Ok(count)
}
- #[cfg(feature = "llmp_compress")]
+ #[cfg(feature = "llmp_compression")]
fn fire(&mut self, _state: &mut S, event: Event) -> Result<(), Error> {
let serialized = postcard::to_allocvec(&event)?;
let flags: Flag = LLMP_FLAG_INITIALIZED;
@@ -381,7 +393,7 @@ where
Ok(())
}
- #[cfg(not(feature = "llmp_compress"))]
+ #[cfg(not(feature = "llmp_compression"))]
fn fire(&mut self, _state: &mut S, event: Event) -> Result<(), Error> {
let serialized = postcard::to_allocvec(&event)?;
self.llmp.send_buf(LLMP_TAG_EVENT_TO_BOTH, &serialized)?;
diff --git a/libafl/src/lib.rs b/libafl/src/lib.rs
index b08b258916..421506380d 100644
--- a/libafl/src/lib.rs
+++ b/libafl/src/lib.rs
@@ -51,7 +51,7 @@ pub enum Error {
/// Serialization error
Serialize(String),
/// Compression error
- #[cfg(feature = "llmp_compress")]
+ #[cfg(feature = "llmp_compression")]
Compression(String),
/// File related error
#[cfg(feature = "std")]
@@ -80,7 +80,7 @@ impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Self::Serialize(s) => write!(f, "Error in Serialization: `{0}`", &s),
- #[cfg(feature = "llmp_compress")]
+ #[cfg(feature = "llmp_compression")]
Self::Compression(s) => write!(f, "Error in Compression: `{0}`", &s),
#[cfg(feature = "std")]
Self::File(err) => write!(f, "File IO failed: {:?}", &err),
@@ -106,7 +106,7 @@ impl From for Error {
}
}
-#[cfg(feature = "llmp_compress")]
+#[cfg(feature = "llmp_compression")]
impl From for Error {
fn from(err: compression::prelude::CompressionError) -> Self {
Self::Compression(format!("{:?}", err))