diff --git a/fairmq/FairMQChannel.cxx b/fairmq/FairMQChannel.cxx index 2b30c62b..eedfd5d1 100644 --- a/fairmq/FairMQChannel.cxx +++ b/fairmq/FairMQChannel.cxx @@ -409,7 +409,7 @@ bool FairMQChannel::InitCommandInterface(FairMQTransportFactory* factory, int nu { fTransportFactory = factory; - fCmdSocket = fTransportFactory->CreateSocket("sub", "device-commands", numIoThreads); + fCmdSocket = fTransportFactory->CreateSocket("sub", "device-commands", numIoThreads, "internal"); if (fCmdSocket) { fCmdSocket->Connect("inproc://commands"); diff --git a/fairmq/FairMQDevice.cxx b/fairmq/FairMQDevice.cxx index 37a0beda..923fe8fe 100644 --- a/fairmq/FairMQDevice.cxx +++ b/fairmq/FairMQDevice.cxx @@ -98,7 +98,7 @@ void FairMQDevice::InitWrapper() if (!fCmdSocket) { - fCmdSocket = fTransportFactory->CreateSocket("pub", "device-commands", fNumIoThreads); + fCmdSocket = fTransportFactory->CreateSocket("pub", "device-commands", fNumIoThreads, fId); fCmdSocket->Bind("inproc://commands"); } @@ -191,7 +191,7 @@ bool FairMQDevice::InitChannel(FairMQChannel& ch) { LOG(DEBUG) << "Initializing channel " << ch.fChannelName << " (" << ch.fType << ")"; // initialize the socket - ch.fSocket = fTransportFactory->CreateSocket(ch.fType, ch.fChannelName, fNumIoThreads); + ch.fSocket = fTransportFactory->CreateSocket(ch.fType, ch.fChannelName, fNumIoThreads, fId); // set high water marks ch.fSocket->SetOption("snd-hwm", &(ch.fSndBufSize), sizeof(ch.fSndBufSize)); ch.fSocket->SetOption("rcv-hwm", &(ch.fRcvBufSize), sizeof(ch.fRcvBufSize)); diff --git a/fairmq/FairMQTransportFactory.h b/fairmq/FairMQTransportFactory.h index 6951d889..5be87c6c 100644 --- a/fairmq/FairMQTransportFactory.h +++ b/fairmq/FairMQTransportFactory.h @@ -34,7 +34,7 @@ class FairMQTransportFactory virtual FairMQMessage* CreateMessage(const size_t size) = 0; virtual FairMQMessage* CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn = NULL, void* hint = NULL) = 0; - virtual FairMQSocket* CreateSocket(const std::string& type, const std::string& name, const int numIoThreads) = 0; + virtual FairMQSocket* CreateSocket(const std::string& type, const std::string& name, const int numIoThreads, const std::string& id = "") = 0; virtual FairMQPoller* CreatePoller(const std::vector& channels) = 0; virtual FairMQPoller* CreatePoller(const std::unordered_map>& channelsMap, const std::initializer_list channelList) = 0; diff --git a/fairmq/nanomsg/FairMQSocketNN.cxx b/fairmq/nanomsg/FairMQSocketNN.cxx index eab99585..28204d1f 100644 --- a/fairmq/nanomsg/FairMQSocketNN.cxx +++ b/fairmq/nanomsg/FairMQSocketNN.cxx @@ -20,7 +20,7 @@ using namespace std; -FairMQSocketNN::FairMQSocketNN(const string& type, const std::string& name, const int numIoThreads) +FairMQSocketNN::FairMQSocketNN(const string& type, const std::string& name, const int numIoThreads, const std::string& id /*= ""*/) : FairMQSocket(0, 0, NN_DONTWAIT) , fSocket(-1) , fId() @@ -29,7 +29,7 @@ FairMQSocketNN::FairMQSocketNN(const string& type, const std::string& name, cons , fMessagesTx(0) , fMessagesRx(0) { - fId = name + "." + type; + fId = id + "." + name + "." + type; if (numIoThreads > 1) { diff --git a/fairmq/nanomsg/FairMQSocketNN.h b/fairmq/nanomsg/FairMQSocketNN.h index 09ae2763..71f07086 100644 --- a/fairmq/nanomsg/FairMQSocketNN.h +++ b/fairmq/nanomsg/FairMQSocketNN.h @@ -26,7 +26,7 @@ class FairMQSocketNN : public FairMQSocket { public: - FairMQSocketNN(const std::string& type, const std::string& name, const int numIoThreads); // numIoThreads is not used in nanomsg. + FairMQSocketNN(const std::string& type, const std::string& name, const int numIoThreads, const std::string& id = ""); // numIoThreads is not used in nanomsg. FairMQSocketNN(const FairMQSocketNN&) = delete; FairMQSocketNN operator=(const FairMQSocketNN&) = delete; diff --git a/fairmq/nanomsg/FairMQTransportFactoryNN.cxx b/fairmq/nanomsg/FairMQTransportFactoryNN.cxx index 6d09d414..4f4e5fc8 100644 --- a/fairmq/nanomsg/FairMQTransportFactoryNN.cxx +++ b/fairmq/nanomsg/FairMQTransportFactoryNN.cxx @@ -36,9 +36,9 @@ FairMQMessage* FairMQTransportFactoryNN::CreateMessage(void* data, const size_t return new FairMQMessageNN(data, size, ffn, hint); } -FairMQSocket* FairMQTransportFactoryNN::CreateSocket(const string& type, const std::string& name, const int numIoThreads) +FairMQSocket* FairMQTransportFactoryNN::CreateSocket(const string& type, const std::string& name, const int numIoThreads, const std::string& id /*= ""*/) { - return new FairMQSocketNN(type, name, numIoThreads); + return new FairMQSocketNN(type, name, numIoThreads, id); } FairMQPoller* FairMQTransportFactoryNN::CreatePoller(const vector& channels) diff --git a/fairmq/nanomsg/FairMQTransportFactoryNN.h b/fairmq/nanomsg/FairMQTransportFactoryNN.h index 76638267..89efb8d0 100644 --- a/fairmq/nanomsg/FairMQTransportFactoryNN.h +++ b/fairmq/nanomsg/FairMQTransportFactoryNN.h @@ -31,7 +31,7 @@ class FairMQTransportFactoryNN : public FairMQTransportFactory virtual FairMQMessage* CreateMessage(const size_t size); virtual FairMQMessage* CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn = NULL, void* hint = NULL); - virtual FairMQSocket* CreateSocket(const std::string& type, const std::string& name, const int numIoThreads); + virtual FairMQSocket* CreateSocket(const std::string& type, const std::string& name, const int numIoThreads, const std::string& id = ""); virtual FairMQPoller* CreatePoller(const std::vector& channels); virtual FairMQPoller* CreatePoller(const std::unordered_map>& channelsMap, const std::initializer_list channelList); diff --git a/fairmq/test/req-rep/FairMQTestRep.cxx b/fairmq/test/req-rep/FairMQTestRep.cxx index 188eb470..16f4a198 100644 --- a/fairmq/test/req-rep/FairMQTestRep.cxx +++ b/fairmq/test/req-rep/FairMQTestRep.cxx @@ -23,12 +23,22 @@ FairMQTestRep::FairMQTestRep() void FairMQTestRep::Run() { - std::unique_ptr request(NewMessage()); - if (Receive(request, "data") >= 0) + std::unique_ptr request1(NewMessage()); + if (Receive(request1, "data") >= 0) { + LOG(INFO) << "Received request 1"; std::unique_ptr reply(NewMessage()); Send(reply, "data"); } + std::unique_ptr request2(NewMessage()); + if (Receive(request2, "data") >= 0) + { + LOG(INFO) << "Received request 2"; + std::unique_ptr reply(NewMessage()); + Send(reply, "data"); + } + + LOG(INFO) << "REQ-REP test successfull"; } FairMQTestRep::~FairMQTestRep() diff --git a/fairmq/test/req-rep/FairMQTestReq.cxx b/fairmq/test/req-rep/FairMQTestReq.cxx index a68abe02..1ed82cbc 100644 --- a/fairmq/test/req-rep/FairMQTestReq.cxx +++ b/fairmq/test/req-rep/FairMQTestReq.cxx @@ -29,7 +29,7 @@ void FairMQTestReq::Run() std::unique_ptr reply(NewMessage()); if (Receive(reply, "data") >= 0) { - LOG(INFO) << "REQ-REP test successfull"; + LOG(INFO) << "received reply"; } } diff --git a/fairmq/test/req-rep/runTestRep.cxx b/fairmq/test/req-rep/runTestRep.cxx index 3684859e..ce0abb54 100644 --- a/fairmq/test/req-rep/runTestRep.cxx +++ b/fairmq/test/req-rep/runTestRep.cxx @@ -25,7 +25,7 @@ int main(int argc, char** argv) testRep.SetProperty(FairMQTestRep::Id, "testRep"); - FairMQChannel repChannel("rep", "connect", "tcp://127.0.0.1:5558"); + FairMQChannel repChannel("rep", "bind", "tcp://127.0.0.1:5558"); testRep.fChannels["data"].push_back(repChannel); testRep.ChangeState("INIT_DEVICE"); diff --git a/fairmq/test/req-rep/runTestReq.cxx b/fairmq/test/req-rep/runTestReq.cxx index 8277b630..0715c172 100644 --- a/fairmq/test/req-rep/runTestReq.cxx +++ b/fairmq/test/req-rep/runTestReq.cxx @@ -23,9 +23,9 @@ int main(int argc, char** argv) testReq.CatchSignals(); testReq.SetTransport("zeromq"); - testReq.SetProperty(FairMQTestReq::Id, "testReq"); + testReq.SetProperty(FairMQTestReq::Id, "testReq" + std::to_string(getpid())); - FairMQChannel reqChannel("req", "bind", "tcp://127.0.0.1:5558"); + FairMQChannel reqChannel("req", "connect", "tcp://127.0.0.1:5558"); testReq.fChannels["data"].push_back(reqChannel); testReq.ChangeState("INIT_DEVICE"); diff --git a/fairmq/test/test-fairmq-req-rep.sh.in b/fairmq/test/test-fairmq-req-rep.sh.in index b523a5f6..22e3df1e 100755 --- a/fairmq/test/test-fairmq-req-rep.sh.in +++ b/fairmq/test/test-fairmq-req-rep.sh.in @@ -1,9 +1,12 @@ #!/bin/bash -trap 'kill -TERM $REQ_PID; kill -TERM $REP_PID; wait $REQ_PID; wait $REP_PID;' TERM +trap 'kill -TERM $REQ1_PID; kill -TERM $REQ2_PID; kill -TERM $REP_PID; wait $REQ1_PID; wait $REQ2_PID; wait $REP_PID;' TERM @CMAKE_BINARY_DIR@/bin/test-fairmq-req & -REQ_PID=$! +REQ1_PID=$! +@CMAKE_BINARY_DIR@/bin/test-fairmq-req & +REQ2_PID=$! @CMAKE_BINARY_DIR@/bin/test-fairmq-rep & REP_PID=$! -wait $REQ_PID +wait $REQ1_PID +wait $REQ2_PID wait $REP_PID diff --git a/fairmq/zeromq/FairMQSocketZMQ.cxx b/fairmq/zeromq/FairMQSocketZMQ.cxx index 02fa9157..7f134621 100644 --- a/fairmq/zeromq/FairMQSocketZMQ.cxx +++ b/fairmq/zeromq/FairMQSocketZMQ.cxx @@ -24,7 +24,7 @@ using namespace std; // Context to hold the ZeroMQ sockets boost::shared_ptr FairMQSocketZMQ::fContext = boost::shared_ptr(new FairMQContextZMQ(1)); -FairMQSocketZMQ::FairMQSocketZMQ(const string& type, const string& name, const int numIoThreads) +FairMQSocketZMQ::FairMQSocketZMQ(const string& type, const string& name, const int numIoThreads, const std::string& id /*= ""*/) : FairMQSocket(ZMQ_SNDMORE, ZMQ_RCVMORE, ZMQ_DONTWAIT) , fSocket(NULL) , fId() @@ -33,7 +33,7 @@ FairMQSocketZMQ::FairMQSocketZMQ(const string& type, const string& name, const i , fMessagesTx(0) , fMessagesRx(0) { - fId = name + "." + type; + fId = id + "." + name + "." + type; if (zmq_ctx_set(fContext->GetContext(), ZMQ_IO_THREADS, numIoThreads) != 0) { @@ -68,8 +68,6 @@ FairMQSocketZMQ::FairMQSocketZMQ(const string& type, const string& name, const i LOG(ERROR) << "Failed setting ZMQ_SUBSCRIBE socket option, reason: " << zmq_strerror(errno); } } - - // LOG(INFO) << "created socket " << fId; } string FairMQSocketZMQ::GetId() diff --git a/fairmq/zeromq/FairMQSocketZMQ.h b/fairmq/zeromq/FairMQSocketZMQ.h index f7ccbb58..2bbdc399 100644 --- a/fairmq/zeromq/FairMQSocketZMQ.h +++ b/fairmq/zeromq/FairMQSocketZMQ.h @@ -23,7 +23,7 @@ class FairMQSocketZMQ : public FairMQSocket { public: - FairMQSocketZMQ(const std::string& type, const std::string& name, const int numIoThreads); + FairMQSocketZMQ(const std::string& type, const std::string& name, const int numIoThreads, const std::string& id = ""); FairMQSocketZMQ(const FairMQSocketZMQ&) = delete; FairMQSocketZMQ operator=(const FairMQSocketZMQ&) = delete; diff --git a/fairmq/zeromq/FairMQTransportFactoryZMQ.cxx b/fairmq/zeromq/FairMQTransportFactoryZMQ.cxx index 40326a51..c5ea3317 100644 --- a/fairmq/zeromq/FairMQTransportFactoryZMQ.cxx +++ b/fairmq/zeromq/FairMQTransportFactoryZMQ.cxx @@ -40,9 +40,9 @@ FairMQMessage* FairMQTransportFactoryZMQ::CreateMessage(void* data, const size_t return new FairMQMessageZMQ(data, size, ffn, hint); } -FairMQSocket* FairMQTransportFactoryZMQ::CreateSocket(const string& type, const std::string& name, const int numIoThreads) +FairMQSocket* FairMQTransportFactoryZMQ::CreateSocket(const string& type, const std::string& name, const int numIoThreads, const std::string& id /*= ""*/) { - return new FairMQSocketZMQ(type, name, numIoThreads); + return new FairMQSocketZMQ(type, name, numIoThreads, id); } FairMQPoller* FairMQTransportFactoryZMQ::CreatePoller(const vector& channels) diff --git a/fairmq/zeromq/FairMQTransportFactoryZMQ.h b/fairmq/zeromq/FairMQTransportFactoryZMQ.h index 5f932f84..8fb6572f 100644 --- a/fairmq/zeromq/FairMQTransportFactoryZMQ.h +++ b/fairmq/zeromq/FairMQTransportFactoryZMQ.h @@ -32,7 +32,7 @@ class FairMQTransportFactoryZMQ : public FairMQTransportFactory virtual FairMQMessage* CreateMessage(const size_t size); virtual FairMQMessage* CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn = NULL, void* hint = NULL); - virtual FairMQSocket* CreateSocket(const std::string& type, const std::string& name, const int numIoThreads); + virtual FairMQSocket* CreateSocket(const std::string& type, const std::string& name, const int numIoThreads, const std::string& id = ""); virtual FairMQPoller* CreatePoller(const std::vector& channels); virtual FairMQPoller* CreatePoller(const std::unordered_map>& channelsMap, const std::initializer_list channelList);