diff --git a/fairmq/nanomsg/FairMQSocketNN.cxx b/fairmq/nanomsg/FairMQSocketNN.cxx index 209f0c6e..cb2d6291 100644 --- a/fairmq/nanomsg/FairMQSocketNN.cxx +++ b/fairmq/nanomsg/FairMQSocketNN.cxx @@ -33,11 +33,22 @@ FairMQSocketNN::FairMQSocketNN(const string& type, int num, int numIoThreads) LOG(INFO) << "number of I/O threads is not used in nanomsg"; } - fSocket = nn_socket(AF_SP, GetConstant(type)); - if (type == "sub") + if (type == "router" || type == "dealer") { - nn_setsockopt(fSocket, NN_SUB, NN_SUB_SUBSCRIBE, NULL, 0); + // Additional info about using the sockets ROUTER and DEALER with nanomsg can be found in: + // http://250bpm.com/blog:14 + // http://www.freelists.org/post/nanomsg/a-stupid-load-balancing-question,1 + fSocket = nn_socket(AF_SP_RAW, GetConstant(type)); } + else + { + fSocket = nn_socket(AF_SP, GetConstant(type)); + if (type == "sub") + { + nn_setsockopt(fSocket, NN_SUB, NN_SUB_SUBSCRIBE, NULL, 0); + } + } + LOG(INFO) << "created socket #" << fId; } @@ -172,7 +183,7 @@ int FairMQSocketNN::GetConstant(const string& constant) if (constant == "pub") return NN_PUB; if (constant == "xsub") - return NN_SUB; // TODO: is there XPUB, XSUB for nanomsg? + return NN_SUB; if (constant == "xpub") return NN_PUB; if (constant == "push") @@ -183,10 +194,20 @@ int FairMQSocketNN::GetConstant(const string& constant) return NN_REQ; if (constant == "rep") return NN_REP; + + if (constant == "dealer") + return NN_REQ; + if (constant == "router") + return NN_REP; + + if (constant == "pair") + return NN_PAIR; + if (constant == "snd-hwm") return NN_SNDBUF; if (constant == "rcv-hwm") return NN_RCVBUF; + if (constant == "snd-more") { LOG(ERROR) << "Multipart messages functionality currently not supported by nanomsg!"; return -1; @@ -195,6 +216,7 @@ int FairMQSocketNN::GetConstant(const string& constant) LOG(ERROR) << "Multipart messages functionality currently not supported by nanomsg!"; return -1; } + if (constant == "linger") return NN_LINGER; diff --git a/fairmq/nanomsg/FairMQSocketNN.h b/fairmq/nanomsg/FairMQSocketNN.h index 4bdc03af..4496dbb8 100644 --- a/fairmq/nanomsg/FairMQSocketNN.h +++ b/fairmq/nanomsg/FairMQSocketNN.h @@ -19,6 +19,7 @@ #include #include #include +#include #include "FairMQSocket.h" diff --git a/fairmq/zeromq/FairMQSocketZMQ.cxx b/fairmq/zeromq/FairMQSocketZMQ.cxx index a776eaf6..d146a2ef 100644 --- a/fairmq/zeromq/FairMQSocketZMQ.cxx +++ b/fairmq/zeromq/FairMQSocketZMQ.cxx @@ -228,6 +228,13 @@ int FairMQSocketZMQ::GetConstant(const string& constant) return ZMQ_REQ; if (constant == "rep") return ZMQ_REP; + if (constant == "dealer") + return ZMQ_DEALER; + if (constant == "router") + return ZMQ_ROUTER; + if (constant == "pair") + return ZMQ_PAIR; + if (constant == "snd-hwm") return ZMQ_SNDHWM; if (constant == "rcv-hwm") @@ -236,6 +243,7 @@ int FairMQSocketZMQ::GetConstant(const string& constant) return ZMQ_SNDMORE; if (constant == "rcv-more") return ZMQ_RCVMORE; + if (constant == "linger") return ZMQ_LINGER;