Fix setting of ZeroMQ I/O Threads (setting was not propagated before)

This commit is contained in:
Alexey Rybalchenko 2015-10-08 12:05:50 +02:00 committed by Mohammad Al-Turany
parent 023d88d0ef
commit 91d3f013c0
4 changed files with 8 additions and 9 deletions

View File

@ -346,11 +346,11 @@ bool FairMQChannel::ValidateChannel()
} }
} }
bool FairMQChannel::InitCommandInterface(FairMQTransportFactory* factory) bool FairMQChannel::InitCommandInterface(FairMQTransportFactory* factory, int numIoThreads)
{ {
fTransportFactory = factory; fTransportFactory = factory;
fCmdSocket = fTransportFactory->CreateSocket("sub", "device-commands", 1); fCmdSocket = fTransportFactory->CreateSocket("sub", "device-commands", numIoThreads);
if (fCmdSocket) if (fCmdSocket)
{ {
fCmdSocket->Connect("inproc://commands"); fCmdSocket->Connect("inproc://commands");

View File

@ -168,7 +168,7 @@ class FairMQChannel
int fNoBlockFlag; int fNoBlockFlag;
int fSndMoreFlag; int fSndMoreFlag;
bool InitCommandInterface(FairMQTransportFactory* factory); bool InitCommandInterface(FairMQTransportFactory* factory, int numIoThreads);
bool HandleUnblock() const; bool HandleUnblock() const;

View File

@ -87,7 +87,7 @@ void FairMQDevice::InitWrapper()
{ {
if (!fCmdSocket) if (!fCmdSocket)
{ {
fCmdSocket = fTransportFactory->CreateSocket("pub", "device-commands", 1); fCmdSocket = fTransportFactory->CreateSocket("pub", "device-commands", fNumIoThreads);
fCmdSocket->Bind("inproc://commands"); fCmdSocket->Bind("inproc://commands");
} }
@ -119,7 +119,7 @@ void FairMQDevice::InitWrapper()
{ {
if (InitChannel(*(*itr))) if (InitChannel(*(*itr)))
{ {
(*itr)->InitCommandInterface(fTransportFactory); (*itr)->InitCommandInterface(fTransportFactory, fNumIoThreads);
uninitializedChannels.erase(itr++); uninitializedChannels.erase(itr++);
} }
else else
@ -180,7 +180,7 @@ bool FairMQDevice::InitChannel(FairMQChannel& ch)
{ {
LOG(DEBUG) << "Initializing channel " << ch.fChannelName << " (" << ch.fType << ")"; LOG(DEBUG) << "Initializing channel " << ch.fChannelName << " (" << ch.fType << ")";
// initialize the socket // initialize the socket
ch.fSocket = fTransportFactory->CreateSocket(ch.fType, ch.fChannelName, 1); ch.fSocket = fTransportFactory->CreateSocket(ch.fType, ch.fChannelName, fNumIoThreads);
// set high water marks // set high water marks
ch.fSocket->SetOption("snd-hwm", &(ch.fSndBufSize), sizeof(ch.fSndBufSize)); ch.fSocket->SetOption("snd-hwm", &(ch.fSndBufSize), sizeof(ch.fSndBufSize));
ch.fSocket->SetOption("rcv-hwm", &(ch.fRcvBufSize), sizeof(ch.fRcvBufSize)); ch.fSocket->SetOption("rcv-hwm", &(ch.fRcvBufSize), sizeof(ch.fRcvBufSize));

View File

@ -43,13 +43,11 @@ int main(int argc, char** argv)
{ {
int eventSize; int eventSize;
int eventRate; int eventRate;
int ioThreads;
options_description sampler_options("Sampler options"); options_description sampler_options("Sampler options");
sampler_options.add_options() sampler_options.add_options()
("event-size", value<int>(&eventSize)->default_value(1000), "Event size in bytes") ("event-size", value<int>(&eventSize)->default_value(1000), "Event size in bytes")
("event-rate", value<int>(&eventRate)->default_value(0), "Event rate limit in maximum number of events per second") ("event-rate", value<int>(&eventRate)->default_value(0), "Event rate limit in maximum number of events per second");
("io-threads", value<int>(&ioThreads)->default_value(1), "Number of I/O threads");
config.AddToCmdLineOptions(sampler_options); config.AddToCmdLineOptions(sampler_options);
@ -60,6 +58,7 @@ int main(int argc, char** argv)
string filename = config.GetValue<string>("config-json-file"); string filename = config.GetValue<string>("config-json-file");
string id = config.GetValue<string>("id"); string id = config.GetValue<string>("id");
int ioThreads = config.GetValue<int>("io-threads");
config.UserParser<JSON>(filename, id); config.UserParser<JSON>(filename, id);