Fix #147
This commit is contained in:
parent
de933fee63
commit
42997dbde9
@ -19,7 +19,7 @@ use std::net::{SocketAddr, ToSocketAddrs};
|
||||
|
||||
use crate::{
|
||||
bolts::{
|
||||
llmp::{self, Flags, LlmpClientDescription, LlmpSender, Tag},
|
||||
llmp::{self, Flags, LlmpClientDescription, LlmpConnection, LlmpSender, Tag},
|
||||
shmem::ShMemProvider,
|
||||
},
|
||||
events::{BrokerEventResult, Event, EventFirer, EventManager, EventProcessor, EventRestarter},
|
||||
@ -273,6 +273,16 @@ where
|
||||
OT: ObserversTuple,
|
||||
SP: ShMemProvider + 'static,
|
||||
{
|
||||
/// Create a manager from a raw llmp client
|
||||
pub fn new(llmp: llmp::LlmpClient<SP>) -> Result<Self, Error> {
|
||||
Ok(Self {
|
||||
llmp,
|
||||
#[cfg(feature = "llmp_compression")]
|
||||
compressor: GzipCompressor::new(COMPRESS_THRESHOLD),
|
||||
phantom: PhantomData,
|
||||
})
|
||||
}
|
||||
|
||||
/// Create llmp on a port
|
||||
/// If the port is not yet bound, it will act as broker
|
||||
/// Else, it will act as client.
|
||||
@ -715,13 +725,25 @@ where
|
||||
)
|
||||
.is_err()
|
||||
{
|
||||
let broker_things = |mut broker: LlmpEventBroker<I, SP, ST>, remote_broker_addr| {
|
||||
if let Some(remote_broker_addr) = remote_broker_addr {
|
||||
println!("B2b: Connecting to {:?}", &remote_broker_addr);
|
||||
broker.connect_b2b(remote_broker_addr)?;
|
||||
};
|
||||
|
||||
broker.broker_loop()
|
||||
};
|
||||
|
||||
// We get here if we are on Unix, or we are a broker on Windows.
|
||||
let core_id = match self.kind {
|
||||
ManagerKind::Broker | ManagerKind::Any => {
|
||||
let mut broker = LlmpEventBroker::<I, SP, ST>::new_on_port(
|
||||
self.shmem_provider.clone(),
|
||||
let (mgr, core_id) = match self.kind {
|
||||
ManagerKind::Any => {
|
||||
let connection =
|
||||
LlmpConnection::on_port(self.shmem_provider.clone(), self.broker_port)?;
|
||||
match connection {
|
||||
LlmpConnection::IsBroker { broker } => {
|
||||
let event_broker = LlmpEventBroker::<I, SP, ST>::new(
|
||||
broker,
|
||||
self.stats.take().unwrap(),
|
||||
self.broker_port,
|
||||
)?;
|
||||
|
||||
// Yep, broker. Just loop here.
|
||||
@ -729,23 +751,38 @@ where
|
||||
"Doing broker things. Run this tool again to start fuzzing in a client."
|
||||
);
|
||||
|
||||
if let Some(remote_broker_addr) = self.remote_broker_addr {
|
||||
println!("B2b: Connecting to {:?}", &remote_broker_addr);
|
||||
broker.connect_b2b(remote_broker_addr)?;
|
||||
};
|
||||
broker_things(event_broker, self.remote_broker_addr)?;
|
||||
|
||||
broker.broker_loop()?;
|
||||
return Err(Error::ShuttingDown);
|
||||
}
|
||||
ManagerKind::Client { cpu_core } => cpu_core,
|
||||
};
|
||||
LlmpConnection::IsClient { client } => {
|
||||
let mgr = LlmpEventManager::<I, OT, S, SP>::new(client)?;
|
||||
(mgr, None)
|
||||
}
|
||||
}
|
||||
}
|
||||
ManagerKind::Broker => {
|
||||
let event_broker = LlmpEventBroker::<I, SP, ST>::new_on_port(
|
||||
self.shmem_provider.clone(),
|
||||
self.stats.take().unwrap(),
|
||||
self.broker_port,
|
||||
)?;
|
||||
|
||||
broker_things(event_broker, self.remote_broker_addr)?;
|
||||
|
||||
return Err(Error::ShuttingDown);
|
||||
}
|
||||
ManagerKind::Client { cpu_core } => {
|
||||
// We are a client
|
||||
let mgr = LlmpEventManager::<I, OT, S, SP>::new_on_port(
|
||||
self.shmem_provider.clone(),
|
||||
self.broker_port,
|
||||
)?;
|
||||
|
||||
(mgr, cpu_core)
|
||||
}
|
||||
};
|
||||
|
||||
if let Some(core_id) = core_id {
|
||||
println!("Setting core affinity to {:?}", core_id);
|
||||
core_affinity::set_for_current(core_id);
|
||||
|
Loading…
x
Reference in New Issue
Block a user