diff --git a/fairmq/FairMQChannel.h b/fairmq/FairMQChannel.h index cf1ca1cb..ba6ea31c 100644 --- a/fairmq/FairMQChannel.h +++ b/fairmq/FairMQChannel.h @@ -250,7 +250,7 @@ class FairMQChannel /// Sends a message to the socket queue. /// @param msg Constant reference of unique_ptr to a FairMQMessage /// @param sndTimeoutInMs send timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send) - /// @return Number of bytes that have been queued. -2 If queueing was not possible or timed out. -1 if there was an error. + /// @return Number of bytes that have been queued, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change) int Send(FairMQMessagePtr& msg, int sndTimeoutInMs = -1) { CheckSendCompatibility(msg); @@ -260,7 +260,7 @@ class FairMQChannel /// Receives a message from the socket queue. /// @param msg Constant reference of unique_ptr to a FairMQMessage /// @param rcvTimeoutInMs receive timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive) - /// @return Number of bytes that have been received. -2 if reading from the queue was not possible or timed out. -1 if there was an error. + /// @return Number of bytes that have been received, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change) int Receive(FairMQMessagePtr& msg, int rcvTimeoutInMs = -1) { CheckReceiveCompatibility(msg); @@ -270,7 +270,7 @@ class FairMQChannel /// Send a vector of messages /// @param msgVec message vector reference /// @param sndTimeoutInMs send timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send) - /// @return Number of bytes that have been queued. -2 If queueing was not possible or timed out. -1 if there was an error. + /// @return Number of bytes that have been queued, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change) int64_t Send(std::vector& msgVec, int sndTimeoutInMs = -1) { CheckSendCompatibility(msgVec); @@ -280,7 +280,7 @@ class FairMQChannel /// Receive a vector of messages /// @param msgVec message vector reference /// @param rcvTimeoutInMs receive timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive) - /// @return Number of bytes that have been received. -2 if reading from the queue was not possible or timed out. -1 if there was an error. + /// @return Number of bytes that have been received, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change) int64_t Receive(std::vector& msgVec, int rcvTimeoutInMs = -1) { CheckReceiveCompatibility(msgVec); @@ -290,7 +290,7 @@ class FairMQChannel /// Send FairMQParts /// @param parts FairMQParts reference /// @param sndTimeoutInMs send timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send) - /// @return Number of bytes that have been queued. -2 If queueing was not possible or timed out. -1 if there was an error. + /// @return Number of bytes that have been queued, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change) int64_t Send(FairMQParts& parts, int sndTimeoutInMs = -1) { return Send(parts.fParts, sndTimeoutInMs); @@ -299,7 +299,7 @@ class FairMQChannel /// Receive FairMQParts /// @param parts FairMQParts reference /// @param rcvTimeoutInMs receive timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive) - /// @return Number of bytes that have been received. -2 if reading from the queue was not possible or timed out. -1 if there was an error. + /// @return Number of bytes that have been received, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change) int64_t Receive(FairMQParts& parts, int rcvTimeoutInMs = -1) { return Receive(parts.fParts, rcvTimeoutInMs); diff --git a/fairmq/FairMQDevice.h b/fairmq/FairMQDevice.h index 29ee45ac..a6bd5ba8 100644 --- a/fairmq/FairMQDevice.h +++ b/fairmq/FairMQDevice.h @@ -129,7 +129,7 @@ class FairMQDevice /// @param chan channel name /// @param i channel index /// @param sndTimeoutInMs send timeout in ms, -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send) - /// @return Number of bytes that have been queued. -2 If queueing was not possible or timed out. -1 if there was an error. + /// @return Number of bytes that have been queued, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change) int Send(FairMQMessagePtr& msg, const std::string& channel, const int index = 0, int sndTimeoutInMs = -1) { return GetChannel(channel, index).Send(msg, sndTimeoutInMs); @@ -140,7 +140,7 @@ class FairMQDevice /// @param chan channel name /// @param i channel index /// @param rcvTimeoutInMs receive timeout in ms, -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive) - /// @return Number of bytes that have been received. -2 if reading from the queue was not possible or timed out. -1 if there was an error. + /// @return Number of bytes that have been received, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change) int Receive(FairMQMessagePtr& msg, const std::string& channel, const int index = 0, int rcvTimeoutInMs = -1) { return GetChannel(channel, index).Receive(msg, rcvTimeoutInMs); @@ -151,7 +151,7 @@ class FairMQDevice /// @param chan channel name /// @param i channel index /// @param sndTimeoutInMs send timeout in ms, -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send) - /// @return Number of bytes that have been queued. -2 If queueing was not possible or timed out. -1 if there was an error. + /// @return Number of bytes that have been queued, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change) int64_t Send(FairMQParts& parts, const std::string& channel, const int index = 0, int sndTimeoutInMs = -1) { return GetChannel(channel, index).Send(parts.fParts, sndTimeoutInMs); @@ -162,7 +162,7 @@ class FairMQDevice /// @param chan channel name /// @param i channel index /// @param rcvTimeoutInMs receive timeout in ms, -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive) - /// @return Number of bytes that have been received. -2 if reading from the queue was not possible or timed out. -1 if there was an error. + /// @return Number of bytes that have been received, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change) int64_t Receive(FairMQParts& parts, const std::string& channel, const int index = 0, int rcvTimeoutInMs = -1) { return GetChannel(channel, index).Receive(parts.fParts, rcvTimeoutInMs); diff --git a/fairmq/FairMQSocket.h b/fairmq/FairMQSocket.h index f2fbedea..a8eaa6d3 100644 --- a/fairmq/FairMQSocket.h +++ b/fairmq/FairMQSocket.h @@ -9,14 +9,31 @@ #ifndef FAIRMQSOCKET_H_ #define FAIRMQSOCKET_H_ +#include "FairMQMessage.h" + #include +#include #include #include #include -#include "FairMQMessage.h" class FairMQTransportFactory; +namespace fair +{ +namespace mq +{ + +enum class TransferResult : int +{ + error = -1, + timeout = -2, + interrupted = -3 +}; + +} // namespace mq +} // namespace fair + class FairMQSocket { public: diff --git a/fairmq/ofi/Socket.cxx b/fairmq/ofi/Socket.cxx index 9958a91e..c5ea18a1 100644 --- a/fairmq/ofi/Socket.cxx +++ b/fairmq/ofi/Socket.cxx @@ -272,7 +272,7 @@ try { int size(0); for (auto& msg : msgVec) { size += msg->GetSize(); - } + } fSendPushSem.wait(); { @@ -284,7 +284,7 @@ try { return size; } catch (const std::exception& e) { LOG(error) << e.what(); - return -1; + return TransferResult::error; } auto Socket::SendQueueReader() -> void @@ -431,7 +431,7 @@ try { return size; } catch (const std::exception& e) { LOG(error) << e.what(); - return -1; + return TransferResult::error; } auto Socket::Receive(std::vector& msgVec, const int /*timeout*/) -> int64_t @@ -449,14 +449,14 @@ try { int64_t size(0); for (auto& msg : msgVec) { size += msg->GetSize(); - } + } fBytesRx += size; ++fMessagesRx; - return size; + return size; } catch (const std::exception& e) { LOG(error) << e.what(); - return -1; + return TransferResult::error; } auto Socket::RecvControlQueueReader() -> void diff --git a/fairmq/shmem/Socket.h b/fairmq/shmem/Socket.h index c40585f5..22488dd0 100644 --- a/fairmq/shmem/Socket.h +++ b/fairmq/shmem/Socket.h @@ -130,7 +130,7 @@ class Socket final : public fair::mq::Socket bool ShouldRetry(int flags, int timeout, int& elapsed) const { - if (!fManager.Interrupted() && ((flags & ZMQ_DONTWAIT) == 0)) { + if ((flags & ZMQ_DONTWAIT) == 0) { if (timeout > 0) { elapsed += fTimeout; if (elapsed >= timeout) { @@ -147,10 +147,10 @@ class Socket final : public fair::mq::Socket { if (zmq_errno() == ETERM) { LOG(debug) << "Terminating socket " << fId; - return -1; + return static_cast(TransferResult::error); } else { LOG(error) << "Failed transfer on socket " << fId << ", reason: " << zmq_strerror(errno); - return -1; + return static_cast(TransferResult::error); } } @@ -166,7 +166,7 @@ class Socket final : public fair::mq::Socket ZMsg zmqMsg(sizeof(MetaHeader)); std::memcpy(zmqMsg.Data(), &(shmMsg->fMeta), sizeof(MetaHeader)); - while (true && !fManager.Interrupted()) { + while (true) { int nbytes = zmq_msg_send(zmqMsg.Msg(), fSocket, flags); if (nbytes > 0) { shmMsg->fQueued = true; @@ -175,17 +175,19 @@ class Socket final : public fair::mq::Socket fBytesTx += size; return size; } else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) { - if (ShouldRetry(flags, timeout, elapsed)) { + if (fManager.Interrupted()) { + return static_cast(TransferResult::interrupted); + } else if (ShouldRetry(flags, timeout, elapsed)) { continue; } else { - return -2; + return static_cast(TransferResult::timeout); } } else { return HandleErrors(); } } - return -1; + return static_cast(TransferResult::error); } int Receive(MessagePtr& msg, const int timeout = -1) override @@ -218,10 +220,12 @@ class Socket final : public fair::mq::Socket ++fMessagesRx; return size; } else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) { - if (ShouldRetry(flags, timeout, elapsed)) { + if (fManager.Interrupted()) { + return static_cast(TransferResult::interrupted); + } else if (ShouldRetry(flags, timeout, elapsed)) { continue; } else { - return -2; + return static_cast(TransferResult::timeout); } } else { return HandleErrors(); @@ -249,7 +253,7 @@ class Socket final : public fair::mq::Socket std::memcpy(metas++, &(shmMsg->fMeta), sizeof(MetaHeader)); } - while (!fManager.Interrupted()) { + while (true) { int64_t totalSize = 0; int nbytes = zmq_msg_send(zmqMsg.Msg(), fSocket, flags); if (nbytes > 0) { @@ -267,17 +271,19 @@ class Socket final : public fair::mq::Socket return totalSize; } else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) { - if (ShouldRetry(flags, timeout, elapsed)) { + if (fManager.Interrupted()) { + return static_cast(TransferResult::interrupted); + } else if (ShouldRetry(flags, timeout, elapsed)) { continue; } else { - return -2; + return static_cast(TransferResult::timeout); } } else { return HandleErrors(); } } - return -1; + return static_cast(TransferResult::error); } int64_t Receive(std::vector& msgVec, const int timeout = -1) override @@ -290,7 +296,7 @@ class Socket final : public fair::mq::Socket ZMsg zmqMsg; - while (!fManager.Interrupted()) { + while (true) { int64_t totalSize = 0; int nbytes = zmq_msg_recv(zmqMsg.Msg(), fSocket, flags); if (nbytes > 0) { @@ -321,17 +327,19 @@ class Socket final : public fair::mq::Socket return totalSize; } else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) { - if (ShouldRetry(flags, timeout, elapsed)) { + if (fManager.Interrupted()) { + return static_cast(TransferResult::interrupted); + } else if (ShouldRetry(flags, timeout, elapsed)) { continue; } else { - return -2; + return static_cast(TransferResult::timeout); } } else { return HandleErrors(); } } - return -1; + return static_cast(TransferResult::error); } void* GetSocket() const { return fSocket; } @@ -498,7 +506,7 @@ class Socket final : public fair::mq::Socket if (constant == "pollout") return ZMQ_POLLOUT; - return -1; + throw SocketError(tools::ToString("GetConstant called with an invalid argument: ", constant)); } ~Socket() override { Close(); } diff --git a/fairmq/zeromq/Socket.h b/fairmq/zeromq/Socket.h index bbf7e578..0e0cea8c 100644 --- a/fairmq/zeromq/Socket.h +++ b/fairmq/zeromq/Socket.h @@ -108,7 +108,7 @@ class Socket final : public fair::mq::Socket bool ShouldRetry(int flags, int timeout, int& elapsed) const { - if (!fCtx.Interrupted() && ((flags & ZMQ_DONTWAIT) == 0)) { + if ((flags & ZMQ_DONTWAIT) == 0) { if (timeout > 0) { elapsed += fTimeout; if (elapsed >= timeout) { @@ -125,10 +125,10 @@ class Socket final : public fair::mq::Socket { if (zmq_errno() == ETERM) { LOG(debug) << "Terminating socket " << fId; - return -1; + return static_cast(TransferResult::error); } else { LOG(error) << "Failed transfer on socket " << fId << ", errno: " << errno << ", reason: " << zmq_strerror(errno); - return -1; + return static_cast(TransferResult::error); } } @@ -149,10 +149,12 @@ class Socket final : public fair::mq::Socket ++fMessagesTx; return nbytes; } else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) { - if (ShouldRetry(flags, timeout, elapsed)) { + if (fCtx.Interrupted()) { + return static_cast(TransferResult::interrupted); + } else if (ShouldRetry(flags, timeout, elapsed)) { continue; } else { - return -2; + return static_cast(TransferResult::timeout); } } else { return HandleErrors(); @@ -175,10 +177,12 @@ class Socket final : public fair::mq::Socket ++fMessagesRx; return nbytes; } else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) { - if (ShouldRetry(flags, timeout, elapsed)) { + if (fCtx.Interrupted()) { + return static_cast(TransferResult::interrupted); + } else if (ShouldRetry(flags, timeout, elapsed)) { continue; } else { - return -2; + return static_cast(TransferResult::timeout); } } else { return HandleErrors(); @@ -210,11 +214,13 @@ class Socket final : public fair::mq::Socket if (nbytes >= 0) { totalSize += nbytes; } else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) { - if (ShouldRetry(flags, timeout, elapsed)) { + if (fCtx.Interrupted()) { + return static_cast(TransferResult::interrupted); + } else if (ShouldRetry(flags, timeout, elapsed)) { repeat = true; break; } else { - return -2; + return static_cast(TransferResult::timeout); } } else { return HandleErrors(); @@ -234,7 +240,7 @@ class Socket final : public fair::mq::Socket return Send(msgVec.back(), timeout); } else { // if the vector is empty, something might be wrong LOG(warn) << "Will not send empty vector"; - return -1; + return static_cast(TransferResult::error); } } @@ -259,11 +265,13 @@ class Socket final : public fair::mq::Socket msgVec.push_back(move(part)); totalSize += nbytes; } else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) { - if (ShouldRetry(flags, timeout, elapsed)) { + if (fCtx.Interrupted()) { + return static_cast(TransferResult::interrupted); + } else if (ShouldRetry(flags, timeout, elapsed)) { repeat = true; break; } else { - return -2; + return static_cast(TransferResult::timeout); } } else { return HandleErrors(); @@ -446,7 +454,7 @@ class Socket final : public fair::mq::Socket if (constant == "pollout") return ZMQ_POLLOUT; - return -1; + throw SocketError(tools::ToString("GetConstant called with an invalid argument: ", constant)); } ~Socket() override { Close(); } diff --git a/test/helper/devices/TestTransferTimeout.h b/test/helper/devices/TestTransferTimeout.h index dc204331..7243eb41 100644 --- a/test/helper/devices/TestTransferTimeout.h +++ b/test/helper/devices/TestTransferTimeout.h @@ -1,5 +1,5 @@ /******************************************************************************** - * Copyright (C) 2015-2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * Copyright (C) 2015static_cast(TransferResult::timeout017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH ) * * * * This software is distributed under the terms of the * * GNU Lesser General Public Licence (LGPL) version 3, * @@ -24,20 +24,20 @@ class TransferTimeout : public FairMQDevice protected: auto Run() -> void override { - bool sendMsgCancelingAfter100ms = false; - bool receiveMsgCancelingAfter100ms = false; + bool sendMsgCancelingAfter200ms = false; + bool receiveMsgCancelingAfter200ms = false; bool sendMsgCancelingAfter0ms = false; bool receiveMsgCancelingAfter0ms = false; - bool send1PartCancelingAfter100ms = false; - bool receive1PartCancelingAfter100ms = false; + bool send1PartCancelingAfter200ms = false; + bool receive1PartCancelingAfter200ms = false; bool send1PartCancelingAfter0ms = false; bool receive1PartCancelingAfter0ms = false; - bool send2PartsCancelingAfter100ms = false; - bool receive2PartsCancelingAfter100ms = false; + bool send2PartsCancelingAfter200ms = false; + bool receive2PartsCancelingAfter200ms = false; bool send2PartsCancelingAfter0ms = false; bool receive2PartsCancelingAfter0ms = false; @@ -45,28 +45,28 @@ class TransferTimeout : public FairMQDevice FairMQMessagePtr msg1(NewMessage()); FairMQMessagePtr msg2(NewMessage()); - if (Send(msg1, "data-out", 0, 100) == -2) { - LOG(info) << "send msg canceled (100ms)"; - sendMsgCancelingAfter100ms = true; + if (Send(msg1, "data-out", 0, 200) == static_cast(TransferResult::timeout)) { + LOG(info) << "send msg canceled (200ms)"; + sendMsgCancelingAfter200ms = true; } else { - LOG(error) << "send msg did not cancel (100ms)"; + LOG(error) << "send msg did not cancel (200ms)"; } - if (Receive(msg2, "data-in", 0, 100) == -2) { - LOG(info) << "receive msg canceled (100ms)"; - receiveMsgCancelingAfter100ms = true; + if (Receive(msg2, "data-in", 0, 200) == static_cast(TransferResult::timeout)) { + LOG(info) << "receive msg canceled (200ms)"; + receiveMsgCancelingAfter200ms = true; } else { - LOG(error) << "receive msg did not cancel (100ms)"; + LOG(error) << "receive msg did not cancel (200ms)"; } - if (Send(msg1, "data-out", 0, 0) == -2) { + if (Send(msg1, "data-out", 0, 0) == static_cast(TransferResult::timeout)) { LOG(info) << "send msg canceled (0ms)"; sendMsgCancelingAfter0ms = true; } else { LOG(error) << "send msg did not cancel (0ms)"; } - if (Receive(msg2, "data-in", 0, 0) == -2) { + if (Receive(msg2, "data-in", 0, 0) == static_cast(TransferResult::timeout)) { LOG(info) << "receive msg canceled (0ms)"; receiveMsgCancelingAfter0ms = true; } else { @@ -77,28 +77,28 @@ class TransferTimeout : public FairMQDevice parts1.AddPart(NewMessage(10)); FairMQParts parts2; - if (Send(parts1, "data-out", 0, 100) == -2) { - LOG(info) << "send 1 part canceled (100ms)"; - send1PartCancelingAfter100ms = true; + if (Send(parts1, "data-out", 0, 200) == static_cast(TransferResult::timeout)) { + LOG(info) << "send 1 part canceled (200ms)"; + send1PartCancelingAfter200ms = true; } else { - LOG(error) << "send 1 part did not cancel (100ms)"; + LOG(error) << "send 1 part did not cancel (200ms)"; } - if (Receive(parts2, "data-in", 0, 100) == -2) { - LOG(info) << "receive 1 part canceled (100ms)"; - receive1PartCancelingAfter100ms = true; + if (Receive(parts2, "data-in", 0, 200) == static_cast(TransferResult::timeout)) { + LOG(info) << "receive 1 part canceled (200ms)"; + receive1PartCancelingAfter200ms = true; } else { - LOG(error) << "receive 1 part did not cancel (100ms)"; + LOG(error) << "receive 1 part did not cancel (200ms)"; } - if (Send(parts1, "data-out", 0, 0) == -2) { + if (Send(parts1, "data-out", 0, 0) == static_cast(TransferResult::timeout)) { LOG(info) << "send 1 part canceled (0ms)"; send1PartCancelingAfter0ms = true; } else { LOG(error) << "send 1 part did not cancel (0ms)"; } - if (Receive(parts2, "data-in", 0, 0) == -2) { + if (Receive(parts2, "data-in", 0, 0) == static_cast(TransferResult::timeout)) { LOG(info) << "receive 1 part canceled (0ms)"; receive1PartCancelingAfter0ms = true; } else { @@ -110,44 +110,44 @@ class TransferTimeout : public FairMQDevice parts3.AddPart(NewMessage(10)); FairMQParts parts4; - if (Send(parts3, "data-out", 0, 100) == -2) { - LOG(info) << "send 2 parts canceled (100ms)"; - send2PartsCancelingAfter100ms = true; + if (Send(parts3, "data-out", 0, 200) == static_cast(TransferResult::timeout)) { + LOG(info) << "send 2 parts canceled (200ms)"; + send2PartsCancelingAfter200ms = true; } else { - LOG(error) << "send 2 parts did not cancel (100ms)"; + LOG(error) << "send 2 parts did not cancel (200ms)"; } - if (Receive(parts4, "data-in", 0, 100) == -2) { - LOG(info) << "receive 2 parts canceled (100ms)"; - receive2PartsCancelingAfter100ms = true; + if (Receive(parts4, "data-in", 0, 200) == static_cast(TransferResult::timeout)) { + LOG(info) << "receive 2 parts canceled (200ms)"; + receive2PartsCancelingAfter200ms = true; } else { - LOG(error) << "receive 2 parts did not cancel (100ms)"; + LOG(error) << "receive 2 parts did not cancel (200ms)"; } - if (Send(parts3, "data-out", 0, 0) == -2) { + if (Send(parts3, "data-out", 0, 0) == static_cast(TransferResult::timeout)) { LOG(info) << "send 2 parts canceled (0ms)"; send2PartsCancelingAfter0ms = true; } else { LOG(error) << "send 2 parts did not cancel (0ms)"; } - if (Receive(parts4, "data-in", 0, 0) == -2) { + if (Receive(parts4, "data-in", 0, 0) == static_cast(TransferResult::timeout)) { LOG(info) << "receive 2 parts canceled (0ms)"; receive2PartsCancelingAfter0ms = true; } else { LOG(error) << "receive 2 parts did not cancel (0ms)"; } - if (sendMsgCancelingAfter100ms && - receiveMsgCancelingAfter100ms && + if (sendMsgCancelingAfter200ms && + receiveMsgCancelingAfter200ms && sendMsgCancelingAfter0ms && receiveMsgCancelingAfter0ms && - send1PartCancelingAfter100ms && - receive1PartCancelingAfter100ms && + send1PartCancelingAfter200ms && + receive1PartCancelingAfter200ms && send1PartCancelingAfter0ms && receive1PartCancelingAfter0ms && - send2PartsCancelingAfter100ms && - receive2PartsCancelingAfter100ms && + send2PartsCancelingAfter200ms && + receive2PartsCancelingAfter200ms && send2PartsCancelingAfter0ms && receive2PartsCancelingAfter0ms) { diff --git a/test/transport/_transfer_timeout.cxx b/test/transport/_transfer_timeout.cxx index a63de9eb..c1c11e4c 100644 --- a/test/transport/_transfer_timeout.cxx +++ b/test/transport/_transfer_timeout.cxx @@ -7,9 +7,16 @@ ********************************************************************************/ #include "runner.h" +#include +#include +#include +#include #include #include + +#include #include // std::stringstream +#include namespace { @@ -18,6 +25,12 @@ using namespace std; using namespace fair::mq::test; using namespace fair::mq::tools; +void delayedInterruptor(FairMQTransportFactory& transport) +{ + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + transport.Interrupt(); +} + auto RunTransferTimeout(string transport) -> void { size_t session{fair::mq::tools::UuidHash()}; @@ -31,6 +44,28 @@ auto RunTransferTimeout(string transport) -> void exit(res.exit_code); } +void InterruptTransfer(const string& transport, const string& _address) +{ + size_t session{fair::mq::tools::UuidHash()}; + std::string address(fair::mq::tools::ToString(_address, "_", transport)); + + fair::mq::ProgOptions config; + config.SetProperty("session", to_string(session)); + + auto factory = FairMQTransportFactory::CreateTransportFactory(transport, fair::mq::tools::Uuid(), &config); + + FairMQChannel pull{"Pull", "pull", factory}; + pull.Bind(address); + + FairMQMessagePtr msg(pull.NewMessage()); + + auto t = thread(delayedInterruptor, ref(*factory)); + + auto result = pull.Receive(msg); + t.join(); + ASSERT_EQ(result, static_cast(fair::mq::TransferResult::interrupted)); +} + TEST(TransferTimeout, zeromq) { EXPECT_EXIT(RunTransferTimeout("zeromq"), ::testing::ExitedWithCode(0), "Transfer timeout test successfull"); @@ -41,4 +76,14 @@ TEST(TransferTimeout, shmem) EXPECT_EXIT(RunTransferTimeout("shmem"), ::testing::ExitedWithCode(0), "Transfer timeout test successfull"); } +TEST(InterruptTransfer, zeromq) +{ + InterruptTransfer("zeromq", "ipc://test_interrupt_transfer"); +} + +TEST(InterruptTransfer, shmem) +{ + InterruptTransfer("shmem", "ipc://test_interrupt_transfer"); +} + } // namespace