diff --git a/fairmq/FairMQSocket.h b/fairmq/FairMQSocket.h index 5591d453..a2965cbd 100644 --- a/fairmq/FairMQSocket.h +++ b/fairmq/FairMQSocket.h @@ -23,13 +23,25 @@ using namespace std; class FairMQSocket { public: + const int SNDMORE; + const int RCVMORE; + const int NOBLOCK; + + FairMQSocket(int sndMore, int rcvMore, int noBlock) + : SNDMORE(sndMore) + , RCVMORE(rcvMore) + , NOBLOCK(noBlock) + {} + virtual string GetId() = 0; virtual void Bind(const string& address) = 0; virtual void Connect(const string& address) = 0; virtual int Send(FairMQMessage* msg, const string& flag="") = 0; + virtual int Send(FairMQMessage* msg, const int flags) = 0; virtual int Receive(FairMQMessage* msg, const string& flag="") = 0; + virtual int Receive(FairMQMessage* msg, const int flags) = 0; virtual void* GetSocket() = 0; virtual int GetSocket(int nothing) = 0; diff --git a/fairmq/nanomsg/FairMQSocketNN.cxx b/fairmq/nanomsg/FairMQSocketNN.cxx index 585f57c3..83ce6035 100644 --- a/fairmq/nanomsg/FairMQSocketNN.cxx +++ b/fairmq/nanomsg/FairMQSocketNN.cxx @@ -19,7 +19,8 @@ #include "FairMQLogger.h" FairMQSocketNN::FairMQSocketNN(const string& type, int num, int numIoThreads) - : fSocket() + : FairMQSocket(0, 0, NN_DONTWAIT) + , fSocket() , fId() , fBytesTx(0) , fBytesRx(0) @@ -100,6 +101,24 @@ int FairMQSocketNN::Send(FairMQMessage* msg, const string& flag) return rc; } +int FairMQSocketNN::Send(FairMQMessage* msg, const int flags) +{ + void* ptr = msg->GetMessage(); + int rc = nn_send(fSocket, &ptr, NN_MSG, flags); + if (rc < 0) + { + LOG(ERROR) << "failed sending on socket #" << fId << ", reason: " << nn_strerror(errno); + } + else + { + fBytesTx += rc; + ++fMessagesTx; + static_cast(msg)->fReceiving = false; + } + + return rc; +} + int FairMQSocketNN::Receive(FairMQMessage* msg, const string& flag) { void* ptr = NULL; @@ -119,6 +138,25 @@ int FairMQSocketNN::Receive(FairMQMessage* msg, const string& flag) return rc; } +int FairMQSocketNN::Receive(FairMQMessage* msg, const int flags) +{ + void* ptr = NULL; + int rc = nn_recv(fSocket, &ptr, NN_MSG, flags); + if (rc < 0) + { + LOG(ERROR) << "failed receiving on socket #" << fId << ", reason: " << nn_strerror(errno); + } + else + { + fBytesRx += rc; + ++fMessagesRx; + msg->SetMessage(ptr, rc); + static_cast(msg)->fReceiving = true; + } + + return rc; +} + void FairMQSocketNN::Close() { nn_close(fSocket); @@ -196,12 +234,10 @@ int FairMQSocketNN::GetConstant(const string& constant) return NN_REQ; if (constant == "rep") return NN_REP; - if (constant == "dealer") return NN_REQ; if (constant == "router") return NN_REP; - if (constant == "pair") return NN_PAIR; @@ -209,7 +245,6 @@ int FairMQSocketNN::GetConstant(const string& constant) return NN_SNDBUF; if (constant == "rcv-hwm") return NN_RCVBUF; - if (constant == "snd-more") { LOG(ERROR) << "Multipart messages functionality currently not supported by nanomsg!"; return -1; diff --git a/fairmq/nanomsg/FairMQSocketNN.h b/fairmq/nanomsg/FairMQSocketNN.h index 4496dbb8..abb564c2 100644 --- a/fairmq/nanomsg/FairMQSocketNN.h +++ b/fairmq/nanomsg/FairMQSocketNN.h @@ -34,7 +34,9 @@ class FairMQSocketNN : public FairMQSocket virtual void Connect(const string& address); virtual int Send(FairMQMessage* msg, const string& flag=""); + virtual int Send(FairMQMessage* msg, const int flags); virtual int Receive(FairMQMessage* msg, const string& flag=""); + virtual int Receive(FairMQMessage* msg, const int flags); virtual void* GetSocket(); virtual int GetSocket(int nothing); diff --git a/fairmq/zeromq/FairMQSocketZMQ.cxx b/fairmq/zeromq/FairMQSocketZMQ.cxx index 87eebadc..ad950bd4 100644 --- a/fairmq/zeromq/FairMQSocketZMQ.cxx +++ b/fairmq/zeromq/FairMQSocketZMQ.cxx @@ -20,7 +20,8 @@ boost::shared_ptr FairMQSocketZMQ::fContext = boost::shared_ptr(new FairMQContextZMQ(1)); FairMQSocketZMQ::FairMQSocketZMQ(const string& type, int num, int numIoThreads) - : fSocket(NULL) + : FairMQSocket(ZMQ_SNDMORE, ZMQ_RCVMORE, ZMQ_DONTWAIT) + , fSocket(NULL) , fId() , fBytesTx(0) , fBytesRx(0) @@ -115,6 +116,28 @@ int FairMQSocketZMQ::Send(FairMQMessage* msg, const string& flag) return nbytes; } +int FairMQSocketZMQ::Send(FairMQMessage* msg, const int flags) +{ + int nbytes = zmq_msg_send(static_cast(msg->GetMessage()), fSocket, flags); + if (nbytes >= 0) + { + fBytesTx += nbytes; + ++fMessagesTx; + return nbytes; + } + if (zmq_errno() == EAGAIN) + { + return 0; + } + if (zmq_errno() == ETERM) + { + LOG(INFO) << "terminating socket #" << fId; + return -1; + } + LOG(ERROR) << "failed sending on socket #" << fId << ", reason: " << zmq_strerror(errno); + return nbytes; +} + int FairMQSocketZMQ::Receive(FairMQMessage* msg, const string& flag) { int nbytes = zmq_msg_recv(static_cast(msg->GetMessage()), fSocket, GetConstant(flag)); @@ -137,6 +160,28 @@ int FairMQSocketZMQ::Receive(FairMQMessage* msg, const string& flag) return nbytes; } +int FairMQSocketZMQ::Receive(FairMQMessage* msg, const int flags) +{ + int nbytes = zmq_msg_recv(static_cast(msg->GetMessage()), fSocket, flags); + if (nbytes >= 0) + { + fBytesRx += nbytes; + ++fMessagesRx; + return nbytes; + } + if (zmq_errno() == EAGAIN) + { + return 0; + } + if (zmq_errno() == ETERM) + { + LOG(INFO) << "terminating socket #" << fId; + return -1; + } + LOG(ERROR) << "failed receiving on socket #" << fId << ", reason: " << zmq_strerror(errno); + return nbytes; +} + void FairMQSocketZMQ::Close() { if (fSocket == NULL) @@ -250,6 +295,8 @@ int FairMQSocketZMQ::GetConstant(const string& constant) return ZMQ_LINGER; if (constant == "no-block") return ZMQ_DONTWAIT; + if (constant == "snd-more no-block") + return ZMQ_DONTWAIT|ZMQ_SNDMORE; return -1; } diff --git a/fairmq/zeromq/FairMQSocketZMQ.h b/fairmq/zeromq/FairMQSocketZMQ.h index 23a3441b..af2947e5 100644 --- a/fairmq/zeromq/FairMQSocketZMQ.h +++ b/fairmq/zeromq/FairMQSocketZMQ.h @@ -33,7 +33,9 @@ class FairMQSocketZMQ : public FairMQSocket virtual void Connect(const string& address); virtual int Send(FairMQMessage* msg, const string& flag=""); + virtual int Send(FairMQMessage* msg, const int flags); virtual int Receive(FairMQMessage* msg, const string& flag=""); + virtual int Receive(FairMQMessage* msg, const int flags); virtual void* GetSocket(); virtual int GetSocket(int nothing);