Fix multipart transfer timeout and enable its tests

This commit is contained in:
Alexey Rybalchenko 2018-08-27 17:36:29 +02:00 committed by Dennis Klein
parent 5d37ab2f01
commit b814e40c87
3 changed files with 134 additions and 56 deletions

View File

@ -246,157 +246,180 @@ int FairMQSocketSHM::ReceiveImpl(FairMQMessagePtr& msg, const int flags, const i
} }
} }
int64_t FairMQSocketSHM::SendImpl(vector<FairMQMessagePtr>& msgVec, const int flags, const int /*timeout*/) int64_t FairMQSocketSHM::SendImpl(vector<FairMQMessagePtr>& msgVec, const int flags, const int timeout)
{ {
const unsigned int vecSize = msgVec.size(); const unsigned int vecSize = msgVec.size();
int64_t totalSize = 0; int64_t totalSize = 0;
int elapsed = 0;
if (vecSize == 1) { if (vecSize == 1) {
return Send(msgVec.back(), flags); return SendImpl(msgVec.back(), flags, timeout);
} }
// put it into zmq message // put it into zmq message
zmq_msg_t lZmqMsg; zmq_msg_t zmqMsg;
zmq_msg_init_size(&lZmqMsg, vecSize * sizeof(MetaHeader)); zmq_msg_init_size(&zmqMsg, vecSize * sizeof(MetaHeader));
// prepare the message with shm metas // prepare the message with shm metas
MetaHeader *lMetas = static_cast<MetaHeader*>(zmq_msg_data(&lZmqMsg)); MetaHeader* metas = static_cast<MetaHeader*>(zmq_msg_data(&zmqMsg));
for (auto &lMsg : msgVec) for (auto &msg : msgVec)
{ {
zmq_msg_t *lMetaMsg = static_cast<FairMQMessageSHM*>(lMsg.get())->GetMessage(); zmq_msg_t* metaMsg = static_cast<FairMQMessageSHM*>(msg.get())->GetMessage();
memcpy(lMetas++, zmq_msg_data(lMetaMsg), sizeof(MetaHeader)); memcpy(metas++, zmq_msg_data(metaMsg), sizeof(MetaHeader));
} }
while (!fInterrupted) while (!fInterrupted)
{ {
int nbytes = -1; int nbytes = -1;
nbytes = zmq_msg_send(&lZmqMsg, fSocket, flags); nbytes = zmq_msg_send(&zmqMsg, fSocket, flags);
if (nbytes == 0) if (nbytes == 0)
{ {
zmq_msg_close (&lZmqMsg); zmq_msg_close(&zmqMsg);
return nbytes; return nbytes;
} }
else if (nbytes > 0) else if (nbytes > 0)
{ {
assert(nbytes == (vecSize * sizeof(MetaHeader))); // all or nothing assert(nbytes == (vecSize * sizeof(MetaHeader))); // all or nothing
for (auto &lMsg : msgVec) for (auto &msg : msgVec)
{ {
FairMQMessageSHM *lShmMsg = static_cast<FairMQMessageSHM*>(lMsg.get()); FairMQMessageSHM* shmMsg = static_cast<FairMQMessageSHM*>(msg.get());
lShmMsg->fQueued = true; shmMsg->fQueued = true;
totalSize += lShmMsg->fSize; totalSize += shmMsg->fSize;
} }
// store statistics on how many messages have been sent // store statistics on how many messages have been sent
fMessagesTx++; fMessagesTx++;
fBytesTx += totalSize; fBytesTx += totalSize;
zmq_msg_close (&lZmqMsg); zmq_msg_close(&zmqMsg);
return totalSize; return totalSize;
} }
else if (zmq_errno() == EAGAIN) else if (zmq_errno() == EAGAIN)
{ {
if (!fInterrupted && ((flags & ZMQ_DONTWAIT) == 0)) if (!fInterrupted && ((flags & ZMQ_DONTWAIT) == 0))
{ {
if (timeout)
{
elapsed += fSndTimeout;
if (elapsed >= timeout)
{
zmq_msg_close(&zmqMsg);
return -2;
}
}
continue; continue;
} }
else else
{ {
zmq_msg_close (&lZmqMsg); zmq_msg_close(&zmqMsg);
return -2; return -2;
} }
} }
else if (zmq_errno() == ETERM) else if (zmq_errno() == ETERM)
{ {
zmq_msg_close (&lZmqMsg); zmq_msg_close(&zmqMsg);
LOG(info) << "terminating socket " << fId; LOG(info) << "terminating socket " << fId;
return -1; return -1;
} }
else else
{ {
zmq_msg_close (&lZmqMsg); zmq_msg_close(&zmqMsg);
LOG(error) << "Failed sending on socket " << fId << ", reason: " << zmq_strerror(errno); LOG(error) << "Failed sending on socket " << fId << ", reason: " << zmq_strerror(errno);
return nbytes; return nbytes;
} }
} }
zmq_msg_close(&zmqMsg);
return -1; return -1;
} }
int64_t FairMQSocketSHM::ReceiveImpl(vector<FairMQMessagePtr>& msgVec, const int flags, const int timeout)
int64_t FairMQSocketSHM::ReceiveImpl(vector<FairMQMessagePtr>& msgVec, const int flags, const int /*timeout*/)
{ {
int64_t totalSize = 0; int64_t totalSize = 0;
int elapsed = 0;
zmq_msg_t zmqMsg;
zmq_msg_init(&zmqMsg);
while (!fInterrupted) while (!fInterrupted)
{ {
zmq_msg_t lRcvMsg; int nbytes = zmq_msg_recv(&zmqMsg, fSocket, flags);
zmq_msg_init(&lRcvMsg);
int nbytes = zmq_msg_recv(&lRcvMsg, fSocket, flags);
if (nbytes == 0) if (nbytes == 0)
{ {
zmq_msg_close (&lRcvMsg); zmq_msg_close(&zmqMsg);
return 0; return 0;
} }
else if (nbytes > 0) else if (nbytes > 0)
{ {
MetaHeader* lHdrVec = static_cast<MetaHeader*>(zmq_msg_data(&lRcvMsg)); MetaHeader* hdrVec = static_cast<MetaHeader*>(zmq_msg_data(&zmqMsg));
const auto lHdrVecSize = zmq_msg_size(&lRcvMsg); const auto hdrVecSize = zmq_msg_size(&zmqMsg);
assert(lHdrVecSize > 0); assert(hdrVecSize > 0);
assert(lHdrVecSize % sizeof(MetaHeader) == 0); assert(hdrVecSize % sizeof(MetaHeader) == 0);
const auto lNumMessages = lHdrVecSize / sizeof (MetaHeader); const auto numMessages = hdrVecSize / sizeof(MetaHeader);
msgVec.reserve(lNumMessages); msgVec.reserve(numMessages);
for (size_t m = 0; m < lNumMessages; m++) for (size_t m = 0; m < numMessages; m++)
{ {
MetaHeader lMetaHeader; MetaHeader metaHeader;
memcpy(&lMetaHeader, &lHdrVec[m], sizeof(MetaHeader)); memcpy(&metaHeader, &hdrVec[m], sizeof(MetaHeader));
msgVec.emplace_back(fair::mq::tools::make_unique<FairMQMessageSHM>(fManager)); msgVec.emplace_back(fair::mq::tools::make_unique<FairMQMessageSHM>(fManager));
FairMQMessageSHM *lMsg = static_cast<FairMQMessageSHM*>(msgVec.back().get()); FairMQMessageSHM* msg = static_cast<FairMQMessageSHM*>(msgVec.back().get());
MetaHeader *lMsgHdr = static_cast<MetaHeader*>(zmq_msg_data(lMsg->GetMessage())); MetaHeader* msgHdr = static_cast<MetaHeader*>(zmq_msg_data(msg->GetMessage()));
memcpy(lMsgHdr, &lMetaHeader, sizeof(MetaHeader)); memcpy(msgHdr, &metaHeader, sizeof(MetaHeader));
lMsg->fHandle = lMetaHeader.fHandle; msg->fHandle = metaHeader.fHandle;
lMsg->fSize = lMetaHeader.fSize; msg->fSize = metaHeader.fSize;
lMsg->fRegionId = lMetaHeader.fRegionId; msg->fRegionId = metaHeader.fRegionId;
lMsg->fHint = lMetaHeader.fHint; msg->fHint = metaHeader.fHint;
totalSize += lMsg->GetSize(); totalSize += msg->GetSize();
} }
// store statistics on how many messages have been received (handle all parts as a single message) // store statistics on how many messages have been received (handle all parts as a single message)
fMessagesRx++; fMessagesRx++;
fBytesRx += totalSize; fBytesRx += totalSize;
zmq_msg_close (&lRcvMsg); zmq_msg_close(&zmqMsg);
return totalSize; return totalSize;
} }
else if (zmq_errno() == EAGAIN) else if (zmq_errno() == EAGAIN)
{ {
zmq_msg_close(&lRcvMsg);
if (!fInterrupted && ((flags & ZMQ_DONTWAIT) == 0)) if (!fInterrupted && ((flags & ZMQ_DONTWAIT) == 0))
{ {
if (timeout)
{
elapsed += fRcvTimeout;
if (elapsed >= timeout)
{
zmq_msg_close(&zmqMsg);
return -2;
}
}
continue; continue;
} }
else else
{ {
zmq_msg_close(&zmqMsg);
return -2; return -2;
} }
} }
else else
{ {
zmq_msg_close (&lRcvMsg); zmq_msg_close(&zmqMsg);
LOG(error) << "Failed receiving on socket " << fId << ", reason: " << zmq_strerror(errno);
return nbytes; return nbytes;
} }
} }
zmq_msg_close(&zmqMsg);
return -1; return -1;
} }

View File

@ -283,7 +283,7 @@ int64_t FairMQSocketZMQ::SendImpl(vector<FairMQMessagePtr>& msgVec, const int fl
} // If there's only one part, send it as a regular message } // If there's only one part, send it as a regular message
else if (vecSize == 1) else if (vecSize == 1)
{ {
return Send(msgVec.back(), flags); return SendImpl(msgVec.back(), flags, timeout);
} }
else // if the vector is empty, something might be wrong else // if the vector is empty, something might be wrong
{ {

View File

@ -21,33 +21,88 @@ class TransferTimeout : public FairMQDevice
protected: protected:
auto Run() -> void override auto Run() -> void override
{ {
auto sendCanceling = false; bool sendMsgCanceling = false;
auto receiveCanceling = false; bool receiveMsgCanceling = false;
auto msg1 = FairMQMessagePtr{NewMessage()}; FairMQMessagePtr msg1(NewMessage());
auto msg2 = FairMQMessagePtr{NewMessage()}; FairMQMessagePtr msg2(NewMessage());
if (Send(msg1, "data-out", 0, 100) == -2) if (Send(msg1, "data-out", 0, 100) == -2)
{ {
LOG(info) << "send canceled"; LOG(info) << "send msg canceled";
sendCanceling = true; sendMsgCanceling = true;
} }
else else
{ {
LOG(error) << "send did not cancel"; LOG(error) << "send msg did not cancel";
} }
if (Receive(msg2, "data-in", 0, 100) == -2) if (Receive(msg2, "data-in", 0, 100) == -2)
{ {
LOG(info) << "receive canceled"; LOG(info) << "receive msg canceled";
receiveCanceling = true; receiveMsgCanceling = true;
} }
else else
{ {
LOG(error) << "receive did not cancel"; LOG(error) << "receive msg did not cancel";
} }
if (sendCanceling && receiveCanceling) bool send1PartCanceling = false;
bool receive1PartCanceling = false;
FairMQParts parts1;
parts1.AddPart(NewMessage(10));
FairMQParts parts2;
if (Send(parts1, "data-out", 0, 100) == -2)
{
LOG(info) << "send 1 part canceled";
send1PartCanceling = true;
}
else
{
LOG(error) << "send 1 part did not cancel";
}
if (Receive(parts2, "data-in", 0, 100) == -2)
{
LOG(info) << "receive 1 part canceled";
receive1PartCanceling = true;
}
else
{
LOG(error) << "receive 1 part did not cancel";
}
bool send2PartsCanceling = false;
bool receive2PartsCanceling = false;
FairMQParts parts3;
parts3.AddPart(NewMessage(10));
parts3.AddPart(NewMessage(10));
FairMQParts parts4;
if (Send(parts3, "data-out", 0, 100) == -2)
{
LOG(info) << "send 2 parts canceled";
send2PartsCanceling = true;
}
else
{
LOG(error) << "send 2 parts did not cancel";
}
if (Receive(parts4, "data-in", 0, 100) == -2)
{
LOG(info) << "receive 2 parts canceled";
receive2PartsCanceling = true;
}
else
{
LOG(error) << "receive 2 parts did not cancel";
}
if (sendMsgCanceling && receiveMsgCanceling && send1PartCanceling && receive1PartCanceling && send2PartsCanceling && receive2PartsCanceling)
{ {
LOG(info) << "Transfer timeout test successfull"; LOG(info) << "Transfer timeout test successfull";
} }