Rename TransferResult to TransferCode

This commit is contained in:
Alexey Rybalchenko
2020-10-21 09:14:58 +02:00
parent ea746b17d0
commit 29f45fa77d
8 changed files with 54 additions and 53 deletions

View File

@@ -256,7 +256,7 @@ class FairMQChannel
/// Sends a message to the socket queue.
/// @param msg Constant reference of unique_ptr to a FairMQMessage
/// @param sndTimeoutInMs send timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send)
/// @return Number of bytes that have been queued, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change)
/// @return Number of bytes that have been queued, TransferCode::timeout if timed out, TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by requested state change)
int64_t Send(FairMQMessagePtr& msg, int sndTimeoutInMs = -1)
{
CheckSendCompatibility(msg);
@@ -266,7 +266,7 @@ class FairMQChannel
/// Receives a message from the socket queue.
/// @param msg Constant reference of unique_ptr to a FairMQMessage
/// @param rcvTimeoutInMs receive timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive)
/// @return Number of bytes that have been received, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change)
/// @return Number of bytes that have been received, TransferCode::timeout if timed out, TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by requested state change)
int64_t Receive(FairMQMessagePtr& msg, int rcvTimeoutInMs = -1)
{
CheckReceiveCompatibility(msg);
@@ -276,7 +276,7 @@ class FairMQChannel
/// Send a vector of messages
/// @param msgVec message vector reference
/// @param sndTimeoutInMs send timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send)
/// @return Number of bytes that have been queued, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change)
/// @return Number of bytes that have been queued, TransferCode::timeout if timed out, TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by requested state change)
int64_t Send(std::vector<FairMQMessagePtr>& msgVec, int sndTimeoutInMs = -1)
{
CheckSendCompatibility(msgVec);
@@ -286,7 +286,7 @@ class FairMQChannel
/// Receive a vector of messages
/// @param msgVec message vector reference
/// @param rcvTimeoutInMs receive timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive)
/// @return Number of bytes that have been received, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change)
/// @return Number of bytes that have been received, TransferCode::timeout if timed out, TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by requested state change)
int64_t Receive(std::vector<FairMQMessagePtr>& msgVec, int rcvTimeoutInMs = -1)
{
CheckReceiveCompatibility(msgVec);
@@ -296,7 +296,7 @@ class FairMQChannel
/// Send FairMQParts
/// @param parts FairMQParts reference
/// @param sndTimeoutInMs send timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send)
/// @return Number of bytes that have been queued, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change)
/// @return Number of bytes that have been queued, TransferCode::timeout if timed out, TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by requested state change)
int64_t Send(FairMQParts& parts, int sndTimeoutInMs = -1)
{
return Send(parts.fParts, sndTimeoutInMs);
@@ -305,7 +305,7 @@ class FairMQChannel
/// Receive FairMQParts
/// @param parts FairMQParts reference
/// @param rcvTimeoutInMs receive timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive)
/// @return Number of bytes that have been received, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change)
/// @return Number of bytes that have been received, TransferCode::timeout if timed out, TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by requested state change)
int64_t Receive(FairMQParts& parts, int rcvTimeoutInMs = -1)
{
return Receive(parts.fParts, rcvTimeoutInMs);

View File

@@ -96,7 +96,7 @@ class FairMQDevice
/// @param chan channel name
/// @param i channel index
/// @param sndTimeoutInMs send timeout in ms, -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send)
/// @return Number of bytes that have been queued, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change)
/// @return Number of bytes that have been queued, TransferCode::timeout if timed out, TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by requested state change)
int64_t Send(FairMQMessagePtr& msg, const std::string& channel, const int index = 0, int sndTimeoutInMs = -1)
{
return GetChannel(channel, index).Send(msg, sndTimeoutInMs);
@@ -107,7 +107,7 @@ class FairMQDevice
/// @param chan channel name
/// @param i channel index
/// @param rcvTimeoutInMs receive timeout in ms, -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive)
/// @return Number of bytes that have been received, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change)
/// @return Number of bytes that have been received, TransferCode::timeout if timed out, TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by requested state change)
int64_t Receive(FairMQMessagePtr& msg, const std::string& channel, const int index = 0, int rcvTimeoutInMs = -1)
{
return GetChannel(channel, index).Receive(msg, rcvTimeoutInMs);
@@ -118,7 +118,7 @@ class FairMQDevice
/// @param chan channel name
/// @param i channel index
/// @param sndTimeoutInMs send timeout in ms, -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send)
/// @return Number of bytes that have been queued, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change)
/// @return Number of bytes that have been queued, TransferCode::timeout if timed out, TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by requested state change)
int64_t Send(FairMQParts& parts, const std::string& channel, const int index = 0, int sndTimeoutInMs = -1)
{
return GetChannel(channel, index).Send(parts.fParts, sndTimeoutInMs);
@@ -129,7 +129,7 @@ class FairMQDevice
/// @param chan channel name
/// @param i channel index
/// @param rcvTimeoutInMs receive timeout in ms, -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive)
/// @return Number of bytes that have been received, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change)
/// @return Number of bytes that have been received, TransferCode::timeout if timed out, TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by requested state change)
int64_t Receive(FairMQParts& parts, const std::string& channel, const int index = 0, int rcvTimeoutInMs = -1)
{
return GetChannel(channel, index).Receive(parts.fParts, rcvTimeoutInMs);

View File

@@ -24,8 +24,9 @@ namespace fair
namespace mq
{
enum class TransferResult : int
enum class TransferCode : int
{
success = 0,
error = -1,
timeout = -2,
interrupted = -3

View File

@@ -284,7 +284,7 @@ try {
return size;
} catch (const std::exception& e) {
LOG(error) << e.what();
return static_cast<int64_t>(TransferResult::error);
return static_cast<int64_t>(TransferCode::error);
}
auto Socket::SendQueueReader() -> void
@@ -431,7 +431,7 @@ try {
return size;
} catch (const std::exception& e) {
LOG(error) << e.what();
return static_cast<int>(TransferResult::error);
return static_cast<int>(TransferCode::error);
}
auto Socket::Receive(std::vector<MessagePtr>& msgVec, const int /*timeout*/) -> int64_t
@@ -456,7 +456,7 @@ try {
return size;
} catch (const std::exception& e) {
LOG(error) << e.what();
return static_cast<int64_t>(TransferResult::error);
return static_cast<int64_t>(TransferCode::error);
}
auto Socket::RecvControlQueueReader() -> void

View File

@@ -148,10 +148,10 @@ class Socket final : public fair::mq::Socket
{
if (zmq_errno() == ETERM) {
LOG(debug) << "Terminating socket " << fId;
return static_cast<int>(TransferResult::error);
return static_cast<int>(TransferCode::error);
} else {
LOG(error) << "Failed transfer on socket " << fId << ", reason: " << zmq_strerror(errno);
return static_cast<int>(TransferResult::error);
return static_cast<int>(TransferCode::error);
}
}
@@ -177,18 +177,18 @@ class Socket final : public fair::mq::Socket
return size;
} else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) {
if (fManager.Interrupted()) {
return static_cast<int>(TransferResult::interrupted);
return static_cast<int>(TransferCode::interrupted);
} else if (ShouldRetry(flags, timeout, elapsed)) {
continue;
} else {
return static_cast<int>(TransferResult::timeout);
return static_cast<int>(TransferCode::timeout);
}
} else {
return HandleErrors();
}
}
return static_cast<int>(TransferResult::error);
return static_cast<int>(TransferCode::error);
}
int64_t Receive(MessagePtr& msg, const int timeout = -1) override
@@ -222,11 +222,11 @@ class Socket final : public fair::mq::Socket
return size;
} else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) {
if (fManager.Interrupted()) {
return static_cast<int>(TransferResult::interrupted);
return static_cast<int>(TransferCode::interrupted);
} else if (ShouldRetry(flags, timeout, elapsed)) {
continue;
} else {
return static_cast<int>(TransferResult::timeout);
return static_cast<int>(TransferCode::timeout);
}
} else {
return HandleErrors();
@@ -273,18 +273,18 @@ class Socket final : public fair::mq::Socket
return totalSize;
} else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) {
if (fManager.Interrupted()) {
return static_cast<int>(TransferResult::interrupted);
return static_cast<int>(TransferCode::interrupted);
} else if (ShouldRetry(flags, timeout, elapsed)) {
continue;
} else {
return static_cast<int>(TransferResult::timeout);
return static_cast<int>(TransferCode::timeout);
}
} else {
return HandleErrors();
}
}
return static_cast<int>(TransferResult::error);
return static_cast<int>(TransferCode::error);
}
int64_t Receive(std::vector<MessagePtr>& msgVec, const int timeout = -1) override
@@ -329,18 +329,18 @@ class Socket final : public fair::mq::Socket
return totalSize;
} else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) {
if (fManager.Interrupted()) {
return static_cast<int>(TransferResult::interrupted);
return static_cast<int>(TransferCode::interrupted);
} else if (ShouldRetry(flags, timeout, elapsed)) {
continue;
} else {
return static_cast<int>(TransferResult::timeout);
return static_cast<int>(TransferCode::timeout);
}
} else {
return HandleErrors();
}
}
return static_cast<int>(TransferResult::error);
return static_cast<int>(TransferCode::error);
}
void* GetSocket() const { return fSocket; }

View File

@@ -125,10 +125,10 @@ class Socket final : public fair::mq::Socket
{
if (zmq_errno() == ETERM) {
LOG(debug) << "Terminating socket " << fId;
return static_cast<int>(TransferResult::error);
return static_cast<int>(TransferCode::error);
} else {
LOG(error) << "Failed transfer on socket " << fId << ", errno: " << errno << ", reason: " << zmq_strerror(errno);
return static_cast<int>(TransferResult::error);
return static_cast<int>(TransferCode::error);
}
}
@@ -150,11 +150,11 @@ class Socket final : public fair::mq::Socket
return nbytes;
} else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) {
if (fCtx.Interrupted()) {
return static_cast<int>(TransferResult::interrupted);
return static_cast<int>(TransferCode::interrupted);
} else if (ShouldRetry(flags, timeout, elapsed)) {
continue;
} else {
return static_cast<int>(TransferResult::timeout);
return static_cast<int>(TransferCode::timeout);
}
} else {
return HandleErrors();
@@ -178,11 +178,11 @@ class Socket final : public fair::mq::Socket
return nbytes;
} else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) {
if (fCtx.Interrupted()) {
return static_cast<int>(TransferResult::interrupted);
return static_cast<int>(TransferCode::interrupted);
} else if (ShouldRetry(flags, timeout, elapsed)) {
continue;
} else {
return static_cast<int>(TransferResult::timeout);
return static_cast<int>(TransferCode::timeout);
}
} else {
return HandleErrors();
@@ -215,12 +215,12 @@ class Socket final : public fair::mq::Socket
totalSize += nbytes;
} else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) {
if (fCtx.Interrupted()) {
return static_cast<int>(TransferResult::interrupted);
return static_cast<int>(TransferCode::interrupted);
} else if (ShouldRetry(flags, timeout, elapsed)) {
repeat = true;
break;
} else {
return static_cast<int>(TransferResult::timeout);
return static_cast<int>(TransferCode::timeout);
}
} else {
return HandleErrors();
@@ -240,7 +240,7 @@ class Socket final : public fair::mq::Socket
return Send(msgVec.back(), timeout);
} else { // if the vector is empty, something might be wrong
LOG(warn) << "Will not send empty vector";
return static_cast<int>(TransferResult::error);
return static_cast<int>(TransferCode::error);
}
}
@@ -254,7 +254,7 @@ class Socket final : public fair::mq::Socket
while (true) {
int64_t totalSize = 0;
int64_t more = 0;
int more = 0;
bool repeat = false;
do {
@@ -266,12 +266,12 @@ class Socket final : public fair::mq::Socket
totalSize += nbytes;
} else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) {
if (fCtx.Interrupted()) {
return static_cast<int>(TransferResult::interrupted);
return static_cast<int>(TransferCode::interrupted);
} else if (ShouldRetry(flags, timeout, elapsed)) {
repeat = true;
break;
} else {
return static_cast<int>(TransferResult::timeout);
return static_cast<int>(TransferCode::timeout);
}
} else {
return HandleErrors();