diff --git a/fairmq/FairMQDevice.cxx b/fairmq/FairMQDevice.cxx index ed37c3ac..3166b621 100644 --- a/fairmq/FairMQDevice.cxx +++ b/fairmq/FairMQDevice.cxx @@ -25,9 +25,6 @@ void FairMQDevice::Init() LOG(INFO) << ">>>>>>> Init <<<<<<<"; LOG(INFO) << "numIoThreads: " << fNumIoThreads; - // fPayloadContext = new FairMQContextZMQ(fNumIoThreads); - - // TODO: nafiga? fInputAddress = new vector(fNumInputs); fInputMethod = new vector(); fInputSocketType = new vector(); @@ -60,7 +57,7 @@ void FairMQDevice::InitInput() LOG(INFO) << ">>>>>>> InitInput <<<<<<<"; for (int i = 0; i < fNumInputs; ++i) { - FairMQSocket* socket = fTransportFactory->CreateSocket(fInputSocketType->at(i), i); + FairMQSocket* socket = fTransportFactory->CreateSocket(fInputSocketType->at(i), i, fNumIoThreads); socket->SetOption("snd-hwm", &fInputSndBufSize->at(i), sizeof(fInputSndBufSize->at(i))); socket->SetOption("rcv-hwm", &fInputRcvBufSize->at(i), sizeof(fInputRcvBufSize->at(i))); @@ -83,7 +80,7 @@ void FairMQDevice::InitOutput() LOG(INFO) << ">>>>>>> InitOutput <<<<<<<"; for (int i = 0; i < fNumOutputs; ++i) { - FairMQSocket* socket = fTransportFactory->CreateSocket(fOutputSocketType->at(i), i); + FairMQSocket* socket = fTransportFactory->CreateSocket(fOutputSocketType->at(i), i, fNumIoThreads); socket->SetOption("snd-hwm", &fOutputSndBufSize->at(i), sizeof(fOutputSndBufSize->at(i))); socket->SetOption("rcv-hwm", &fOutputRcvBufSize->at(i), sizeof(fOutputRcvBufSize->at(i))); diff --git a/fairmq/FairMQTransportFactory.h b/fairmq/FairMQTransportFactory.h index 3acb8ee4..324d0681 100644 --- a/fairmq/FairMQTransportFactory.h +++ b/fairmq/FairMQTransportFactory.h @@ -23,7 +23,7 @@ class FairMQTransportFactory virtual FairMQMessage* CreateMessage() = 0; virtual FairMQMessage* CreateMessage(size_t size) = 0; virtual FairMQMessage* CreateMessage(void* data, size_t size) = 0; - virtual FairMQSocket* CreateSocket(const string& type, int num) = 0; + virtual FairMQSocket* CreateSocket(const string& type, int num, int numIoThreads) = 0; virtual FairMQPoller* CreatePoller(const vector& inputs) = 0; virtual ~FairMQTransportFactory() {}; diff --git a/fairmq/nanomsg/FairMQSocketNN.cxx b/fairmq/nanomsg/FairMQSocketNN.cxx index 038742db..c117d998 100644 --- a/fairmq/nanomsg/FairMQSocketNN.cxx +++ b/fairmq/nanomsg/FairMQSocketNN.cxx @@ -11,7 +11,7 @@ #include "FairMQMessageNN.h" #include "FairMQLogger.h" -FairMQSocketNN::FairMQSocketNN(const string& type, int num) : +FairMQSocketNN::FairMQSocketNN(const string& type, int num, int numIoThreads) : fBytesTx(0), fBytesRx(0), fMessagesTx(0), @@ -21,6 +21,10 @@ FairMQSocketNN::FairMQSocketNN(const string& type, int num) : id << type << "." << num; fId = id.str(); + if ( numIoThreads > 1 ) { + LOG(INFO) << "number of I/O threads is not used in nanomsg"; + } + fSocket = nn_socket (AF_SP, GetConstant(type)); if (type == "sub") { nn_setsockopt(fSocket, NN_SUB, NN_SUB_SUBSCRIBE, NULL, 0); diff --git a/fairmq/nanomsg/FairMQSocketNN.h b/fairmq/nanomsg/FairMQSocketNN.h index e094a6b4..59ea27b4 100644 --- a/fairmq/nanomsg/FairMQSocketNN.h +++ b/fairmq/nanomsg/FairMQSocketNN.h @@ -18,7 +18,7 @@ class FairMQSocketNN : public FairMQSocket { public: - FairMQSocketNN(const string& type, int num); + FairMQSocketNN(const string& type, int num, int numIoThreads); // numIoThreads is not used in nanomsg. virtual string GetId(); diff --git a/fairmq/nanomsg/FairMQTransportFactoryNN.cxx b/fairmq/nanomsg/FairMQTransportFactoryNN.cxx index 25a80161..df30a7b2 100644 --- a/fairmq/nanomsg/FairMQTransportFactoryNN.cxx +++ b/fairmq/nanomsg/FairMQTransportFactoryNN.cxx @@ -27,9 +27,9 @@ FairMQMessage* FairMQTransportFactoryNN::CreateMessage(void* data, size_t size) return new FairMQMessageNN(data, size); } -FairMQSocket* FairMQTransportFactoryNN::CreateSocket(const string& type, int num) +FairMQSocket* FairMQTransportFactoryNN::CreateSocket(const string& type, int num, int numIoThreads) { - return new FairMQSocketNN(type, num); + return new FairMQSocketNN(type, num, numIoThreads); } FairMQPoller* FairMQTransportFactoryNN::CreatePoller(const vector& inputs) diff --git a/fairmq/nanomsg/FairMQTransportFactoryNN.h b/fairmq/nanomsg/FairMQTransportFactoryNN.h index 0a6a2e74..ace1a1e7 100644 --- a/fairmq/nanomsg/FairMQTransportFactoryNN.h +++ b/fairmq/nanomsg/FairMQTransportFactoryNN.h @@ -23,7 +23,7 @@ class FairMQTransportFactoryNN : public FairMQTransportFactory virtual FairMQMessage* CreateMessage(); virtual FairMQMessage* CreateMessage(size_t size); virtual FairMQMessage* CreateMessage(void* data, size_t size); - virtual FairMQSocket* CreateSocket(const string& type, int num); + virtual FairMQSocket* CreateSocket(const string& type, int num, int numIoThreads); virtual FairMQPoller* CreatePoller(const vector& inputs); diff --git a/fairmq/zeromq/FairMQSocketZMQ.cxx b/fairmq/zeromq/FairMQSocketZMQ.cxx index 338f845a..8e16115c 100644 --- a/fairmq/zeromq/FairMQSocketZMQ.cxx +++ b/fairmq/zeromq/FairMQSocketZMQ.cxx @@ -10,9 +10,9 @@ #include "FairMQSocketZMQ.h" #include "FairMQLogger.h" -boost::shared_ptr FairMQSocketZMQ::fContext = boost::shared_ptr(new FairMQContextZMQ(1)); // TODO: numIoThreads! +boost::shared_ptr FairMQSocketZMQ::fContext = boost::shared_ptr(new FairMQContextZMQ(1)); -FairMQSocketZMQ::FairMQSocketZMQ(const string& type, int num) : +FairMQSocketZMQ::FairMQSocketZMQ(const string& type, int num, int numIoThreads) : fBytesTx(0), fBytesRx(0), fMessagesTx(0), @@ -22,9 +22,14 @@ FairMQSocketZMQ::FairMQSocketZMQ(const string& type, int num) : id << type << "." << num; fId = id.str(); + int rc = zmq_ctx_set (fContext->GetContext(), ZMQ_IO_THREADS, numIoThreads); + if (rc != 0){ + LOG(ERROR) << "failed configuring context, reason: " << zmq_strerror(errno); + } + fSocket = zmq_socket(fContext->GetContext(), GetConstant(type)); - int rc = zmq_setsockopt(fSocket, ZMQ_IDENTITY, &fId, fId.length()); + rc = zmq_setsockopt(fSocket, ZMQ_IDENTITY, &fId, fId.length()); if (rc != 0) { LOG(ERROR) << "failed setting socket option, reason: " << zmq_strerror(errno); } diff --git a/fairmq/zeromq/FairMQSocketZMQ.h b/fairmq/zeromq/FairMQSocketZMQ.h index 56cdd043..34c85ed7 100644 --- a/fairmq/zeromq/FairMQSocketZMQ.h +++ b/fairmq/zeromq/FairMQSocketZMQ.h @@ -19,7 +19,7 @@ class FairMQSocketZMQ : public FairMQSocket { public: - FairMQSocketZMQ(const string& type, int num); + FairMQSocketZMQ(const string& type, int num, int numIoThreads); virtual string GetId(); diff --git a/fairmq/zeromq/FairMQTransportFactoryZMQ.cxx b/fairmq/zeromq/FairMQTransportFactoryZMQ.cxx index c108197e..442d600a 100644 --- a/fairmq/zeromq/FairMQTransportFactoryZMQ.cxx +++ b/fairmq/zeromq/FairMQTransportFactoryZMQ.cxx @@ -31,9 +31,9 @@ FairMQMessage* FairMQTransportFactoryZMQ::CreateMessage(void* data, size_t size) return new FairMQMessageZMQ(data, size); } -FairMQSocket* FairMQTransportFactoryZMQ::CreateSocket(const string& type, int num) +FairMQSocket* FairMQTransportFactoryZMQ::CreateSocket(const string& type, int num, int numIoThreads) { - return new FairMQSocketZMQ(type, num); + return new FairMQSocketZMQ(type, num, numIoThreads); } FairMQPoller* FairMQTransportFactoryZMQ::CreatePoller(const vector& inputs) diff --git a/fairmq/zeromq/FairMQTransportFactoryZMQ.h b/fairmq/zeromq/FairMQTransportFactoryZMQ.h index 3863425d..d06e2fe6 100644 --- a/fairmq/zeromq/FairMQTransportFactoryZMQ.h +++ b/fairmq/zeromq/FairMQTransportFactoryZMQ.h @@ -24,7 +24,7 @@ class FairMQTransportFactoryZMQ : public FairMQTransportFactory virtual FairMQMessage* CreateMessage(); virtual FairMQMessage* CreateMessage(size_t size); virtual FairMQMessage* CreateMessage(void* data, size_t size); - virtual FairMQSocket* CreateSocket(const string& type, int num); + virtual FairMQSocket* CreateSocket(const string& type, int num, int numIoThreads); virtual FairMQPoller* CreatePoller(const vector& inputs);