Tree-shaped multi-machine fuzzing (#2302)

* tree-shaped multi-machine fuzzing

* forgot main file

* aaa

* moving things around

* fix

* working?

* remove debug panic

* aaa

* aaa

* fmt

* normal centralized adapted

* removed old useless code

* cleanup

* llmp hooks

* working multi machine apparently?

* aaa

* cleanup (#2305)

* added old message dispatch.
thread safety stuff

* testing things around

* opti opti opti

* :)

* fuzz

* limit the amound received at once to avoid congestion

* remove useless corpus
mv to sqlite
less warnings

* aaa

* ;

* big opti

* adding cfgs

* fix

* fixer

* fix

* s

* clippy and reduce generics

* debugging

* fix

* more robust disconnection

* aaa

* aaa

* aaa

* nostd

* more nostd

* clippy

* not in ci

* unused

* aaa

* doc

* clippy

* clippy

* clippy

* no crash in libpng

* aaa

* aaa

* aaa

* aaa

* graph generator

* fix

* fix

* windows fix all

---------

Co-authored-by: Dongjia "toka" Zhang <tokazerkje@outlook.com>
This commit is contained in:
Romain Malmain 2024-06-17 23:23:01 +02:00 committed by GitHub
parent a4070deee1
commit fa17f47115
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
44 changed files with 2404 additions and 1753 deletions

View File

@ -294,7 +294,8 @@ jobs:
- ./fuzzers/libfuzzer_libpng_launcher
- ./fuzzers/libfuzzer_libpng_accounting
- ./fuzzers/forkserver_libafl_cc
- ./fuzzers/libfuzzer_libpng_tcp_manager
# - ./fuzzers/libfuzzer_libpng_tcp_manager
# - ./fuzzers/sqlite_centralized_multi_machine
- ./fuzzers/backtrace_baby_fuzzers
- ./fuzzers/fuzzbench_qemu
- ./fuzzers/nyx_libxml2_parallel

1
.gitignore vendored
View File

@ -7,6 +7,7 @@ vendor
.DS_Store
.env
*.test
*.tmp
*.swp
*.o

View File

@ -37,6 +37,8 @@ exclude = [
"utils/noaslr",
"utils/gdb_qemu",
"utils/libafl_fmt",
"utils/desyscall",
"utils/multi_machine_generator",
"scripts",
]

View File

@ -136,7 +136,7 @@ pub extern "C" fn libafl_main() {
let monitor = MultiMonitor::new(|s| println!("{s}"));
let mut secondary_run_client = |state: Option<_>,
mut mgr: CentralizedEventManager<_, _>,
mut mgr: CentralizedEventManager<_, _, _, _>,
_core_id: CoreId| {
// Create an observation channel using the coverage map
let edges_observer =

View File

@ -18,7 +18,7 @@ debug = true
[build-dependencies]
cc = { version = "1.0", features = ["parallel"] }
which = "4.4"
which = "6.0"
[dependencies]
libafl = { path = "../../libafl/", features = ["default", "tcp_manager"] }

View File

@ -0,0 +1,34 @@
[package]
name = "libfuzzer_libpng_launcher_centralized_multi_machine"
version = "0.12.0"
authors = ["Romain Malmain <romain.malmain@pm.me>", "Andrea Fioraldi <andreafioraldi@gmail.com>", "Dominik Maier <domenukk@gmail.com>"]
edition = "2021"
[features]
default = ["std"]
std = []
[profile.release]
lto = true
codegen-units = 1
opt-level = 3
debug = true
[build-dependencies]
cc = { version = "1.0", features = ["parallel"] }
which = "6.0"
[dependencies]
# no llmp compression for now, better perfs.
libafl = { path = "../../libafl", default-features = false, features = ["std", "derive", "llmp_small_maps", "llmp_broker_timeouts", "rand_trait", "fork", "prelude", "gzip", "regex", "serdeany_autoreg", "tui_monitor", "std", "derive", "rand_trait", "fork", "prelude", "gzip", "regex", "scalability_introspection", "multi_machine", "errors_backtrace"] }
libafl_bolts = { path = "../../libafl_bolts", features = ["xxh3"] }
libafl_targets = { path = "../../libafl_targets", features = ["sancov_pcguard_hitcounts", "libfuzzer"] }
# TODO Include it only when building cc
libafl_cc = { path = "../../libafl_cc" }
clap = { version = "4.0", features = ["derive"] }
mimalloc = { version = "*", default-features = false }
env_logger = "0.11"
[lib]
name = "libfuzzer_libpng"
crate-type = ["staticlib"]

View File

@ -0,0 +1,133 @@
# Variables
[env]
FUZZER_NAME='fuzzer_libpng_launcher'
CARGO_TARGET_DIR = { value = "${PROJECT_DIR}/target", condition = { env_not_set = ["CARGO_TARGET_DIR"] } }
PROFILE = { value = "release", condition = {env_not_set = ["PROFILE"]} }
PROFILE_DIR = {value = "release", condition = {env_not_set = ["PROFILE_DIR"] }}
LIBAFL_CC = '${CARGO_TARGET_DIR}/${PROFILE_DIR}/libafl_cc'
LIBAFL_CXX = '${CARGO_TARGET_DIR}/${PROFILE}/libafl_cxx'
FUZZER = '${CARGO_TARGET_DIR}/${PROFILE_DIR}/${FUZZER_NAME}'
PROJECT_DIR = { script = ["pwd"] }
[tasks.unsupported]
script_runner="@shell"
script='''
echo "Cargo-make not integrated yet on this platform"
'''
# libpng
[tasks.libpng]
linux_alias = "libpng_unix"
mac_alias = "libpng_unix"
windows_alias = "unsupported"
[tasks.libpng_unix]
condition = { files_not_exist = ["./libpng-1.6.37"]}
script_runner="@shell"
script='''
wget https://github.com/glennrp/libpng/archive/refs/tags/v1.6.37.tar.gz
tar -xvf v1.6.37.tar.gz
'''
# Compilers
[tasks.cxx]
linux_alias = "cxx_unix"
mac_alias = "cxx_unix"
windows_alias = "unsupported"
[tasks.cxx_unix]
command = "cargo"
args = ["build" , "--profile", "${PROFILE}"]
[tasks.cc]
linux_alias = "cc_unix"
mac_alias = "cc_unix"
windows_alias = "unsupported"
[tasks.cc_unix]
command = "cargo"
args = ["build" , "--profile", "${PROFILE}"]
# Library
[tasks.lib]
linux_alias = "lib_unix"
mac_alias = "lib_unix"
windows_alias = "unsupported"
[tasks.lib_unix]
script_runner="@shell"
script='''
cd libpng-1.6.37 && ./configure --enable-shared=no --with-pic=yes --enable-hardware-optimizations=yes
cd "${PROJECT_DIR}"
make -C libpng-1.6.37 CC="${CARGO_TARGET_DIR}/${PROFILE_DIR}/libafl_cc" CXX="${CARGO_TARGET_DIR}/${PROFILE_DIR}/libafl_cxx"
'''
dependencies = [ "libpng", "cxx", "cc" ]
# Harness
[tasks.fuzzer]
linux_alias = "fuzzer_unix"
mac_alias = "fuzzer_unix"
windows_alias = "unsupported"
[tasks.fuzzer_unix]
command = "${CARGO_TARGET_DIR}/${PROFILE_DIR}/libafl_cxx"
args = ["${PROJECT_DIR}/harness.cc", "${PROJECT_DIR}/libpng-1.6.37/.libs/libpng16.a", "-I", "${PROJECT_DIR}/libpng-1.6.37/", "-o", "${FUZZER_NAME}", "-lm", "-lz"]
dependencies = [ "lib", "cxx", "cc" ]
# Run the fuzzer
[tasks.run]
linux_alias = "run_unix"
mac_alias = "run_unix"
windows_alias = "unsupported"
[tasks.run_unix]
script_runner = "@shell"
script='''
./${FUZZER_NAME} --cores 0-1 --input ./corpus --parent-addr 0.0.0.0:12345
'''
dependencies = [ "fuzzer" ]
# Test
[tasks.test]
linux_alias = "test_unix"
mac_alias = "test_mac"
windows_alias = "unsupported"
[tasks.test_unix]
script_runner = "@shell"
script='''
rm -rf libafl_unix_shmem_server || true
timeout 31s ./${FUZZER_NAME} --cores 0-1 --input ./corpus 2>/dev/null | tee fuzz_stdout.log || true
if grep -qa "corpus: 30" fuzz_stdout.log; then
echo "Fuzzer is working"
else
echo "Fuzzer does not generate any testcases or any crashes"
exit 1
fi
'''
dependencies = [ "fuzzer" ]
[tasks.test_mac]
script_runner = "@shell"
script='''
rm -rf libafl_unix_shmem_server || true
timeout 31s ./${FUZZER_NAME} --cores 0 --input ./corpus 2>/dev/null | tee fuzz_stdout.log || true
'''
dependencies = [ "fuzzer" ]
# Clean up
[tasks.clean]
linux_alias = "clean_unix"
mac_alias = "clean_unix"
windows_alias = "unsupported"
[tasks.clean_unix]
# Disable default `clean` definition
clear = true
script_runner="@shell"
script='''
rm -f ./${FUZZER_NAME}
make -C libpng-1.6.37 clean
cargo clean
'''

View File

@ -0,0 +1,47 @@
# Libfuzzer for libpng, with launcher
This folder contains an example fuzzer for libpng, using LLMP for fast multi-process fuzzing and crash detection.
To show off crash detection, we added a `ud2` instruction to the harness, edit harness.cc if you want a non-crashing example.
It has been tested on Linux.
In contrast to the normal libfuzzer libpng example, this uses the `launcher` feature, that automatically spawns `n` child processes, and binds them to a free core.
## Build
To build this example, run
```bash
cargo build --release
```
This will build the library with the fuzzer (src/lib.rs) with the libfuzzer compatibility layer and the SanitizerCoverage runtime functions for coverage feedback.
In addition, it will also build two C and C++ compiler wrappers (bin/libafl_c(libafl_c/xx).rs) that you must use to compile the target.
Then download libpng, and unpack the archive:
```bash
wget https://github.com/glennrp/libpng/archive/refs/tags/v1.6.37.tar.gz
tar -xvf v1.6.37.tar.gz
```
Now compile libpng, using the libafl_cc compiler wrapper:
```bash
cd libpng-1.6.37
./configure
make CC=../target/release/libafl_cc CXX=../target/release/libafl_cxx -j `nproc`
```
You can find the static lib at `libpng-1.6.37/.libs/libpng16.a`.
Now, we have to build the libfuzzer harness and link all together to create our fuzzer binary.
```
cd ..
./target/release/libafl_cxx ./harness.cc libpng-1.6.37/.libs/libpng16.a -I libpng-1.6.37/ -o fuzzer_libpng -lz -lm
```
Afterwards, the fuzzer will be ready to run.
## Run
Just run once, the launcher feature should do the rest.

View File

@ -0,0 +1,42 @@
#!/bin/bash
if [ ! -d "sqlite3" ]; then
curl 'https://sqlite.org/src/tarball/sqlite.tar.gz?r=c78cbf2e86850cc6' -o sqlite3.tar.gz && mkdir sqlite3 && pushd sqlite3 && tar xzf ../sqlite3.tar.gz --strip-components 1 && popd
mkdir corpus
find ./sqlite3 -name "*.test" -exec cp {} corpus/ \;
fi
if [ "$1" = "release" ]; then
cargo build --release
else
cargo build
fi
export CC=`pwd`/target/debug/libafl_cc
export CXX=`pwd`/target/debug/libafl_cxx
export CFLAGS='--libafl'
export CXXFLAGS='--libafl'
export CFLAGS="$CFLAGS -DSQLITE_MAX_LENGTH=128000000 \
-DSQLITE_MAX_SQL_LENGTH=128000000 \
-DSQLITE_MAX_MEMORY=25000000 \
-DSQLITE_PRINTF_PRECISION_LIMIT=1048576 \
-DSQLITE_DEBUG=1 \
-DSQLITE_MAX_PAGE_COUNT=16384"
pushd sqlite3
if [ ! -f "Makefile" ]; then
echo "Run configure..."
./configure
fi
make -j$(nproc)
make sqlite3.c
popd
if [ "$1" = "release" ]; then
./target/release/libafl_cc --libafl -I ./sqlite3 -c ./sqlite3/test/ossfuzz.c -o ./sqlite3/test/ossfuzz.o
./target/release/libafl_cxx --libafl -o ossfuzz ./sqlite3/test/ossfuzz.o ./sqlite3/sqlite3.o -pthread -ldl -lz
else
./target/debug/libafl_cc --libafl -I ./sqlite3 -c ./sqlite3/test/ossfuzz.c -o ./sqlite3/test/ossfuzz.o
./target/debug/libafl_cxx --libafl -o ossfuzz ./sqlite3/test/ossfuzz.o ./sqlite3/sqlite3.o -pthread -ldl -lz
fi

View File

@ -0,0 +1,3 @@
#!/bin/bash
./ossfuzz --cores 0-3 --input ./corpus --parent-addr 0.0.0.0:50000 --broker-port 3000

View File

@ -0,0 +1,3 @@
#!/bin/bash
./ossfuzz --cores 4-7 --input ./corpus

View File

@ -0,0 +1,36 @@
use std::env;
use libafl_cc::{ClangWrapper, CompilerWrapper, ToolWrapper};
pub fn main() {
let args: Vec<String> = env::args().collect();
if args.len() > 1 {
let mut dir = env::current_exe().unwrap();
let wrapper_name = dir.file_name().unwrap().to_str().unwrap();
let is_cpp = match wrapper_name[wrapper_name.len()-2..].to_lowercase().as_str() {
"cc" => false,
"++" | "pp" | "xx" => true,
_ => panic!("Could not figure out if c or c++ wrapper was called. Expected {dir:?} to end with c or cxx"),
};
dir.pop();
let mut cc = ClangWrapper::new();
if let Some(code) = cc
.cpp(is_cpp)
// silence the compiler wrapper output, needed for some configure scripts.
.silence(true)
.parse_args(&args)
.expect("Failed to parse the command line")
.link_staticlib(&dir, "libfuzzer_libpng")
.add_arg("-fsanitize-coverage=trace-pc-guard")
.run()
.expect("Failed to run the wrapped compiler")
{
std::process::exit(code);
}
} else {
panic!("LibAFL CC: No Arguments given");
}
}

View File

@ -0,0 +1,5 @@
pub mod libafl_cc;
fn main() {
libafl_cc::main();
}

View File

@ -0,0 +1,307 @@
//! A libfuzzer-like fuzzer with llmp-multithreading support and restarts
//! The example harness is built for libpng.
//! In this example, you will see the use of the `launcher` feature.
//! The `launcher` will spawn new processes for each cpu core.
use core::time::Duration;
use std::{env, net::SocketAddr, path::PathBuf, str::FromStr};
use clap::{self, Parser};
use libafl::{
corpus::{Corpus, InMemoryCorpus, OnDiskCorpus},
events::{
centralized::CentralizedEventManager, launcher::CentralizedLauncher,
multi_machine::NodeDescriptor, EventConfig,
},
executors::{inprocess::InProcessExecutor, ExitKind},
feedback_or, feedback_or_fast,
feedbacks::{CrashFeedback, MaxMapFeedback, TimeFeedback, TimeoutFeedback},
fuzzer::{Fuzzer, StdFuzzer},
inputs::{BytesInput, HasTargetBytes},
monitors::MultiMonitor,
mutators::{
scheduled::{havoc_mutations, tokens_mutations, StdScheduledMutator},
token_mutations::Tokens,
},
observers::{CanTrack, HitcountsMapObserver, TimeObserver},
schedulers::{IndexesLenTimeMinimizerScheduler, QueueScheduler},
stages::mutational::StdMutationalStage,
state::{HasCorpus, StdState},
Error, HasMetadata,
};
use libafl_bolts::{
core_affinity::{CoreId, Cores},
rands::StdRand,
shmem::{ShMemProvider, StdShMemProvider},
tuples::{tuple_list, Merge},
AsSlice,
};
use libafl_targets::{libfuzzer_initialize, libfuzzer_test_one_input, std_edges_map_observer};
use mimalloc::MiMalloc;
#[global_allocator]
static GLOBAL: MiMalloc = MiMalloc;
/// Parse a millis string to a [`Duration`]. Used for arg parsing.
fn timeout_from_millis_str(time: &str) -> Result<Duration, Error> {
Ok(Duration::from_millis(time.parse()?))
}
/// The commandline args this fuzzer accepts
#[derive(Debug, Parser)]
#[command(
name = "libfuzzer_libpng_launcher",
about = "A libfuzzer-like fuzzer for libpng with llmp-multithreading support and a launcher",
author = "Andrea Fioraldi <andreafioraldi@gmail.com>, Dominik Maier <domenukk@gmail.com>"
)]
struct Opt {
#[arg(
short,
long,
value_parser = Cores::from_cmdline,
help = "Spawn a client in each of the provided cores. Broker runs in the 0th core. 'all' to select all available cores. 'none' to run a client without binding to any core. eg: '1,2-4,6' selects the cores 1,2,3,4,6.",
name = "CORES"
)]
cores: Cores,
#[arg(
short = 'p',
long,
help = "Choose the broker TCP port, default is 1337",
name = "PORT",
default_value = "1337"
)]
broker_port: u16,
#[arg(short = 'a', long, help = "Specify a remote broker", name = "REMOTE")]
remote_broker_addr: Option<SocketAddr>,
#[arg(
short,
long,
help = "Set an initial corpus directory",
name = "INPUT",
required = true
)]
input: Vec<PathBuf>,
#[arg(
short,
long,
help = "Set the output directory, default is ./out",
name = "OUTPUT",
default_value = "./out"
)]
output: PathBuf,
#[arg(
value_parser = timeout_from_millis_str,
short,
long,
help = "Set the exeucution timeout in milliseconds, default is 10000",
name = "TIMEOUT",
default_value = "10000"
)]
timeout: Duration,
/*
/// This fuzzer has hard-coded tokens
#[arg(
short = "x",
long,
help = "Feed the fuzzer with an user-specified list of tokens (often called \"dictionary\"",
name = "TOKENS",
multiple = true
)]
tokens: Vec<PathBuf>,
*/
#[arg(
long,
help = "The address of the parent node to connect to, if any",
name = "PARENT_ADDR",
default_value = None
)]
parent_addr: Option<String>,
#[arg(
long,
help = "The port on which the node will listen on, if children are to be expected",
name = "NODE_LISTENING_PORT",
default_value = None
)]
node_listening_port: Option<u16>,
}
/// The main fn, `no_mangle` as it is a C symbol
#[no_mangle]
pub extern "C" fn libafl_main() {
env_logger::init();
// Registry the metadata types used in this fuzzer
// Needed only on no_std
// unsafe { RegistryBuilder::register::<Tokens>(); }
let opt = Opt::parse();
let broker_port = opt.broker_port;
let cores = opt.cores;
println!(
"Workdir: {:?}",
env::current_dir().unwrap().to_string_lossy().to_string()
);
let shmem_provider = StdShMemProvider::new().expect("Failed to init shared memory");
let monitor = MultiMonitor::new(|s| println!("{s}"));
let mut secondary_run_client = |state: Option<_>,
mut mgr: CentralizedEventManager<_, _, _, _>,
_core_id: CoreId| {
// Create an observation channel using the coverage map
let edges_observer =
HitcountsMapObserver::new(unsafe { std_edges_map_observer("edges") }).track_indices();
// Create an observation channel to keep track of the execution time
let time_observer = TimeObserver::new("time");
// Feedback to rate the interestingness of an input
// This one is composed by two Feedbacks in OR
let mut feedback = feedback_or!(
// New maximization map feedback linked to the edges observer and the feedback state
MaxMapFeedback::new(&edges_observer),
// Time feedback, this one does not need a feedback state
TimeFeedback::new(&time_observer)
);
// A feedback to choose if an input is a solution or not
let mut objective = feedback_or_fast!(CrashFeedback::new(), TimeoutFeedback::new());
// If not restarting, create a State from scratch
let mut state = state.unwrap_or_else(|| {
StdState::new(
// RNG
StdRand::new(),
// Corpus that will be evolved, we keep it in memory for performance
InMemoryCorpus::new(),
// Corpus in which we store solutions (crashes in this example),
// on disk so the user can get them after stopping the fuzzer
OnDiskCorpus::new(&opt.output).unwrap(),
// States of the feedbacks.
// The feedbacks can report the data that should persist in the State.
&mut feedback,
// Same for objective feedbacks
&mut objective,
)
.unwrap()
});
println!("We're a client, let's fuzz :)");
// Create a PNG dictionary if not existing
if state.metadata_map().get::<Tokens>().is_none() {
state.add_metadata(Tokens::from([
vec![137, 80, 78, 71, 13, 10, 26, 10], // PNG header
"IHDR".as_bytes().to_vec(),
"IDAT".as_bytes().to_vec(),
"PLTE".as_bytes().to_vec(),
"IEND".as_bytes().to_vec(),
]));
}
// Setup a basic mutator with a mutational stage
let mutator = StdScheduledMutator::new(havoc_mutations().merge(tokens_mutations()));
let mut stages = tuple_list!(StdMutationalStage::new(mutator));
// A minimization+queue policy to get testcasess from the corpus
let scheduler =
IndexesLenTimeMinimizerScheduler::new(&edges_observer, QueueScheduler::new());
// A fuzzer with feedbacks and a corpus scheduler
let mut fuzzer = StdFuzzer::new(scheduler, feedback, objective);
// The wrapped harness function, calling out to the LLVM-style harness
let mut harness = |input: &BytesInput| {
let target = input.target_bytes();
let buf = target.as_slice();
libfuzzer_test_one_input(buf);
ExitKind::Ok
};
// Create the executor for an in-process function with one observer for edge coverage and one for the execution time
#[cfg(target_os = "linux")]
let mut executor = InProcessExecutor::batched_timeout(
&mut harness,
tuple_list!(edges_observer, time_observer),
&mut fuzzer,
&mut state,
&mut mgr,
opt.timeout,
)?;
#[cfg(not(target_os = "linux"))]
let mut executor = InProcessExecutor::with_timeout(
&mut harness,
tuple_list!(edges_observer, time_observer),
&mut fuzzer,
&mut state,
&mut mgr,
opt.timeout,
)?;
// The actual target run starts here.
// Call LLVMFUzzerInitialize() if present.
let args: Vec<String> = env::args().collect();
if libfuzzer_initialize(&args) == -1 {
println!("Warning: LLVMFuzzerInitialize failed with -1");
}
// In case the corpus is empty (on first run), reset
if state.must_load_initial_inputs() {
state
.load_initial_inputs(&mut fuzzer, &mut executor, &mut mgr, &opt.input)
.unwrap_or_else(|_| panic!("Failed to load initial corpus at {:?}", &opt.input));
println!("We imported {} inputs from disk.", state.corpus().count());
}
if !mgr.is_main() {
fuzzer.fuzz_loop(&mut stages, &mut executor, &mut state, &mut mgr)?;
} else {
let mut empty_stages = tuple_list!();
fuzzer.fuzz_loop(&mut empty_stages, &mut executor, &mut state, &mut mgr)?;
}
Ok(())
};
let mut main_run_client = secondary_run_client.clone(); // clone it just for borrow checker
let parent_addr: Option<SocketAddr> = if let Some(parent_str) = opt.parent_addr {
Some(SocketAddr::from_str(parent_str.as_str()).expect("Wrong parent address"))
} else {
None
};
let mut node_description = NodeDescriptor::builder().parent_addr(parent_addr).build();
if opt.node_listening_port.is_some() {
node_description.node_listening_port = opt.node_listening_port;
}
match CentralizedLauncher::builder()
.shmem_provider(shmem_provider)
.configuration(EventConfig::from_name("default"))
.monitor(monitor)
.secondary_run_client(&mut secondary_run_client)
.main_run_client(&mut main_run_client)
.cores(&cores)
.broker_port(broker_port)
.centralized_broker_port(broker_port + 1)
.remote_broker_addr(opt.remote_broker_addr)
.multi_machine_node_descriptor(node_description)
// .stdout_file(Some("/dev/null"))
.build()
.launch()
{
Ok(()) => (),
Err(Error::ShuttingDown) => println!("Fuzzing stopped by user. Good bye."),
Err(err) => panic!("Failed to run launcher: {err:?}"),
}
}

View File

@ -58,11 +58,8 @@ handle_sigpipe = []
#! ## Additional Components
## Enables `TcpEventManager`, a simple EventManager proxying everything via TCP. This uses `tokio`.
tcp_manager = ["tokio", "std"]
## Enables compression for the TCP manager
tcp_compression = ["tcp_manager", "libafl_bolts/gzip"]
## Enable multi-machine support
multi_machine = ["tokio", "std", "enumflags2", "ahash/std"]
## Enables the `NaiveTokenizer` and `StacktraceObserver`
regex = ["std", "dep:regex"]
@ -158,17 +155,18 @@ typed-builder = { version = "0.18", optional = true } # Implement the builder pa
serde_json = { version = "1.0", optional = true, default-features = false, features = ["alloc"] }
nix = { version = "0.29", optional = true }
regex = { version = "1", optional = true }
uuid = { version = "1.4", optional = true, features = ["serde", "v4"] }
libm = "0.2.2"
ratatui = { version = "0.26.3", default-features = false, features = ['crossterm'], optional = true } # Commandline rendering, for TUI Monitor
crossterm = { version = "0.27.0", optional = true }
uuid = { version = "1.8", optional = true, features = ["serde", "v4"] }
libm = "0.2"
ratatui = { version = "0.26", default-features = false, features = ['crossterm'], optional = true } # Commandline rendering, for TUI Monitor
crossterm = { version = "0.27", optional = true }
prometheus-client = { version = "0.22", optional = true } # For the prometheus monitor
tide = { version = "0.16.0", optional = true }
async-std = { version = "1.12.0", features = ["attributes"], optional = true }
futures = { version = "0.3.24", optional = true }
log = "0.4.20"
tokio = { version = "1.28.1", optional = true, features = ["sync", "net", "rt", "io-util", "macros"] } # only used for TCP Event Manager right now
tide = { version = "0.16", optional = true }
async-std = { version = "1.12", features = ["attributes"], optional = true }
futures = { version = "0.3", optional = true }
log = { version = "0.4", features = ["release_max_level_info"] }
tokio = { version = "1.38", optional = true, features = ["sync", "net", "rt", "io-util", "macros", "rt-multi-thread", "time"] } # used for TCP Event Manager and multi-machine
enumflags2 = { version = "0.7", optional = true }
wait-timeout = { version = "0.2", optional = true } # used by CommandExecutor to wait for child process

View File

@ -1,4 +1,5 @@
use std::{fmt::Debug, marker::PhantomData};
use alloc::vec::Vec;
use core::{fmt::Debug, marker::PhantomData};
#[cfg(feature = "llmp_compression")]
use libafl_bolts::{compress::GzipCompressor, llmp::LLMP_FLAG_COMPRESSED};
@ -34,6 +35,7 @@ where
msg_tag: &mut Tag,
_msg_flags: &mut Flags,
msg: &mut [u8],
_new_msgs: &mut Vec<(Tag, Flags, Vec<u8>)>,
) -> Result<LlmpMsgHookResult, Error> {
if *msg_tag == _LLMP_TAG_TO_MAIN {
#[cfg(feature = "llmp_compression")]
@ -93,16 +95,7 @@ where
event: &Event<I>,
) -> Result<BrokerEventResult, Error> {
match &event {
Event::NewTestcase {
input: _,
client_config: _,
exit_kind: _,
corpus_size: _,
observers_buf: _,
time: _,
executions: _,
forward_id: _,
} => Ok(BrokerEventResult::Forward),
Event::NewTestcase { .. } => Ok(BrokerEventResult::Forward),
_ => Ok(BrokerEventResult::Handled),
}
}

View File

@ -0,0 +1,281 @@
use std::{
fmt::{Debug, Display},
marker::PhantomData,
slice,
sync::Arc,
vec::Vec,
};
#[cfg(feature = "llmp_compression")]
use libafl_bolts::llmp::LLMP_FLAG_COMPRESSED;
use libafl_bolts::{
llmp::{Flags, LlmpBrokerInner, LlmpHook, LlmpMsgHookResult, Tag},
ownedref::OwnedRef,
shmem::ShMemProvider,
ClientId, Error,
};
use log::debug;
use tokio::{
net::ToSocketAddrs,
runtime::Runtime,
sync::{RwLock, RwLockWriteGuard},
task::JoinHandle,
};
use crate::{
events::{
centralized::_LLMP_TAG_TO_MAIN,
multi_machine::{MultiMachineMsg, TcpMultiMachineState},
Event,
},
inputs::Input,
};
/// Makes a raw pointer send + sync.
/// Extremely unsafe to use in general, only use this if you know what you're doing.
#[derive(Debug, Clone, Copy)]
pub struct NullLock<T> {
value: T,
}
unsafe impl<T> Send for NullLock<T> {}
unsafe impl<T> Sync for NullLock<T> {}
impl<T> NullLock<T> {
/// Instantiate a [`NullLock`]
///
/// # Safety
///
/// The null lock makes anything Send + Sync, which is usually very dangerous.
pub unsafe fn new(value: T) -> Self {
Self { value }
}
/// Get a reference to value
pub fn get(&self) -> &T {
&self.value
}
/// Get a mutable reference to value
pub fn get_mut(&mut self) -> &mut T {
&mut self.value
}
/// Get back the value
pub fn into_innter(self) -> T {
self.value
}
}
/// The Receiving side of the multi-machine architecture
/// It is responsible for receiving messages from other neighbours.
/// Please check [`crate::events::multi_machine`] for more information.
#[derive(Debug)]
pub struct TcpMultiMachineLlmpSenderHook<A, I>
where
I: Input,
{
/// the actual state of the broker hook
shared_state: Arc<RwLock<TcpMultiMachineState<A>>>,
/// the tokio runtime used to interact with other machines. Keep it outside to avoid locking it.
rt: Arc<Runtime>,
phantom: PhantomData<I>,
}
/// The Receiving side of the multi-machine architecture
/// It is responsible for receiving messages from other neighbours.
/// Please check [`crate::events::multi_machine`] for more information.
#[derive(Debug)]
pub struct TcpMultiMachineLlmpReceiverHook<A, I>
where
I: Input,
{
/// the actual state of the broker hook
shared_state: Arc<RwLock<TcpMultiMachineState<A>>>,
/// the tokio runtime used to interact with other machines. Keep it outside to avoid locking it.
rt: Arc<Runtime>,
phantom: PhantomData<I>,
}
impl<A, I> TcpMultiMachineLlmpSenderHook<A, I>
where
A: Clone + Display + ToSocketAddrs + Send + Sync + 'static,
I: Input + Send + Sync + 'static,
{
/// Should not be created alone. Use [`TcpMultiMachineBuilder`] instead.
pub(crate) fn new(
shared_state: Arc<RwLock<TcpMultiMachineState<A>>>,
rt: Arc<Runtime>,
) -> Self {
Self {
shared_state,
rt,
phantom: PhantomData,
}
}
}
impl<A, I> TcpMultiMachineLlmpReceiverHook<A, I>
where
A: Clone + Display + ToSocketAddrs + Send + Sync + 'static,
I: Input + Send + Sync + 'static,
{
/// Should not be created alone. Use [`TcpMultiMachineBuilder`] instead.
pub(crate) fn new(
shared_state: Arc<RwLock<TcpMultiMachineState<A>>>,
rt: Arc<Runtime>,
) -> Self {
Self {
shared_state,
rt,
phantom: PhantomData,
}
}
#[cfg(feature = "llmp_compression")]
fn try_compress(
state_lock: &mut RwLockWriteGuard<TcpMultiMachineState<A>>,
event: &Event<I>,
) -> Result<(Flags, Vec<u8>), Error> {
let serialized = postcard::to_allocvec(&event)?;
match state_lock.compressor().maybe_compress(&serialized) {
Some(comp_buf) => Ok((LLMP_FLAG_COMPRESSED, comp_buf)),
None => Ok((Flags(0), serialized)),
}
}
#[cfg(not(feature = "llmp_compression"))]
fn try_compress(
_state_lock: &mut RwLockWriteGuard<TcpMultiMachineState<A>>,
event: &Event<I>,
) -> Result<(Flags, Vec<u8>), Error> {
Ok((Flags(0), postcard::to_allocvec(&event)?))
}
}
impl<A, I, SP> LlmpHook<SP> for TcpMultiMachineLlmpSenderHook<A, I>
where
A: Clone + Debug + Display + ToSocketAddrs + Send + Sync + 'static,
SP: ShMemProvider,
I: Input + Send + Sync + 'static,
{
/// check for received messages, and forward them alongside the incoming message to inner.
fn on_new_message(
&mut self,
_broker_inner: &mut LlmpBrokerInner<SP>,
_client_id: ClientId,
_msg_tag: &mut Tag,
_msg_flags: &mut Flags,
msg: &mut [u8],
_new_msgs: &mut Vec<(Tag, Flags, Vec<u8>)>,
) -> Result<LlmpMsgHookResult, Error> {
let shared_state = self.shared_state.clone();
// Here, we suppose msg will never be written again and will always be available.
// Thus, it is safe to handle this in a separate thread.
let msg_lock = unsafe { NullLock::new((msg.as_ptr(), msg.len())) };
// let flags = msg_flags.clone();
let _handle: JoinHandle<Result<(), Error>> = self.rt.spawn(async move {
let mut state_wr_lock = shared_state.write().await;
let (msg_ptr, msg_len) = msg_lock.into_innter();
let msg: &[u8] = unsafe { slice::from_raw_parts(msg_ptr, msg_len) }; // most likely crash here
// #[cfg(not(feature = "llmp_compression"))]
// let event_bytes = msg;
// #[cfg(feature = "llmp_compression")]
// let compressed;
// #[cfg(feature = "llmp_compression")]
// let event_bytes = if flags & LLMP_FLAG_COMPRESSED == LLMP_FLAG_COMPRESSED {
// compressed = state_wr_lock.compressor().decompress(msg)?;
// &compressed
// } else {
// &*msg
// };
// let event: Event<I> = postcard::from_bytes(event_bytes)?;
let mm_msg: MultiMachineMsg<I> = MultiMachineMsg::llmp_msg(OwnedRef::Ref(msg));
// TODO: do not copy here
state_wr_lock.add_past_msg(msg);
debug!("Sending msg...");
state_wr_lock
.send_interesting_event_to_nodes(&mm_msg)
.await?;
Ok(())
});
Ok(LlmpMsgHookResult::ForwardToClients)
}
}
impl<A, I, SP> LlmpHook<SP> for TcpMultiMachineLlmpReceiverHook<A, I>
where
A: Clone + Debug + Display + ToSocketAddrs + Send + Sync + 'static,
SP: ShMemProvider,
I: Input + Send + Sync + 'static,
{
/// check for received messages, and forward them alongside the incoming message to inner.
fn on_new_message(
&mut self,
_broker_inner: &mut LlmpBrokerInner<SP>,
_client_id: ClientId,
_msg_tag: &mut Tag,
_msg_flags: &mut Flags,
_msg: &mut [u8],
new_msgs: &mut Vec<(Tag, Flags, Vec<u8>)>,
) -> Result<LlmpMsgHookResult, Error> {
let shared_state = self.shared_state.clone();
let res: Result<(), Error> = self.rt.block_on(async move {
let mut state_wr_lock = shared_state.write().await;
let mut incoming_msgs: Vec<MultiMachineMsg<I>> = Vec::new();
state_wr_lock
.receive_new_messages_from_nodes(&mut incoming_msgs)
.await?;
debug!("received {} new incoming msg(s)", incoming_msgs.len());
let msgs_to_forward: Result<Vec<(Tag, Flags, Vec<u8>)>, Error> = incoming_msgs
.into_iter()
.map(|mm_msg| match mm_msg {
MultiMachineMsg::LlmpMsg(msg) => {
let msg = msg.into_owned().unwrap().into_vec();
#[cfg(feature = "llmp_compression")]
match state_wr_lock.compressor().maybe_compress(msg.as_ref()) {
Some(comp_buf) => {
Ok((_LLMP_TAG_TO_MAIN, LLMP_FLAG_COMPRESSED, comp_buf))
}
None => Ok((_LLMP_TAG_TO_MAIN, Flags(0), msg)),
}
#[cfg(not(feature = "llmp_compression"))]
Ok((_LLMP_TAG_TO_MAIN, Flags(0), msg))
}
MultiMachineMsg::Event(evt) => {
let evt = evt.into_owned().unwrap();
let (inner_flags, buf) =
Self::try_compress(&mut state_wr_lock, evt.as_ref())?;
Ok((_LLMP_TAG_TO_MAIN, inner_flags, buf))
}
})
.collect();
new_msgs.extend(msgs_to_forward?);
Ok(())
});
res?;
// Add incoming events to the ones we should filter
// events.extend_from_slice(&incoming_events);
Ok(LlmpMsgHookResult::ForwardToClients)
}
}

View File

@ -1,4 +1,5 @@
//! Standard LLMP hook
//! Hooks called on broker side
use alloc::vec::Vec;
use core::marker::PhantomData;
#[cfg(feature = "llmp_compression")]
@ -21,6 +22,14 @@ use crate::{
/// centralized hook
#[cfg(all(unix, feature = "std"))]
pub mod centralized;
#[cfg(all(unix, feature = "std"))]
pub use centralized::*;
/// Multi-machine hook
#[cfg(all(unix, feature = "multi_machine"))]
pub mod centralized_multi_machine;
#[cfg(all(unix, feature = "multi_machine"))]
pub use centralized_multi_machine::*;
/// An LLMP-backed event hook for scalable multi-processed fuzzing
#[derive(Debug)]
@ -45,6 +54,7 @@ where
#[cfg(feature = "llmp_compression")] msg_flags: &mut Flags,
#[cfg(not(feature = "llmp_compression"))] _msg_flags: &mut Flags,
msg: &mut [u8],
_new_msgs: &mut Vec<(Tag, Flags, Vec<u8>)>,
) -> Result<LlmpMsgHookResult, Error> {
let monitor = &mut self.monitor;
#[cfg(feature = "llmp_compression")]
@ -102,14 +112,11 @@ where
) -> Result<BrokerEventResult, Error> {
match &event {
Event::NewTestcase {
input: _,
client_config: _,
exit_kind: _,
corpus_size,
observers_buf: _,
time,
executions,
forward_id,
..
} => {
let id = if let Some(id) = *forward_id {
id

View File

@ -9,6 +9,7 @@
use alloc::{boxed::Box, string::String, vec::Vec};
use core::{fmt::Debug, time::Duration};
use std::{marker::PhantomData, process};
#[cfg(feature = "llmp_compression")]
use libafl_bolts::{
@ -21,6 +22,7 @@ use libafl_bolts::{
tuples::Handle,
ClientId,
};
use log::debug;
use serde::{Deserialize, Serialize};
use super::NopEventManager;
@ -31,14 +33,14 @@ use crate::state::HasScalabilityMonitor;
use crate::{
events::{
AdaptiveSerializer, CustomBufEventResult, Event, EventConfig, EventFirer, EventManager,
EventManagerId, EventProcessor, EventRestarter, HasCustomBufHandlers, HasEventManagerId,
LogSeverity, ProgressReporter,
EventManagerHooksTuple, EventManagerId, EventProcessor, EventRestarter,
HasCustomBufHandlers, HasEventManagerId, LogSeverity, ProgressReporter,
},
executors::{Executor, HasObservers},
fuzzer::{EvaluatorObservers, ExecutionProcessor},
inputs::{Input, NopInput, UsesInput},
observers::{ObserversTuple, TimeObserver},
state::{HasExecutions, HasLastReportTime, NopState, UsesState},
state::{HasExecutions, HasLastReportTime, NopState, State, UsesState},
Error, HasMetadata,
};
@ -46,21 +48,32 @@ pub(crate) const _LLMP_TAG_TO_MAIN: Tag = Tag(0x3453453);
/// A wrapper manager to implement a main-secondary architecture with another broker
#[derive(Debug)]
pub struct CentralizedEventManager<EM, SP>
pub struct CentralizedEventManager<EM, EMH, S, SP>
where
EM: UsesState,
EMH: EventManagerHooksTuple<EM::State>,
S: State,
SP: ShMemProvider,
{
inner: EM,
/// The LLMP client for inter process communication
/// The centralized LLMP client for inter process communication
client: LlmpClient<SP>,
#[cfg(feature = "llmp_compression")]
compressor: GzipCompressor,
time_ref: Option<Handle<TimeObserver>>,
hooks: EMH,
is_main: bool,
phantom: PhantomData<S>,
}
impl CentralizedEventManager<NopEventManager<NopState<NopInput>>, NopShMemProvider> {
impl
CentralizedEventManager<
NopEventManager<NopState<NopInput>>,
(),
NopState<NopInput>,
NopShMemProvider,
>
{
/// Creates a builder for [`CentralizedEventManager`]
#[must_use]
pub fn builder() -> CentralizedEventManagerBuilder {
@ -93,24 +106,29 @@ impl CentralizedEventManagerBuilder {
Self { is_main }
}
/// Creates a new `CentralizedEventManager`
pub fn build_from_client<EM, SP>(
/// Creates a new [`CentralizedEventManager`].
pub fn build_from_client<EM, EMH, S, SP>(
self,
inner: EM,
hooks: EMH,
client: LlmpClient<SP>,
time_obs: Option<Handle<TimeObserver>>,
) -> Result<CentralizedEventManager<EM, SP>, Error>
) -> Result<CentralizedEventManager<EM, EMH, S, SP>, Error>
where
SP: ShMemProvider,
EM: UsesState,
EMH: EventManagerHooksTuple<EM::State>,
S: State,
SP: ShMemProvider,
{
Ok(CentralizedEventManager {
inner,
hooks,
client,
#[cfg(feature = "llmp_compression")]
compressor: GzipCompressor::with_threshold(COMPRESS_THRESHOLD),
time_ref: time_obs,
is_main: self.is_main,
phantom: PhantomData,
})
}
@ -119,86 +137,105 @@ impl CentralizedEventManagerBuilder {
/// If the port is not yet bound, it will act as a broker; otherwise, it
/// will act as a client.
#[cfg(feature = "std")]
pub fn build_on_port<EM, SP>(
pub fn build_on_port<EM, EMH, S, SP>(
self,
inner: EM,
hooks: EMH,
shmem_provider: SP,
port: u16,
time_obs: Option<Handle<TimeObserver>>,
) -> Result<CentralizedEventManager<EM, SP>, Error>
) -> Result<CentralizedEventManager<EM, EMH, S, SP>, Error>
where
SP: ShMemProvider,
EM: UsesState,
EMH: EventManagerHooksTuple<EM::State>,
S: State,
SP: ShMemProvider,
{
let client = LlmpClient::create_attach_to_tcp(shmem_provider, port)?;
Ok(CentralizedEventManager {
inner,
hooks,
client,
#[cfg(feature = "llmp_compression")]
compressor: GzipCompressor::with_threshold(COMPRESS_THRESHOLD),
time_ref: time_obs,
is_main: self.is_main,
phantom: PhantomData,
})
}
/// If a client respawns, it may reuse the existing connection, previously
/// stored by [`LlmpClient::to_env()`].
#[cfg(feature = "std")]
pub fn build_existing_client_from_env<EM, SP>(
pub fn build_existing_client_from_env<EM, EMH, S, SP>(
self,
inner: EM,
hooks: EMH,
shmem_provider: SP,
env_name: &str,
time_obs: Option<Handle<TimeObserver>>,
) -> Result<CentralizedEventManager<EM, SP>, Error>
) -> Result<CentralizedEventManager<EM, EMH, S, SP>, Error>
where
EM: UsesState,
EMH: EventManagerHooksTuple<EM::State>,
S: State,
SP: ShMemProvider,
{
Ok(CentralizedEventManager {
inner,
hooks,
client: LlmpClient::on_existing_from_env(shmem_provider, env_name)?,
#[cfg(feature = "llmp_compression")]
compressor: GzipCompressor::with_threshold(COMPRESS_THRESHOLD),
time_ref: time_obs,
is_main: self.is_main,
phantom: PhantomData,
})
}
/// Create an existing client from description
#[cfg(feature = "std")]
pub fn existing_client_from_description<EM, SP>(
pub fn existing_client_from_description<EM, EMH, S, SP>(
self,
inner: EM,
hooks: EMH,
shmem_provider: SP,
description: &LlmpClientDescription,
time_obs: Option<Handle<TimeObserver>>,
) -> Result<CentralizedEventManager<EM, SP>, Error>
) -> Result<CentralizedEventManager<EM, EMH, S, SP>, Error>
where
EM: UsesState,
EMH: EventManagerHooksTuple<EM::State>,
S: State,
SP: ShMemProvider,
{
Ok(CentralizedEventManager {
inner,
hooks,
client: LlmpClient::existing_client_from_description(shmem_provider, description)?,
#[cfg(feature = "llmp_compression")]
compressor: GzipCompressor::with_threshold(COMPRESS_THRESHOLD),
time_ref: time_obs,
is_main: self.is_main,
phantom: PhantomData,
})
}
}
impl<EM, SP> UsesState for CentralizedEventManager<EM, SP>
impl<EM, EMH, S, SP> UsesState for CentralizedEventManager<EM, EMH, S, SP>
where
EM: UsesState,
EMH: EventManagerHooksTuple<EM::State>,
S: State,
SP: ShMemProvider,
{
type State = EM::State;
}
impl<EM, SP> AdaptiveSerializer for CentralizedEventManager<EM, SP>
impl<EM, EMH, S, SP> AdaptiveSerializer for CentralizedEventManager<EM, EMH, S, SP>
where
EM: AdaptiveSerializer + UsesState,
EMH: EventManagerHooksTuple<EM::State>,
S: State,
SP: ShMemProvider,
{
fn serialization_time(&self) -> Duration {
@ -232,9 +269,11 @@ where
}
}
impl<EM, SP> EventFirer for CentralizedEventManager<EM, SP>
impl<EM, EMH, S, SP> EventFirer for CentralizedEventManager<EM, EMH, S, SP>
where
EM: AdaptiveSerializer + EventFirer + HasEventManagerId,
EMH: EventManagerHooksTuple<EM::State>,
S: State,
SP: ShMemProvider,
{
fn should_send(&self) -> bool {
@ -251,25 +290,12 @@ where
let mut is_tc = false;
// Forward to main only if new tc or heartbeat
let should_be_forwarded = match &mut event {
Event::NewTestcase {
input: _,
client_config: _,
exit_kind: _,
corpus_size: _,
time: _,
executions: _,
observers_buf: _,
forward_id,
} => {
Event::NewTestcase { forward_id, .. } => {
*forward_id = Some(ClientId(self.inner.mgr_id().0 as u32));
is_tc = true;
true
}
Event::UpdateExecStats {
time: _,
executions: _,
phantom: _,
} => true, // send it but this guy won't be handled. the only purpose is to keep this client alive else the broker thinks it is dead and will dc it
Event::UpdateExecStats { .. } => true, // send it but this guy won't be handled. the only purpose is to keep this client alive else the broker thinks it is dead and will dc it
_ => false,
};
@ -313,9 +339,11 @@ where
}
}
impl<EM, SP> EventRestarter for CentralizedEventManager<EM, SP>
impl<EM, EMH, S, SP> EventRestarter for CentralizedEventManager<EM, EMH, S, SP>
where
EM: EventRestarter,
EMH: EventManagerHooksTuple<EM::State>,
S: State,
SP: ShMemProvider,
{
#[inline]
@ -337,15 +365,17 @@ where
}
}
impl<E, EM, SP, Z> EventProcessor<E, Z> for CentralizedEventManager<EM, SP>
impl<E, EM, EMH, S, SP, Z> EventProcessor<E, Z> for CentralizedEventManager<EM, EMH, S, SP>
where
EM: AdaptiveSerializer + EventProcessor<E, Z> + EventFirer + HasEventManagerId,
EMH: EventManagerHooksTuple<EM::State>,
E: HasObservers<State = Self::State> + Executor<Self, Z>,
for<'a> E::Observers: Deserialize<'a>,
Z: EvaluatorObservers<E::Observers, State = Self::State>
+ ExecutionProcessor<E::Observers, State = Self::State>,
S: State,
Self::State: HasExecutions + HasMetadata,
SP: ShMemProvider,
Z: EvaluatorObservers<E::Observers, State = Self::State>
+ ExecutionProcessor<E::Observers, State = Self::State>,
{
fn process(
&mut self,
@ -356,6 +386,7 @@ where
if self.is_main {
// main node
self.receive_from_secondary(fuzzer, state, executor)
// self.inner.process(fuzzer, state, executor)
} else {
// The main node does not process incoming events from the broker ATM
self.inner.process(fuzzer, state, executor)
@ -363,21 +394,25 @@ where
}
}
impl<E, EM, SP, Z> EventManager<E, Z> for CentralizedEventManager<EM, SP>
impl<E, EM, EMH, S, SP, Z> EventManager<E, Z> for CentralizedEventManager<EM, EMH, S, SP>
where
EM: AdaptiveSerializer + EventManager<E, Z>,
EM::State: HasExecutions + HasMetadata + HasLastReportTime,
E: HasObservers<State = Self::State> + Executor<Self, Z>,
for<'a> E::Observers: Deserialize<'a>,
EM: AdaptiveSerializer + EventManager<E, Z>,
EM::State: HasExecutions + HasMetadata + HasLastReportTime,
EMH: EventManagerHooksTuple<EM::State>,
S: State,
SP: ShMemProvider,
Z: EvaluatorObservers<E::Observers, State = Self::State>
+ ExecutionProcessor<E::Observers, State = Self::State>,
SP: ShMemProvider,
{
}
impl<EM, SP> HasCustomBufHandlers for CentralizedEventManager<EM, SP>
impl<EM, EMH, S, SP> HasCustomBufHandlers for CentralizedEventManager<EM, EMH, S, SP>
where
EM: HasCustomBufHandlers,
EMH: EventManagerHooksTuple<EM::State>,
S: State,
SP: ShMemProvider,
{
/// Adds a custom buffer handler that will run for each incoming `CustomBuf` event.
@ -391,17 +426,21 @@ where
}
}
impl<EM, SP> ProgressReporter for CentralizedEventManager<EM, SP>
impl<EM, EMH, S, SP> ProgressReporter for CentralizedEventManager<EM, EMH, S, SP>
where
EM: AdaptiveSerializer + ProgressReporter + HasEventManagerId,
EM::State: HasMetadata + HasExecutions + HasLastReportTime,
EMH: EventManagerHooksTuple<EM::State>,
S: State,
SP: ShMemProvider,
{
}
impl<EM, SP> HasEventManagerId for CentralizedEventManager<EM, SP>
impl<EM, EMH, S, SP> HasEventManagerId for CentralizedEventManager<EM, EMH, S, SP>
where
EM: HasEventManagerId + UsesState,
EMH: EventManagerHooksTuple<EM::State>,
S: State,
SP: ShMemProvider,
{
fn mgr_id(&self) -> EventManagerId {
@ -409,9 +448,11 @@ where
}
}
impl<EM, SP> CentralizedEventManager<EM, SP>
impl<EM, EMH, S, SP> CentralizedEventManager<EM, EMH, S, SP>
where
EM: UsesState,
EMH: EventManagerHooksTuple<EM::State>,
S: State,
SP: ShMemProvider,
{
/// Describe the client event manager's LLMP parts in a restorable fashion
@ -432,9 +473,11 @@ where
}
}
impl<EM, SP> CentralizedEventManager<EM, SP>
impl<EM, EMH, S, SP> CentralizedEventManager<EM, EMH, S, SP>
where
EM: UsesState + EventFirer + AdaptiveSerializer + HasEventManagerId,
EMH: EventManagerHooksTuple<EM::State>,
S: State,
SP: ShMemProvider,
{
#[cfg(feature = "llmp_compression")]
@ -508,6 +551,7 @@ where
};
let event: Event<<<Self as UsesState>::State as UsesInput>::Input> =
postcard::from_bytes(event_bytes)?;
debug!("Processor received message {}", event.name_detailed());
self.handle_in_main(fuzzer, executor, state, client_id, event)?;
count += 1;
}
@ -530,6 +574,10 @@ where
Z: ExecutionProcessor<E::Observers, State = <Self as UsesState>::State>
+ EvaluatorObservers<E::Observers>,
{
debug!("handle_in_main!");
let event_name = event.name_detailed();
match event {
Event::NewTestcase {
input,
@ -540,8 +588,13 @@ where
time,
executions,
forward_id,
#[cfg(feature = "multi_machine")]
node_id,
} => {
log::info!("Received new Testcase from {client_id:?} ({client_config:?}, forward {forward_id:?})");
debug!(
"Received {} from {client_id:?} ({client_config:?}, forward {forward_id:?})",
event_name
);
let res =
if client_config.match_with(&self.configuration()) && observers_buf.is_some() {
@ -551,6 +604,11 @@ where
{
state.scalability_monitor_mut().testcase_with_observers += 1;
}
debug!(
"[{}] Running fuzzer with event {}",
process::id(),
event_name
);
fuzzer.execute_and_process(
state,
self,
@ -564,6 +622,11 @@ where
{
state.scalability_monitor_mut().testcase_without_observers += 1;
}
debug!(
"[{}] Running fuzzer with event {}",
process::id(),
event_name
);
fuzzer.evaluate_input_with_observers::<E, Self>(
state,
executor,
@ -574,30 +637,41 @@ where
};
if let Some(item) = res.1 {
if res.1.is_some() {
self.inner.fire(
state,
Event::NewTestcase {
input,
client_config,
exit_kind,
corpus_size,
observers_buf,
time,
executions,
forward_id,
},
)?;
}
log::info!("Added received Testcase as item #{item}");
let event = Event::NewTestcase {
input,
client_config,
exit_kind,
corpus_size,
observers_buf,
time,
executions,
forward_id,
#[cfg(feature = "multi_machine")]
node_id,
};
self.hooks.on_fire_all(state, client_id, &event)?;
debug!(
"[{}] Adding received Testcase {} as item #{item}...",
process::id(),
event_name
);
self.inner.fire(state, event)?;
} else {
debug!("[{}] {} was discarded...)", process::id(), event_name);
}
Ok(())
}
_ => Err(Error::unknown(format!(
"Received illegal message that message should not have arrived: {:?}.",
event.name()
))),
_ => {
return Err(Error::unknown(format!(
"Received illegal message that message should not have arrived: {:?}.",
event.name()
)));
}
}
Ok(())
}
}

View File

@ -5,7 +5,13 @@ use libafl_bolts::ClientId;
use crate::{events::Event, state::State, Error};
/// The hooks that are run before and after the event manager calls `handle_in_client`
/// node hook, for multi-machine fuzzing
// #[cfg(feature = "multi_machine")]
// pub mod multi_machine;
// #[cfg(feature = "multi_machine")]
// pub use multi_machine::*;
/// The `broker_hooks` that are run before and after the event manager calls `handle_in_client`
pub trait EventManagerHook<S>
where
S: State,
@ -18,12 +24,25 @@ where
client_id: ClientId,
event: &Event<S::Input>,
) -> Result<bool, Error>;
/// Triggered when the even manager decides to fire the event after processing
fn on_fire(
&mut self,
_state: &mut S,
_client_id: ClientId,
_event: &Event<S::Input>,
) -> Result<(), Error> {
Ok(())
}
/// The hook that runs after `handle_in_client`
/// Return false if you want to cancel the subsequent event handling
fn post_exec(&mut self, state: &mut S, client_id: ClientId) -> Result<bool, Error>;
fn post_exec(&mut self, _state: &mut S, _client_id: ClientId) -> Result<bool, Error> {
Ok(true)
}
}
/// The tuples contains hooks to be executed for `handle_in_client`
/// The tuples contains `broker_hooks` to be executed for `handle_in_client`
pub trait EventManagerHooksTuple<S>
where
S: State,
@ -35,6 +54,15 @@ where
client_id: ClientId,
event: &Event<S::Input>,
) -> Result<bool, Error>;
/// Ran when the Event Manager decides to accept an event and propagates it
fn on_fire_all(
&mut self,
state: &mut S,
client_id: ClientId,
event: &Event<S::Input>,
) -> Result<(), Error>;
/// The hook that runs after `handle_in_client`
fn post_exec_all(&mut self, state: &mut S, client_id: ClientId) -> Result<bool, Error>;
}
@ -52,6 +80,16 @@ where
) -> Result<bool, Error> {
Ok(true)
}
fn on_fire_all(
&mut self,
_state: &mut S,
_client_id: ClientId,
_event: &Event<S::Input>,
) -> Result<(), Error> {
Ok(())
}
/// The hook that runs after `handle_in_client`
fn post_exec_all(&mut self, _state: &mut S, _client_id: ClientId) -> Result<bool, Error> {
Ok(true)
@ -75,6 +113,17 @@ where
let second = self.1.pre_exec_all(state, client_id, event)?;
Ok(first & second)
}
fn on_fire_all(
&mut self,
state: &mut S,
client_id: ClientId,
event: &Event<S::Input>,
) -> Result<(), Error> {
self.0.on_fire(state, client_id, event)?;
self.1.on_fire_all(state, client_id, event)
}
/// The hook that runs after `handle_in_client`
fn post_exec_all(&mut self, state: &mut S, client_id: ClientId) -> Result<bool, Error> {
let first = self.0.post_exec(state, client_id)?;

View File

@ -14,13 +14,13 @@
use alloc::string::ToString;
#[cfg(feature = "std")]
use core::marker::PhantomData;
#[cfg(feature = "std")]
use core::time::Duration;
use core::{
fmt::{self, Debug, Formatter},
num::NonZeroUsize,
};
#[cfg(all(unix, feature = "std", feature = "fork"))]
use std::boxed::Box;
#[cfg(feature = "std")]
use std::net::SocketAddr;
#[cfg(all(feature = "std", any(windows, not(feature = "fork"))))]
@ -28,6 +28,8 @@ use std::process::Stdio;
#[cfg(all(unix, feature = "std"))]
use std::{fs::File, os::unix::io::AsRawFd};
#[cfg(all(unix, feature = "std", feature = "fork"))]
use libafl_bolts::llmp::Brokers;
#[cfg(all(unix, feature = "std", feature = "fork"))]
use libafl_bolts::llmp::LlmpBroker;
#[cfg(all(unix, feature = "std"))]
@ -44,16 +46,25 @@ use libafl_bolts::{
shmem::ShMemProvider,
tuples::{tuple_list, Handle},
};
#[cfg(all(unix, feature = "std", feature = "fork"))]
use log::debug;
#[cfg(feature = "std")]
use typed_builder::TypedBuilder;
use super::hooks::EventManagerHooksTuple;
use super::EventManagerHooksTuple;
#[cfg(all(unix, feature = "std", feature = "fork"))]
use super::StdLlmpEventHook;
#[cfg(all(unix, feature = "std", feature = "fork", feature = "multi_machine"))]
use crate::events::multi_machine::NodeDescriptor;
#[cfg(all(unix, feature = "std", feature = "fork", feature = "multi_machine"))]
use crate::events::multi_machine::TcpMultiMachineBuilder;
#[cfg(all(unix, feature = "std", feature = "fork"))]
use crate::events::{centralized::CentralizedEventManager, CentralizedLlmpHook};
#[cfg(all(unix, feature = "std", feature = "fork"))]
use crate::inputs::UsesInput;
use crate::observers::TimeObserver;
#[cfg(all(unix, feature = "std", feature = "fork"))]
use crate::{
events::{centralized::CentralizedEventManager, llmp::centralized::CentralizedLlmpHook},
state::UsesState,
};
use crate::state::UsesState;
#[cfg(feature = "std")]
use crate::{
events::{
@ -82,7 +93,7 @@ const LIBAFL_DEBUG_OUTPUT: &str = "LIBAFL_DEBUG_OUTPUT";
clippy::ignored_unit_patterns
)]
#[derive(TypedBuilder)]
pub struct Launcher<'a, CF, EMH, MT, S, SP> {
pub struct Launcher<'a, CF, MT, SP> {
/// The `ShmemProvider` to use
shmem_provider: SP,
/// The monitor instance to use
@ -133,18 +144,9 @@ pub struct Launcher<'a, CF, EMH, MT, S, SP> {
/// Tell the manager to serialize or not the state on restart
#[builder(default = LlmpShouldSaveState::OnRestart)]
serialize_state: LlmpShouldSaveState,
#[builder(setter(skip), default = PhantomData)]
phantom_data: PhantomData<(&'a S, &'a SP, EMH)>,
}
impl<CF, EMH, MT, S, SP> Debug for Launcher<'_, CF, EMH, MT, S, SP>
where
CF: FnOnce(Option<S>, LlmpRestartingEventManager<EMH, S, SP>, CoreId) -> Result<(), Error>,
EMH: EventManagerHooksTuple<S>,
MT: Monitor + Clone,
SP: ShMemProvider,
S: State,
{
impl<CF, MT, SP> Debug for Launcher<'_, CF, MT, SP> {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
let mut dbg_struct = f.debug_struct("Launcher");
dbg_struct
@ -164,41 +166,49 @@ where
}
}
impl<'a, CF, MT, S, SP> Launcher<'a, CF, (), MT, S, SP>
impl<'a, CF, MT, SP> Launcher<'a, CF, MT, SP>
where
CF: FnOnce(Option<S>, LlmpRestartingEventManager<(), S, SP>, CoreId) -> Result<(), Error>,
MT: Monitor + Clone,
S: State + HasExecutions,
SP: ShMemProvider,
{
/// Launch the broker and the clients and fuzz
#[cfg(all(unix, feature = "std", feature = "fork"))]
pub fn launch(&mut self) -> Result<(), Error> {
pub fn launch<S>(&mut self) -> Result<(), Error>
where
S: State + HasExecutions,
CF: FnOnce(Option<S>, LlmpRestartingEventManager<(), S, SP>, CoreId) -> Result<(), Error>,
{
Self::launch_with_hooks(self, tuple_list!())
}
/// Launch the broker and the clients and fuzz
#[cfg(all(feature = "std", any(windows, not(feature = "fork"))))]
#[allow(unused_mut, clippy::match_wild_err_arm)]
pub fn launch(&mut self) -> Result<(), Error> {
pub fn launch<S>(&mut self) -> Result<(), Error>
where
S: State + HasExecutions,
CF: FnOnce(Option<S>, LlmpRestartingEventManager<(), S, SP>, CoreId) -> Result<(), Error>,
{
Self::launch_with_hooks(self, tuple_list!())
}
}
#[cfg(feature = "std")]
impl<'a, CF, EMH, MT, S, SP> Launcher<'a, CF, EMH, MT, S, SP>
impl<'a, CF, MT, SP> Launcher<'a, CF, MT, SP>
where
CF: FnOnce(Option<S>, LlmpRestartingEventManager<EMH, S, SP>, CoreId) -> Result<(), Error>,
EMH: EventManagerHooksTuple<S> + Clone + Copy,
MT: Monitor + Clone,
S: State + HasExecutions,
SP: ShMemProvider,
{
/// Launch the broker and the clients and fuzz with a user-supplied hook
#[cfg(all(unix, feature = "std", feature = "fork"))]
#[allow(clippy::similar_names)]
#[allow(clippy::too_many_lines)]
pub fn launch_with_hooks(&mut self, hooks: EMH) -> Result<(), Error> {
pub fn launch_with_hooks<EMH, S>(&mut self, hooks: EMH) -> Result<(), Error>
where
S: State + HasExecutions,
EMH: EventManagerHooksTuple<S> + Clone + Copy,
CF: FnOnce(Option<S>, LlmpRestartingEventManager<EMH, S, SP>, CoreId) -> Result<(), Error>,
{
if self.cores.ids.is_empty() {
return Err(Error::illegal_argument(
"No cores to spawn on given, cannot launch anything.",
@ -329,7 +339,12 @@ where
/// Launch the broker and the clients and fuzz
#[cfg(all(feature = "std", any(windows, not(feature = "fork"))))]
#[allow(unused_mut, clippy::match_wild_err_arm)]
pub fn launch_with_hooks(&mut self, hooks: EMH) -> Result<(), Error> {
pub fn launch_with_hooks<EMH, S>(&mut self, hooks: EMH) -> Result<(), Error>
where
S: State + HasExecutions,
EMH: EventManagerHooksTuple<S> + Clone + Copy,
CF: FnOnce(Option<S>, LlmpRestartingEventManager<EMH, S, SP>, CoreId) -> Result<(), Error>,
{
use libafl_bolts::core_affinity;
let is_client = std::env::var(_AFL_LAUNCHER_CLIENT);
@ -470,7 +485,7 @@ where
#[cfg(all(unix, feature = "std", feature = "fork"))]
#[derive(TypedBuilder)]
#[allow(clippy::type_complexity, missing_debug_implementations)]
pub struct CentralizedLauncher<'a, CF, IM, MF, MT, S, SP> {
pub struct CentralizedLauncher<'a, CF, MF, MT, SP> {
/// The `ShmemProvider` to use
shmem_provider: SP,
/// The monitor instance to use
@ -517,9 +532,10 @@ pub struct CentralizedLauncher<'a, CF, IM, MF, MT, S, SP> {
opened_stderr_file: Option<File>,
/// The `ip:port` address of another broker to connect our new broker to for multi-machine
/// clusters.
#[builder(default = None)]
remote_broker_addr: Option<SocketAddr>,
#[cfg(feature = "multi_machine")]
multi_machine_node_descriptor: NodeDescriptor<SocketAddr>,
/// If this launcher should spawn a new `broker` on `[Self::broker_port]` (default).
/// The reason you may not want this is, if you already have a [`Launcher`]
/// with a different configuration (for the same target) running on this machine.
@ -529,12 +545,10 @@ pub struct CentralizedLauncher<'a, CF, IM, MF, MT, S, SP> {
/// Tell the manager to serialize or not the state on restart
#[builder(default = LlmpShouldSaveState::OnRestart)]
serialize_state: LlmpShouldSaveState,
#[builder(setter(skip), default = PhantomData)]
phantom_data: PhantomData<(IM, &'a S, &'a SP)>,
}
#[cfg(all(unix, feature = "std", feature = "fork"))]
impl<CF, IM, MF, MT, S, SP> Debug for CentralizedLauncher<'_, CF, IM, MF, MT, S, SP> {
impl<CF, MF, MT, SP> Debug for CentralizedLauncher<'_, CF, MF, MT, SP> {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("Launcher")
.field("configuration", &self.configuration)
@ -552,25 +566,27 @@ impl<CF, IM, MF, MT, S, SP> Debug for CentralizedLauncher<'_, CF, IM, MF, MT, S,
pub type StdCentralizedInnerMgr<S, SP> = LlmpRestartingEventManager<(), S, SP>;
#[cfg(all(unix, feature = "std", feature = "fork"))]
impl<'a, CF, MF, MT, S, SP>
CentralizedLauncher<'a, CF, StdCentralizedInnerMgr<S, SP>, MF, MT, S, SP>
impl<'a, CF, MF, MT, SP> CentralizedLauncher<'a, CF, MF, MT, SP>
where
CF: FnOnce(
Option<S>,
CentralizedEventManager<StdCentralizedInnerMgr<S, SP>, SP>,
CoreId,
) -> Result<(), Error>,
MF: FnOnce(
Option<S>,
CentralizedEventManager<StdCentralizedInnerMgr<S, SP>, SP>,
CoreId,
) -> Result<(), Error>,
MT: Monitor + Clone,
S: State + HasExecutions,
SP: ShMemProvider,
MT: Monitor + Clone + 'static,
SP: ShMemProvider + 'static,
{
/// Launch a standard Centralized-based fuzzer
pub fn launch(&mut self) -> Result<(), Error> {
pub fn launch<S>(&mut self) -> Result<(), Error>
where
S: State,
S::Input: Send + Sync + 'static,
CF: FnOnce(
Option<S>,
CentralizedEventManager<StdCentralizedInnerMgr<S, SP>, (), S, SP>,
CoreId,
) -> Result<(), Error>,
MF: FnOnce(
Option<S>,
CentralizedEventManager<StdCentralizedInnerMgr<S, SP>, (), S, SP>,
CoreId,
) -> Result<(), Error>,
{
let restarting_mgr_builder = |centralized_launcher: &Self, core_to_bind: CoreId| {
// Fuzzer client. keeps retrying the connection to broker till the broker starts
let builder = RestartingMgr::<(), MT, S, SP>::builder()
@ -594,31 +610,33 @@ where
}
#[cfg(all(unix, feature = "std", feature = "fork"))]
impl<'a, CF, IM, MF, MT, S, SP> CentralizedLauncher<'a, CF, IM, MF, MT, S, SP>
impl<'a, CF, MF, MT, SP> CentralizedLauncher<'a, CF, MF, MT, SP>
where
CF: FnOnce(Option<S>, CentralizedEventManager<IM, SP>, CoreId) -> Result<(), Error>,
IM: UsesState,
MF: FnOnce(
Option<S>,
CentralizedEventManager<IM, SP>, // No hooks for centralized EM
CoreId,
) -> Result<(), Error>,
MT: Monitor + Clone,
S: State + HasExecutions,
SP: ShMemProvider,
MT: Monitor + Clone + 'static,
SP: ShMemProvider + 'static,
{
/// Launch a Centralized-based fuzzer.
/// - `main_inner_mgr_builder` will be called to build the inner manager of the main node.
/// - `secondary_inner_mgr_builder` will be called to build the inner manager of the secondary nodes.
#[allow(clippy::similar_names)]
#[allow(clippy::too_many_lines)]
pub fn launch_generic<IMF>(
pub fn launch_generic<EM, EMB, S>(
&mut self,
main_inner_mgr_builder: IMF,
secondary_inner_mgr_builder: IMF,
main_inner_mgr_builder: EMB,
secondary_inner_mgr_builder: EMB,
) -> Result<(), Error>
where
IMF: FnOnce(&Self, CoreId) -> Result<(Option<S>, IM), Error>,
S: State,
S::Input: Send + Sync + 'static,
CF: FnOnce(Option<S>, CentralizedEventManager<EM, (), S, SP>, CoreId) -> Result<(), Error>,
EM: UsesState<State = S>,
EMB: FnOnce(&Self, CoreId) -> Result<(Option<S>, EM), Error>,
MF: FnOnce(
Option<S>,
CentralizedEventManager<EM, (), S, SP>, // No broker_hooks for centralized EM
CoreId,
) -> Result<(), Error>,
<<EM as UsesState>::State as UsesInput>::Input: Send + Sync + 'static,
{
let mut main_inner_mgr_builder = Some(main_inner_mgr_builder);
let mut secondary_inner_mgr_builder = Some(secondary_inner_mgr_builder);
@ -639,7 +657,7 @@ where
let num_cores = core_ids.len();
let mut handles = vec![];
log::info!("spawning on cores: {:?}", self.cores);
debug!("spawning on cores: {:?}", self.cores);
self.opened_stdout_file = self
.stdout_file
@ -650,42 +668,6 @@ where
let debug_output = std::env::var(LIBAFL_DEBUG_OUTPUT).is_ok();
// Spawn centralized broker
self.shmem_provider.pre_fork()?;
match unsafe { fork() }? {
ForkResult::Parent(child) => {
self.shmem_provider.post_fork(false)?;
handles.push(child.pid);
#[cfg(feature = "std")]
log::info!("PID: {:#?} centralized broker spawned", std::process::id());
}
ForkResult::Child => {
log::info!("{:?} PostFork", unsafe { libc::getpid() });
#[cfg(feature = "std")]
log::info!("PID: {:#?} I am centralized broker", std::process::id());
self.shmem_provider.post_fork(true)?;
let llmp_centralized_hook = CentralizedLlmpHook::<S::Input>::new()?;
// TODO switch to false after solving the bug
let mut broker = LlmpBroker::with_keep_pages_attach_to_tcp(
self.shmem_provider.clone(),
tuple_list!(llmp_centralized_hook),
self.centralized_broker_port,
true,
)?;
// Run in the broker until all clients exit
broker.loop_with_timeouts(Duration::from_secs(30), Some(Duration::from_millis(5)));
log::info!("The last client quit. Exiting.");
return Err(Error::shutting_down());
}
}
std::thread::sleep(Duration::from_millis(10));
// Spawn clients
let mut index = 0_u64;
for (id, bind_to) in core_ids.iter().enumerate().take(num_cores) {
@ -718,14 +700,19 @@ where
if index == 1 {
// Main client
debug!("Running main client on PID {}", std::process::id());
let (state, mgr) =
main_inner_mgr_builder.take().unwrap()(self, *bind_to)?;
let mut centralized_builder = CentralizedEventManager::builder();
centralized_builder = centralized_builder.is_main(true);
let mut centralized_event_manager_builder =
CentralizedEventManager::builder();
centralized_event_manager_builder =
centralized_event_manager_builder.is_main(true);
let c_mgr = centralized_builder.build_on_port(
let c_mgr = centralized_event_manager_builder.build_on_port(
mgr,
// tuple_list!(multi_machine_event_manager_hook.take().unwrap()),
tuple_list!(),
self.shmem_provider.clone(),
self.centralized_broker_port,
self.time_obs.clone(),
@ -734,6 +721,7 @@ where
self.main_run_client.take().unwrap()(state, c_mgr, *bind_to)
} else {
// Secondary clients
debug!("Running secondary client on PID {}", std::process::id());
let (state, mgr) =
secondary_inner_mgr_builder.take().unwrap()(self, *bind_to)?;
@ -741,6 +729,7 @@ where
let c_mgr = centralized_builder.build_on_port(
mgr,
tuple_list!(),
self.shmem_provider.clone(),
self.centralized_broker_port,
self.time_obs.clone(),
@ -753,44 +742,94 @@ where
}
}
#[cfg(feature = "multi_machine")]
// Create this after forks, to avoid problems with tokio runtime
let (multi_machine_sender_hook, multi_machine_receiver_hook) =
TcpMultiMachineBuilder::build::<
SocketAddr,
<<EM as UsesState>::State as UsesInput>::Input,
>(self.multi_machine_node_descriptor.clone())?;
let mut brokers = Brokers::new();
// Add centralized broker
brokers.add(Box::new({
#[cfg(feature = "multi_machine")]
let centralized_hooks = tuple_list!(
CentralizedLlmpHook::<S::Input>::new()?,
multi_machine_receiver_hook,
);
#[cfg(not(feature = "multi_machine"))]
let centralized_hooks = tuple_list!(CentralizedLlmpHook::<S::Input>::new()?);
// TODO switch to false after solving the bug
LlmpBroker::with_keep_pages_attach_to_tcp(
self.shmem_provider.clone(),
centralized_hooks,
self.centralized_broker_port,
true,
)?
}));
#[cfg(feature = "multi_machine")]
assert!(
self.spawn_broker,
"Multi machine is not compatible with externally spawned brokers for now."
);
// If we should add another broker, add it to other brokers.
if self.spawn_broker {
log::info!("I am broker!!.");
// TODO we don't want always a broker here, think about using different laucher process to spawn different configurations
let builder = RestartingMgr::<(), MT, S, SP>::builder()
.shmem_provider(self.shmem_provider.clone())
.monitor(Some(self.monitor.clone()))
.broker_port(self.broker_port)
.kind(ManagerKind::Broker)
.remote_broker_addr(self.remote_broker_addr)
.exit_cleanly_after(Some(NonZeroUsize::try_from(self.cores.ids.len()).unwrap()))
.configuration(self.configuration)
.serialize_state(self.serialize_state)
.hooks(tuple_list!());
#[cfg(not(feature = "multi_machine"))]
let llmp_hook =
tuple_list!(StdLlmpEventHook::<S::Input, MT>::new(self.monitor.clone())?);
let builder = builder.time_ref(self.time_obs.clone());
#[cfg(feature = "multi_machine")]
let llmp_hook = tuple_list!(
StdLlmpEventHook::<S::Input, MT>::new(self.monitor.clone())?,
multi_machine_sender_hook,
);
builder.build().launch()?;
let mut broker = LlmpBroker::create_attach_to_tcp(
self.shmem_provider.clone(),
llmp_hook,
self.broker_port,
)?;
// Broker exited. kill all clients.
for handle in &handles {
unsafe {
libc::kill(*handle, libc::SIGINT);
}
}
} else {
for handle in &handles {
let mut status = 0;
log::info!("Not spawning broker (spawn_broker is false). Waiting for fuzzer children to exit...");
unsafe {
libc::waitpid(*handle, &mut status, 0);
if status != 0 {
log::info!("Client with pid {handle} exited with status {status}");
}
}
if let Some(remote_broker_addr) = self.remote_broker_addr {
log::info!("B2b: Connecting to {:?}", &remote_broker_addr);
broker.inner_mut().connect_b2b(remote_broker_addr)?;
};
let exit_cleanly_after = NonZeroUsize::try_from(self.cores.ids.len()).unwrap();
broker
.inner_mut()
.set_exit_cleanly_after(exit_cleanly_after);
brokers.add(Box::new(broker));
}
debug!(
"Brokers have been initialized on port {}.",
std::process::id()
);
// Loop over all the brokers that should be polled
brokers.loop_with_timeouts(Duration::from_secs(30), Some(Duration::from_millis(5)));
#[cfg(feature = "llmp_debug")]
log::info!("The last client quit. Exiting.");
// Brokers exited. kill all clients.
for handle in &handles {
unsafe {
libc::kill(*handle, libc::SIGINT);
}
}
Ok(())
Err(Error::shutting_down())
}
}

View File

@ -25,17 +25,17 @@ use libafl_bolts::{
llmp::{recv_tcp_msg, send_tcp_msg, TcpRequest, TcpResponse},
IP_LOCALHOST,
};
use log::debug;
use serde::{Deserialize, Serialize};
#[cfg(feature = "llmp_compression")]
use crate::events::llmp::COMPRESS_THRESHOLD;
use crate::{
events::{
hooks::EventManagerHooksTuple,
llmp::{LLMP_TAG_EVENT_TO_BOTH, _LLMP_TAG_EVENT_TO_BROKER},
AdaptiveSerializer, CustomBufEventResult, CustomBufHandlerFn, Event, EventConfig,
EventFirer, EventManager, EventManagerId, EventProcessor, EventRestarter,
HasCustomBufHandlers, HasEventManagerId, ProgressReporter,
EventFirer, EventManager, EventManagerHooksTuple, EventManagerId, EventProcessor,
EventRestarter, HasCustomBufHandlers, HasEventManagerId, ProgressReporter,
},
executors::{Executor, HasObservers},
fuzzer::{Evaluator, EvaluatorObservers, ExecutionProcessor},
@ -370,7 +370,7 @@ where
Ok(_) => (),
Err(e) => log::error!("Failed to send tcp message {:#?}", e),
}
log::info!("Asking he broker to be disconnected");
debug!("Asking he broker to be disconnected");
Ok(())
}
@ -410,25 +410,24 @@ where
+ EvaluatorObservers<E::Observers>
+ Evaluator<E, Self>,
{
if !self.hooks.pre_exec_all(state, client_id, &event)? {
return Ok(());
}
let evt_name = event.name_detailed();
match event {
Event::NewTestcase {
input,
client_config,
exit_kind,
corpus_size: _,
observers_buf,
time: _,
executions: _,
#[cfg(feature = "std")]
forward_id,
..
} => {
log::info!("Received new Testcase from {client_id:?} ({client_config:?}, forward {forward_id:?})");
#[cfg(feature = "std")]
debug!("[{}] Received new Testcase {evt_name} from {client_id:?} ({client_config:?}, forward {forward_id:?})", std::process::id());
if self.always_interesting {
let item = fuzzer.add_input(state, executor, self, input)?;
log::info!("Added received Testcase as item #{item}");
debug!("Added received Testcase as item #{item}");
} else {
let res = if client_config.match_with(&self.configuration)
&& observers_buf.is_some()
@ -456,7 +455,9 @@ where
)?
};
if let Some(item) = res.1 {
log::info!("Added received Testcase as item #{item}");
debug!("Added received Testcase {evt_name} as item #{item}");
} else {
debug!("Testcase {evt_name} was discarded");
}
}
}
@ -474,6 +475,7 @@ where
)));
}
}
self.hooks.post_exec_all(state, client_id)?;
Ok(())
}
@ -618,6 +620,7 @@ where
msg
};
let event: Event<S::Input> = postcard::from_bytes(event_bytes)?;
debug!("Received event in normal llmp {}", event.name_detailed());
self.handle_in_client(fuzzer, executor, state, client_id, event)?;
count += 1;
}

View File

@ -13,6 +13,7 @@ use libafl_bolts::{
shmem::{NopShMemProvider, ShMemProvider},
ClientId,
};
use log::debug;
use serde::Deserialize;
use crate::{
@ -28,10 +29,6 @@ use crate::{
pub mod mgr;
pub use mgr::*;
/// The llmp hooks
pub mod hooks;
pub use hooks::*;
/// The llmp restarting manager
#[cfg(feature = "std")]
pub mod restarting;
@ -39,17 +36,17 @@ pub mod restarting;
pub use restarting::*;
/// Forward this to the client
const _LLMP_TAG_EVENT_TO_CLIENT: Tag = Tag(0x2C11E471);
pub(crate) const _LLMP_TAG_EVENT_TO_CLIENT: Tag = Tag(0x2C11E471);
/// Only handle this in the broker
const _LLMP_TAG_EVENT_TO_BROKER: Tag = Tag(0x2B80438);
pub(crate) const _LLMP_TAG_EVENT_TO_BROKER: Tag = Tag(0x2B80438);
/// Handle in both
///
const LLMP_TAG_EVENT_TO_BOTH: Tag = Tag(0x2B0741);
const _LLMP_TAG_RESTART: Tag = Tag(0x8357A87);
const _LLMP_TAG_NO_RESTART: Tag = Tag(0x57A7EE71);
pub(crate) const LLMP_TAG_EVENT_TO_BOTH: Tag = Tag(0x2B0741);
pub(crate) const _LLMP_TAG_RESTART: Tag = Tag(0x8357A87);
pub(crate) const _LLMP_TAG_NO_RESTART: Tag = Tag(0x57A7EE71);
/// The minimum buffer size at which to compress LLMP IPC messages.
#[cfg(any(feature = "llmp_compression", feature = "tcp_compression"))]
#[cfg(feature = "llmp_compression")]
pub const COMPRESS_THRESHOLD: usize = 1024;
/// Specify if the State must be persistent over restarts
@ -304,16 +301,9 @@ where
{
match event {
Event::NewTestcase {
input,
client_config: _,
exit_kind: _,
corpus_size: _,
observers_buf: _, // Useless as we are converting between types
time: _,
executions: _,
forward_id,
input, forward_id, ..
} => {
log::info!("Received new Testcase to convert from {client_id:?} (forward {forward_id:?}, forward {forward_id:?})");
debug!("Received new Testcase to convert from {client_id:?} (forward {forward_id:?}, forward {forward_id:?})");
let Some(converter) = self.converter_back.as_mut() else {
return Ok(());
@ -387,6 +377,7 @@ where
};
let event: Event<DI> = postcard::from_bytes(event_bytes)?;
debug!("Processor received message {}", event.name_detailed());
self.handle_in_client(fuzzer, executor, state, manager, client_id, event)?;
count += 1;
}
@ -442,6 +433,8 @@ where
time,
executions,
forward_id,
#[cfg(all(unix, feature = "std", feature = "multi_machine"))]
node_id,
} => Event::NewTestcase {
input: self.converter.as_mut().unwrap().convert(input)?,
client_config,
@ -451,6 +444,8 @@ where
time,
executions,
forward_id,
#[cfg(all(unix, feature = "std", feature = "multi_machine"))]
node_id,
},
Event::CustomBuf { buf, tag } => Event::CustomBuf { buf, tag },
_ => {
@ -497,6 +492,8 @@ where
time,
executions,
forward_id,
#[cfg(all(unix, feature = "std", feature = "multi_machine"))]
node_id,
} => Event::NewTestcase {
input: self.converter.as_mut().unwrap().convert(input)?,
client_config,
@ -506,6 +503,8 @@ where
time,
executions,
forward_id,
#[cfg(all(unix, feature = "std", feature = "multi_machine"))]
node_id,
},
Event::CustomBuf { buf, tag } => Event::CustomBuf { buf, tag },
_ => {

View File

@ -31,6 +31,8 @@ use libafl_bolts::{
use libafl_bolts::{
llmp::LlmpConnection, os::CTRL_C_EXIT, shmem::StdShMemProvider, staterestore::StateRestorer,
};
#[cfg(all(unix, feature = "fork"))]
use log::debug;
use serde::{Deserialize, Serialize};
#[cfg(feature = "std")]
use typed_builder::TypedBuilder;
@ -41,9 +43,9 @@ use crate::events::AdaptiveSerializer;
use crate::events::EVENTMGR_SIGHANDLER_STATE;
use crate::{
events::{
hooks::EventManagerHooksTuple, Event, EventConfig, EventFirer, EventManager,
EventManagerId, EventProcessor, EventRestarter, HasEventManagerId, LlmpEventManager,
LlmpShouldSaveState, ProgressReporter, StdLlmpEventHook,
Event, EventConfig, EventFirer, EventManager, EventManagerHooksTuple, EventManagerId,
EventProcessor, EventRestarter, HasEventManagerId, LlmpEventManager, LlmpShouldSaveState,
ProgressReporter, StdLlmpEventHook,
},
executors::{Executor, HasObservers},
fuzzer::{Evaluator, EvaluatorObservers, ExecutionProcessor},
@ -386,14 +388,7 @@ where
#[cfg(feature = "std")]
#[allow(clippy::default_trait_access, clippy::ignored_unit_patterns)]
#[derive(TypedBuilder, Debug)]
pub struct RestartingMgr<EMH, MT, S, SP>
where
EMH: EventManagerHooksTuple<S>,
S: State,
SP: ShMemProvider,
MT: Monitor,
//CE: CustomEvent<I>,
{
pub struct RestartingMgr<EMH, MT, S, SP> {
/// The shared memory provider to use for the broker or client spawned by the restarting
/// manager.
shmem_provider: SP,
@ -444,7 +439,7 @@ where
{
/// Launch the broker and the clients and fuzz
pub fn launch(&mut self) -> Result<(Option<S>, LlmpRestartingEventManager<EMH, S, SP>), Error> {
// We start ourself as child process to actually fuzz
// We start ourselves as child process to actually fuzz
let (staterestorer, new_shmem_provider, core_id) = if std::env::var(_ENV_FUZZER_SENDER)
.is_err()
{
@ -569,6 +564,11 @@ where
handle.status()
}
ForkResult::Child => {
debug!(
"{} has been forked into {}",
std::os::unix::process::parent_id(),
std::process::id()
);
self.shmem_provider.post_fork(true)?;
break (staterestorer, self.shmem_provider.clone(), core_id);
}

View File

@ -1,7 +1,8 @@
//! An [`EventManager`] manages all events that go to other instances of the fuzzer.
//! The messages are commonly information about new Testcases as well as stats and other [`Event`]s.
pub mod hooks;
pub mod events_hooks;
pub use events_hooks::*;
pub mod simple;
pub use simple::*;
@ -16,10 +17,13 @@ pub mod launcher;
pub mod llmp;
pub use llmp::*;
#[cfg(feature = "tcp_manager")]
#[allow(clippy::ignored_unit_patterns)]
pub mod tcp;
use alloc::{borrow::Cow, boxed::Box, string::String, vec::Vec};
pub mod broker_hooks;
use alloc::{
borrow::Cow,
boxed::Box,
string::{String, ToString},
vec::Vec,
};
use core::{
fmt,
hash::{BuildHasher, Hasher},
@ -28,6 +32,7 @@ use core::{
};
use ahash::RandomState;
pub use broker_hooks::*;
#[cfg(feature = "std")]
pub use launcher::*;
#[cfg(all(unix, feature = "std"))]
@ -57,6 +62,10 @@ use crate::{
state::HasScalabilityMonitor,
};
/// Multi-machine mode
#[cfg(all(unix, feature = "std", feature = "multi_machine"))]
pub mod multi_machine;
/// Check if ctrl-c is sent with this struct
#[cfg(all(unix, feature = "std"))]
pub static mut EVENTMGR_SIGHANDLER_STATE: ShutdownSignalData = ShutdownSignalData {};
@ -104,6 +113,8 @@ pub struct EventManagerId(
pub usize,
);
#[cfg(all(unix, feature = "std", feature = "multi_machine"))]
use crate::events::multi_machine::NodeId;
#[cfg(feature = "introspection")]
use crate::monitors::ClientPerfMonitor;
use crate::{
@ -279,6 +290,9 @@ where
executions: u64,
/// The original sender if, if forwarded
forward_id: Option<ClientId>,
/// The (multi-machine) node from which the tc is from, if any
#[cfg(all(unix, feature = "std", feature = "multi_machine"))]
node_id: Option<NodeId>,
},
/// New stats event to monitor.
UpdateExecStats {
@ -349,45 +363,37 @@ where
{
fn name(&self) -> &str {
match self {
Event::NewTestcase {
input: _,
client_config: _,
corpus_size: _,
exit_kind: _,
observers_buf: _,
time: _,
executions: _,
forward_id: _,
} => "Testcase",
Event::UpdateExecStats {
time: _,
executions: _,
phantom: _,
} => "Client Heartbeat",
Event::UpdateUserStats {
name: _,
value: _,
phantom: _,
} => "UserStats",
Event::NewTestcase { .. } => "Testcase",
Event::UpdateExecStats { .. } => "Client Heartbeat",
Event::UpdateUserStats { .. } => "UserStats",
#[cfg(feature = "introspection")]
Event::UpdatePerfMonitor {
time: _,
executions: _,
introspection_monitor: _,
phantom: _,
} => "PerfMonitor",
Event::UpdatePerfMonitor { .. } => "PerfMonitor",
Event::Objective { .. } => "Objective",
Event::Log {
severity_level: _,
message: _,
phantom: _,
} => "Log",
Event::Log { .. } => "Log",
Event::CustomBuf { .. } => "CustomBuf",
/*Event::Custom {
sender_id: _, /*custom_event} => custom_event.name()*/
} => "todo",*/
}
}
fn name_detailed(&self) -> String {
match self {
Event::NewTestcase { input, .. } => {
format!("Testcase {}", input.generate_name(0))
}
Event::UpdateExecStats { .. } => "Client Heartbeat".to_string(),
Event::UpdateUserStats { .. } => "UserStats".to_string(),
#[cfg(feature = "introspection")]
Event::UpdatePerfMonitor { .. } => "PerfMonitor".to_string(),
Event::Objective { .. } => "Objective".to_string(),
Event::Log { .. } => "Log".to_string(),
Event::CustomBuf { .. } => "CustomBuf".to_string(),
/*Event::Custom {
sender_id: _, /*custom_event} => custom_event.name()*/
} => "todo",*/
}
}
}
/// [`EventFirer`] fire an event.
@ -949,22 +955,15 @@ mod tests {
time: current_time(),
executions: 0,
forward_id: None,
#[cfg(all(unix, feature = "std", feature = "multi_machine"))]
node_id: None,
};
let serialized = postcard::to_allocvec(&e).unwrap();
let d = postcard::from_bytes::<Event<BytesInput>>(&serialized).unwrap();
match d {
Event::NewTestcase {
input: _,
observers_buf,
corpus_size: _,
exit_kind: _,
client_config: _,
time: _,
executions: _,
forward_id: _,
} => {
Event::NewTestcase { observers_buf, .. } => {
let o: tuple_list_type!(StdMapObserver::<u32, false>) =
postcard::from_bytes(observers_buf.as_ref().unwrap()).unwrap();
assert_eq!("test", o.0.name());

View File

@ -0,0 +1,555 @@
use core::fmt::Display;
use std::{
boxed::Box,
collections::HashMap,
io::ErrorKind,
process,
sync::{
atomic::{AtomicU64, Ordering},
Arc, OnceLock,
},
time::Duration,
vec::Vec,
};
use enumflags2::{bitflags, BitFlags};
#[cfg(feature = "llmp_compression")]
use libafl_bolts::bolts_prelude::GzipCompressor;
use libafl_bolts::{current_time, ownedref::OwnedRef, Error};
use log::{debug, error};
use serde::{Deserialize, Serialize};
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::{TcpListener, TcpStream, ToSocketAddrs},
runtime::Runtime,
sync::RwLock,
task::JoinHandle,
time,
};
use typed_builder::TypedBuilder;
use crate::{
events::{Event, TcpMultiMachineLlmpReceiverHook, TcpMultiMachineLlmpSenderHook},
inputs::Input,
};
const MAX_NB_RECEIVED_AT_ONCE: usize = 10;
#[bitflags(default = SendToParent | SendToChildren)]
#[repr(u8)]
#[derive(Copy, Clone, Debug, PartialEq)]
/// The node policy. It represents flags that can be applied to the node to change how it behaves.
pub enum NodePolicy {
/// Send current node's interesting inputs to parent.
SendToParent,
/// Send current node's interesting inputs to children.
SendToChildren,
}
const DUMMY_BYTE: u8 = 0x14;
/// Use `OwnedRef` as much as possible here to avoid useless copies.
/// An owned TCP message for multi machine
#[derive(Clone, Debug)]
// #[serde(bound = "I: serde::de::DeserializeOwned")]
pub enum MultiMachineMsg<'a, I>
where
I: Input,
{
/// A raw llmp message (not deserialized)
LlmpMsg(OwnedRef<'a, [u8]>),
/// A `LibAFL` Event (already deserialized)
Event(OwnedRef<'a, Event<I>>),
}
/// We do not use raw pointers, so no problem with thead-safety
unsafe impl<'a, I: Input> Send for MultiMachineMsg<'a, I> {}
unsafe impl<'a, I: Input> Sync for MultiMachineMsg<'a, I> {}
impl<'a, I> MultiMachineMsg<'a, I>
where
I: Input,
{
/// Create a new [`MultiMachineMsg`] as event.
///
/// # Safety
///
/// `OwnedRef` should **never** be a raw pointer for thread-safety reasons.
/// We check this for debug builds, but not for release.
#[must_use]
pub unsafe fn event(event: OwnedRef<'a, Event<I>>) -> Self {
debug_assert!(!event.is_raw());
MultiMachineMsg::Event(event)
}
/// Create a new [`MultiMachineMsg`] from an llmp msg.
#[must_use]
pub fn llmp_msg(msg: OwnedRef<'a, [u8]>) -> Self {
MultiMachineMsg::LlmpMsg(msg)
}
/// Get the message
#[must_use]
pub fn serialize_as_ref(&self) -> &[u8] {
match self {
MultiMachineMsg::LlmpMsg(msg) => msg.as_ref(),
MultiMachineMsg::Event(_) => {
panic!("Not supported")
}
}
}
/// To owned message
#[must_use]
pub fn from_llmp_msg(msg: Box<[u8]>) -> MultiMachineMsg<'a, I> {
MultiMachineMsg::LlmpMsg(OwnedRef::Owned(msg))
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default, Serialize, Deserialize)]
/// A `NodeId` (unused for now)
pub struct NodeId(pub u64);
impl NodeId {
/// Generate a unique [`NodeId`].
pub fn new() -> Self {
static CTR: OnceLock<AtomicU64> = OnceLock::new();
let ctr = CTR.get_or_init(|| AtomicU64::new(0));
NodeId(ctr.fetch_add(1, Ordering::Relaxed))
}
}
/// The state of the hook shared between the background threads and the main thread.
#[derive(Debug)]
#[allow(dead_code)]
pub struct TcpMultiMachineState<A> {
node_descriptor: NodeDescriptor<A>,
/// the parent to which the testcases should be forwarded when deemed interesting
parent: Option<TcpStream>,
/// The children who connected during the fuzzing session.
children: HashMap<NodeId, TcpStream>, // The children who connected during the fuzzing session.
old_msgs: Vec<Vec<u8>>,
#[cfg(feature = "llmp_compression")]
compressor: GzipCompressor,
}
/// The tree descriptor for the
#[derive(Debug, Clone, TypedBuilder)]
pub struct NodeDescriptor<A> {
/// The parent address, if there is one.
pub parent_addr: Option<A>,
/// The node listening port. Defaults to 50000
#[builder(default = Some(50000))]
pub node_listening_port: Option<u16>,
#[builder(default = Duration::from_secs(60))]
/// The timeout for connecting to parent
pub timeout: Duration,
/// Node flags
#[builder(default_code = "BitFlags::default()")]
pub flags: BitFlags<NodePolicy>, // The policy for shared messages between nodes.
}
/// A Multi-machine `broker_hooks` builder.
#[derive(Debug)]
pub struct TcpMultiMachineBuilder {
_private: (),
}
impl TcpMultiMachineBuilder {
/// Build a new couple [`TcpMultiMachineLlmpSenderHook`] / [`TcpMultiMachineLlmpReceiverHook`] from a [`NodeDescriptor`].
/// Everything is initialized and ready to be used.
/// Beware, the hooks should run in the same process as the one this function is called.
/// This is because we spawn a tokio runtime underneath.
/// Check `<https://github.com/tokio-rs/tokio/issues/4301>` for more details.
pub fn build<A, I>(
node_descriptor: NodeDescriptor<A>,
) -> Result<
(
TcpMultiMachineLlmpSenderHook<A, I>,
TcpMultiMachineLlmpReceiverHook<A, I>,
),
Error,
>
where
A: Clone + Display + ToSocketAddrs + Send + Sync + 'static,
I: Input + Send + Sync + 'static,
{
// Create the state of the hook. This will be shared with the background server, so we wrap
// it with concurrent-safe objects
let state = Arc::new(RwLock::new(TcpMultiMachineState {
node_descriptor,
parent: None,
children: HashMap::default(),
old_msgs: Vec::new(),
#[cfg(feature = "llmp_compression")]
compressor: GzipCompressor::new(),
}));
let rt =
Arc::new(Runtime::new().map_err(|_| Error::unknown("Tokio runtime spawning failed"))?);
unsafe {
TcpMultiMachineState::init::<I>(&state.clone(), &rt.clone())?;
}
Ok((
TcpMultiMachineLlmpSenderHook::new(state.clone(), rt.clone()),
TcpMultiMachineLlmpReceiverHook::new(state, rt),
))
}
}
impl<A> TcpMultiMachineState<A>
where
A: Clone + Display + ToSocketAddrs + Send + Sync + 'static,
{
/// Initializes the Multi-Machine state.
///
/// # Safety
///
/// This should be run **only once**, in the same process as the llmp hooks, and before the hooks
/// are effectively used.
unsafe fn init<I: Input>(
self_mutex: &Arc<RwLock<Self>>,
rt: &Arc<Runtime>,
) -> Result<(), Error> {
let node_descriptor =
rt.block_on(async { self_mutex.read().await.node_descriptor.clone() });
// Try to connect to the parent if we should
rt.block_on(async {
let parent_mutex = self_mutex.clone();
let mut parent_lock = parent_mutex.write().await;
if let Some(parent_addr) = &parent_lock.node_descriptor.parent_addr {
let timeout = current_time() + parent_lock.node_descriptor.timeout;
parent_lock.parent = loop {
debug!("Trying to connect to parent @ {}..", parent_addr);
match TcpStream::connect(parent_addr).await {
Ok(stream) => {
debug!("Connected to parent @ {}", parent_addr);
break Some(stream);
}
Err(e) => {
if current_time() > timeout {
return Err(Error::os_error(e, "Unable to connect to parent"));
}
}
}
time::sleep(Duration::from_secs(1)).await;
};
}
Ok(())
})?;
// Now, setup the background tasks for the children to connect to
if let Some(listening_port) = node_descriptor.node_listening_port {
let bg_state = self_mutex.clone();
let _handle: JoinHandle<Result<(), Error>> = rt.spawn(async move {
let addr = format!("0.0.0.0:{listening_port}");
debug!("Starting background child task on {addr}...");
let listener = TcpListener::bind(addr).await.map_err(|e| {
Error::os_error(e, format!("Error while binding to port {listening_port}"))
})?;
let state = bg_state;
// The main listening loop. Should never fail.
'listening: loop {
debug!("listening for children on {:?}...", listener);
match listener.accept().await {
Ok((mut stream, addr)) => {
debug!("{} joined the children.", addr);
let mut state_guard = state.write().await;
if let Err(e) = state_guard
.send_old_events_to_stream::<I>(&mut stream)
.await
{
error!("Error while send old messages: {:?}.", e);
error!("The loop will resume");
continue 'listening;
}
state_guard.children.insert(NodeId::new(), stream);
debug!(
"[pid {}]{} added the child. nb children: {}",
process::id(),
addr,
state_guard.children.len()
);
}
Err(e) => {
error!("Error while accepting child {:?}.", e);
}
}
}
});
}
Ok(())
}
/// Add an event as past event.
pub fn add_past_msg(&mut self, msg: &[u8]) {
self.old_msgs.push(msg.to_vec());
}
/// The compressor
#[cfg(feature = "llmp_compression")]
pub fn compressor(&mut self) -> &GzipCompressor {
&self.compressor
}
/// Read a [`TcpMultiMachineMsg`] from a stream.
/// Expects a message written by [`TcpMultiMachineState::write_msg`].
/// If there is nothing to read from the stream, return asap with Ok(None).
#[allow(clippy::uninit_vec)]
async fn read_msg<'a, I: Input + 'a>(
stream: &mut TcpStream,
) -> Result<Option<MultiMachineMsg<'a, I>>, Error> {
// 0. Check if we should try to fetch something from the stream
let mut dummy_byte: [u8; 1] = [0u8];
debug!("Starting read msg...");
let n_read = match stream.try_read(&mut dummy_byte) {
Ok(n) => n,
Err(e) if e.kind() == ErrorKind::WouldBlock => {
return Ok(None);
}
Err(e) => return Err(Error::os_error(e, "try read failed")),
};
debug!("msg read.");
if n_read == 0 {
debug!("No dummy byte received...");
return Ok(None); // Nothing to read from this stream
}
debug!("Received dummy byte!");
// we should always read the dummy byte at this point.
assert_eq!(u8::from_le_bytes(dummy_byte), DUMMY_BYTE);
// 1. Read msg size
let mut node_msg_len: [u8; 4] = [0; 4];
stream.read_exact(&mut node_msg_len).await?;
let node_msg_len = u32::from_le_bytes(node_msg_len) as usize;
// 2. Read msg
// do not store msg on the stack to avoid overflow issues
// TODO: optimize with less allocations...
let mut node_msg: Vec<u8> = Vec::with_capacity(node_msg_len);
unsafe {
node_msg.set_len(node_msg_len);
}
stream.read_exact(node_msg.as_mut_slice()).await?;
let node_msg = node_msg.into_boxed_slice();
Ok(Some(MultiMachineMsg::from_llmp_msg(node_msg)))
}
/// Write an [`OwnedTcpMultiMachineMsg`] to a stream.
/// Can be read back using [`TcpMultiMachineState::read_msg`].
async fn write_msg<'a, I: Input>(
stream: &mut TcpStream,
msg: &MultiMachineMsg<'a, I>,
) -> Result<(), Error> {
let serialized_msg = msg.serialize_as_ref();
let msg_len = u32::to_le_bytes(serialized_msg.len() as u32);
// 0. Write the dummy byte
debug!("Sending dummy byte");
stream.write_all(&[DUMMY_BYTE]).await?;
// 1. Write msg size
debug!("Sending msg len");
stream.write_all(&msg_len).await?;
// 2. Write msg
debug!("Sending msg");
stream.write_all(serialized_msg).await?;
Ok(())
}
pub(crate) async fn send_old_events_to_stream<I: Input>(
&mut self,
stream: &mut TcpStream,
) -> Result<(), Error> {
debug!("Send old events to new child...");
for old_msg in &self.old_msgs {
let event_ref: MultiMachineMsg<I> =
MultiMachineMsg::llmp_msg(OwnedRef::Ref(old_msg.as_slice()));
debug!("Sending an old message...");
Self::write_msg(stream, &event_ref).await?;
debug!("Old message sent.");
}
debug!("Sent {} old messages.", self.old_msgs.len());
Ok(())
}
pub(crate) async fn send_interesting_event_to_nodes<'a, I: Input>(
&mut self,
msg: &MultiMachineMsg<'a, I>,
) -> Result<(), Error> {
debug!("Sending interesting events to nodes...");
if self
.node_descriptor
.flags
.intersects(NodePolicy::SendToParent)
{
if let Some(parent) = &mut self.parent {
debug!("Sending to parent...");
if let Err(e) = Self::write_msg(parent, msg).await {
error!("The parent disconnected. We won't try to communicate with it again.");
error!("Error: {:?}", e);
self.parent.take();
}
}
}
if self
.node_descriptor
.flags
.intersects(NodePolicy::SendToChildren)
{
let mut ids_to_remove: Vec<NodeId> = Vec::new();
for (child_id, child_stream) in &mut self.children {
debug!("Sending to child...");
if (Self::write_msg(child_stream, msg).await).is_err() {
// most likely the child disconnected. drop the connection later on and continue.
debug!("The child disconnected. We won't try to communicate with it again.");
ids_to_remove.push(*child_id);
}
}
// Garbage collect disconnected children
for id_to_remove in &ids_to_remove {
debug!("Child {:?} has been garbage collected.", id_to_remove);
self.children.remove(id_to_remove);
}
}
Ok(())
}
/// Flush the message queue from other nodes and add incoming events to the
/// centralized event manager queue.
pub(crate) async fn receive_new_messages_from_nodes<'a, I: Input>(
&mut self,
msgs: &mut Vec<MultiMachineMsg<'a, I>>,
) -> Result<(), Error> {
debug!("Checking for new events from other nodes...");
let mut nb_received = 0usize;
// Our (potential) parent could have something for us
if let Some(parent) = &mut self.parent {
loop {
// Exit if received a lot of inputs at once.
if nb_received > MAX_NB_RECEIVED_AT_ONCE {
return Ok(());
}
debug!("Receiving from parent...");
match Self::read_msg(parent).await {
Ok(Some(msg)) => {
debug!("Received event from parent");
// The parent has something for us, we store it
msgs.push(msg);
nb_received += 1;
}
Ok(None) => {
// nothing from the parent, we continue
debug!("Nothing from parent");
break;
}
Err(Error::OsError(_, _, _)) => {
// most likely the parent disconnected. drop the connection
debug!(
"The parent disconnected. We won't try to communicate with it again."
);
self.parent.take();
break;
}
Err(e) => {
debug!("An error occurred and was not expected.");
return Err(e);
}
}
}
}
// What about the (potential) children?
let mut ids_to_remove: Vec<NodeId> = Vec::new();
debug!(
"[pid {}] Nb children: {}",
process::id(),
self.children.len()
);
for (child_id, child_stream) in &mut self.children {
loop {
// Exit if received a lot of inputs at once.
if nb_received > MAX_NB_RECEIVED_AT_ONCE {
return Ok(());
}
debug!("Receiving from child {:?}...", child_id);
match Self::read_msg(child_stream).await {
Ok(Some(msg)) => {
// The parent has something for us, we store it
debug!("Received event from child!");
msgs.push(msg);
nb_received += 1;
}
Ok(None) => {
// nothing from the parent, we continue
debug!("Nothing from child");
break;
}
Err(Error::OsError(e, _, _)) => {
// most likely the parent disconnected. drop the connection
error!(
"The parent disconnected. We won't try to communicate with it again."
);
error!("Error: {:?}", e);
ids_to_remove.push(*child_id);
break;
}
Err(e) => {
// Other error
debug!("An error occurred and was not expected.");
return Err(e);
}
}
}
}
// Garbage collect disconnected children
for id_to_remove in &ids_to_remove {
debug!("Child {:?} has been garbage collected.", id_to_remove);
self.children.remove(id_to_remove);
}
Ok(())
}
}

View File

@ -205,14 +205,10 @@ where
) -> Result<BrokerEventResult, Error> {
match event {
Event::NewTestcase {
input: _,
client_config: _,
exit_kind: _,
corpus_size,
observers_buf: _,
time,
executions,
forward_id: _,
..
} => {
monitor.client_stats_insert(ClientId(0));
monitor
@ -225,9 +221,7 @@ where
Ok(BrokerEventResult::Handled)
}
Event::UpdateExecStats {
time,
executions,
phantom: _,
time, executions, ..
} => {
// TODO: The monitor buffer should be added on client add.
monitor.client_stats_insert(ClientId(0));
@ -238,11 +232,7 @@ where
monitor.display(event.name(), ClientId(0));
Ok(BrokerEventResult::Handled)
}
Event::UpdateUserStats {
name,
value,
phantom: _,
} => {
Event::UpdateUserStats { name, value, .. } => {
monitor.client_stats_insert(ClientId(0));
monitor
.client_stats_mut_for(ClientId(0))
@ -256,7 +246,7 @@ where
time,
executions,
introspection_monitor,
phantom: _,
..
} => {
// TODO: The monitor buffer should be added on client add.
monitor.client_stats_insert(ClientId(0));
@ -284,7 +274,7 @@ where
Event::Log {
severity_level,
message,
phantom: _,
..
} => {
let (_, _) = (message, severity_level);
log::log!((*severity_level).into(), "{message}");

File diff suppressed because it is too large Load Diff

View File

@ -27,7 +27,7 @@ use crate::{
use crate::{monitors::PerfFeature, state::HasClientPerfMonitor};
/// Send a monitor update all 15 (or more) seconds
const STATS_TIMEOUT_DEFAULT: Duration = Duration::from_secs(15);
const STATS_TIMEOUT_DEFAULT: Duration = Duration::from_secs(1);
/// Holds a scheduler
pub trait HasScheduler: UsesState
@ -477,6 +477,8 @@ where
time: current_time(),
executions: *state.executions(),
forward_id: None,
#[cfg(all(unix, feature = "std", feature = "multi_machine"))]
node_id: None,
},
)?;
} else {
@ -676,6 +678,8 @@ where
time: current_time(),
executions: *state.executions(),
forward_id: None,
#[cfg(all(unix, feature = "std", feature = "multi_machine"))]
node_id: None,
},
)?;
Ok(idx)

View File

@ -32,6 +32,7 @@ Welcome to `LibAFL`
clippy::similar_names,
clippy::too_many_lines,
clippy::into_iter_without_iter, // broken
clippy::type_complexity,
)]
#![cfg_attr(not(test), warn(
missing_debug_implementations,

View File

@ -1092,7 +1092,7 @@ where
}
let range = rand_range(state, other_size, min(other_size, max_size - size));
let target = state.rand_mut().below(size);
let target = state.rand_mut().below(size); // TODO: fix bug if size is 0
let other_testcase = state.corpus().get_from_all(idx)?.borrow_mut();
// No need to load the input again, it'll still be cached.

View File

@ -333,6 +333,8 @@ where
time: current_time(),
executions: 0,
forward_id: None,
#[cfg(all(unix, feature = "std", feature = "multi_machine"))]
node_id: None,
},
)?;

View File

@ -113,7 +113,7 @@ rand_core = { version = "0.6", optional = true }
nix = { version = "0.29", default-features = false, optional = true, features = ["signal", "socket", "poll"] }
uuid = { version = "1.4", optional = true, features = ["serde", "v4"] }
clap = { version = "4.5", features = ["derive", "wrap_help"], optional = true } # CLI parsing, for libafl_bolts::cli / the `cli` feature
log = "0.4.20"
log = { version = "0.4", features = ["release_max_level_info"] }
pyo3 = { version = "0.18", optional = true, features = ["serde", "macros"] }

View File

@ -124,6 +124,7 @@ where
msg_tag: &mut Tag,
_msg_flags: &mut Flags,
msg: &mut [u8],
_new_msgs: &mut Vec<(Tag, Flags, Vec<u8>)>,
) -> Result<LlmpMsgHookResult, Error> {
match *msg_tag {
_TAG_SIMPLE_U32_V1 => {

View File

@ -30,7 +30,8 @@
clippy::module_name_repetitions,
clippy::ptr_cast_constness,
clippy::negative_feature_names,
clippy::too_many_lines
clippy::too_many_lines,
clippy::if_not_else
)]
#![cfg_attr(not(test), warn(
missing_debug_implementations,

View File

@ -77,6 +77,7 @@ use core::{
};
#[cfg(feature = "std")]
use std::{
boxed::Box,
env,
io::{ErrorKind, Read, Write},
net::{SocketAddr, TcpListener, TcpStream, ToSocketAddrs},
@ -86,10 +87,12 @@ use std::{
#[cfg(all(debug_assertions, feature = "llmp_debug", feature = "std"))]
use backtrace::Backtrace;
use log::debug;
#[cfg(all(unix, feature = "std"))]
#[cfg(not(any(target_os = "solaris", target_os = "illumos")))]
use nix::sys::socket::{self, sockopt::ReusePort};
use serde::{Deserialize, Serialize};
#[cfg(feature = "std")]
use tuple_list::tuple_list;
#[cfg(all(unix, not(miri)))]
@ -1091,7 +1094,7 @@ where
}
/// For non zero-copy, we want to get rid of old pages with duplicate messages in the client
/// eventually. This function This function sees if we can deallocate older pages.
/// eventually. This function sees if we can deallocate older pages.
/// The broker would have informed us by setting the safe_to_unmap-flag.
unsafe fn prune_old_pages(&mut self) {
// Exclude the current page by splitting of the last element for this iter
@ -1274,7 +1277,8 @@ where
)));
}
(*msg).message_id.0 = (*page).current_msg_id.load(Ordering::Relaxed) + 1;
let mid = (*page).current_msg_id.load(Ordering::Relaxed) + 1;
(*msg).message_id.0 = mid;
// Make sure all things have been written to the page, and commit the message to the page
(*page)
@ -1283,6 +1287,12 @@ where
self.last_msg_sent = msg;
self.has_unsent_message = false;
debug!(
"[{} - {:#x}] Send message with id {}",
self.id.0, self as *const Self as u64, mid
);
Ok(())
}
@ -1691,6 +1701,14 @@ where
if !(*msg).in_shmem(&mut self.current_recv_shmem) {
return Err(Error::illegal_state("Unexpected message in map (out of map bounds) - buggy client or tampered shared map detected!"));
}
debug!(
"[{} - {:#x}] Received message with ID {}...",
self.id.0,
self as *const Self as u64,
(*msg).message_id.0
);
// Handle special, LLMP internal, messages.
match (*msg).tag {
LLMP_TAG_UNSET => panic!(
@ -2058,6 +2076,88 @@ where
hooks: HT,
}
/// The trait for brokers.
pub trait Broker {
/// Getter to `is_shutting_down`
fn is_shutting_down(&self) -> bool;
/// The hooks run for `on_timeout`
fn on_timeout(&mut self) -> Result<(), Error>;
/// The main thing the `broker` does
fn broker_once(&mut self) -> Result<bool, Error>;
/// Getter to `exit_after`
fn exit_after(&self) -> Option<NonZeroUsize>;
/// Getter to `has_clients`
fn has_clients(&self) -> bool;
/// Send the buffer out
fn send_buf(&mut self, tag: Tag, buf: &[u8]) -> Result<(), Error>;
/// Getter to `num_clients_seen`
fn num_clients_seen(&self) -> usize;
/// Getter to `nb_listeners`
fn nb_listeners(&self) -> usize;
}
impl<HT, SP> Broker for LlmpBroker<HT, SP>
where
HT: LlmpHookTuple<SP>,
SP: ShMemProvider,
{
fn is_shutting_down(&self) -> bool {
self.inner.is_shutting_down()
}
fn on_timeout(&mut self) -> Result<(), Error> {
self.hooks.on_timeout_all()
}
fn broker_once(&mut self) -> Result<bool, Error> {
self.broker_once()
}
fn exit_after(&self) -> Option<NonZeroUsize> {
self.inner.exit_cleanly_after
}
fn has_clients(&self) -> bool {
self.inner.has_clients()
}
fn send_buf(&mut self, tag: Tag, buf: &[u8]) -> Result<(), Error> {
self.inner.llmp_out.send_buf(tag, buf)
}
fn num_clients_seen(&self) -> usize {
self.inner.num_clients_seen
}
fn nb_listeners(&self) -> usize {
self.inner.listeners.len()
}
}
/// A set of brokers.
/// Limitation: the hooks must be the same.
#[cfg(feature = "std")]
#[derive(Default)]
pub struct Brokers {
/// the brokers
llmp_brokers: Vec<Box<dyn Broker>>,
}
#[cfg(feature = "std")]
impl Debug for Brokers {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
Debug::fmt("Brokers", f)?;
Ok(())
}
}
/// A signal handler for the [`LlmpBroker`].
/// On unix, it handles signals
/// On Windows - control signals (e.g., CTRL+C)
@ -2111,6 +2211,7 @@ where
msg_tag: &mut Tag,
msg_flags: &mut Flags,
msg: &mut [u8],
new_msgs: &mut Vec<(Tag, Flags, Vec<u8>)>,
) -> Result<LlmpMsgHookResult, Error>;
/// Hook called whenever there is a timeout.
@ -2132,6 +2233,7 @@ where
msg_tag: &mut Tag,
msg_flags: &mut Flags,
msg: &mut [u8],
new_msgs: &mut Vec<(Tag, Flags, Vec<u8>)>,
) -> Result<LlmpMsgHookResult, Error>;
/// Call all hook callbacks on timeout.
@ -2149,6 +2251,7 @@ where
_msg_tag: &mut Tag,
_msg_flags: &mut Flags,
_msg: &mut [u8],
_new_msgs: &mut Vec<(Tag, Flags, Vec<u8>)>,
) -> Result<LlmpMsgHookResult, Error> {
Ok(LlmpMsgHookResult::ForwardToClients)
}
@ -2171,10 +2274,11 @@ where
msg_tag: &mut Tag,
msg_flags: &mut Flags,
msg: &mut [u8],
new_msgs: &mut Vec<(Tag, Flags, Vec<u8>)>,
) -> Result<LlmpMsgHookResult, Error> {
match self
.0
.on_new_message(inner, client_id, msg_tag, msg_flags, msg)?
.on_new_message(inner, client_id, msg_tag, msg_flags, msg, new_msgs)?
{
LlmpMsgHookResult::Handled => {
// message handled, stop early
@ -2183,7 +2287,7 @@ where
LlmpMsgHookResult::ForwardToClients => {
// message should be forwarded, continue iterating
self.1
.on_new_message_all(inner, client_id, msg_tag, msg_flags, msg)
.on_new_message_all(inner, client_id, msg_tag, msg_flags, msg, new_msgs)
}
}
}
@ -2211,6 +2315,120 @@ where
}
}
#[cfg(feature = "std")]
impl Brokers {
/// The constructor
#[must_use]
pub fn new() -> Self {
Self {
llmp_brokers: Vec::new(),
}
}
/// Add another broker
pub fn add(&mut self, broker: Box<dyn Broker>) {
self.llmp_brokers.push(broker);
}
#[cfg(any(all(unix, not(miri)), all(windows, feature = "std")))]
fn setup_handlers() {
#[cfg(all(unix, not(miri)))]
if let Err(e) = unsafe { setup_signal_handler(ptr::addr_of_mut!(LLMP_SIGHANDLER_STATE)) } {
// We can live without a proper ctrl+c signal handler - Ignore.
log::info!("Failed to setup signal handlers: {e}");
} else {
log::info!("Successfully setup signal handlers");
}
#[cfg(all(windows, feature = "std"))]
if let Err(e) = unsafe { setup_ctrl_handler(ptr::addr_of_mut!(LLMP_SIGHANDLER_STATE)) } {
// We can live without a proper ctrl+c signal handler - Ignore.
log::info!("Failed to setup control handlers: {e}");
} else {
log::info!(
"{}: Broker successfully setup control handlers",
std::process::id().to_string()
);
}
}
/// Loops until the last client quits the last broker,
/// forwarding and handling all incoming messages from clients for each broker.
/// Will call `on_timeout` roughly after `timeout`
/// Panics on error.
/// 5 millis of sleep can't hurt to keep busywait not at 100%
#[cfg(feature = "std")]
pub fn loop_with_timeouts(&mut self, timeout: Duration, sleep_time: Option<Duration>) {
use super::current_milliseconds;
#[cfg(any(all(unix, not(miri)), all(windows, feature = "std")))]
Self::setup_handlers();
let timeout = timeout.as_millis() as u64;
let mut end_time = current_milliseconds() + timeout;
loop {
self.llmp_brokers.retain_mut(|broker| {
if !broker.is_shutting_down() {
if current_milliseconds() > end_time {
broker
.on_timeout()
.expect("An error occurred in broker timeout. Exiting.");
end_time = current_milliseconds() + timeout;
}
if broker
.broker_once()
.expect("An error occurred when brokering. Exiting.")
{
end_time = current_milliseconds() + timeout;
}
if let Some(exit_after_count) = broker.exit_after() {
// log::trace!(
// "Clients connected: {} && > {} - {} >= {}",
// self.has_clients(),
// self.num_clients_seen,
// self.listeners.len(),
// exit_after_count
// );
if !broker.has_clients()
&& (broker.num_clients_seen() - broker.nb_listeners())
>= exit_after_count.into()
{
// No more clients connected, and the amount of clients we were waiting for was previously connected.
// exit cleanly.
return false;
}
}
true
} else {
broker.send_buf(LLMP_TAG_EXITING, &[]).expect(
"Error when shutting down broker: Could not send LLMP_TAG_EXITING msg.",
);
false
}
});
if self.llmp_brokers.is_empty() {
break;
}
#[cfg(feature = "std")]
if let Some(time) = sleep_time {
thread::sleep(time);
}
#[cfg(not(feature = "std"))]
if let Some(time) = sleep_time {
panic!("Cannot sleep on no_std platform (requested {time:?})");
}
}
}
}
impl<HT, SP> LlmpBroker<HT, SP>
where
HT: LlmpHookTuple<SP>,
@ -2414,6 +2632,7 @@ where
/// Broker broadcast to its own page for all others to read
/// Returns `true` if new messages were broker-ed
/// It is supposed that the message is never unmapped.
#[inline]
#[allow(clippy::cast_ptr_alignment)]
unsafe fn handle_new_msgs(&mut self, client_id: ClientId) -> Result<bool, Error> {
@ -2522,9 +2741,6 @@ where
}
// handle all other messages
_ => {
// The message is not specifically for use. Let the user handle it, then forward it to the clients, if necessary.
let mut should_forward_msg = true;
let pos = if (client_id.0 as usize) < self.inner.llmp_clients.len()
&& self.inner.llmp_clients[client_id.0 as usize].id == client_id
{
@ -2539,18 +2755,27 @@ where
let map = &mut self.inner.llmp_clients[pos].current_recv_shmem;
let msg_buf = (*msg).try_as_slice_mut(map)?;
if let LlmpMsgHookResult::Handled = self.hooks.on_new_message_all(
// The message is not specifically for use. Let the user handle it, then forward it to the clients, if necessary.
let mut new_msgs: Vec<(Tag, Flags, Vec<u8>)> = Vec::new();
if let LlmpMsgHookResult::ForwardToClients = self.hooks.on_new_message_all(
&mut self.inner,
client_id,
&mut (*msg).tag,
&mut (*msg).flags,
msg_buf,
&mut new_msgs,
)? {
should_forward_msg = false;
self.inner.forward_msg(msg)?;
}
if should_forward_msg {
self.inner_mut().forward_msg(msg)?;
debug!("New msg vector: {}", new_msgs.len());
for (new_msg_tag, new_msg_flag, new_msg) in new_msgs {
self.inner.llmp_out.send_buf_with_flags(
new_msg_tag,
new_msg_flag,
new_msg.as_ref(),
)?;
}
}
}
@ -3467,12 +3692,14 @@ where
ErrorKind::ConnectionRefused => {
//connection refused. loop till the broker is up
loop {
match TcpStream::connect((IP_LOCALHOST, port)) {
Ok(stream) => break stream,
Err(_) => {
log::info!("Connection Refused.. Retrying");
}
if let Ok(stream) = TcpStream::connect((IP_LOCALHOST, port)) {
break stream;
}
debug!("Connection Refused. Retrying...");
#[cfg(feature = "std")]
thread::sleep(Duration::from_millis(50));
}
}
_ => return Err(Error::illegal_state(e.to_string())),

View File

@ -53,7 +53,7 @@ impl<'a, T> Truncate for &'a mut [T] {
}
/// Wrap a reference and convert to a [`Box`] on serialize
#[derive(Clone, Debug)]
#[derive(Debug)]
pub enum OwnedRef<'a, T>
where
T: 'a + ?Sized,
@ -66,6 +66,30 @@ where
Owned(Box<T>),
}
/// Special case, &\[u8] is a fat pointer containing the size implicitly.
impl<'a> Clone for OwnedRef<'a, [u8]> {
fn clone(&self) -> Self {
match self {
Self::RefRaw(_, _) => panic!("Cannot clone"),
Self::Ref(slice) => Self::Ref(slice),
Self::Owned(elt) => Self::Owned(elt.clone()),
}
}
}
impl<'a, T> Clone for OwnedRef<'a, T>
where
T: 'a + Sized + Clone,
{
fn clone(&self) -> Self {
match self {
Self::RefRaw(ptr, mrkr) => Self::RefRaw(*ptr, mrkr.clone()),
Self::Ref(slice) => Self::Ref(slice),
Self::Owned(elt) => Self::Owned(elt.clone()),
}
}
}
impl<'a, T> OwnedRef<'a, T>
where
T: 'a + ?Sized,
@ -82,6 +106,21 @@ where
);
Self::RefRaw(ptr, UnsafeMarker::new())
}
/// Returns true if the inner ref is a raw pointer, false otherwise.
#[must_use]
pub fn is_raw(&self) -> bool {
matches!(self, OwnedRef::Ref(_))
}
/// Return the inner value, if owned by the given object
#[must_use]
pub fn into_owned(self) -> Option<Box<T>> {
match self {
Self::Owned(val) => Some(val),
_ => None,
}
}
}
impl<'a, T> OwnedRef<'a, T>
@ -135,6 +174,17 @@ where
}
}
impl<'a> AsRef<[u8]> for OwnedRef<'a, [u8]> {
#[must_use]
fn as_ref(&self) -> &[u8] {
match self {
OwnedRef::RefRaw(r, _) => unsafe { (*r).as_ref().unwrap() },
OwnedRef::Ref(r) => r,
OwnedRef::Owned(v) => v.as_ref(),
}
}
}
impl<'a, T> AsRef<T> for OwnedRef<'a, T>
where
T: Sized,

View File

@ -13,7 +13,7 @@ BOOL APIENTRY DllMain(HANDLE hModule, DWORD ul_reason_for_call,
return TRUE;
}
#define EXTERN extern "C" __declspec(dllexport)
#define EXTERN extern "C" __declspec(dllexport)
#else
#define EXTERN
extern "C" {

View File

@ -35,10 +35,12 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
int __libafl_raw_mprotect(void *addr, size_t len, int prot);
void* mmap(void *addr, size_t length, int prot, int flags, int fd, off_t offset);
int munmap(void *addr, size_t length);
void *mremap(void *old_address, size_t old_size, size_t new_size, int flags, ... /* void *new_address */);
int mprotect(void *addr, size_t len, int prot);
void *mmap(void *addr, size_t length, int prot, int flags, int fd,
off_t offset);
int munmap(void *addr, size_t length);
void *mremap(void *old_address, size_t old_size, size_t new_size, int flags,
... /* void *new_address */);
int mprotect(void *addr, size_t len, int prot);
#ifdef __x86_64__
@ -197,9 +199,9 @@ __attribute__((constructor)) void __libafl_hotpatch(void) {
HOTPATCH(mmap)
HOTPATCH(munmap)
HOTPATCH(mprotect)
HOTPATCH(write)
HOTPATCH(_exit)
#undef HOTPATCH

View File

@ -3,34 +3,37 @@
#include <sys/mman.h>
#include <sys/syscall.h>
void* __libafl_raw_mmap(void *addr, size_t length, int prot, int flags, int fd, off_t offset) {
return (void*)syscall(SYS_mmap, addr, length, prot, flags, fd, offset);
void *__libafl_raw_mmap(void *addr, size_t length, int prot, int flags, int fd,
off_t offset) {
return (void *)syscall(SYS_mmap, addr, length, prot, flags, fd, offset);
}
int __libafl_raw_munmap(void *addr, size_t length) {
return syscall(SYS_munmap, addr, length);
return syscall(SYS_munmap, addr, length);
}
void *__libafl_raw_mremap(void *old_address, size_t old_size, size_t new_size, int flags, void *new_address) {
return (void*)syscall(SYS_mremap, old_address, old_size, new_size, flags, new_address);
void *__libafl_raw_mremap(void *old_address, size_t old_size, size_t new_size,
int flags, void *new_address) {
return (void *)syscall(SYS_mremap, old_address, old_size, new_size, flags,
new_address);
}
int __libafl_raw_mprotect(void *addr, size_t len, int prot) {
return syscall(SYS_mprotect, addr, len, prot);
return syscall(SYS_mprotect, addr, len, prot);
}
int __libafl_raw_madvise(void *addr, size_t length, int advice) {
return syscall(SYS_madvise, addr, length, advice);
return syscall(SYS_madvise, addr, length, advice);
}
ssize_t __libafl_raw_write(int fd, const void *buf, size_t count) {
return syscall(SYS_write, fd, buf, count);
return syscall(SYS_write, fd, buf, count);
}
ssize_t __libafl_raw_read(int fd, void *buf, size_t count) {
return syscall(SYS_read, fd, buf, count);
return syscall(SYS_read, fd, buf, count);
}
void __libafl_raw_exit_group(int status) {
syscall(SYS_exit_group, status);
syscall(SYS_exit_group, status);
}

View File

@ -188,6 +188,7 @@ async fn main() -> io::Result<()> {
r".*AFLplusplus.*",
r".*Little-CMS.*",
r".*cms_transform_fuzzer.cc.*",
r".*sqlite3.*",
])
.expect("Could not create the regex set from the given regex");

View File

@ -0,0 +1,7 @@
[package]
name = "multi_machine_generator"
authors = ["Romain Malmain <romain.malmain@pm.me>"]
edition = "2021"
[dependencies]
petgraph = "0.6"

View File

@ -0,0 +1,75 @@
use std::net::SocketAddr;
use petgraph::{Direction, Graph};
use petgraph::graph::NodeIndex;
/// A node of the network
#[derive(Debug, Clone)]
pub struct MultiMachineNode {
addr: SocketAddr
}
/// The tree
pub struct MultiMachineTree {
pub graph: Graph<MultiMachineNode, ()>
}
impl MultiMachineNode {
pub fn new(addr: SocketAddr) -> Self {
Self {
addr
}
}
}
impl MultiMachineTree {
/// Generate a multi-machine tree.
///
///
/// - machines: machines to add.
/// - max_children_per_parent: each parent will have at most this amount of children
pub fn generate(machines: &[SocketAddr], max_children_per_parent: u64) -> Self {
let mut graph = Graph::<MultiMachineNode, ()>::new();
let mut machines = Vec::from(machines);
let root = if let Some(root) = machines.pop() {
graph.add_node(MultiMachineNode::new(root))
} else {
return Self {
graph
};
};
let mut graph = Self { graph
};
let mut populate_idx = 0u64; // round-robin population to avoid congestion
let mut nodes_to_populate_now: Vec<NodeIndex> = vec![root]; // current nodes we are working on
let mut nodes_to_populate_later: Vec<NodeIndex> = Vec::new();
// place all the machines in the graph
while let Some(machine) = machines.pop() {
if graph.nb_children(nodes_to_populate_now[populate_idx as usize]) == max_children_per_parent {
nodes_to_populate_now = nodes_to_populate_later.drain(..).collect();
populate_idx = 0; // should be useless
}
let new_child = graph.add_child(nodes_to_populate_now[populate_idx as usize], MultiMachineNode::new(machine));
nodes_to_populate_later.push(new_child);
populate_idx = (populate_idx + 1) % nodes_to_populate_now.len() as u64;
}
graph
}
fn add_child(&mut self, parent: NodeIndex, child: MultiMachineNode) -> NodeIndex {
let child_idx = self.graph.add_node(child);
self.graph.add_edge(child_idx, parent, ());
child_idx
}
fn nb_children(&self, node: NodeIndex) -> u64 {
self.graph.neighbors_directed(node, Direction::Incoming).count() as u64
}
}

View File

@ -0,0 +1,45 @@
//! Multi Machine Generator
//!
//! Generates a ready-to-run multi-machine configuration, as a balanced tree.
//! A simple algorithm will first create such a tree, and associate IPs to them.
//! It will finally output a set of commands to run to have each fuzzer communicating correctly with the other machines of the network.
//!
//! We suppose everyone is on the same network and the machines have the fuzzer ready to run on each machine.
use std::fs;
use std::net::SocketAddr;
use std::str::FromStr;
use petgraph::dot::Dot;
use crate::graph::MultiMachineTree;
pub mod graph;
fn main() {
let machines = [
SocketAddr::from_str("0.0.0.1:50000").unwrap(),
SocketAddr::from_str("0.0.0.2:50000").unwrap(),
SocketAddr::from_str("0.0.0.3:50000").unwrap(),
SocketAddr::from_str("0.0.0.4:50000").unwrap(),
SocketAddr::from_str("0.0.0.5:50000").unwrap(),
SocketAddr::from_str("0.0.0.6:50000").unwrap(),
SocketAddr::from_str("0.0.0.7:50000").unwrap(),
SocketAddr::from_str("0.0.0.8:50000").unwrap(),
SocketAddr::from_str("0.0.0.9:50000").unwrap(),
SocketAddr::from_str("0.0.0.10:50000").unwrap(),
SocketAddr::from_str("0.0.0.11:50000").unwrap(),
SocketAddr::from_str("0.0.0.12:50000").unwrap(),
SocketAddr::from_str("0.0.0.13:50000").unwrap(),
SocketAddr::from_str("0.0.0.14:50000").unwrap(),
SocketAddr::from_str("0.0.0.15:50000").unwrap(),
SocketAddr::from_str("0.0.0.16:50000").unwrap(),
SocketAddr::from_str("0.0.0.17:50000").unwrap(),
SocketAddr::from_str("0.0.0.18:50000").unwrap(),
];
let multi_machine_graph = MultiMachineTree::generate(&machines, 3);
let dot = Dot::new(&multi_machine_graph.graph);
fs::write("multi_machine.dot", format!("{:?}", dot)).unwrap();
}