From afadbb53e420856e9c548c5fcb0145e66f07f769 Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Fri, 20 Nov 2020 10:13:58 +0100 Subject: [PATCH] zmq: correct accounting for msg size > 2GB --- fairmq/zeromq/Socket.h | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/fairmq/zeromq/Socket.h b/fairmq/zeromq/Socket.h index b1fd3f91..1447a70f 100644 --- a/fairmq/zeromq/Socket.h +++ b/fairmq/zeromq/Socket.h @@ -142,12 +142,14 @@ class Socket final : public fair::mq::Socket static_cast(msg.get())->ApplyUsedSize(); + int64_t actualBytes = zmq_msg_size(static_cast(msg.get())->GetMessage()); + while (true) { int nbytes = zmq_msg_send(static_cast(msg.get())->GetMessage(), fSocket, flags); if (nbytes >= 0) { - fBytesTx += nbytes; + fBytesTx += actualBytes; ++fMessagesTx; - return nbytes; + return actualBytes; } else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) { if (fCtx.Interrupted()) { return static_cast(TransferCode::interrupted); @@ -173,9 +175,10 @@ class Socket final : public fair::mq::Socket while (true) { int nbytes = zmq_msg_recv(static_cast(msg.get())->GetMessage(), fSocket, flags); if (nbytes >= 0) { - fBytesRx += nbytes; + int64_t actualBytes = zmq_msg_size(static_cast(msg.get())->GetMessage()); + fBytesRx += actualBytes; ++fMessagesRx; - return nbytes; + return actualBytes; } else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) { if (fCtx.Interrupted()) { return static_cast(TransferCode::interrupted);