diff --git a/fairmq/shmem/Socket.cxx b/fairmq/shmem/Socket.cxx index 835f1d25..a5c5bebc 100644 --- a/fairmq/shmem/Socket.cxx +++ b/fairmq/shmem/Socket.cxx @@ -156,7 +156,10 @@ int Socket::Send(MessagePtr& msg, const int timeout) } else if (zmq_errno() == ETERM) { LOG(info) << "terminating socket " << fId; return -1; - } else { + } else if (zmq_errno() == EINTR) { + LOG(debug) << "Send interrupted by system call"; + return nbytes; + }else { LOG(error) << "Failed sending on socket " << fId << ", reason: " << zmq_strerror(errno) << ", nbytes = " << nbytes; return nbytes; } @@ -204,8 +207,11 @@ int Socket::Receive(MessagePtr& msg, const int timeout) } else if (zmq_errno() == ETERM) { LOG(info) << "terminating socket " << fId; return -1; - } else { - LOG(error) << "Failed receiving on socket " << fId << ", reason: " << zmq_strerror(errno) << ", nbytes = " << nbytes; + } else if (zmq_errno() == EINTR) { + LOG(debug) << "Receive interrupted by system call"; + return nbytes; + }else { + LOG(error) << "Failed receiving on socket " << fId << ", errno: " << errno << ", reason: " << zmq_strerror(errno) << ", nbytes = " << nbytes; return nbytes; } } @@ -263,7 +269,10 @@ int64_t Socket::Send(vector& msgVec, const int timeout) } else if (zmq_errno() == ETERM) { LOG(info) << "terminating socket " << fId; return -1; - } else { + } else if (zmq_errno() == EINTR) { + LOG(debug) << "Send interrupted by system call"; + return nbytes; + }else { LOG(error) << "Failed sending on socket " << fId << ", reason: " << zmq_strerror(errno) << ", nbytes = " << nbytes; return nbytes; } @@ -319,8 +328,11 @@ int64_t Socket::Receive(vector& msgVec, const int timeout) } else { return -2; } + } else if (zmq_errno() == EINTR) { + LOG(debug) << "Receive interrupted by system call"; + return nbytes; } else { - LOG(error) << "Failed receiving on socket " << fId << ", reason: " << zmq_strerror(errno) << ", nbytes = " << nbytes; + LOG(error) << "Failed receiving on socket " << fId << ", errno: " << errno << ", reason: " << zmq_strerror(errno) << ", nbytes = " << nbytes; return nbytes; } } diff --git a/fairmq/zeromq/FairMQSocketZMQ.cxx b/fairmq/zeromq/FairMQSocketZMQ.cxx index 4ca54f6a..66514f49 100644 --- a/fairmq/zeromq/FairMQSocketZMQ.cxx +++ b/fairmq/zeromq/FairMQSocketZMQ.cxx @@ -107,50 +107,39 @@ bool FairMQSocketZMQ::Connect(const string& address) int FairMQSocketZMQ::Send(FairMQMessagePtr& msg, const int timeout) { int flags = 0; - if (timeout == 0) - { + if (timeout == 0) { flags = ZMQ_DONTWAIT; } int elapsed = 0; static_cast(msg.get())->ApplyUsedSize(); - while (true) - { + while (true) { int nbytes = zmq_msg_send(static_cast(msg.get())->GetMessage(), fSocket, flags); - if (nbytes >= 0) - { + if (nbytes >= 0) { fBytesTx += nbytes; ++fMessagesTx; return nbytes; - } - else if (zmq_errno() == EAGAIN) - { - if (!fInterrupted && ((flags & ZMQ_DONTWAIT) == 0)) - { - if (timeout > 0) - { + } else if (zmq_errno() == EAGAIN) { + if (!fInterrupted && ((flags & ZMQ_DONTWAIT) == 0)) { + if (timeout > 0) { elapsed += fSndTimeout; - if (elapsed >= timeout) - { + if (elapsed >= timeout) { return -2; } } continue; - } - else - { + } else { return -2; } - } - else if (zmq_errno() == ETERM) - { + } else if (zmq_errno() == ETERM) { LOG(info) << "terminating socket " << fId; return -1; - } - else - { + } else if (zmq_errno() == EINTR) { + LOG(debug) << "Send interrupted by system call"; + return nbytes; + } else { LOG(error) << "Failed sending on socket " << fId << ", reason: " << zmq_strerror(errno); return nbytes; } @@ -160,47 +149,36 @@ int FairMQSocketZMQ::Send(FairMQMessagePtr& msg, const int timeout) int FairMQSocketZMQ::Receive(FairMQMessagePtr& msg, const int timeout) { int flags = 0; - if (timeout == 0) - { + if (timeout == 0) { flags = ZMQ_DONTWAIT; } int elapsed = 0; - while (true) - { + while (true) { int nbytes = zmq_msg_recv(static_cast(msg.get())->GetMessage(), fSocket, flags); - if (nbytes >= 0) - { + if (nbytes >= 0) { fBytesRx += nbytes; ++fMessagesRx; return nbytes; - } - else if (zmq_errno() == EAGAIN) - { - if (!fInterrupted && ((flags & ZMQ_DONTWAIT) == 0)) - { - if (timeout > 0) - { + } else if (zmq_errno() == EAGAIN) { + if (!fInterrupted && ((flags & ZMQ_DONTWAIT) == 0)) { + if (timeout > 0) { elapsed += fRcvTimeout; - if (elapsed >= timeout) - { + if (elapsed >= timeout) { return -2; } } continue; - } - else - { + } else { return -2; } - } - else if (zmq_errno() == ETERM) - { + } else if (zmq_errno() == ETERM) { LOG(info) << "terminating socket " << fId; return -1; - } - else - { + } else if (zmq_errno() == EINTR) { + LOG(debug) << "Receive interrupted by system call"; + return nbytes; + } else { LOG(error) << "Failed receiving on socket " << fId << ", reason: " << zmq_strerror(errno); return nbytes; } @@ -210,69 +188,58 @@ int FairMQSocketZMQ::Receive(FairMQMessagePtr& msg, const int timeout) int64_t FairMQSocketZMQ::Send(vector& msgVec, const int timeout) { int flags = 0; - if (timeout == 0) - { + if (timeout == 0) { flags = ZMQ_DONTWAIT; } const unsigned int vecSize = msgVec.size(); // Sending vector typicaly handles more then one part - if (vecSize > 1) - { + if (vecSize > 1) { int elapsed = 0; - while (true) - { + while (true) { int64_t totalSize = 0; bool repeat = false; - for (unsigned int i = 0; i < vecSize; ++i) - { + for (unsigned int i = 0; i < vecSize; ++i) { static_cast(msgVec[i].get())->ApplyUsedSize(); int nbytes = zmq_msg_send(static_cast(msgVec[i].get())->GetMessage(), fSocket, (i < vecSize - 1) ? ZMQ_SNDMORE|flags : flags); - if (nbytes >= 0) - { + if (nbytes >= 0) { totalSize += nbytes; - } - else - { + } else { // according to ZMQ docs, this can only occur for the first part - if (zmq_errno() == EAGAIN) - { - if (!fInterrupted && ((flags & ZMQ_DONTWAIT) == 0)) - { - if (timeout > 0) - { + if (zmq_errno() == EAGAIN) { + if (!fInterrupted && ((flags & ZMQ_DONTWAIT) == 0)) { + if (timeout > 0) { elapsed += fSndTimeout; - if (elapsed >= timeout) - { + if (elapsed >= timeout) { return -2; } } repeat = true; break; - } - else - { + } else { return -2; } } - if (zmq_errno() == ETERM) - { + if (zmq_errno() == ETERM) { LOG(info) << "terminating socket " << fId; return -1; + } else if (zmq_errno() == EINTR) { + LOG(debug) << "Receive interrupted by system call"; + return nbytes; + } else { + LOG(error) << "Failed sending on socket " << fId << ", reason: " << zmq_strerror(errno); + return nbytes; } - LOG(error) << "Failed sending on socket " << fId << ", reason: " << zmq_strerror(errno); - return nbytes; } } - if (repeat) - { + if (repeat) { continue; } @@ -282,12 +249,9 @@ int64_t FairMQSocketZMQ::Send(vector& msgVec, const int timeou return totalSize; } } // If there's only one part, send it as a regular message - else if (vecSize == 1) - { + else if (vecSize == 1) { return Send(msgVec.back(), timeout); - } - else // if the vector is empty, something might be wrong - { + } else { // if the vector is empty, something might be wrong LOG(warn) << "Will not send empty vector"; return -1; } @@ -296,60 +260,48 @@ int64_t FairMQSocketZMQ::Send(vector& msgVec, const int timeou int64_t FairMQSocketZMQ::Receive(vector& msgVec, const int timeout) { int flags = 0; - if (timeout == 0) - { + if (timeout == 0) { flags = ZMQ_DONTWAIT; } int elapsed = 0; - while (true) - { + while (true) { int64_t totalSize = 0; int64_t more = 0; bool repeat = false; - do - { + do { unique_ptr part(new FairMQMessageZMQ(GetTransport())); int nbytes = zmq_msg_recv(static_cast(part.get())->GetMessage(), fSocket, flags); - if (nbytes >= 0) - { + if (nbytes >= 0) { msgVec.push_back(move(part)); totalSize += nbytes; - } - else if (zmq_errno() == EAGAIN) - { - if (!fInterrupted && ((flags & ZMQ_DONTWAIT) == 0)) - { - if (timeout > 0) - { + } else if (zmq_errno() == EAGAIN) { + if (!fInterrupted && ((flags & ZMQ_DONTWAIT) == 0)) { + if (timeout > 0) { elapsed += fRcvTimeout; - if (elapsed >= timeout) - { + if (elapsed >= timeout) { return -2; } } repeat = true; break; - } - else - { + } else { return -2; } - } - else - { + } else if (zmq_errno() == EINTR) { + LOG(debug) << "Receive interrupted by system call"; + return nbytes; + } else { return nbytes; } size_t moreSize = sizeof(more); zmq_getsockopt(fSocket, ZMQ_RCVMORE, &more, &moreSize); - } - while (more); + } while (more); - if (repeat) - { + if (repeat) { continue; }