diff --git a/fairmq/FairMQChannel.cxx b/fairmq/FairMQChannel.cxx index 7c963705..bf7e7c7e 100644 --- a/fairmq/FairMQChannel.cxx +++ b/fairmq/FairMQChannel.cxx @@ -346,11 +346,11 @@ bool FairMQChannel::ValidateChannel() } } -bool FairMQChannel::InitCommandInterface(FairMQTransportFactory* factory) +bool FairMQChannel::InitCommandInterface(FairMQTransportFactory* factory, int numIoThreads) { fTransportFactory = factory; - fCmdSocket = fTransportFactory->CreateSocket("sub", "device-commands", 1); + fCmdSocket = fTransportFactory->CreateSocket("sub", "device-commands", numIoThreads); if (fCmdSocket) { fCmdSocket->Connect("inproc://commands"); diff --git a/fairmq/FairMQChannel.h b/fairmq/FairMQChannel.h index 280d87d4..34fd33ad 100644 --- a/fairmq/FairMQChannel.h +++ b/fairmq/FairMQChannel.h @@ -168,7 +168,7 @@ class FairMQChannel int fNoBlockFlag; int fSndMoreFlag; - bool InitCommandInterface(FairMQTransportFactory* factory); + bool InitCommandInterface(FairMQTransportFactory* factory, int numIoThreads); bool HandleUnblock() const; diff --git a/fairmq/FairMQDevice.cxx b/fairmq/FairMQDevice.cxx index 5bb39f30..18be0b1b 100644 --- a/fairmq/FairMQDevice.cxx +++ b/fairmq/FairMQDevice.cxx @@ -87,7 +87,7 @@ void FairMQDevice::InitWrapper() { if (!fCmdSocket) { - fCmdSocket = fTransportFactory->CreateSocket("pub", "device-commands", 1); + fCmdSocket = fTransportFactory->CreateSocket("pub", "device-commands", fNumIoThreads); fCmdSocket->Bind("inproc://commands"); } @@ -119,7 +119,7 @@ void FairMQDevice::InitWrapper() { if (InitChannel(*(*itr))) { - (*itr)->InitCommandInterface(fTransportFactory); + (*itr)->InitCommandInterface(fTransportFactory, fNumIoThreads); uninitializedChannels.erase(itr++); } else @@ -180,7 +180,7 @@ bool FairMQDevice::InitChannel(FairMQChannel& ch) { LOG(DEBUG) << "Initializing channel " << ch.fChannelName << " (" << ch.fType << ")"; // initialize the socket - ch.fSocket = fTransportFactory->CreateSocket(ch.fType, ch.fChannelName, 1); + ch.fSocket = fTransportFactory->CreateSocket(ch.fType, ch.fChannelName, fNumIoThreads); // set high water marks ch.fSocket->SetOption("snd-hwm", &(ch.fSndBufSize), sizeof(ch.fSndBufSize)); ch.fSocket->SetOption("rcv-hwm", &(ch.fRcvBufSize), sizeof(ch.fRcvBufSize)); diff --git a/fairmq/run/runBenchmarkSampler.cxx b/fairmq/run/runBenchmarkSampler.cxx index 8fb2df3d..aed5f714 100644 --- a/fairmq/run/runBenchmarkSampler.cxx +++ b/fairmq/run/runBenchmarkSampler.cxx @@ -43,13 +43,11 @@ int main(int argc, char** argv) { int eventSize; int eventRate; - int ioThreads; options_description sampler_options("Sampler options"); sampler_options.add_options() ("event-size", value(&eventSize)->default_value(1000), "Event size in bytes") - ("event-rate", value(&eventRate)->default_value(0), "Event rate limit in maximum number of events per second") - ("io-threads", value(&ioThreads)->default_value(1), "Number of I/O threads"); + ("event-rate", value(&eventRate)->default_value(0), "Event rate limit in maximum number of events per second"); config.AddToCmdLineOptions(sampler_options); @@ -60,6 +58,7 @@ int main(int argc, char** argv) string filename = config.GetValue("config-json-file"); string id = config.GetValue("id"); + int ioThreads = config.GetValue("io-threads"); config.UserParser(filename, id);