From b814e40c87fd1df04f9c3601c66aa03a40a90c14 Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Mon, 27 Aug 2018 17:36:29 +0200 Subject: [PATCH] Fix multipart transfer timeout and enable its tests --- fairmq/shmem/FairMQSocketSHM.cxx | 111 ++++++++++++-------- fairmq/zeromq/FairMQSocketZMQ.cxx | 2 +- test/helper/devices/TestTransferTimeout.cxx | 77 ++++++++++++-- 3 files changed, 134 insertions(+), 56 deletions(-) diff --git a/fairmq/shmem/FairMQSocketSHM.cxx b/fairmq/shmem/FairMQSocketSHM.cxx index e82818ce..2822352e 100644 --- a/fairmq/shmem/FairMQSocketSHM.cxx +++ b/fairmq/shmem/FairMQSocketSHM.cxx @@ -246,157 +246,180 @@ int FairMQSocketSHM::ReceiveImpl(FairMQMessagePtr& msg, const int flags, const i } } -int64_t FairMQSocketSHM::SendImpl(vector& msgVec, const int flags, const int /*timeout*/) +int64_t FairMQSocketSHM::SendImpl(vector& msgVec, const int flags, const int timeout) { const unsigned int vecSize = msgVec.size(); int64_t totalSize = 0; + int elapsed = 0; if (vecSize == 1) { - return Send(msgVec.back(), flags); + return SendImpl(msgVec.back(), flags, timeout); } // put it into zmq message - zmq_msg_t lZmqMsg; - zmq_msg_init_size(&lZmqMsg, vecSize * sizeof(MetaHeader)); + zmq_msg_t zmqMsg; + zmq_msg_init_size(&zmqMsg, vecSize * sizeof(MetaHeader)); // prepare the message with shm metas - MetaHeader *lMetas = static_cast(zmq_msg_data(&lZmqMsg)); + MetaHeader* metas = static_cast(zmq_msg_data(&zmqMsg)); - for (auto &lMsg : msgVec) + for (auto &msg : msgVec) { - zmq_msg_t *lMetaMsg = static_cast(lMsg.get())->GetMessage(); - memcpy(lMetas++, zmq_msg_data(lMetaMsg), sizeof(MetaHeader)); + zmq_msg_t* metaMsg = static_cast(msg.get())->GetMessage(); + memcpy(metas++, zmq_msg_data(metaMsg), sizeof(MetaHeader)); } while (!fInterrupted) { int nbytes = -1; - nbytes = zmq_msg_send(&lZmqMsg, fSocket, flags); + nbytes = zmq_msg_send(&zmqMsg, fSocket, flags); if (nbytes == 0) { - zmq_msg_close (&lZmqMsg); + zmq_msg_close(&zmqMsg); return nbytes; } else if (nbytes > 0) { assert(nbytes == (vecSize * sizeof(MetaHeader))); // all or nothing - for (auto &lMsg : msgVec) + for (auto &msg : msgVec) { - FairMQMessageSHM *lShmMsg = static_cast(lMsg.get()); - lShmMsg->fQueued = true; - totalSize += lShmMsg->fSize; + FairMQMessageSHM* shmMsg = static_cast(msg.get()); + shmMsg->fQueued = true; + totalSize += shmMsg->fSize; } // store statistics on how many messages have been sent fMessagesTx++; fBytesTx += totalSize; - zmq_msg_close (&lZmqMsg); + zmq_msg_close(&zmqMsg); return totalSize; } else if (zmq_errno() == EAGAIN) { if (!fInterrupted && ((flags & ZMQ_DONTWAIT) == 0)) { + if (timeout) + { + elapsed += fSndTimeout; + if (elapsed >= timeout) + { + zmq_msg_close(&zmqMsg); + return -2; + } + } continue; } else { - zmq_msg_close (&lZmqMsg); + zmq_msg_close(&zmqMsg); return -2; } } else if (zmq_errno() == ETERM) { - zmq_msg_close (&lZmqMsg); + zmq_msg_close(&zmqMsg); LOG(info) << "terminating socket " << fId; return -1; } else { - zmq_msg_close (&lZmqMsg); + zmq_msg_close(&zmqMsg); LOG(error) << "Failed sending on socket " << fId << ", reason: " << zmq_strerror(errno); return nbytes; } } + zmq_msg_close(&zmqMsg); return -1; } - -int64_t FairMQSocketSHM::ReceiveImpl(vector& msgVec, const int flags, const int /*timeout*/) +int64_t FairMQSocketSHM::ReceiveImpl(vector& msgVec, const int flags, const int timeout) { int64_t totalSize = 0; + int elapsed = 0; + + zmq_msg_t zmqMsg; + zmq_msg_init(&zmqMsg); while (!fInterrupted) { - zmq_msg_t lRcvMsg; - zmq_msg_init(&lRcvMsg); - int nbytes = zmq_msg_recv(&lRcvMsg, fSocket, flags); + int nbytes = zmq_msg_recv(&zmqMsg, fSocket, flags); if (nbytes == 0) { - zmq_msg_close (&lRcvMsg); + zmq_msg_close(&zmqMsg); return 0; } else if (nbytes > 0) { - MetaHeader* lHdrVec = static_cast(zmq_msg_data(&lRcvMsg)); - const auto lHdrVecSize = zmq_msg_size(&lRcvMsg); - assert(lHdrVecSize > 0); - assert(lHdrVecSize % sizeof(MetaHeader) == 0); + MetaHeader* hdrVec = static_cast(zmq_msg_data(&zmqMsg)); + const auto hdrVecSize = zmq_msg_size(&zmqMsg); + assert(hdrVecSize > 0); + assert(hdrVecSize % sizeof(MetaHeader) == 0); - const auto lNumMessages = lHdrVecSize / sizeof (MetaHeader); + const auto numMessages = hdrVecSize / sizeof(MetaHeader); - msgVec.reserve(lNumMessages); + msgVec.reserve(numMessages); - for (size_t m = 0; m < lNumMessages; m++) + for (size_t m = 0; m < numMessages; m++) { - MetaHeader lMetaHeader; - memcpy(&lMetaHeader, &lHdrVec[m], sizeof(MetaHeader)); + MetaHeader metaHeader; + memcpy(&metaHeader, &hdrVec[m], sizeof(MetaHeader)); msgVec.emplace_back(fair::mq::tools::make_unique(fManager)); - FairMQMessageSHM *lMsg = static_cast(msgVec.back().get()); - MetaHeader *lMsgHdr = static_cast(zmq_msg_data(lMsg->GetMessage())); + FairMQMessageSHM* msg = static_cast(msgVec.back().get()); + MetaHeader* msgHdr = static_cast(zmq_msg_data(msg->GetMessage())); - memcpy(lMsgHdr, &lMetaHeader, sizeof(MetaHeader)); + memcpy(msgHdr, &metaHeader, sizeof(MetaHeader)); - lMsg->fHandle = lMetaHeader.fHandle; - lMsg->fSize = lMetaHeader.fSize; - lMsg->fRegionId = lMetaHeader.fRegionId; - lMsg->fHint = lMetaHeader.fHint; + msg->fHandle = metaHeader.fHandle; + msg->fSize = metaHeader.fSize; + msg->fRegionId = metaHeader.fRegionId; + msg->fHint = metaHeader.fHint; - totalSize += lMsg->GetSize(); + totalSize += msg->GetSize(); } // store statistics on how many messages have been received (handle all parts as a single message) fMessagesRx++; fBytesRx += totalSize; - zmq_msg_close (&lRcvMsg); + zmq_msg_close(&zmqMsg); return totalSize; } else if (zmq_errno() == EAGAIN) { - zmq_msg_close(&lRcvMsg); if (!fInterrupted && ((flags & ZMQ_DONTWAIT) == 0)) { + if (timeout) + { + elapsed += fRcvTimeout; + if (elapsed >= timeout) + { + zmq_msg_close(&zmqMsg); + return -2; + } + } continue; } else { + zmq_msg_close(&zmqMsg); return -2; } } else { - zmq_msg_close (&lRcvMsg); + zmq_msg_close(&zmqMsg); + LOG(error) << "Failed receiving on socket " << fId << ", reason: " << zmq_strerror(errno); return nbytes; } } + zmq_msg_close(&zmqMsg); return -1; } diff --git a/fairmq/zeromq/FairMQSocketZMQ.cxx b/fairmq/zeromq/FairMQSocketZMQ.cxx index 1684d6f7..8b6ac96f 100644 --- a/fairmq/zeromq/FairMQSocketZMQ.cxx +++ b/fairmq/zeromq/FairMQSocketZMQ.cxx @@ -283,7 +283,7 @@ int64_t FairMQSocketZMQ::SendImpl(vector& msgVec, const int fl } // If there's only one part, send it as a regular message else if (vecSize == 1) { - return Send(msgVec.back(), flags); + return SendImpl(msgVec.back(), flags, timeout); } else // if the vector is empty, something might be wrong { diff --git a/test/helper/devices/TestTransferTimeout.cxx b/test/helper/devices/TestTransferTimeout.cxx index a644b6d5..d181b189 100644 --- a/test/helper/devices/TestTransferTimeout.cxx +++ b/test/helper/devices/TestTransferTimeout.cxx @@ -21,33 +21,88 @@ class TransferTimeout : public FairMQDevice protected: auto Run() -> void override { - auto sendCanceling = false; - auto receiveCanceling = false; + bool sendMsgCanceling = false; + bool receiveMsgCanceling = false; - auto msg1 = FairMQMessagePtr{NewMessage()}; - auto msg2 = FairMQMessagePtr{NewMessage()}; + FairMQMessagePtr msg1(NewMessage()); + FairMQMessagePtr msg2(NewMessage()); if (Send(msg1, "data-out", 0, 100) == -2) { - LOG(info) << "send canceled"; - sendCanceling = true; + LOG(info) << "send msg canceled"; + sendMsgCanceling = true; } else { - LOG(error) << "send did not cancel"; + LOG(error) << "send msg did not cancel"; } if (Receive(msg2, "data-in", 0, 100) == -2) { - LOG(info) << "receive canceled"; - receiveCanceling = true; + LOG(info) << "receive msg canceled"; + receiveMsgCanceling = true; } else { - LOG(error) << "receive did not cancel"; + LOG(error) << "receive msg did not cancel"; } - if (sendCanceling && receiveCanceling) + bool send1PartCanceling = false; + bool receive1PartCanceling = false; + + FairMQParts parts1; + parts1.AddPart(NewMessage(10)); + FairMQParts parts2; + + if (Send(parts1, "data-out", 0, 100) == -2) + { + LOG(info) << "send 1 part canceled"; + send1PartCanceling = true; + } + else + { + LOG(error) << "send 1 part did not cancel"; + } + + if (Receive(parts2, "data-in", 0, 100) == -2) + { + LOG(info) << "receive 1 part canceled"; + receive1PartCanceling = true; + } + else + { + LOG(error) << "receive 1 part did not cancel"; + } + + bool send2PartsCanceling = false; + bool receive2PartsCanceling = false; + + FairMQParts parts3; + parts3.AddPart(NewMessage(10)); + parts3.AddPart(NewMessage(10)); + FairMQParts parts4; + + if (Send(parts3, "data-out", 0, 100) == -2) + { + LOG(info) << "send 2 parts canceled"; + send2PartsCanceling = true; + } + else + { + LOG(error) << "send 2 parts did not cancel"; + } + + if (Receive(parts4, "data-in", 0, 100) == -2) + { + LOG(info) << "receive 2 parts canceled"; + receive2PartsCanceling = true; + } + else + { + LOG(error) << "receive 2 parts did not cancel"; + } + + if (sendMsgCanceling && receiveMsgCanceling && send1PartCanceling && receive1PartCanceling && send2PartsCanceling && receive2PartsCanceling) { LOG(info) << "Transfer timeout test successfull"; }