Do not report interruption by system call as error

This commit is contained in:
Alexey Rybalchenko 2020-03-02 12:05:09 +01:00 committed by Dennis Klein
parent f00519b99b
commit a545bee3b1
2 changed files with 79 additions and 115 deletions

View File

@ -156,7 +156,10 @@ int Socket::Send(MessagePtr& msg, const int timeout)
} else if (zmq_errno() == ETERM) { } else if (zmq_errno() == ETERM) {
LOG(info) << "terminating socket " << fId; LOG(info) << "terminating socket " << fId;
return -1; return -1;
} else { } else if (zmq_errno() == EINTR) {
LOG(debug) << "Send interrupted by system call";
return nbytes;
}else {
LOG(error) << "Failed sending on socket " << fId << ", reason: " << zmq_strerror(errno) << ", nbytes = " << nbytes; LOG(error) << "Failed sending on socket " << fId << ", reason: " << zmq_strerror(errno) << ", nbytes = " << nbytes;
return nbytes; return nbytes;
} }
@ -204,8 +207,11 @@ int Socket::Receive(MessagePtr& msg, const int timeout)
} else if (zmq_errno() == ETERM) { } else if (zmq_errno() == ETERM) {
LOG(info) << "terminating socket " << fId; LOG(info) << "terminating socket " << fId;
return -1; return -1;
} else { } else if (zmq_errno() == EINTR) {
LOG(error) << "Failed receiving on socket " << fId << ", reason: " << zmq_strerror(errno) << ", nbytes = " << nbytes; LOG(debug) << "Receive interrupted by system call";
return nbytes;
}else {
LOG(error) << "Failed receiving on socket " << fId << ", errno: " << errno << ", reason: " << zmq_strerror(errno) << ", nbytes = " << nbytes;
return nbytes; return nbytes;
} }
} }
@ -263,7 +269,10 @@ int64_t Socket::Send(vector<MessagePtr>& msgVec, const int timeout)
} else if (zmq_errno() == ETERM) { } else if (zmq_errno() == ETERM) {
LOG(info) << "terminating socket " << fId; LOG(info) << "terminating socket " << fId;
return -1; return -1;
} else { } else if (zmq_errno() == EINTR) {
LOG(debug) << "Send interrupted by system call";
return nbytes;
}else {
LOG(error) << "Failed sending on socket " << fId << ", reason: " << zmq_strerror(errno) << ", nbytes = " << nbytes; LOG(error) << "Failed sending on socket " << fId << ", reason: " << zmq_strerror(errno) << ", nbytes = " << nbytes;
return nbytes; return nbytes;
} }
@ -319,8 +328,11 @@ int64_t Socket::Receive(vector<MessagePtr>& msgVec, const int timeout)
} else { } else {
return -2; return -2;
} }
} else if (zmq_errno() == EINTR) {
LOG(debug) << "Receive interrupted by system call";
return nbytes;
} else { } else {
LOG(error) << "Failed receiving on socket " << fId << ", reason: " << zmq_strerror(errno) << ", nbytes = " << nbytes; LOG(error) << "Failed receiving on socket " << fId << ", errno: " << errno << ", reason: " << zmq_strerror(errno) << ", nbytes = " << nbytes;
return nbytes; return nbytes;
} }
} }

View File

@ -107,50 +107,39 @@ bool FairMQSocketZMQ::Connect(const string& address)
int FairMQSocketZMQ::Send(FairMQMessagePtr& msg, const int timeout) int FairMQSocketZMQ::Send(FairMQMessagePtr& msg, const int timeout)
{ {
int flags = 0; int flags = 0;
if (timeout == 0) if (timeout == 0) {
{
flags = ZMQ_DONTWAIT; flags = ZMQ_DONTWAIT;
} }
int elapsed = 0; int elapsed = 0;
static_cast<FairMQMessageZMQ*>(msg.get())->ApplyUsedSize(); static_cast<FairMQMessageZMQ*>(msg.get())->ApplyUsedSize();
while (true) while (true) {
{
int nbytes = zmq_msg_send(static_cast<FairMQMessageZMQ*>(msg.get())->GetMessage(), fSocket, flags); int nbytes = zmq_msg_send(static_cast<FairMQMessageZMQ*>(msg.get())->GetMessage(), fSocket, flags);
if (nbytes >= 0) if (nbytes >= 0) {
{
fBytesTx += nbytes; fBytesTx += nbytes;
++fMessagesTx; ++fMessagesTx;
return nbytes; return nbytes;
} } else if (zmq_errno() == EAGAIN) {
else if (zmq_errno() == EAGAIN) if (!fInterrupted && ((flags & ZMQ_DONTWAIT) == 0)) {
{ if (timeout > 0) {
if (!fInterrupted && ((flags & ZMQ_DONTWAIT) == 0))
{
if (timeout > 0)
{
elapsed += fSndTimeout; elapsed += fSndTimeout;
if (elapsed >= timeout) if (elapsed >= timeout) {
{
return -2; return -2;
} }
} }
continue; continue;
} } else {
else
{
return -2; return -2;
} }
} } else if (zmq_errno() == ETERM) {
else if (zmq_errno() == ETERM)
{
LOG(info) << "terminating socket " << fId; LOG(info) << "terminating socket " << fId;
return -1; return -1;
} } else if (zmq_errno() == EINTR) {
else LOG(debug) << "Send interrupted by system call";
{ return nbytes;
} else {
LOG(error) << "Failed sending on socket " << fId << ", reason: " << zmq_strerror(errno); LOG(error) << "Failed sending on socket " << fId << ", reason: " << zmq_strerror(errno);
return nbytes; return nbytes;
} }
@ -160,47 +149,36 @@ int FairMQSocketZMQ::Send(FairMQMessagePtr& msg, const int timeout)
int FairMQSocketZMQ::Receive(FairMQMessagePtr& msg, const int timeout) int FairMQSocketZMQ::Receive(FairMQMessagePtr& msg, const int timeout)
{ {
int flags = 0; int flags = 0;
if (timeout == 0) if (timeout == 0) {
{
flags = ZMQ_DONTWAIT; flags = ZMQ_DONTWAIT;
} }
int elapsed = 0; int elapsed = 0;
while (true) while (true) {
{
int nbytes = zmq_msg_recv(static_cast<FairMQMessageZMQ*>(msg.get())->GetMessage(), fSocket, flags); int nbytes = zmq_msg_recv(static_cast<FairMQMessageZMQ*>(msg.get())->GetMessage(), fSocket, flags);
if (nbytes >= 0) if (nbytes >= 0) {
{
fBytesRx += nbytes; fBytesRx += nbytes;
++fMessagesRx; ++fMessagesRx;
return nbytes; return nbytes;
} } else if (zmq_errno() == EAGAIN) {
else if (zmq_errno() == EAGAIN) if (!fInterrupted && ((flags & ZMQ_DONTWAIT) == 0)) {
{ if (timeout > 0) {
if (!fInterrupted && ((flags & ZMQ_DONTWAIT) == 0))
{
if (timeout > 0)
{
elapsed += fRcvTimeout; elapsed += fRcvTimeout;
if (elapsed >= timeout) if (elapsed >= timeout) {
{
return -2; return -2;
} }
} }
continue; continue;
} } else {
else
{
return -2; return -2;
} }
} } else if (zmq_errno() == ETERM) {
else if (zmq_errno() == ETERM)
{
LOG(info) << "terminating socket " << fId; LOG(info) << "terminating socket " << fId;
return -1; return -1;
} } else if (zmq_errno() == EINTR) {
else LOG(debug) << "Receive interrupted by system call";
{ return nbytes;
} else {
LOG(error) << "Failed receiving on socket " << fId << ", reason: " << zmq_strerror(errno); LOG(error) << "Failed receiving on socket " << fId << ", reason: " << zmq_strerror(errno);
return nbytes; return nbytes;
} }
@ -210,69 +188,58 @@ int FairMQSocketZMQ::Receive(FairMQMessagePtr& msg, const int timeout)
int64_t FairMQSocketZMQ::Send(vector<FairMQMessagePtr>& msgVec, const int timeout) int64_t FairMQSocketZMQ::Send(vector<FairMQMessagePtr>& msgVec, const int timeout)
{ {
int flags = 0; int flags = 0;
if (timeout == 0) if (timeout == 0) {
{
flags = ZMQ_DONTWAIT; flags = ZMQ_DONTWAIT;
} }
const unsigned int vecSize = msgVec.size(); const unsigned int vecSize = msgVec.size();
// Sending vector typicaly handles more then one part // Sending vector typicaly handles more then one part
if (vecSize > 1) if (vecSize > 1) {
{
int elapsed = 0; int elapsed = 0;
while (true) while (true) {
{
int64_t totalSize = 0; int64_t totalSize = 0;
bool repeat = false; bool repeat = false;
for (unsigned int i = 0; i < vecSize; ++i) for (unsigned int i = 0; i < vecSize; ++i) {
{
static_cast<FairMQMessageZMQ*>(msgVec[i].get())->ApplyUsedSize(); static_cast<FairMQMessageZMQ*>(msgVec[i].get())->ApplyUsedSize();
int nbytes = zmq_msg_send(static_cast<FairMQMessageZMQ*>(msgVec[i].get())->GetMessage(), int nbytes = zmq_msg_send(static_cast<FairMQMessageZMQ*>(msgVec[i].get())->GetMessage(),
fSocket, fSocket,
(i < vecSize - 1) ? ZMQ_SNDMORE|flags : flags); (i < vecSize - 1) ? ZMQ_SNDMORE|flags : flags);
if (nbytes >= 0) if (nbytes >= 0) {
{
totalSize += nbytes; totalSize += nbytes;
} } else {
else
{
// according to ZMQ docs, this can only occur for the first part // according to ZMQ docs, this can only occur for the first part
if (zmq_errno() == EAGAIN) if (zmq_errno() == EAGAIN) {
{ if (!fInterrupted && ((flags & ZMQ_DONTWAIT) == 0)) {
if (!fInterrupted && ((flags & ZMQ_DONTWAIT) == 0)) if (timeout > 0) {
{
if (timeout > 0)
{
elapsed += fSndTimeout; elapsed += fSndTimeout;
if (elapsed >= timeout) if (elapsed >= timeout) {
{
return -2; return -2;
} }
} }
repeat = true; repeat = true;
break; break;
} } else {
else
{
return -2; return -2;
} }
} }
if (zmq_errno() == ETERM) if (zmq_errno() == ETERM) {
{
LOG(info) << "terminating socket " << fId; LOG(info) << "terminating socket " << fId;
return -1; return -1;
} else if (zmq_errno() == EINTR) {
LOG(debug) << "Receive interrupted by system call";
return nbytes;
} else {
LOG(error) << "Failed sending on socket " << fId << ", reason: " << zmq_strerror(errno);
return nbytes;
} }
LOG(error) << "Failed sending on socket " << fId << ", reason: " << zmq_strerror(errno);
return nbytes;
} }
} }
if (repeat) if (repeat) {
{
continue; continue;
} }
@ -282,12 +249,9 @@ int64_t FairMQSocketZMQ::Send(vector<FairMQMessagePtr>& msgVec, const int timeou
return totalSize; return totalSize;
} }
} // If there's only one part, send it as a regular message } // If there's only one part, send it as a regular message
else if (vecSize == 1) else if (vecSize == 1) {
{
return Send(msgVec.back(), timeout); return Send(msgVec.back(), timeout);
} } else { // if the vector is empty, something might be wrong
else // if the vector is empty, something might be wrong
{
LOG(warn) << "Will not send empty vector"; LOG(warn) << "Will not send empty vector";
return -1; return -1;
} }
@ -296,60 +260,48 @@ int64_t FairMQSocketZMQ::Send(vector<FairMQMessagePtr>& msgVec, const int timeou
int64_t FairMQSocketZMQ::Receive(vector<FairMQMessagePtr>& msgVec, const int timeout) int64_t FairMQSocketZMQ::Receive(vector<FairMQMessagePtr>& msgVec, const int timeout)
{ {
int flags = 0; int flags = 0;
if (timeout == 0) if (timeout == 0) {
{
flags = ZMQ_DONTWAIT; flags = ZMQ_DONTWAIT;
} }
int elapsed = 0; int elapsed = 0;
while (true) while (true) {
{
int64_t totalSize = 0; int64_t totalSize = 0;
int64_t more = 0; int64_t more = 0;
bool repeat = false; bool repeat = false;
do do {
{
unique_ptr<FairMQMessage> part(new FairMQMessageZMQ(GetTransport())); unique_ptr<FairMQMessage> part(new FairMQMessageZMQ(GetTransport()));
int nbytes = zmq_msg_recv(static_cast<FairMQMessageZMQ*>(part.get())->GetMessage(), fSocket, flags); int nbytes = zmq_msg_recv(static_cast<FairMQMessageZMQ*>(part.get())->GetMessage(), fSocket, flags);
if (nbytes >= 0) if (nbytes >= 0) {
{
msgVec.push_back(move(part)); msgVec.push_back(move(part));
totalSize += nbytes; totalSize += nbytes;
} } else if (zmq_errno() == EAGAIN) {
else if (zmq_errno() == EAGAIN) if (!fInterrupted && ((flags & ZMQ_DONTWAIT) == 0)) {
{ if (timeout > 0) {
if (!fInterrupted && ((flags & ZMQ_DONTWAIT) == 0))
{
if (timeout > 0)
{
elapsed += fRcvTimeout; elapsed += fRcvTimeout;
if (elapsed >= timeout) if (elapsed >= timeout) {
{
return -2; return -2;
} }
} }
repeat = true; repeat = true;
break; break;
} } else {
else
{
return -2; return -2;
} }
} } else if (zmq_errno() == EINTR) {
else LOG(debug) << "Receive interrupted by system call";
{ return nbytes;
} else {
return nbytes; return nbytes;
} }
size_t moreSize = sizeof(more); size_t moreSize = sizeof(more);
zmq_getsockopt(fSocket, ZMQ_RCVMORE, &more, &moreSize); zmq_getsockopt(fSocket, ZMQ_RCVMORE, &more, &moreSize);
} } while (more);
while (more);
if (repeat) if (repeat) {
{
continue; continue;
} }