From 29f45fa77df445cd0eda32d353d39edf382212a7 Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Wed, 21 Oct 2020 09:14:58 +0200 Subject: [PATCH] Rename TransferResult to TransferCode --- fairmq/FairMQChannel.h | 12 +++++------ fairmq/FairMQDevice.h | 8 +++---- fairmq/FairMQSocket.h | 3 ++- fairmq/ofi/Socket.cxx | 6 +++--- fairmq/shmem/Socket.h | 26 +++++++++++------------ fairmq/zeromq/Socket.h | 24 ++++++++++----------- test/helper/devices/TestTransferTimeout.h | 26 +++++++++++------------ test/transport/_transfer_timeout.cxx | 2 +- 8 files changed, 54 insertions(+), 53 deletions(-) diff --git a/fairmq/FairMQChannel.h b/fairmq/FairMQChannel.h index 7a2a6f30..f4405b73 100644 --- a/fairmq/FairMQChannel.h +++ b/fairmq/FairMQChannel.h @@ -256,7 +256,7 @@ class FairMQChannel /// Sends a message to the socket queue. /// @param msg Constant reference of unique_ptr to a FairMQMessage /// @param sndTimeoutInMs send timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send) - /// @return Number of bytes that have been queued, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change) + /// @return Number of bytes that have been queued, TransferCode::timeout if timed out, TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by requested state change) int64_t Send(FairMQMessagePtr& msg, int sndTimeoutInMs = -1) { CheckSendCompatibility(msg); @@ -266,7 +266,7 @@ class FairMQChannel /// Receives a message from the socket queue. /// @param msg Constant reference of unique_ptr to a FairMQMessage /// @param rcvTimeoutInMs receive timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive) - /// @return Number of bytes that have been received, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change) + /// @return Number of bytes that have been received, TransferCode::timeout if timed out, TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by requested state change) int64_t Receive(FairMQMessagePtr& msg, int rcvTimeoutInMs = -1) { CheckReceiveCompatibility(msg); @@ -276,7 +276,7 @@ class FairMQChannel /// Send a vector of messages /// @param msgVec message vector reference /// @param sndTimeoutInMs send timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send) - /// @return Number of bytes that have been queued, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change) + /// @return Number of bytes that have been queued, TransferCode::timeout if timed out, TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by requested state change) int64_t Send(std::vector& msgVec, int sndTimeoutInMs = -1) { CheckSendCompatibility(msgVec); @@ -286,7 +286,7 @@ class FairMQChannel /// Receive a vector of messages /// @param msgVec message vector reference /// @param rcvTimeoutInMs receive timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive) - /// @return Number of bytes that have been received, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change) + /// @return Number of bytes that have been received, TransferCode::timeout if timed out, TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by requested state change) int64_t Receive(std::vector& msgVec, int rcvTimeoutInMs = -1) { CheckReceiveCompatibility(msgVec); @@ -296,7 +296,7 @@ class FairMQChannel /// Send FairMQParts /// @param parts FairMQParts reference /// @param sndTimeoutInMs send timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send) - /// @return Number of bytes that have been queued, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change) + /// @return Number of bytes that have been queued, TransferCode::timeout if timed out, TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by requested state change) int64_t Send(FairMQParts& parts, int sndTimeoutInMs = -1) { return Send(parts.fParts, sndTimeoutInMs); @@ -305,7 +305,7 @@ class FairMQChannel /// Receive FairMQParts /// @param parts FairMQParts reference /// @param rcvTimeoutInMs receive timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive) - /// @return Number of bytes that have been received, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change) + /// @return Number of bytes that have been received, TransferCode::timeout if timed out, TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by requested state change) int64_t Receive(FairMQParts& parts, int rcvTimeoutInMs = -1) { return Receive(parts.fParts, rcvTimeoutInMs); diff --git a/fairmq/FairMQDevice.h b/fairmq/FairMQDevice.h index 0b1e1fcb..92ebb6fa 100644 --- a/fairmq/FairMQDevice.h +++ b/fairmq/FairMQDevice.h @@ -96,7 +96,7 @@ class FairMQDevice /// @param chan channel name /// @param i channel index /// @param sndTimeoutInMs send timeout in ms, -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send) - /// @return Number of bytes that have been queued, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change) + /// @return Number of bytes that have been queued, TransferCode::timeout if timed out, TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by requested state change) int64_t Send(FairMQMessagePtr& msg, const std::string& channel, const int index = 0, int sndTimeoutInMs = -1) { return GetChannel(channel, index).Send(msg, sndTimeoutInMs); @@ -107,7 +107,7 @@ class FairMQDevice /// @param chan channel name /// @param i channel index /// @param rcvTimeoutInMs receive timeout in ms, -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive) - /// @return Number of bytes that have been received, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change) + /// @return Number of bytes that have been received, TransferCode::timeout if timed out, TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by requested state change) int64_t Receive(FairMQMessagePtr& msg, const std::string& channel, const int index = 0, int rcvTimeoutInMs = -1) { return GetChannel(channel, index).Receive(msg, rcvTimeoutInMs); @@ -118,7 +118,7 @@ class FairMQDevice /// @param chan channel name /// @param i channel index /// @param sndTimeoutInMs send timeout in ms, -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send) - /// @return Number of bytes that have been queued, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change) + /// @return Number of bytes that have been queued, TransferCode::timeout if timed out, TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by requested state change) int64_t Send(FairMQParts& parts, const std::string& channel, const int index = 0, int sndTimeoutInMs = -1) { return GetChannel(channel, index).Send(parts.fParts, sndTimeoutInMs); @@ -129,7 +129,7 @@ class FairMQDevice /// @param chan channel name /// @param i channel index /// @param rcvTimeoutInMs receive timeout in ms, -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive) - /// @return Number of bytes that have been received, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change) + /// @return Number of bytes that have been received, TransferCode::timeout if timed out, TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by requested state change) int64_t Receive(FairMQParts& parts, const std::string& channel, const int index = 0, int rcvTimeoutInMs = -1) { return GetChannel(channel, index).Receive(parts.fParts, rcvTimeoutInMs); diff --git a/fairmq/FairMQSocket.h b/fairmq/FairMQSocket.h index a134a659..f95dbe6a 100644 --- a/fairmq/FairMQSocket.h +++ b/fairmq/FairMQSocket.h @@ -24,8 +24,9 @@ namespace fair namespace mq { -enum class TransferResult : int +enum class TransferCode : int { + success = 0, error = -1, timeout = -2, interrupted = -3 diff --git a/fairmq/ofi/Socket.cxx b/fairmq/ofi/Socket.cxx index 7c661913..558fcea9 100644 --- a/fairmq/ofi/Socket.cxx +++ b/fairmq/ofi/Socket.cxx @@ -284,7 +284,7 @@ try { return size; } catch (const std::exception& e) { LOG(error) << e.what(); - return static_cast(TransferResult::error); + return static_cast(TransferCode::error); } auto Socket::SendQueueReader() -> void @@ -431,7 +431,7 @@ try { return size; } catch (const std::exception& e) { LOG(error) << e.what(); - return static_cast(TransferResult::error); + return static_cast(TransferCode::error); } auto Socket::Receive(std::vector& msgVec, const int /*timeout*/) -> int64_t @@ -456,7 +456,7 @@ try { return size; } catch (const std::exception& e) { LOG(error) << e.what(); - return static_cast(TransferResult::error); + return static_cast(TransferCode::error); } auto Socket::RecvControlQueueReader() -> void diff --git a/fairmq/shmem/Socket.h b/fairmq/shmem/Socket.h index 0bf93b31..c2ef962e 100644 --- a/fairmq/shmem/Socket.h +++ b/fairmq/shmem/Socket.h @@ -148,10 +148,10 @@ class Socket final : public fair::mq::Socket { if (zmq_errno() == ETERM) { LOG(debug) << "Terminating socket " << fId; - return static_cast(TransferResult::error); + return static_cast(TransferCode::error); } else { LOG(error) << "Failed transfer on socket " << fId << ", reason: " << zmq_strerror(errno); - return static_cast(TransferResult::error); + return static_cast(TransferCode::error); } } @@ -177,18 +177,18 @@ class Socket final : public fair::mq::Socket return size; } else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) { if (fManager.Interrupted()) { - return static_cast(TransferResult::interrupted); + return static_cast(TransferCode::interrupted); } else if (ShouldRetry(flags, timeout, elapsed)) { continue; } else { - return static_cast(TransferResult::timeout); + return static_cast(TransferCode::timeout); } } else { return HandleErrors(); } } - return static_cast(TransferResult::error); + return static_cast(TransferCode::error); } int64_t Receive(MessagePtr& msg, const int timeout = -1) override @@ -222,11 +222,11 @@ class Socket final : public fair::mq::Socket return size; } else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) { if (fManager.Interrupted()) { - return static_cast(TransferResult::interrupted); + return static_cast(TransferCode::interrupted); } else if (ShouldRetry(flags, timeout, elapsed)) { continue; } else { - return static_cast(TransferResult::timeout); + return static_cast(TransferCode::timeout); } } else { return HandleErrors(); @@ -273,18 +273,18 @@ class Socket final : public fair::mq::Socket return totalSize; } else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) { if (fManager.Interrupted()) { - return static_cast(TransferResult::interrupted); + return static_cast(TransferCode::interrupted); } else if (ShouldRetry(flags, timeout, elapsed)) { continue; } else { - return static_cast(TransferResult::timeout); + return static_cast(TransferCode::timeout); } } else { return HandleErrors(); } } - return static_cast(TransferResult::error); + return static_cast(TransferCode::error); } int64_t Receive(std::vector& msgVec, const int timeout = -1) override @@ -329,18 +329,18 @@ class Socket final : public fair::mq::Socket return totalSize; } else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) { if (fManager.Interrupted()) { - return static_cast(TransferResult::interrupted); + return static_cast(TransferCode::interrupted); } else if (ShouldRetry(flags, timeout, elapsed)) { continue; } else { - return static_cast(TransferResult::timeout); + return static_cast(TransferCode::timeout); } } else { return HandleErrors(); } } - return static_cast(TransferResult::error); + return static_cast(TransferCode::error); } void* GetSocket() const { return fSocket; } diff --git a/fairmq/zeromq/Socket.h b/fairmq/zeromq/Socket.h index 4f55d2ca..b1fd3f91 100644 --- a/fairmq/zeromq/Socket.h +++ b/fairmq/zeromq/Socket.h @@ -125,10 +125,10 @@ class Socket final : public fair::mq::Socket { if (zmq_errno() == ETERM) { LOG(debug) << "Terminating socket " << fId; - return static_cast(TransferResult::error); + return static_cast(TransferCode::error); } else { LOG(error) << "Failed transfer on socket " << fId << ", errno: " << errno << ", reason: " << zmq_strerror(errno); - return static_cast(TransferResult::error); + return static_cast(TransferCode::error); } } @@ -150,11 +150,11 @@ class Socket final : public fair::mq::Socket return nbytes; } else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) { if (fCtx.Interrupted()) { - return static_cast(TransferResult::interrupted); + return static_cast(TransferCode::interrupted); } else if (ShouldRetry(flags, timeout, elapsed)) { continue; } else { - return static_cast(TransferResult::timeout); + return static_cast(TransferCode::timeout); } } else { return HandleErrors(); @@ -178,11 +178,11 @@ class Socket final : public fair::mq::Socket return nbytes; } else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) { if (fCtx.Interrupted()) { - return static_cast(TransferResult::interrupted); + return static_cast(TransferCode::interrupted); } else if (ShouldRetry(flags, timeout, elapsed)) { continue; } else { - return static_cast(TransferResult::timeout); + return static_cast(TransferCode::timeout); } } else { return HandleErrors(); @@ -215,12 +215,12 @@ class Socket final : public fair::mq::Socket totalSize += nbytes; } else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) { if (fCtx.Interrupted()) { - return static_cast(TransferResult::interrupted); + return static_cast(TransferCode::interrupted); } else if (ShouldRetry(flags, timeout, elapsed)) { repeat = true; break; } else { - return static_cast(TransferResult::timeout); + return static_cast(TransferCode::timeout); } } else { return HandleErrors(); @@ -240,7 +240,7 @@ class Socket final : public fair::mq::Socket return Send(msgVec.back(), timeout); } else { // if the vector is empty, something might be wrong LOG(warn) << "Will not send empty vector"; - return static_cast(TransferResult::error); + return static_cast(TransferCode::error); } } @@ -254,7 +254,7 @@ class Socket final : public fair::mq::Socket while (true) { int64_t totalSize = 0; - int64_t more = 0; + int more = 0; bool repeat = false; do { @@ -266,12 +266,12 @@ class Socket final : public fair::mq::Socket totalSize += nbytes; } else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) { if (fCtx.Interrupted()) { - return static_cast(TransferResult::interrupted); + return static_cast(TransferCode::interrupted); } else if (ShouldRetry(flags, timeout, elapsed)) { repeat = true; break; } else { - return static_cast(TransferResult::timeout); + return static_cast(TransferCode::timeout); } } else { return HandleErrors(); diff --git a/test/helper/devices/TestTransferTimeout.h b/test/helper/devices/TestTransferTimeout.h index 7243eb41..b681ffe1 100644 --- a/test/helper/devices/TestTransferTimeout.h +++ b/test/helper/devices/TestTransferTimeout.h @@ -1,5 +1,5 @@ /******************************************************************************** - * Copyright (C) 2015static_cast(TransferResult::timeout017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH ) * + * Copyright (C) 2015-2020 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * * * This software is distributed under the terms of the * * GNU Lesser General Public Licence (LGPL) version 3, * @@ -45,28 +45,28 @@ class TransferTimeout : public FairMQDevice FairMQMessagePtr msg1(NewMessage()); FairMQMessagePtr msg2(NewMessage()); - if (Send(msg1, "data-out", 0, 200) == static_cast(TransferResult::timeout)) { + if (Send(msg1, "data-out", 0, 200) == static_cast(TransferCode::timeout)) { LOG(info) << "send msg canceled (200ms)"; sendMsgCancelingAfter200ms = true; } else { LOG(error) << "send msg did not cancel (200ms)"; } - if (Receive(msg2, "data-in", 0, 200) == static_cast(TransferResult::timeout)) { + if (Receive(msg2, "data-in", 0, 200) == static_cast(TransferCode::timeout)) { LOG(info) << "receive msg canceled (200ms)"; receiveMsgCancelingAfter200ms = true; } else { LOG(error) << "receive msg did not cancel (200ms)"; } - if (Send(msg1, "data-out", 0, 0) == static_cast(TransferResult::timeout)) { + if (Send(msg1, "data-out", 0, 0) == static_cast(TransferCode::timeout)) { LOG(info) << "send msg canceled (0ms)"; sendMsgCancelingAfter0ms = true; } else { LOG(error) << "send msg did not cancel (0ms)"; } - if (Receive(msg2, "data-in", 0, 0) == static_cast(TransferResult::timeout)) { + if (Receive(msg2, "data-in", 0, 0) == static_cast(TransferCode::timeout)) { LOG(info) << "receive msg canceled (0ms)"; receiveMsgCancelingAfter0ms = true; } else { @@ -77,28 +77,28 @@ class TransferTimeout : public FairMQDevice parts1.AddPart(NewMessage(10)); FairMQParts parts2; - if (Send(parts1, "data-out", 0, 200) == static_cast(TransferResult::timeout)) { + if (Send(parts1, "data-out", 0, 200) == static_cast(TransferCode::timeout)) { LOG(info) << "send 1 part canceled (200ms)"; send1PartCancelingAfter200ms = true; } else { LOG(error) << "send 1 part did not cancel (200ms)"; } - if (Receive(parts2, "data-in", 0, 200) == static_cast(TransferResult::timeout)) { + if (Receive(parts2, "data-in", 0, 200) == static_cast(TransferCode::timeout)) { LOG(info) << "receive 1 part canceled (200ms)"; receive1PartCancelingAfter200ms = true; } else { LOG(error) << "receive 1 part did not cancel (200ms)"; } - if (Send(parts1, "data-out", 0, 0) == static_cast(TransferResult::timeout)) { + if (Send(parts1, "data-out", 0, 0) == static_cast(TransferCode::timeout)) { LOG(info) << "send 1 part canceled (0ms)"; send1PartCancelingAfter0ms = true; } else { LOG(error) << "send 1 part did not cancel (0ms)"; } - if (Receive(parts2, "data-in", 0, 0) == static_cast(TransferResult::timeout)) { + if (Receive(parts2, "data-in", 0, 0) == static_cast(TransferCode::timeout)) { LOG(info) << "receive 1 part canceled (0ms)"; receive1PartCancelingAfter0ms = true; } else { @@ -110,28 +110,28 @@ class TransferTimeout : public FairMQDevice parts3.AddPart(NewMessage(10)); FairMQParts parts4; - if (Send(parts3, "data-out", 0, 200) == static_cast(TransferResult::timeout)) { + if (Send(parts3, "data-out", 0, 200) == static_cast(TransferCode::timeout)) { LOG(info) << "send 2 parts canceled (200ms)"; send2PartsCancelingAfter200ms = true; } else { LOG(error) << "send 2 parts did not cancel (200ms)"; } - if (Receive(parts4, "data-in", 0, 200) == static_cast(TransferResult::timeout)) { + if (Receive(parts4, "data-in", 0, 200) == static_cast(TransferCode::timeout)) { LOG(info) << "receive 2 parts canceled (200ms)"; receive2PartsCancelingAfter200ms = true; } else { LOG(error) << "receive 2 parts did not cancel (200ms)"; } - if (Send(parts3, "data-out", 0, 0) == static_cast(TransferResult::timeout)) { + if (Send(parts3, "data-out", 0, 0) == static_cast(TransferCode::timeout)) { LOG(info) << "send 2 parts canceled (0ms)"; send2PartsCancelingAfter0ms = true; } else { LOG(error) << "send 2 parts did not cancel (0ms)"; } - if (Receive(parts4, "data-in", 0, 0) == static_cast(TransferResult::timeout)) { + if (Receive(parts4, "data-in", 0, 0) == static_cast(TransferCode::timeout)) { LOG(info) << "receive 2 parts canceled (0ms)"; receive2PartsCancelingAfter0ms = true; } else { diff --git a/test/transport/_transfer_timeout.cxx b/test/transport/_transfer_timeout.cxx index c1c11e4c..4f1d4c96 100644 --- a/test/transport/_transfer_timeout.cxx +++ b/test/transport/_transfer_timeout.cxx @@ -63,7 +63,7 @@ void InterruptTransfer(const string& transport, const string& _address) auto result = pull.Receive(msg); t.join(); - ASSERT_EQ(result, static_cast(fair::mq::TransferResult::interrupted)); + ASSERT_EQ(result, static_cast(fair::mq::TransferCode::interrupted)); } TEST(TransferTimeout, zeromq)