zmq: correct accounting for msg size > 2GB

This commit is contained in:
Alexey Rybalchenko 2020-11-20 10:13:58 +01:00
parent 749d28a3b5
commit afadbb53e4

View File

@ -142,12 +142,14 @@ class Socket final : public fair::mq::Socket
static_cast<Message*>(msg.get())->ApplyUsedSize(); static_cast<Message*>(msg.get())->ApplyUsedSize();
int64_t actualBytes = zmq_msg_size(static_cast<Message*>(msg.get())->GetMessage());
while (true) { while (true) {
int nbytes = zmq_msg_send(static_cast<Message*>(msg.get())->GetMessage(), fSocket, flags); int nbytes = zmq_msg_send(static_cast<Message*>(msg.get())->GetMessage(), fSocket, flags);
if (nbytes >= 0) { if (nbytes >= 0) {
fBytesTx += nbytes; fBytesTx += actualBytes;
++fMessagesTx; ++fMessagesTx;
return nbytes; return actualBytes;
} else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) { } else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) {
if (fCtx.Interrupted()) { if (fCtx.Interrupted()) {
return static_cast<int>(TransferCode::interrupted); return static_cast<int>(TransferCode::interrupted);
@ -173,9 +175,10 @@ class Socket final : public fair::mq::Socket
while (true) { while (true) {
int nbytes = zmq_msg_recv(static_cast<Message*>(msg.get())->GetMessage(), fSocket, flags); int nbytes = zmq_msg_recv(static_cast<Message*>(msg.get())->GetMessage(), fSocket, flags);
if (nbytes >= 0) { if (nbytes >= 0) {
fBytesRx += nbytes; int64_t actualBytes = zmq_msg_size(static_cast<Message*>(msg.get())->GetMessage());
fBytesRx += actualBytes;
++fMessagesRx; ++fMessagesRx;
return nbytes; return actualBytes;
} else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) { } else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) {
if (fCtx.Interrupted()) { if (fCtx.Interrupted()) {
return static_cast<int>(TransferCode::interrupted); return static_cast<int>(TransferCode::interrupted);