diff --git a/fuzzers/libfuzzer_libpng_tcp_manager/.gitignore b/fuzzers/libfuzzer_libpng_tcp_manager/.gitignore new file mode 100644 index 0000000000..a977a2ca5b --- /dev/null +++ b/fuzzers/libfuzzer_libpng_tcp_manager/.gitignore @@ -0,0 +1 @@ +libpng-* \ No newline at end of file diff --git a/fuzzers/libfuzzer_libpng_tcp_manager/Cargo.toml b/fuzzers/libfuzzer_libpng_tcp_manager/Cargo.toml new file mode 100644 index 0000000000..be33589158 --- /dev/null +++ b/fuzzers/libfuzzer_libpng_tcp_manager/Cargo.toml @@ -0,0 +1,33 @@ +[package] +name = "libfuzzer_libpng_tcp_manager" +version = "0.10.1" +authors = ["Andrea Fioraldi ", "Dominik Maier "] +edition = "2021" + +[features] +default = ["std"] +std = [] +# Forces a crash +crash = [] + +[profile.release] +lto = true +codegen-units = 1 +opt-level = 3 +debug = true + +[build-dependencies] +cc = { version = "1.0", features = ["parallel"] } +which = { version = "4.0.2" } + +[dependencies] +libafl = { path = "../../libafl/", features = ["default", "tcp_manager"] } +# libafl = { path = "../../libafl/", features = ["default"] } +libafl_targets = { path = "../../libafl_targets/", features = ["sancov_pcguard_hitcounts", "libfuzzer", "sancov_cmplog"] } +# TODO Include it only when building cc +libafl_cc = { path = "../../libafl_cc/" } +mimalloc = { version = "*", default-features = false } + +[lib] +name = "libfuzzer_libpng" +crate-type = ["staticlib"] diff --git a/fuzzers/libfuzzer_libpng_tcp_manager/Makefile.toml b/fuzzers/libfuzzer_libpng_tcp_manager/Makefile.toml new file mode 100644 index 0000000000..84486be845 --- /dev/null +++ b/fuzzers/libfuzzer_libpng_tcp_manager/Makefile.toml @@ -0,0 +1,198 @@ +# Variables +[env] +FUZZER_NAME='fuzzer_libpng' +PROJECT_DIR = { script = ["pwd"] } +CARGO_TARGET_DIR = { value = "${PROJECT_DIR}/target", condition = { env_not_set = ["CARGO_TARGET_DIR"] } } +LIBAFL_CC = '${CARGO_TARGET_DIR}/release/libafl_cc' +LIBAFL_CXX = '${CARGO_TARGET_DIR}/release/libafl_cxx' +FUZZER = '${CARGO_TARGET_DIR}/release/${FUZZER_NAME}' + +[tasks.unsupported] +script_runner="@shell" +script=''' +echo "Cargo-make not integrated yet on this" +''' + +# libpng +[tasks.libpng] +linux_alias = "libpng_unix" +mac_alias = "libpng_unix" +windows_alias = "unsupported" + +[tasks.libpng_unix] +condition = { files_not_exist = ["./libpng-1.6.37"]} +script_runner="@shell" +script=''' +curl https://deac-fra.dl.sourceforge.net/project/libpng/libpng16/1.6.37/libpng-1.6.37.tar.xz --output libpng-1.6.37.tar.xz +tar -xvf libpng-1.6.37.tar.xz +''' + +# Compilers +[tasks.cxx] +linux_alias = "cxx_unix" +mac_alias = "cxx_unix" +windows_alias = "unsupported" + +[tasks.cxx_unix] +command = "cargo" +args = ["build" , "--release"] + +[tasks.cc] +linux_alias = "cc_unix" +mac_alias = "cc_unix" +windows_alias = "unsupported" + +[tasks.cc_unix] +command = "cargo" +args = ["build" , "--release"] + +[tasks.crash_cxx] +linux_alias = "crash_cxx_unix" +mac_alias = "crash_cxx_unix" +windows_alias = "unsupported" + +[tasks.crash_cxx_unix] +command = "cargo" +args = ["build" , "--release", "--features=crash"] + +[tasks.crash_cc] +linux_alias = "crash_cc_unix" +mac_alias = "crash_cc_unix" +windows_alias = "unsupported" + +[tasks.crash_cc_unix] +command = "cargo" +args = ["build" , "--release", "--features=crash"] + +# Library +[tasks.lib] +linux_alias = "lib_unix" +mac_alias = "lib_unix" +windows_alias = "unsupported" + +[tasks.lib_unix] +script_runner="@shell" +script=''' +cd libpng-1.6.37 && ./configure --enable-shared=no --with-pic=yes --enable-hardware-optimizations=yes +cd "${PROJECT_DIR}" +make -C libpng-1.6.37 CC="${CARGO_TARGET_DIR}/release/libafl_cc" CXX="${CARGO_TARGET_DIR}/release/libafl_cxx" +''' +dependencies = [ "libpng", "cxx", "cc" ] + +# Library +[tasks.crash_lib] +linux_alias = "crash_lib_unix" +mac_alias = "crash_lib_unix" +windows_alias = "unsupported" + +[tasks.crash_lib_unix] +script_runner="@shell" +script=''' +cd libpng-1.6.37 && ./configure --enable-shared=no --with-pic=yes --enable-hardware-optimizations=yes +cd "${PROJECT_DIR}" +make -C libpng-1.6.37 CC="${CARGO_TARGET_DIR}/release/libafl_cc" CXX="${CARGO_TARGET_DIR}/release/libafl_cxx" +''' +dependencies = [ "libpng", "crash_cxx", "crash_cc" ] + +# Harness +[tasks.fuzzer] +linux_alias = "fuzzer_unix" +mac_alias = "fuzzer_unix" +windows_alias = "unsupported" + +[tasks.fuzzer_unix] +command = "${CARGO_TARGET_DIR}/release/libafl_cxx" +args = ["${PROJECT_DIR}/harness.cc", "${PROJECT_DIR}/libpng-1.6.37/.libs/libpng16.a", "-I", "${PROJECT_DIR}/libpng-1.6.37/", "-o", "${FUZZER_NAME}", "-lm", "-lz"] +dependencies = [ "lib", "cxx", "cc" ] + +# Crashing Harness +[tasks.fuzzer_crash] +linux_alias = "fuzzer_crash_unix" +mac_alias = "fuzzer_crash_unix" +windows_alias = "unsupported" + +[tasks.fuzzer_crash_unix] +command = "${CARGO_TARGET_DIR}/release/libafl_cxx" +args = ["${PROJECT_DIR}/harness.cc", "${PROJECT_DIR}/libpng-1.6.37/.libs/libpng16.a", "-I", "${PROJECT_DIR}/libpng-1.6.37/", "-o", "${FUZZER_NAME}_crash", "-lm", "-lz"] +dependencies = [ "crash_lib", "crash_cxx", "crash_cc" ] + +# Run the fuzzer +[tasks.run] +linux_alias = "run_unix" +mac_alias = "run_unix" +windows_alias = "unsupported" + +[tasks.run_unix] +script_runner = "@shell" +script=''' +./${FUZZER_NAME} & +sleep 0.2 +./${FUZZER_NAME} 2>/dev/null +''' +dependencies = [ "fuzzer" ] + + +# Run the fuzzer with a crash +[tasks.crash] +linux_alias = "crash_unix" +mac_alias = "crash_unix" +windows_alias = "unsupported" + +[tasks.crash_unix] +script_runner = "@shell" +script=''' +./${FUZZER_NAME}_crash & +sleep 0.2 +./${FUZZER_NAME}_crash 2>/dev/null +''' +dependencies = [ "fuzzer_crash" ] + + + +# Test +[tasks.test] +linux_alias = "test_unix" +mac_alias = "test_mac" +windows_alias = "unsupported" + +[tasks.test_unix] +script_runner = "@shell" +script=''' +rm -rf libafl_unix_shmem_server || true +(timeout 11s ./${FUZZER_NAME} >fuzz_stdout.log 2>/dev/null || true) & +sleep 0.2 +timeout 10s ./${FUZZER_NAME} >/dev/null 2>/dev/null || true +if [ -z "$(grep "corpus: 30" fuzz_stdout.log)" ]; then + echo "Fuzzer does not generate any testcases or any crashes" + exit 1 +else + echo "Fuzzer is working" +fi +''' +dependencies = [ "fuzzer" ] + +[tasks.test_mac] +script_runner = "@shell" +script=''' +rm -rf libafl_unix_shmem_server || true +(timeout 11s ./${FUZZER_NAME} >fuzz_stdout.log 2>/dev/null || true) & +sleep 0.2 +timeout 10s ./${FUZZER_NAME} >/dev/null 2>/dev/null || true +''' +dependencies = [ "fuzzer" ] + +# Clean up +[tasks.clean] +linux_alias = "clean_unix" +mac_alias = "clean_unix" +windows_alias = "unsupported" + +[tasks.clean_unix] +# Disable default `clean` definition +clear = true +script_runner="@shell" +script=''' +rm -f ./${FUZZER_NAME} +make -C libpng-1.6.37 clean +cargo clean +''' diff --git a/fuzzers/libfuzzer_libpng_tcp_manager/README.md b/fuzzers/libfuzzer_libpng_tcp_manager/README.md new file mode 100644 index 0000000000..51d39c8f6c --- /dev/null +++ b/fuzzers/libfuzzer_libpng_tcp_manager/README.md @@ -0,0 +1,81 @@ +# Libfuzzer for libpng + +This folder contains an example fuzzer for libpng, using LLMP for fast multi-process fuzzing and crash detection. + +In contrast to other fuzzer examples, this setup uses `fuzz_loop_for`, to occasionally respawn the fuzzer executor. +While this costs performance, it can be useful for targets with memory leaks or other instabilities. +If your target is really instable, however, consider exchanging the `InProcessExecutor` for a `ForkserverExecutor` instead. + +It also uses the `introspection` feature, printing fuzzer stats during execution. + +To show off crash detection, we added a `ud2` instruction to the harness, edit harness.cc if you want a non-crashing example. +It has been tested on Linux. + +## Build + +To build this example, run + +```bash +cargo build --release +``` + +This will build the library with the fuzzer (src/lib.rs) with the libfuzzer compatibility layer and the SanitizerCoverage runtime functions for coverage feedback. +In addition, it will also build two C and C++ compiler wrappers (bin/libafl_c(libafl_c/xx).rs) that you must use to compile the target. + +The compiler wrappers, `libafl_cc` and `libafl_cxx`, will end up in `./target/release/` (or `./target/debug`, in case you did not build with the `--release` flag). + +Then download libpng, and unpack the archive: +```bash +wget https://deac-fra.dl.sourceforge.net/project/libpng/libpng16/1.6.37/libpng-1.6.37.tar.xz +tar -xvf libpng-1.6.37.tar.xz +``` + +Now compile libpng, using the libafl_cc compiler wrapper: + +```bash +cd libpng-1.6.37 +./configure --enable-shared=no --with-pic=yes --enable-hardware-optimizations=yes +make CC="$(pwd)/../target/release/libafl_cc" CXX="$(pwd)/../target/release/libafl_cxx" -j `nproc` +``` + +You can find the static lib at `libpng-1.6.37/.libs/libpng16.a`. + +Now, we have to build the libfuzzer harness and link all together to create our fuzzer binary. + +``` +cd .. +./target/release/libafl_cxx ./harness.cc libpng-1.6.37/.libs/libpng16.a -I libpng-1.6.37/ -o fuzzer_libpng -lz -lm +``` + +Afterward, the fuzzer will be ready to run. +Note that, unless you use the `launcher`, you will have to run the binary multiple times to actually start the fuzz process, see `Run` in the following. +This allows you to run multiple different builds of the same fuzzer alongside, for example, with and without ASAN (`-fsanitize=address`) or with different mutators. + +## Run + +The first time you run the binary, the broker will open a tcp port (currently on port `1337`), waiting for fuzzer clients to connect. This port is local and only used for the initial handshake. All further communication happens via shared map, to be independent of the kernel. Currently, you must run the clients from the libfuzzer_libpng directory for them to be able to access the PNG corpus. + +``` +./fuzzer_libpng + +[libafl/src/bolts/llmp.rs:407] "We're the broker" = "We\'re the broker" +Doing broker things. Run this tool again to start fuzzing in a client. +``` + +And after running the above again in a separate terminal: + +``` +[libafl/src/bolts/llmp.rs:1464] "New connection" = "New connection" +[libafl/src/bolts/llmp.rs:1464] addr = 127.0.0.1:33500 +[libafl/src/bolts/llmp.rs:1464] stream.peer_addr().unwrap() = 127.0.0.1:33500 +[LOG Debug]: Loaded 4 initial testcases. +[New Testcase #2] clients: 3, corpus: 6, objectives: 0, executions: 5, exec/sec: 0 +< fuzzing stats > +``` + +As this example uses in-process fuzzing, we added a Restarting Event Manager (`setup_restarting_mgr`). +This means each client will start itself again to listen for crashes and timeouts. +By restarting the actual fuzzer, it can recover from these exit conditions. + +In any real-world scenario, you should use `taskset` to pin each client to an empty CPU core, the lib does not pick an empty core automatically (yet). + diff --git a/fuzzers/libfuzzer_libpng_tcp_manager/corpus/not_kitty.png b/fuzzers/libfuzzer_libpng_tcp_manager/corpus/not_kitty.png new file mode 100644 index 0000000000..eff7c1707b Binary files /dev/null and b/fuzzers/libfuzzer_libpng_tcp_manager/corpus/not_kitty.png differ diff --git a/fuzzers/libfuzzer_libpng_tcp_manager/corpus/not_kitty_alpha.png b/fuzzers/libfuzzer_libpng_tcp_manager/corpus/not_kitty_alpha.png new file mode 100644 index 0000000000..2fb8da2c8f Binary files /dev/null and b/fuzzers/libfuzzer_libpng_tcp_manager/corpus/not_kitty_alpha.png differ diff --git a/fuzzers/libfuzzer_libpng_tcp_manager/corpus/not_kitty_gamma.png b/fuzzers/libfuzzer_libpng_tcp_manager/corpus/not_kitty_gamma.png new file mode 100644 index 0000000000..939d9d29a9 Binary files /dev/null and b/fuzzers/libfuzzer_libpng_tcp_manager/corpus/not_kitty_gamma.png differ diff --git a/fuzzers/libfuzzer_libpng_tcp_manager/corpus/not_kitty_icc.png b/fuzzers/libfuzzer_libpng_tcp_manager/corpus/not_kitty_icc.png new file mode 100644 index 0000000000..f0c7804d99 Binary files /dev/null and b/fuzzers/libfuzzer_libpng_tcp_manager/corpus/not_kitty_icc.png differ diff --git a/fuzzers/libfuzzer_libpng_tcp_manager/harness.cc b/fuzzers/libfuzzer_libpng_tcp_manager/harness.cc new file mode 100644 index 0000000000..5c36517376 --- /dev/null +++ b/fuzzers/libfuzzer_libpng_tcp_manager/harness.cc @@ -0,0 +1,191 @@ +// libpng_read_fuzzer.cc +// Copyright 2017-2018 Glenn Randers-Pehrson +// Copyright 2015 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that may +// be found in the LICENSE file https://cs.chromium.org/chromium/src/LICENSE + +// Last changed in libpng 1.6.35 [July 15, 2018] + +// The modifications in 2017 by Glenn Randers-Pehrson include +// 1. addition of a PNG_CLEANUP macro, +// 2. setting the option to ignore ADLER32 checksums, +// 3. adding "#include " which is needed on some platforms +// to provide memcpy(). +// 4. adding read_end_info() and creating an end_info structure. +// 5. adding calls to png_set_*() transforms commonly used by browsers. + +#include +#include +#include + +#include + +#define PNG_INTERNAL +#include "png.h" + +#define PNG_CLEANUP \ + if (png_handler.png_ptr) { \ + if (png_handler.row_ptr) { \ + png_free(png_handler.png_ptr, png_handler.row_ptr); \ + } \ + if (png_handler.end_info_ptr) { \ + png_destroy_read_struct(&png_handler.png_ptr, &png_handler.info_ptr, \ + &png_handler.end_info_ptr); \ + } else if (png_handler.info_ptr) { \ + png_destroy_read_struct(&png_handler.png_ptr, &png_handler.info_ptr, \ + nullptr); \ + } else { \ + png_destroy_read_struct(&png_handler.png_ptr, nullptr, nullptr); \ + } \ + png_handler.png_ptr = nullptr; \ + png_handler.row_ptr = nullptr; \ + png_handler.info_ptr = nullptr; \ + png_handler.end_info_ptr = nullptr; \ + } + +struct BufState { + const uint8_t *data; + size_t bytes_left; +}; + +struct PngObjectHandler { + png_infop info_ptr = nullptr; + png_structp png_ptr = nullptr; + png_infop end_info_ptr = nullptr; + png_voidp row_ptr = nullptr; + BufState *buf_state = nullptr; + + ~PngObjectHandler() { + if (row_ptr) { png_free(png_ptr, row_ptr); } + if (end_info_ptr) { + png_destroy_read_struct(&png_ptr, &info_ptr, &end_info_ptr); + } else if (info_ptr) { + png_destroy_read_struct(&png_ptr, &info_ptr, nullptr); + } else { + png_destroy_read_struct(&png_ptr, nullptr, nullptr); + } + delete buf_state; + } +}; + +void user_read_data(png_structp png_ptr, png_bytep data, size_t length) { + BufState *buf_state = static_cast(png_get_io_ptr(png_ptr)); + if (length > buf_state->bytes_left) { png_error(png_ptr, "read error"); } + memcpy(data, buf_state->data, length); + buf_state->bytes_left -= length; + buf_state->data += length; +} + +static const int kPngHeaderSize = 8; + +// Entry point for LibFuzzer. +// Roughly follows the libpng book example: +// http://www.libpng.org/pub/png/book/chapter13.html +extern "C" int LLVMFuzzerTestOneInput(const uint8_t *data, size_t size) { + if (size < kPngHeaderSize) { return 0; } + + std::vector v(data, data + size); + if (png_sig_cmp(v.data(), 0, kPngHeaderSize)) { + // not a PNG. + return 0; + } + + PngObjectHandler png_handler; + png_handler.png_ptr = nullptr; + png_handler.row_ptr = nullptr; + png_handler.info_ptr = nullptr; + png_handler.end_info_ptr = nullptr; + + png_handler.png_ptr = + png_create_read_struct(PNG_LIBPNG_VER_STRING, nullptr, nullptr, nullptr); + if (!png_handler.png_ptr) { return 0; } + + png_handler.info_ptr = png_create_info_struct(png_handler.png_ptr); + if (!png_handler.info_ptr) { + PNG_CLEANUP + return 0; + } + + png_handler.end_info_ptr = png_create_info_struct(png_handler.png_ptr); + if (!png_handler.end_info_ptr) { + PNG_CLEANUP + return 0; + } + + png_set_crc_action(png_handler.png_ptr, PNG_CRC_QUIET_USE, PNG_CRC_QUIET_USE); +#ifdef PNG_IGNORE_ADLER32 + png_set_option(png_handler.png_ptr, PNG_IGNORE_ADLER32, PNG_OPTION_ON); +#endif + + // Setting up reading from buffer. + png_handler.buf_state = new BufState(); + png_handler.buf_state->data = data + kPngHeaderSize; + png_handler.buf_state->bytes_left = size - kPngHeaderSize; + png_set_read_fn(png_handler.png_ptr, png_handler.buf_state, user_read_data); + png_set_sig_bytes(png_handler.png_ptr, kPngHeaderSize); + + if (setjmp(png_jmpbuf(png_handler.png_ptr))) { + PNG_CLEANUP + return 0; + } + + // Reading. + png_read_info(png_handler.png_ptr, png_handler.info_ptr); + + // reset error handler to put png_deleter into scope. + if (setjmp(png_jmpbuf(png_handler.png_ptr))) { + PNG_CLEANUP + return 0; + } + + png_uint_32 width, height; + int bit_depth, color_type, interlace_type, compression_type; + int filter_type; + + if (!png_get_IHDR(png_handler.png_ptr, png_handler.info_ptr, &width, &height, + &bit_depth, &color_type, &interlace_type, &compression_type, + &filter_type)) { + PNG_CLEANUP + return 0; + } + + // This is going to be too slow. + if (width && height > 100000000 / width) { + PNG_CLEANUP +#ifdef HAS_DUMMY_CRASH + #ifdef __aarch64__ + asm volatile(".word 0xf7f0a000\n"); + #else + asm("ud2"); + #endif +#endif + return 0; + } + + // Set several transforms that browsers typically use: + png_set_gray_to_rgb(png_handler.png_ptr); + png_set_expand(png_handler.png_ptr); + png_set_packing(png_handler.png_ptr); + png_set_scale_16(png_handler.png_ptr); + png_set_tRNS_to_alpha(png_handler.png_ptr); + + int passes = png_set_interlace_handling(png_handler.png_ptr); + + png_read_update_info(png_handler.png_ptr, png_handler.info_ptr); + + png_handler.row_ptr = + png_malloc(png_handler.png_ptr, + png_get_rowbytes(png_handler.png_ptr, png_handler.info_ptr)); + + for (int pass = 0; pass < passes; ++pass) { + for (png_uint_32 y = 0; y < height; ++y) { + png_read_row(png_handler.png_ptr, + static_cast(png_handler.row_ptr), nullptr); + } + } + + png_read_end(png_handler.png_ptr, png_handler.end_info_ptr); + + PNG_CLEANUP + return 0; +} diff --git a/fuzzers/libfuzzer_libpng_tcp_manager/src/bin/libafl_cc.rs b/fuzzers/libfuzzer_libpng_tcp_manager/src/bin/libafl_cc.rs new file mode 100644 index 0000000000..69f3766586 --- /dev/null +++ b/fuzzers/libfuzzer_libpng_tcp_manager/src/bin/libafl_cc.rs @@ -0,0 +1,36 @@ +use std::env; + +use libafl_cc::{ClangWrapper, CompilerWrapper}; + +pub fn main() { + let args: Vec = env::args().collect(); + if args.len() > 1 { + let mut dir = env::current_exe().unwrap(); + let wrapper_name = dir.file_name().unwrap().to_str().unwrap(); + + let is_cpp = match wrapper_name[wrapper_name.len()-2..].to_lowercase().as_str() { + "cc" => false, + "++" | "pp" | "xx" => true, + _ => panic!("Could not figure out if c or c++ wrapper was called. Expected {dir:?} to end with c or cxx"), + }; + + dir.pop(); + + let mut cc = ClangWrapper::new(); + if let Some(code) = cc + .cpp(is_cpp) + // silence the compiler wrapper output, needed for some configure scripts. + .silence(true) + .parse_args(&args) + .expect("Failed to parse the command line") + .link_staticlib(&dir, "libfuzzer_libpng") + .add_arg("-fsanitize-coverage=trace-pc-guard") + .run() + .expect("Failed to run the wrapped compiler") + { + std::process::exit(code); + } + } else { + panic!("LibAFL CC: No Arguments given"); + } +} diff --git a/fuzzers/libfuzzer_libpng_tcp_manager/src/bin/libafl_cxx.rs b/fuzzers/libfuzzer_libpng_tcp_manager/src/bin/libafl_cxx.rs new file mode 100644 index 0000000000..dabd22971a --- /dev/null +++ b/fuzzers/libfuzzer_libpng_tcp_manager/src/bin/libafl_cxx.rs @@ -0,0 +1,5 @@ +pub mod libafl_cc; + +fn main() { + libafl_cc::main(); +} diff --git a/fuzzers/libfuzzer_libpng_tcp_manager/src/lib.rs b/fuzzers/libfuzzer_libpng_tcp_manager/src/lib.rs new file mode 100644 index 0000000000..f4ab8894cc --- /dev/null +++ b/fuzzers/libfuzzer_libpng_tcp_manager/src/lib.rs @@ -0,0 +1,220 @@ +//! A libfuzzer-like fuzzer with llmp-multithreading support and restarts +//! The example harness is built for libpng. +use mimalloc::MiMalloc; +#[global_allocator] +static GLOBAL: MiMalloc = MiMalloc; + +use core::time::Duration; +#[cfg(feature = "crash")] +use std::ptr; +use std::{env, path::PathBuf}; + +use libafl::{ + bolts::{ + current_nanos, + rands::StdRand, + tuples::{tuple_list, Merge}, + AsSlice, + }, + corpus::{Corpus, InMemoryCorpus, OnDiskCorpus}, + events::tcp::setup_restarting_mgr_tcp, + events::{EventConfig, EventRestarter}, + executors::{inprocess::InProcessExecutor, ExitKind, TimeoutExecutor}, + feedback_or, feedback_or_fast, + feedbacks::{CrashFeedback, MaxMapFeedback, TimeFeedback, TimeoutFeedback}, + fuzzer::{Fuzzer, StdFuzzer}, + inputs::{BytesInput, HasTargetBytes}, + monitors::MultiMonitor, + mutators::{ + scheduled::{havoc_mutations, tokens_mutations, StdScheduledMutator}, + token_mutations::Tokens, + }, + observers::{HitcountsMapObserver, StdMapObserver, TimeObserver}, + schedulers::{ + powersched::PowerSchedule, IndexesLenTimeMinimizerScheduler, StdWeightedScheduler, + }, + stages::{calibrate::CalibrationStage, power::StdPowerMutationalStage}, + state::{HasCorpus, HasMetadata, StdState}, + Error, +}; +use libafl_targets::{libfuzzer_initialize, libfuzzer_test_one_input, EDGES_MAP, MAX_EDGES_NUM}; + +/// The main fn, `no_mangle` as it is a C main +#[no_mangle] +pub extern "C" fn libafl_main() { + // Registry the metadata types used in this fuzzer + // Needed only on no_std + //RegistryBuilder::register::(); + + println!( + "Workdir: {:?}", + env::current_dir().unwrap().to_string_lossy().to_string() + ); + fuzz( + &[PathBuf::from("./corpus")], + PathBuf::from("./crashes"), + 1337, + ) + .expect("An error occurred while fuzzing"); +} + +/// The actual fuzzer +fn fuzz(corpus_dirs: &[PathBuf], objective_dir: PathBuf, broker_port: u16) -> Result<(), Error> { + // 'While the stats are state, they are usually used in the broker - which is likely never restarted + let monitor = MultiMonitor::new(|s| println!("{s}")); + + // The restarting state will spawn the same process again as child, then restarted it each time it crashes. + let (state, mut restarting_mgr) = + match setup_restarting_mgr_tcp(monitor, broker_port, EventConfig::AlwaysUnique) { + Ok(res) => res, + Err(err) => match err { + Error::ShuttingDown => { + return Ok(()); + } + _ => { + panic!("Failed to setup the restarter: {err}"); + } + }, + }; + + // Create an observation channel using the coverage map + let edges_observer = unsafe { + HitcountsMapObserver::new(StdMapObserver::from_mut_ptr( + "edges", + EDGES_MAP.as_mut_ptr(), + MAX_EDGES_NUM, + )) + }; + + // Create an observation channel to keep track of the execution time + let time_observer = TimeObserver::new("time"); + + let map_feedback = MaxMapFeedback::tracking(&edges_observer, true, false); + + let calibration = CalibrationStage::new(&map_feedback); + + // Feedback to rate the interestingness of an input + // This one is composed by two Feedbacks in OR + let mut feedback = feedback_or!( + // New maximization map feedback linked to the edges observer and the feedback state + map_feedback, + // Time feedback, this one does not need a feedback state + TimeFeedback::with_observer(&time_observer) + ); + + // A feedback to choose if an input is a solution or not + let mut objective = feedback_or_fast!(CrashFeedback::new(), TimeoutFeedback::new()); + + // If not restarting, create a State from scratch + let mut state = state.unwrap_or_else(|| { + StdState::new( + // RNG + StdRand::with_seed(current_nanos()), + // Corpus that will be evolved, we keep it in memory for performance + InMemoryCorpus::new(), + // Corpus in which we store solutions (crashes in this example), + // on disk so the user can get them after stopping the fuzzer + OnDiskCorpus::new(objective_dir).unwrap(), + // States of the feedbacks. + // The feedbacks can report the data that should persist in the State. + &mut feedback, + // Same for objective feedbacks + &mut objective, + ) + .unwrap() + }); + + println!("We're a client, let's fuzz :)"); + + // Create a PNG dictionary if not existing + if state.metadata_map().get::().is_none() { + state.add_metadata(Tokens::from([ + vec![137, 80, 78, 71, 13, 10, 26, 10], // PNG header + "IHDR".as_bytes().to_vec(), + "IDAT".as_bytes().to_vec(), + "PLTE".as_bytes().to_vec(), + "IEND".as_bytes().to_vec(), + ])); + } + + // Setup a basic mutator with a mutational stage + + let mutator = StdScheduledMutator::new(havoc_mutations().merge(tokens_mutations())); + + let power = StdPowerMutationalStage::new(mutator); + + let mut stages = tuple_list!(calibration, power); + + // A minimization+queue policy to get testcasess from the corpus + let scheduler = IndexesLenTimeMinimizerScheduler::new(StdWeightedScheduler::with_schedule( + &mut state, + &edges_observer, + Some(PowerSchedule::FAST), + )); + + // A fuzzer with feedbacks and a corpus scheduler + let mut fuzzer = StdFuzzer::new(scheduler, feedback, objective); + + // The wrapped harness function, calling out to the LLVM-style harness + let mut harness = |input: &BytesInput| { + let target = input.target_bytes(); + let buf = target.as_slice(); + #[cfg(feature = "crash")] + if buf.len() > 4 && buf[4] == 0 { + unsafe { + eprintln!("Crashing (for testing purposes)"); + let addr = ptr::null_mut(); + *addr = 1; + } + } + libfuzzer_test_one_input(buf); + ExitKind::Ok + }; + + // Create the executor for an in-process function with one observer for edge coverage and one for the execution time + let mut executor = TimeoutExecutor::new( + InProcessExecutor::new( + &mut harness, + tuple_list!(edges_observer, time_observer), + &mut fuzzer, + &mut state, + &mut restarting_mgr, + )?, + // 10 seconds timeout + Duration::new(10, 0), + ); + + // The actual target run starts here. + // Call LLVMFUzzerInitialize() if present. + let args: Vec = env::args().collect(); + if libfuzzer_initialize(&args) == -1 { + println!("Warning: LLVMFuzzerInitialize failed with -1"); + } + + // In case the corpus is empty (on first run), reset + if state.must_load_initial_inputs() { + state + .load_initial_inputs(&mut fuzzer, &mut executor, &mut restarting_mgr, corpus_dirs) + .unwrap_or_else(|_| panic!("Failed to load initial corpus at {:?}", &corpus_dirs)); + println!("We imported {} inputs from disk.", state.corpus().count()); + } + + // This fuzzer restarts after 1 mio `fuzz_one` executions. + // Each fuzz_one will internally do many executions of the target. + // If your target is very instable, setting a low count here may help. + // However, you will lose a lot of performance that way. + let iters = 1_000_000; + fuzzer.fuzz_loop_for( + &mut stages, + &mut executor, + &mut state, + &mut restarting_mgr, + iters, + )?; + + // It's important, that we store the state before restarting! + // Else, the parent will not respawn a new child and quit. + restarting_mgr.on_restart(&mut state)?; + + Ok(()) +} diff --git a/libafl/Cargo.toml b/libafl/Cargo.toml index 265df683ba..7f22ee637d 100644 --- a/libafl/Cargo.toml +++ b/libafl/Cargo.toml @@ -33,6 +33,7 @@ corpus_btreemap = [] # Switches from HashMap to BTreeMap for CorpusId gzip = ["miniz_oxide"] # Enables gzip compression in certain parts of the lib regex = ["std", "dep:regex"] # enables the NaiveTokenizer and StacktraceObserver casr = ["libcasr", "std", "regex"] # enables deduplication based on libcasr for StacktraceObserver +tcp_manager = ["tokio", "std"] # A simple EventManager proxying everything via TCP # features hiding dependencies licensed under GPL gpl = [] @@ -85,15 +86,16 @@ uuid = { version = "1.1.2", optional = true, features = ["serde", "v4"] } byteorder = { version = "1.4", optional = true } once_cell = { version = "1.13", optional = true } libm = "0.2.2" -tui = { version = "0.19", default-features = false, features = ['crossterm'], optional = true } +tui = { version = "0.19", default-features = false, features = ['crossterm'], optional = true } # Commandline rendering, for TUI Monitor crossterm = { version = "0.26", optional = true } -clap = {version = "4.0", features = ["derive", "wrap_help"], optional = true} +clap = {version = "4.0", features = ["derive", "wrap_help"], optional = true} # CLI parsing, for bolts::cli / the `cli` feature -prometheus-client = { version= "0.19", optional = true} +prometheus-client = { version= "0.19", optional = true} # For the prometheus monitor tide = { version = "0.16.0", optional = true } async-std = { version = "1.12.0", features = ["attributes"], optional = true } futures = { version = "0.3.24", optional = true } log = "0.4.18" +tokio = { version = "1.28.1", optional = true, features = ["sync", "net", "rt", "io-util", "macros"] } # only used for TCP Event Manager right now wait-timeout = { version = "0.2", optional = true } # used by CommandExecutor to wait for child process diff --git a/libafl/src/events/mod.rs b/libafl/src/events/mod.rs index 83e0e26f07..f5bb415092 100644 --- a/libafl/src/events/mod.rs +++ b/libafl/src/events/mod.rs @@ -1,10 +1,13 @@ -//! Eventmanager manages all events that go to other instances of the fuzzer. +//! An [`EventManager`] manages all events that go to other instances of the fuzzer. +//! The messages are commonly information about new Testcases as well as stats and other [`Event`]s. pub mod simple; pub use simple::*; pub mod centralized; pub use centralized::*; pub mod llmp; +#[cfg(feature = "tcp_manager")] +pub mod tcp; use alloc::{boxed::Box, string::String, vec::Vec}; #[cfg(all(unix, feature = "std"))] use core::ffi::c_void; @@ -110,7 +113,7 @@ impl Handler for ShutdownSignalData { } /// A per-fuzzer unique `ID`, usually starting with `0` and increasing -/// by `1` in multiprocessed `EventManager`s, such as [`self::llmp::LlmpEventManager`]. +/// by `1` in multiprocessed [`EventManager`]s, such as [`self::llmp::LlmpEventManager`]. #[derive(Clone, Copy, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Hash)] #[repr(transparent)] pub struct EventManagerId( diff --git a/libafl/src/events/tcp.rs b/libafl/src/events/tcp.rs new file mode 100644 index 0000000000..04fd461074 --- /dev/null +++ b/libafl/src/events/tcp.rs @@ -0,0 +1,1117 @@ +//! TCP-backed event manager for scalable multi-processed fuzzing + +use alloc::{ + boxed::Box, + string::{String, ToString}, + vec::Vec, +}; +use core::{ + marker::PhantomData, + num::NonZeroUsize, + sync::atomic::{compiler_fence, Ordering}, +}; +use std::{ + io::{ErrorKind, Read, Write}, + net::{SocketAddr, TcpListener, TcpStream, ToSocketAddrs}, +}; + +use serde::{de::DeserializeOwned, Deserialize, Serialize}; +use tokio::{ + io::{AsyncReadExt, AsyncWriteExt}, + sync::{broadcast, mpsc}, + task::spawn, +}; +#[cfg(feature = "std")] +use typed_builder::TypedBuilder; + +use super::{CustomBufEventResult, CustomBufHandlerFn}; +#[cfg(feature = "std")] +use crate::bolts::core_affinity::CoreId; +#[cfg(all(feature = "std", any(windows, not(feature = "fork"))))] +use crate::bolts::os::startable_self; +#[cfg(all(unix, feature = "std", not(miri)))] +use crate::bolts::os::unix_signals::setup_signal_handler; +#[cfg(all(feature = "std", feature = "fork", unix))] +use crate::bolts::os::{fork, ForkResult}; +#[cfg(feature = "std")] +use crate::bolts::{shmem::StdShMemProvider, staterestore::StateRestorer}; +#[cfg(all(unix, feature = "std"))] +use crate::events::{shutdown_handler, SHUTDOWN_SIGHANDLER_DATA}; +use crate::{ + bolts::{shmem::ShMemProvider, ClientId}, + events::{ + BrokerEventResult, Event, EventConfig, EventFirer, EventManager, EventManagerId, + EventProcessor, EventRestarter, HasCustomBufHandlers, HasEventManagerId, ProgressReporter, + }, + executors::{Executor, HasObservers}, + fuzzer::{EvaluatorObservers, ExecutionProcessor}, + inputs::{Input, UsesInput}, + monitors::Monitor, + state::{HasClientPerfMonitor, HasExecutions, HasMetadata, UsesState}, + Error, +}; + +/// Tries to create (synchronously) a [`TcpListener`] that is `nonblocking` (for later use in tokio). +/// Will error if the port is already in use (or other errors occur) +fn create_nonblocking_listener(addr: A) -> Result { + let listener = std::net::TcpListener::bind(addr)?; + listener.set_nonblocking(true)?; + Ok(listener) +} + +/// An TCP-backed event manager for simple multi-processed fuzzing +#[derive(Debug)] +pub struct TcpEventBroker +where + I: Input, + MT: Monitor, + //CE: CustomEvent, +{ + monitor: MT, + /// A `nonblocking` [`TcpListener`] that we will `take` and convert to a Tokio listener in [`Self::broker_loop()`]. + listener: Option, + /// Amount of all clients ever, after which (when all are disconnected) this broker should quit. + exit_cleanly_after: Option, + phantom: PhantomData, +} + +impl TcpEventBroker +where + I: Input, + MT: Monitor, +{ + /// Create a TCP broker, listening on the given address. + pub fn new(addr: A, monitor: MT) -> Result { + Ok(Self::with_listener( + create_nonblocking_listener(addr)?, + monitor, + )) + } + + /// Create a TCP broker, with a listener that needs to already be bound to an address. + pub fn with_listener(listener: TcpListener, monitor: MT) -> Self { + Self { + listener: Some(listener), + monitor, + phantom: PhantomData, + exit_cleanly_after: None, + } + } + + /// Exit the broker process cleanly after at least `n` clients attached and all of them disconnected again + pub fn set_exit_cleanly_after(&mut self, n_clients: NonZeroUsize) { + self.exit_cleanly_after = Some(n_clients); + } + + /// Run in the broker until all clients exit + #[tokio::main(flavor = "current_thread")] + pub async fn broker_loop(&mut self) -> Result<(), Error> { + let (tx_bc, rx) = broadcast::channel(128); + let (tx, mut rx_mpsc) = mpsc::channel(128); + + let exit_cleanly_after = self.exit_cleanly_after; + + let listener = self + .listener + .take() + .ok_or_else(|| Error::illegal_state("Listener has already been used / was none"))?; + let listener = tokio::net::TcpListener::from_std(listener)?; + + let tokio_broker = spawn(async move { + let mut recv_handles = vec![]; + + loop { + if let Some(max_clients) = exit_cleanly_after { + if max_clients.get() <= recv_handles.len() { + // we waited fro all the clients we wanted to see attached. Now wait for them to close their tcp connections. + break; + } + } + + //println!("loop"); + // Asynchronously wait for an inbound socket. + let (socket, _) = listener.accept().await.expect("test"); + let (mut read, mut write) = tokio::io::split(socket); + // ClientIds for this broker start at 0. + let this_client_id = ClientId(recv_handles.len().try_into().unwrap()); + let this_client_id_bytes = this_client_id.0.to_le_bytes(); + + // Send the client id for this node; + write.write_all(&this_client_id_bytes).await.unwrap(); + + let tx_inner = tx.clone(); + let mut rx_inner = rx.resubscribe(); + // Keep all handles around. + recv_handles.push(spawn(async move { + // In a loop, read data from the socket and write the data back. + loop { + let mut len_buf = [0; 4]; + + read.read_exact(&mut len_buf).await.expect("Socket closed?"); + + let mut len = u32::from_le_bytes(len_buf); + // we forward the sender id as well, so we add 4 bytes to the message length + len += 4; + + #[cfg(feature = "tcp_debug")] + println!("len +4 = {len:?}"); + + let mut buf = vec![0; len as usize]; + + read.read_exact(&mut buf) + .await + .expect("failed to read data from socket"); + + #[cfg(feature = "tcp_debug")] + println!("len: {len:?} - {buf:?}"); + tx_inner.send(buf).await.expect("Could not send"); + } + })); + // The forwarding end. No need to keep a handle to this (TODO: unless they don't quit/get stuck?) + spawn(async move { + // In a loop, read data from the socket and write the data back. + loop { + let buf: Vec = rx_inner.recv().await.expect("Could not receive"); + + #[cfg(feature = "tcp_debug")] + println!("{buf:?}"); + + if buf.len() <= 4 { + eprintln!("We got no contents (or only the length) in a broadcast"); + continue; + } + + if buf[..4] == this_client_id_bytes { + #[cfg(feature = "tcp_debug")] + eprintln!( + "Not forwarding message from this very client ({this_client_id:?})." + ); + continue; + } + + // subtract 4 since the client_id isn't part of the actual message. + let len = u32::try_from(buf.len() - 4).unwrap(); + let len_buf: [u8; 4] = len.to_le_bytes(); + + // Write message length + write.write_all(&len_buf).await.expect("Writing failed"); + // Write the rest + write.write_all(&buf).await.expect("Socket closed?"); + } + }); + } + println!("joining handles.."); + // wait for all clients to exit/error out + for recv_handle in recv_handles { + drop(recv_handle.await); + } + }); + + loop { + let buf = rx_mpsc.recv().await.expect("Could not receive"); + + // read client ID. + let mut client_id_buf = [0_u8; 4]; + client_id_buf.copy_from_slice(&buf[..4]); + + let client_id = ClientId(u32::from_le_bytes(client_id_buf)); + + // cut off the ID. + let event_bytes = &buf[4..]; + + let event: Event = postcard::from_bytes(event_bytes).unwrap(); + match Self::handle_in_broker(&mut self.monitor, client_id, &event).unwrap() { + BrokerEventResult::Forward => { + tx_bc.send(buf).expect("Could not send"); + } + BrokerEventResult::Handled => (), + } + + if tokio_broker.is_finished() { + tokio_broker.await.unwrap(); + break; + } + } + + #[cfg(feature = "tcp_debug")] + println!("The last client quit. Exiting."); + + Err(Error::shutting_down()) + } + + /// Handle arriving events in the broker + #[allow(clippy::unnecessary_wraps)] + fn handle_in_broker( + monitor: &mut MT, + client_id: ClientId, + event: &Event, + ) -> Result { + match &event { + Event::NewTestcase { + input: _, + client_config: _, + exit_kind: _, + corpus_size, + observers_buf: _, + time, + executions, + forward_id, + } => { + let id = if let Some(id) = *forward_id { + id + } else { + client_id + }; + let client = monitor.client_stats_mut_for(id); + client.update_corpus_size(*corpus_size as u64); + client.update_executions(*executions as u64, *time); + monitor.display(event.name().to_string(), id); + Ok(BrokerEventResult::Forward) + } + Event::UpdateExecStats { + time, + executions, + phantom: _, + } => { + // TODO: The monitor buffer should be added on client add. + let client = monitor.client_stats_mut_for(client_id); + client.update_executions(*executions as u64, *time); + monitor.display(event.name().to_string(), client_id); + Ok(BrokerEventResult::Handled) + } + Event::UpdateUserStats { + name, + value, + phantom: _, + } => { + let client = monitor.client_stats_mut_for(client_id); + client.update_user_stats(name.clone(), value.clone()); + monitor.display(event.name().to_string(), client_id); + Ok(BrokerEventResult::Handled) + } + #[cfg(feature = "introspection")] + Event::UpdatePerfMonitor { + time, + executions, + introspection_monitor, + phantom: _, + } => { + // TODO: The monitor buffer should be added on client add. + + // Get the client for the staterestorer ID + let client = monitor.client_stats_mut_for(client_id); + + // Update the normal monitor for this client + client.update_executions(*executions as u64, *time); + + // Update the performance monitor for this client + client.update_introspection_monitor((**introspection_monitor).clone()); + + // Display the monitor via `.display` only on core #1 + monitor.display(event.name().to_string(), client_id); + + // Correctly handled the event + Ok(BrokerEventResult::Handled) + } + Event::Objective { objective_size } => { + let client = monitor.client_stats_mut_for(client_id); + client.update_objective_size(*objective_size as u64); + monitor.display(event.name().to_string(), client_id); + Ok(BrokerEventResult::Handled) + } + Event::Log { + severity_level, + message, + phantom: _, + } => { + let (_, _) = (severity_level, message); + // TODO rely on Monitor + log::log!((*severity_level).into(), "{message}"); + Ok(BrokerEventResult::Handled) + } + Event::CustomBuf { .. } => Ok(BrokerEventResult::Forward), + //_ => Ok(BrokerEventResult::Forward), + } + } +} + +/// An [`EventManager`] that forwards all events to other attached fuzzers on shared maps or via tcp, +/// using low-level message passing, [`crate::bolts::tcp`]. +pub struct TcpEventManager +where + S: UsesInput, +{ + /// The TCP stream for inter process communication + tcp: TcpStream, + /// Our `CientId` + client_id: ClientId, + /// The custom buf handler + custom_buf_handlers: Vec>>, + #[cfg(feature = "tcp_compression")] + compressor: GzipCompressor, + /// The configuration defines this specific fuzzer. + /// A node will not re-use the observer values sent over TCP + /// from nodes with other configurations. + configuration: EventConfig, + phantom: PhantomData, +} + +impl core::fmt::Debug for TcpEventManager +where + S: UsesInput, +{ + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + let mut debug_struct = f.debug_struct("TcpEventManager"); + let debug = debug_struct.field("tcp", &self.tcp); + //.field("custom_buf_handlers", &self.custom_buf_handlers) + #[cfg(feature = "tcp_compression")] + let debug = debug.field("compressor", &self.compressor); + debug + .field("configuration", &self.configuration) + .field("phantom", &self.phantom) + .finish_non_exhaustive() + } +} + +impl Drop for TcpEventManager +where + S: UsesInput, +{ + /// TCP clients will have to wait until their pages are mapped by somebody. + fn drop(&mut self) { + self.await_restart_safe(); + } +} + +impl TcpEventManager +where + S: UsesInput + HasExecutions + HasClientPerfMonitor, +{ + /// Create a manager from a raw TCP client + pub fn new(addr: &A, configuration: EventConfig) -> Result { + let mut tcp = TcpStream::connect(addr)?; + + let mut our_client_id_buf = [0_u8; 4]; + tcp.read_exact(&mut our_client_id_buf).unwrap(); + let client_id = ClientId(u32::from_le_bytes(our_client_id_buf)); + + println!("Our client id: {client_id:?}"); + + Ok(Self { + tcp, + client_id, + #[cfg(feature = "tcp_compression")] + compressor: GzipCompressor::new(COMPRESS_THRESHOLD), + configuration, + phantom: PhantomData, + custom_buf_handlers: vec![], + }) + } + + /// Create an TCP event manager on a port + /// + /// If the port is not yet bound, it will act as a broker; otherwise, it + /// will act as a client. + #[cfg(feature = "std")] + pub fn on_port(port: u16, configuration: EventConfig) -> Result { + Self::new(&("127.0.0.1", port), configuration) + } + + // Handle arriving events in the client + #[allow(clippy::unused_self)] + fn handle_in_client( + &mut self, + fuzzer: &mut Z, + executor: &mut E, + state: &mut S, + client_id: ClientId, + event: Event, + ) -> Result<(), Error> + where + E: Executor + HasObservers, + for<'a> E::Observers: Deserialize<'a>, + Z: ExecutionProcessor + EvaluatorObservers, + { + match event { + Event::NewTestcase { + input, + client_config, + exit_kind, + corpus_size: _, + observers_buf, + time: _, + executions: _, + forward_id, + } => { + log::info!("Received new Testcase from {client_id:?} ({client_config:?}, forward {forward_id:?})"); + + let _res = if client_config.match_with(&self.configuration) + && observers_buf.is_some() + { + let observers: E::Observers = + postcard::from_bytes(observers_buf.as_ref().unwrap())?; + fuzzer.process_execution(state, self, input, &observers, &exit_kind, false)? + } else { + fuzzer.evaluate_input_with_observers::( + state, executor, self, input, false, + )? + }; + if let Some(item) = _res.1 { + log::info!("Added received Testcase as item #{item}"); + } + Ok(()) + } + Event::CustomBuf { tag, buf } => { + for handler in &mut self.custom_buf_handlers { + if handler(state, &tag, &buf)? == CustomBufEventResult::Handled { + break; + } + } + Ok(()) + } + _ => Err(Error::unknown(format!( + "Received illegal message that message should not have arrived: {:?}.", + event.name() + ))), + } + } +} + +impl TcpEventManager +where + S: UsesInput, +{ + /// Send information that this client is exiting. + /// The other side may free up all allocated memory. + /// We are no longer allowed to send anything afterwards. + pub fn send_exiting(&mut self) -> Result<(), Error> { + //TODO: Should not be needed since TCP does that for us + //self.tcp.sender.send_exiting() + Ok(()) + } +} + +impl UsesState for TcpEventManager +where + S: UsesInput, +{ + type State = S; +} + +impl EventFirer for TcpEventManager +where + S: UsesInput, +{ + #[cfg(feature = "tcp_compression")] + fn fire( + &mut self, + _state: &mut Self::State, + event: Event<::Input>, + ) -> Result<(), Error> { + let serialized = postcard::to_allocvec(&event)?; + let flags = TCP_FLAG_INITIALIZED; + + match self.compressor.compress(&serialized)? { + Some(comp_buf) => { + self.tcp.send_buf_with_flags( + TCP_TAG_EVENT_TO_BOTH, + flags | TCP_FLAG_COMPRESSED, + &comp_buf, + )?; + } + None => { + self.tcp.send_buf(TCP_TAG_EVENT_TO_BOTH, &serialized)?; + } + } + Ok(()) + } + + #[cfg(not(feature = "tcp_compression"))] + fn fire( + &mut self, + _state: &mut Self::State, + event: Event<::Input>, + ) -> Result<(), Error> { + let serialized = postcard::to_allocvec(&event)?; + let size = u32::try_from(serialized.len()).unwrap(); + self.tcp.write_all(&size.to_le_bytes())?; + self.tcp.write_all(&self.client_id.0.to_le_bytes())?; + self.tcp.write_all(&serialized)?; + Ok(()) + } + + fn configuration(&self) -> EventConfig { + self.configuration + } +} + +impl EventRestarter for TcpEventManager +where + S: UsesInput, +{ + /// The TCP client needs to wait until a broker has mapped all pages before shutting down. + /// Otherwise, the OS may already have removed the shared maps. + fn await_restart_safe(&mut self) { + // wait until we can drop the message safely. + //self.tcp.await_safe_to_unmap_blocking(); + } +} + +impl EventProcessor for TcpEventManager +where + S: UsesInput + HasClientPerfMonitor + HasExecutions, + E: HasObservers + Executor, + for<'a> E::Observers: Deserialize<'a>, + Z: EvaluatorObservers + ExecutionProcessor, +{ + fn process( + &mut self, + fuzzer: &mut Z, + state: &mut Self::State, + executor: &mut E, + ) -> Result { + // TODO: Get around local event copy by moving handle_in_client + let self_id = self.client_id; + let mut len_buf = [0_u8; 4]; + let mut count = 0; + + self.tcp.set_nonblocking(true).expect("set to non-blocking"); + + // read all pending messages + loop { + match self.tcp.read_exact(&mut len_buf) { + Ok(()) => { + self.tcp.set_nonblocking(false).expect("set to blocking"); + let len = u32::from_le_bytes(len_buf); + let mut buf = vec![0_u8; len as usize + 4_usize]; + self.tcp.read_exact(&mut buf).unwrap(); + + let mut client_id_buf = [0_u8; 4]; + client_id_buf.copy_from_slice(&buf[..4]); + + let other_client_id = ClientId(u32::from_le_bytes(client_id_buf)); + + self.tcp.set_nonblocking(true).expect("set to non-blocking"); + if self_id == other_client_id { + panic!("Own ID should never have been sent by the broker"); + } else { + println!("{self_id:?} (from {other_client_id:?}) Received: {buf:?}"); + + let event = postcard::from_bytes(&buf[4..])?; + self.handle_in_client(fuzzer, executor, state, other_client_id, event)?; + count += 1; + } + } + + Err(e) if e.kind() == ErrorKind::WouldBlock => { + // no new data on the socket + break; + } + Err(e) => { + panic!("Unexpected error {e:?}"); + } + } + } + self.tcp.set_nonblocking(false).expect("set to blocking"); + + Ok(count) + } +} + +impl EventManager for TcpEventManager +where + E: HasObservers + Executor, + for<'a> E::Observers: Deserialize<'a>, + S: UsesInput + HasExecutions + HasClientPerfMonitor + HasMetadata, + Z: EvaluatorObservers + ExecutionProcessor, +{ +} + +impl HasCustomBufHandlers for TcpEventManager +where + S: UsesInput, +{ + fn add_custom_buf_handler( + &mut self, + handler: Box Result>, + ) { + self.custom_buf_handlers.push(handler); + } +} + +impl ProgressReporter for TcpEventManager where + S: UsesInput + HasExecutions + HasClientPerfMonitor + HasMetadata +{ +} + +impl HasEventManagerId for TcpEventManager +where + S: UsesInput, +{ + /// Gets the id assigned to this staterestorer. + fn mgr_id(&self) -> EventManagerId { + EventManagerId(self.client_id.0 as usize) + } +} + +/// A manager that can restart on the fly, storing states in-between (in `on_restart`) +#[cfg(feature = "std")] +#[derive(Debug)] +pub struct TcpRestartingEventManager +where + S: UsesInput, + SP: ShMemProvider + 'static, + //CE: CustomEvent, +{ + /// The embedded TCP event manager + tcp_mgr: TcpEventManager, + /// The staterestorer to serialize the state for the next runner + staterestorer: StateRestorer, + /// Decide if the state restorer must save the serialized state + save_state: bool, +} + +#[cfg(feature = "std")] +impl UsesState for TcpRestartingEventManager +where + S: UsesInput, + SP: ShMemProvider + 'static, +{ + type State = S; +} + +#[cfg(feature = "std")] +impl ProgressReporter for TcpRestartingEventManager +where + S: UsesInput + HasExecutions + HasClientPerfMonitor + HasMetadata + Serialize, + SP: ShMemProvider, +{ +} + +#[cfg(feature = "std")] +impl EventFirer for TcpRestartingEventManager +where + SP: ShMemProvider, + S: UsesInput, + //CE: CustomEvent, +{ + fn fire( + &mut self, + state: &mut Self::State, + event: Event<::Input>, + ) -> Result<(), Error> { + // Check if we are going to crash in the event, in which case we store our current state for the next runner + self.tcp_mgr.fire(state, event) + } + + fn configuration(&self) -> EventConfig { + self.tcp_mgr.configuration() + } +} + +#[cfg(feature = "std")] +impl EventRestarter for TcpRestartingEventManager +where + S: UsesInput + HasExecutions + HasClientPerfMonitor + Serialize, + SP: ShMemProvider, + //CE: CustomEvent, +{ + /// The tcp client needs to wait until a broker mapped all pages, before shutting down. + /// Otherwise, the OS may already have removed the shared maps, + #[inline] + fn await_restart_safe(&mut self) { + self.tcp_mgr.await_restart_safe(); + } + + /// Reset the single page (we reuse it over and over from pos 0), then send the current state to the next runner. + fn on_restart(&mut self, state: &mut S) -> Result<(), Error> { + // First, reset the page to 0 so the next iteration can read read from the beginning of this page + self.staterestorer.reset(); + self.staterestorer + .save(&if self.save_state { Some(state) } else { None }) + } + + fn send_exiting(&mut self) -> Result<(), Error> { + self.staterestorer.send_exiting(); + // Also inform the broker that we are about to exit. + // This way, the broker can clean up the pages, and eventually exit. + self.tcp_mgr.send_exiting() + } +} + +#[cfg(feature = "std")] +impl EventProcessor for TcpRestartingEventManager +where + E: HasObservers + Executor, Z>, + for<'a> E::Observers: Deserialize<'a>, + S: UsesInput + HasExecutions + HasClientPerfMonitor, + SP: ShMemProvider + 'static, + Z: EvaluatorObservers + ExecutionProcessor, //CE: CustomEvent, +{ + fn process(&mut self, fuzzer: &mut Z, state: &mut S, executor: &mut E) -> Result { + self.tcp_mgr.process(fuzzer, state, executor) + } +} + +#[cfg(feature = "std")] +impl EventManager for TcpRestartingEventManager +where + E: HasObservers + Executor, Z>, + for<'a> E::Observers: Deserialize<'a>, + S: UsesInput + HasExecutions + HasClientPerfMonitor + HasMetadata + Serialize, + SP: ShMemProvider + 'static, + Z: EvaluatorObservers + ExecutionProcessor, //CE: CustomEvent, +{ +} + +#[cfg(feature = "std")] +impl HasEventManagerId for TcpRestartingEventManager +where + S: UsesInput + Serialize, + SP: ShMemProvider + 'static, +{ + fn mgr_id(&self) -> EventManagerId { + self.tcp_mgr.mgr_id() + } +} + +/// The tcp connection from the actual fuzzer to the process supervising it +const _ENV_FUZZER_SENDER: &str = "_AFL_ENV_FUZZER_SENDER"; +const _ENV_FUZZER_RECEIVER: &str = "_AFL_ENV_FUZZER_RECEIVER"; +/// The tcp (2 way) connection from a fuzzer to the broker (broadcasting all other fuzzer messages) +const _ENV_FUZZER_BROKER_CLIENT_INITIAL: &str = "_AFL_ENV_FUZZER_BROKER_CLIENT"; + +#[cfg(feature = "std")] +impl TcpRestartingEventManager +where + S: UsesInput, + SP: ShMemProvider + 'static, + //CE: CustomEvent, +{ + /// Create a new runner, the executed child doing the actual fuzzing. + pub fn new(tcp_mgr: TcpEventManager, staterestorer: StateRestorer) -> Self { + Self { + tcp_mgr, + staterestorer, + save_state: true, + } + } + + /// Create a new runner specifying if it must save the serialized state on restart. + pub fn with_save_state( + tcp_mgr: TcpEventManager, + staterestorer: StateRestorer, + save_state: bool, + ) -> Self { + Self { + tcp_mgr, + staterestorer, + save_state, + } + } + + /// Get the staterestorer + pub fn staterestorer(&self) -> &StateRestorer { + &self.staterestorer + } + + /// Get the staterestorer (mutable) + pub fn staterestorer_mut(&mut self) -> &mut StateRestorer { + &mut self.staterestorer + } +} + +/// The kind of manager we're creating right now +#[cfg(feature = "std")] +#[derive(Debug, Clone, Copy)] +pub enum ManagerKind { + /// Any kind will do + Any, + /// A client, getting messages from a local broker. + Client { + /// The CPU core ID of this client + cpu_core: Option, + }, + /// A [`tcp::TcpBroker`], forwarding the packets of local clients. + Broker, +} + +/// Sets up a restarting fuzzer, using the [`StdShMemProvider`], and standard features. +/// The restarting mgr is a combination of restarter and runner, that can be used on systems with and without `fork` support. +/// The restarter will spawn a new process each time the child crashes or timeouts. +#[cfg(feature = "std")] +#[allow(clippy::type_complexity)] +pub fn setup_restarting_mgr_tcp( + monitor: MT, + broker_port: u16, + configuration: EventConfig, +) -> Result<(Option, TcpRestartingEventManager), Error> +where + MT: Monitor + Clone, + S: DeserializeOwned + UsesInput + HasClientPerfMonitor + HasExecutions, +{ + RestartingMgr::builder() + .shmem_provider(StdShMemProvider::new()?) + .monitor(Some(monitor)) + .broker_port(broker_port) + .configuration(configuration) + .build() + .launch() +} + +/// Provides a `builder` which can be used to build a [`RestartingMgr`], which is a combination of a +/// `restarter` and `runner`, that can be used on systems both with and without `fork` support. The +/// `restarter` will start a new process each time the child crashes or times out. +#[cfg(feature = "std")] +#[allow(clippy::default_trait_access)] +#[derive(TypedBuilder, Debug)] +pub struct RestartingMgr +where + S: UsesInput + DeserializeOwned, + SP: ShMemProvider + 'static, + MT: Monitor, + //CE: CustomEvent, +{ + /// The shared memory provider to use for the broker or client spawned by the restarting + /// manager. + shmem_provider: SP, + /// The configuration + configuration: EventConfig, + /// The monitor to use + #[builder(default = None)] + monitor: Option, + /// The broker port to use + #[builder(default = 1337_u16)] + broker_port: u16, + /// The address to connect to + #[builder(default = None)] + remote_broker_addr: Option, + /// The type of manager to build + #[builder(default = ManagerKind::Any)] + kind: ManagerKind, + /// The amount of external clients that should have connected (not counting our own tcp client) + /// before this broker quits _after the last client exited_. + /// If `None`, the broker will never quit when the last client exits, but run forever. + /// + /// So, if this value is `Some(2)`, the broker will not exit after client 1 connected and disconnected, + /// but it will quit after client 2 connected and disconnected. + #[builder(default = None)] + exit_cleanly_after: Option, + /// Tell the manager to serialize or not the state on restart + #[builder(default = true)] + serialize_state: bool, + #[builder(setter(skip), default = PhantomData)] + phantom_data: PhantomData, +} + +#[cfg(feature = "std")] +#[allow(clippy::type_complexity, clippy::too_many_lines)] +impl RestartingMgr +where + SP: ShMemProvider, + S: UsesInput + HasExecutions + HasClientPerfMonitor + DeserializeOwned, + MT: Monitor + Clone, +{ + /// Launch the restarting manager + pub fn launch(&mut self) -> Result<(Option, TcpRestartingEventManager), Error> { + // We start ourself as child process to actually fuzz + let (staterestorer, _new_shmem_provider, core_id) = if std::env::var(_ENV_FUZZER_SENDER) + .is_err() + { + let broker_things = |mut broker: TcpEventBroker, _remote_broker_addr| { + if let Some(exit_cleanly_after) = self.exit_cleanly_after { + broker.set_exit_cleanly_after(exit_cleanly_after); + } + + broker.broker_loop() + }; + + // We get here if we are on Unix, or we are a broker on Windows (or without forks). + let (_mgr, core_id) = match self.kind { + ManagerKind::Any => { + let connection = create_nonblocking_listener(("127.0.0.1", self.broker_port)); + match connection { + Ok(listener) => { + let event_broker = TcpEventBroker::::with_listener( + listener, + self.monitor.take().unwrap(), + ); + + // Yep, broker. Just loop here. + log::info!( + "Doing broker things. Run this tool again to start fuzzing in a client." + ); + + broker_things(event_broker, self.remote_broker_addr)?; + + return Err(Error::shutting_down()); + } + Err(Error::File(_, _)) => { + // port was likely already bound + let mgr = TcpEventManager::::new( + &("127.0.0.1", self.broker_port), + self.configuration, + )?; + (mgr, None) + } + Err(e) => { + return Err(e); + } + } + } + ManagerKind::Broker => { + let event_broker = TcpEventBroker::::new( + format!("127.0.0.1:{}", self.broker_port), + self.monitor.take().unwrap(), + )?; + + broker_things(event_broker, self.remote_broker_addr)?; + unreachable!("The broker may never return normally, only on errors or when shutting down."); + } + ManagerKind::Client { cpu_core } => { + // We are a client + let mgr = TcpEventManager::::on_port(self.broker_port, self.configuration)?; + + (mgr, cpu_core) + } + }; + + if let Some(core_id) = core_id { + let core_id: CoreId = core_id; + log::info!("Setting core affinity to {core_id:?}"); + core_id.set_affinity()?; + } + + // We are the fuzzer respawner in a tcp client + //mgr.to_env(_ENV_FUZZER_BROKER_CLIENT_INITIAL); + + // First, create a channel from the current fuzzer to the next to store state between restarts. + #[cfg(unix)] + let mut staterestorer: StateRestorer = + StateRestorer::new(self.shmem_provider.new_shmem(256 * 1024 * 1024)?); + + #[cfg(not(unix))] + let staterestorer: StateRestorer = + StateRestorer::new(self.shmem_provider.new_shmem(256 * 1024 * 1024)?); + // Store the information to a map. + staterestorer.write_to_env(_ENV_FUZZER_SENDER)?; + + #[cfg(unix)] + unsafe { + let data = &mut SHUTDOWN_SIGHANDLER_DATA; + // Write the pointer to staterestorer so we can release its shmem later + core::ptr::write_volatile( + &mut data.staterestorer_ptr, + &mut staterestorer as *mut _ as *mut std::ffi::c_void, + ); + data.allocator_pid = std::process::id() as usize; + data.shutdown_handler = shutdown_handler:: as *const std::ffi::c_void; + } + + // We setup signal handlers to clean up shmem segments used by state restorer + #[cfg(all(unix, not(miri)))] + if let Err(_e) = unsafe { setup_signal_handler(&mut SHUTDOWN_SIGHANDLER_DATA) } { + // We can live without a proper ctrl+c signal handler. Print and ignore. + log::error!("Failed to setup signal handlers: {_e}"); + } + + let mut ctr: u64 = 0; + // Client->parent loop + loop { + log::info!("Spawning next client (id {ctr})"); + + // On Unix, we fork (when fork feature is enabled) + #[cfg(all(unix, feature = "fork"))] + let child_status = { + self.shmem_provider.pre_fork()?; + match unsafe { fork() }? { + ForkResult::Parent(handle) => { + self.shmem_provider.post_fork(false)?; + handle.status() + } + ForkResult::Child => { + self.shmem_provider.post_fork(true)?; + break (staterestorer, self.shmem_provider.clone(), core_id); + } + } + }; + + // On Windows (or in any case without fork), we spawn ourself again + #[cfg(any(windows, not(feature = "fork")))] + let child_status = startable_self()?.status()?; + #[cfg(all(unix, not(feature = "fork")))] + let child_status = child_status.code().unwrap_or_default(); + + compiler_fence(Ordering::SeqCst); + + #[allow(clippy::manual_assert)] + if !staterestorer.has_content() && self.serialize_state { + #[cfg(unix)] + if child_status == 137 { + // Out of Memory, see https://tldp.org/LDP/abs/html/exitcodes.html + // and https://github.com/AFLplusplus/LibAFL/issues/32 for discussion. + panic!("Fuzzer-respawner: The fuzzed target crashed with an out of memory error! Fix your harness, or switch to another executor (for example, a forkserver)."); + } + + // Storing state in the last round did not work + panic!("Fuzzer-respawner: Storing state in crashed fuzzer instance did not work, no point to spawn the next client! This can happen if the child calls `exit()`, in that case make sure it uses `abort()`, if it got killed unrecoverable (OOM), or if there is a bug in the fuzzer itself. (Child exited with: {child_status})"); + } + + if staterestorer.wants_to_exit() { + return Err(Error::shutting_down()); + } + + ctr = ctr.wrapping_add(1); + } + } else { + // We are the newly started fuzzing instance (i.e. on Windows), first, connect to our own restore map. + // We get here *only on Windows*, if we were started by a restarting fuzzer. + // A staterestorer and a receiver for single communication + ( + StateRestorer::from_env(&mut self.shmem_provider, _ENV_FUZZER_SENDER)?, + self.shmem_provider.clone(), + None, + ) + }; + + if let Some(core_id) = core_id { + let core_id: CoreId = core_id; + core_id.set_affinity()?; + } + + // If we're restarting, deserialize the old state. + let (state, mut mgr) = if let Some(state_opt) = staterestorer.restore()? { + ( + state_opt, + TcpRestartingEventManager::with_save_state( + TcpEventManager::on_port(self.broker_port, self.configuration)?, + staterestorer, + self.serialize_state, + ), + ) + } else { + log::info!("First run. Let's set it all up"); + // Mgr to send and receive msgs from/to all other fuzzer instances + let mgr = TcpEventManager::::on_port(self.broker_port, self.configuration)?; + + ( + None, + TcpRestartingEventManager::with_save_state( + mgr, + staterestorer, + self.serialize_state, + ), + ) + }; + // We reset the staterestorer, the next staterestorer and receiver (after crash) will reuse the page from the initial message. + mgr.staterestorer.reset(); + + /* TODO: Not sure if this is needed + // We commit an empty NO_RESTART message to this buf, against infinite loops, + // in case something crashes in the fuzzer. + staterestorer.send_buf(_TCP_TAG_NO_RESTART, []); + */ + + Ok((state, mgr)) + } +}