From e5aa85b61dfc07d5e3b3d374b79a2c35a4e42e95 Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Tue, 5 Dec 2017 12:51:07 +0100 Subject: [PATCH] FairMQ: Add functionality to set used message size --- fairmq/FairMQMessage.h | 4 +- fairmq/nanomsg/FairMQMessageNN.cxx | 22 ++- fairmq/nanomsg/FairMQMessageNN.h | 26 ++-- fairmq/nanomsg/FairMQSocketNN.cxx | 8 +- fairmq/shmem/FairMQMessageSHM.cxx | 41 +++++- fairmq/shmem/FairMQMessageSHM.h | 6 +- fairmq/zeromq/FairMQMessageZMQ.cxx | 163 +++++++++++++++++----- fairmq/zeromq/FairMQMessageZMQ.h | 34 +++-- fairmq/zeromq/FairMQSocketZMQ.cxx | 40 +++--- fairmq/zeromq/FairMQTransportFactoryZMQ.h | 4 +- 10 files changed, 250 insertions(+), 98 deletions(-) diff --git a/fairmq/FairMQMessage.h b/fairmq/FairMQMessage.h index 885b3424..a7b7be9b 100644 --- a/fairmq/FairMQMessage.h +++ b/fairmq/FairMQMessage.h @@ -31,7 +31,9 @@ class FairMQMessage virtual void* GetMessage() = 0; virtual void* GetData() = 0; - virtual size_t GetSize() = 0; + virtual size_t GetSize() const = 0; + + virtual bool SetUsedSize(const size_t size) = 0; virtual void SetMessage(void* data, size_t size) = 0; diff --git a/fairmq/nanomsg/FairMQMessageNN.cxx b/fairmq/nanomsg/FairMQMessageNN.cxx index 983e4c5a..ef2effd9 100644 --- a/fairmq/nanomsg/FairMQMessageNN.cxx +++ b/fairmq/nanomsg/FairMQMessageNN.cxx @@ -1,8 +1,8 @@ /******************************************************************************** * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * * - * This software is distributed under the terms of the * - * GNU Lesser General Public Licence (LGPL) version 3, * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * * copied verbatim in the file "LICENSE" * ********************************************************************************/ /** @@ -141,11 +141,27 @@ void* FairMQMessageNN::GetData() return fMessage; } -size_t FairMQMessageNN::GetSize() +size_t FairMQMessageNN::GetSize() const { return fSize; } +bool FairMQMessageNN::SetUsedSize(const size_t size) +{ + if (size <= fSize) + { + // with size smaller than original nanomsg will simply "chop" the data, avoiding reallocation + fMessage = nn_reallocmsg(fMessage, size); + fSize = size; + return true; + } + else + { + LOG(ERROR) << "FairMQMessageNN::SetUsedSize: cannot set used size higher than original."; + return false; + } +} + void FairMQMessageNN::SetMessage(void* data, const size_t size) { fMessage = data; diff --git a/fairmq/nanomsg/FairMQMessageNN.h b/fairmq/nanomsg/FairMQMessageNN.h index b447fd3c..6e7c3684 100644 --- a/fairmq/nanomsg/FairMQMessageNN.h +++ b/fairmq/nanomsg/FairMQMessageNN.h @@ -1,8 +1,8 @@ /******************************************************************************** * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * * - * This software is distributed under the terms of the * - * GNU Lesser General Public Licence (LGPL) version 3, * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * * copied verbatim in the file "LICENSE" * ********************************************************************************/ /** @@ -33,21 +33,23 @@ class FairMQMessageNN : public FairMQMessage FairMQMessageNN(const FairMQMessageNN&) = delete; FairMQMessageNN operator=(const FairMQMessageNN&) = delete; - virtual void Rebuild(); - virtual void Rebuild(const size_t size); - virtual void Rebuild(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr); + void Rebuild() override; + void Rebuild(const size_t size) override; + void Rebuild(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) override; - virtual void* GetMessage(); - virtual void* GetData(); - virtual size_t GetSize(); + void* GetMessage() override; + void* GetData() override; + size_t GetSize() const override; - virtual void SetMessage(void* data, const size_t size); + bool SetUsedSize(const size_t size) override; - virtual FairMQ::Transport GetType() const; + void SetMessage(void* data, const size_t size) override; - virtual void Copy(const std::unique_ptr& msg); + FairMQ::Transport GetType() const override; - virtual ~FairMQMessageNN(); + void Copy(const std::unique_ptr& msg) override; + + ~FairMQMessageNN() override; friend class FairMQSocketNN; diff --git a/fairmq/nanomsg/FairMQSocketNN.cxx b/fairmq/nanomsg/FairMQSocketNN.cxx index c62f0274..4403af7b 100644 --- a/fairmq/nanomsg/FairMQSocketNN.cxx +++ b/fairmq/nanomsg/FairMQSocketNN.cxx @@ -65,7 +65,7 @@ FairMQSocketNN::FairMQSocketNN(const string& type, const string& name, const str } if (type == "sub") { - nn_setsockopt(fSocket, NN_SUB, NN_SUB_SUBSCRIBE, NULL, 0); + nn_setsockopt(fSocket, NN_SUB, NN_SUB_SUBSCRIBE, nullptr, 0); } } @@ -185,7 +185,7 @@ int FairMQSocketNN::Receive(FairMQMessagePtr& msg, const int flags) while (true) { - void* ptr = NULL; + void* ptr = nullptr; nbytes = nn_recv(fSocket, &ptr, NN_MSG, flags); if (nbytes >= 0) { @@ -310,7 +310,7 @@ int64_t FairMQSocketNN::Receive(vector>& msgVec, const while (true) { // pointer to point to received message buffer - char* ptr = NULL; + char* ptr = nullptr; // receive the message into a buffer allocated by nanomsg and let ptr point to it int nbytes = nn_recv(fSocket, &ptr, NN_MSG, flags); if (nbytes >= 0) // if no errors or non-blocking timeouts @@ -396,7 +396,7 @@ void FairMQSocketNN::Resume() void* FairMQSocketNN::GetSocket() const { - return NULL; // dummy method to comply with the interface. functionality not possible in zeromq. + return nullptr; // dummy method to comply with the interface. functionality not possible in zeromq. } int FairMQSocketNN::GetSocket(int /*nothing*/) const diff --git a/fairmq/shmem/FairMQMessageSHM.cxx b/fairmq/shmem/FairMQMessageSHM.cxx index f6221ae9..af3bc2b3 100644 --- a/fairmq/shmem/FairMQMessageSHM.cxx +++ b/fairmq/shmem/FairMQMessageSHM.cxx @@ -91,7 +91,7 @@ FairMQMessageSHM::FairMQMessageSHM(Manager& manager, FairMQUnmanagedRegionPtr& r , fRegionPtr(nullptr) , fHandle(-1) , fSize(size) - , fLocalPtr(data) + , fLocalPtr(static_cast(data)) { fHandle = (bipc::managed_shared_memory::handle_t)(reinterpret_cast(data) - reinterpret_cast(region->GetData())); @@ -117,7 +117,9 @@ bool FairMQMessageSHM::InitializeChunk(const size_t size) { try { - fLocalPtr = fManager.Segment().allocate(size); + bipc::managed_shared_memory::size_type actualSize = size; + char* hint = 0; // unused for bipc::allocate_new + fLocalPtr = fManager.Segment().allocation_command(bipc::allocate_new, size, actualSize, hint); } catch (bipc::bad_alloc& ba) { @@ -229,11 +231,44 @@ void* FairMQMessageSHM::GetData() } } -size_t FairMQMessageSHM::GetSize() +size_t FairMQMessageSHM::GetSize() const { return fSize; } +bool FairMQMessageSHM::SetUsedSize(const size_t size) +{ + if (size == fSize) + { + return true; + } + else if (size <= fSize) + { + try + { + + bipc::managed_shared_memory::size_type shrunkSize = size; + fLocalPtr = fManager.Segment().allocation_command(bipc::shrink_in_place, fSize + 128, shrunkSize, fLocalPtr); + fSize = size; + + // update meta header + MetaHeader* hdrPtr = static_cast(zmq_msg_data(&fMessage)); + hdrPtr->fSize = fSize; + return true; + } + catch (bipc::interprocess_exception& e) + { + LOG(INFO) << "FairMQMessageSHM::SetUsedSize could not set used size: " << e.what(); + return false; + } + } + else + { + LOG(ERROR) << "FairMQMessageSHM::SetUsedSize: cannot set used size higher than original."; + return false; + } +} + void FairMQMessageSHM::SetMessage(void*, const size_t) { // dummy method to comply with the interface. functionality not allowed in zeromq. diff --git a/fairmq/shmem/FairMQMessageSHM.h b/fairmq/shmem/FairMQMessageSHM.h index b742b001..4259ca10 100644 --- a/fairmq/shmem/FairMQMessageSHM.h +++ b/fairmq/shmem/FairMQMessageSHM.h @@ -41,7 +41,9 @@ class FairMQMessageSHM : public FairMQMessage void* GetMessage() override; void* GetData() override; - size_t GetSize() override; + size_t GetSize() const override; + + bool SetUsedSize(const size_t size) override; void SetMessage(void* data, const size_t size) override; @@ -64,7 +66,7 @@ class FairMQMessageSHM : public FairMQMessage fair::mq::shmem::Region* fRegionPtr; boost::interprocess::managed_shared_memory::handle_t fHandle; size_t fSize; - void* fLocalPtr; + char* fLocalPtr; }; #endif /* FAIRMQMESSAGESHM_H_ */ diff --git a/fairmq/zeromq/FairMQMessageZMQ.cxx b/fairmq/zeromq/FairMQMessageZMQ.cxx index eb9870c2..84a0c98f 100644 --- a/fairmq/zeromq/FairMQMessageZMQ.cxx +++ b/fairmq/zeromq/FairMQMessageZMQ.cxx @@ -1,8 +1,8 @@ /******************************************************************************** * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * * - * This software is distributed under the terms of the * - * GNU Lesser General Public Licence (LGPL) version 3, * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * * copied verbatim in the file "LICENSE" * ********************************************************************************/ /** @@ -12,11 +12,10 @@ * @author D. Klein, A. Rybalchenko, N. Winckler */ -#include -#include #include "FairMQMessageZMQ.h" #include "FairMQLogger.h" +#include #include "FairMQUnmanagedRegionZMQ.h" using namespace std; @@ -24,48 +23,60 @@ using namespace std; FairMQ::Transport FairMQMessageZMQ::fTransportType = FairMQ::Transport::ZMQ; FairMQMessageZMQ::FairMQMessageZMQ() - : fMessage() + : fUsedSizeModified(false) + , fUsedSize() + , fMsg(fair::mq::tools::make_unique()) + , fViewMsg(nullptr) { - if (zmq_msg_init(&fMessage) != 0) + if (zmq_msg_init(fMsg.get()) != 0) { LOG(ERROR) << "failed initializing message, reason: " << zmq_strerror(errno); } } FairMQMessageZMQ::FairMQMessageZMQ(const size_t size) - : fMessage() + : fUsedSizeModified(false) + , fUsedSize(size) + , fMsg(fair::mq::tools::make_unique()) + , fViewMsg(nullptr) { - if (zmq_msg_init_size(&fMessage, size) != 0) + if (zmq_msg_init_size(fMsg.get(), size) != 0) { LOG(ERROR) << "failed initializing message with size, reason: " << zmq_strerror(errno); } } FairMQMessageZMQ::FairMQMessageZMQ(void* data, const size_t size, fairmq_free_fn* ffn, void* hint) - : fMessage() + : fUsedSizeModified(false) + , fUsedSize() + , fMsg(fair::mq::tools::make_unique()) + , fViewMsg(nullptr) { - if (zmq_msg_init_data(&fMessage, data, size, ffn, hint) != 0) + if (zmq_msg_init_data(fMsg.get(), data, size, ffn, hint) != 0) { LOG(ERROR) << "failed initializing message with data, reason: " << zmq_strerror(errno); } } FairMQMessageZMQ::FairMQMessageZMQ(FairMQUnmanagedRegionPtr& region, void* data, const size_t size) - : fMessage() + : fUsedSizeModified(false) + , fUsedSize() + , fMsg(fair::mq::tools::make_unique()) + , fViewMsg(nullptr) { // FIXME: make this zero-copy: // simply taking over the provided buffer can casue premature delete, since region could be destroyed before the message is sent out. // Needs lifetime extension for the ZMQ region. - if (zmq_msg_init_size(&fMessage, size) != 0) + if (zmq_msg_init_size(fMsg.get(), size) != 0) { LOG(ERROR) << "failed initializing message with size, reason: " << zmq_strerror(errno); } - memcpy(zmq_msg_data(&fMessage), data, size); + memcpy(zmq_msg_data(fMsg.get()), data, size); // call region callback static_cast(region.get())->fCallback(data, size); - // if (zmq_msg_init_data(&fMessage, data, size, [](void*, void*){}, nullptr) != 0) + // if (zmq_msg_init_data(fMsg.get(), data, size, [](void*, void*){}, nullptr) != 0) // { // LOG(ERROR) << "failed initializing message with data, reason: " << zmq_strerror(errno); // } @@ -74,7 +85,8 @@ FairMQMessageZMQ::FairMQMessageZMQ(FairMQUnmanagedRegionPtr& region, void* data, void FairMQMessageZMQ::Rebuild() { CloseMessage(); - if (zmq_msg_init(&fMessage) != 0) + fMsg = fair::mq::tools::make_unique(); + if (zmq_msg_init(fMsg.get()) != 0) { LOG(ERROR) << "failed initializing message, reason: " << zmq_strerror(errno); } @@ -83,7 +95,8 @@ void FairMQMessageZMQ::Rebuild() void FairMQMessageZMQ::Rebuild(const size_t size) { CloseMessage(); - if (zmq_msg_init_size(&fMessage, size) != 0) + fMsg = fair::mq::tools::make_unique(); + if (zmq_msg_init_size(fMsg.get(), size) != 0) { LOG(ERROR) << "failed initializing message with size, reason: " << zmq_strerror(errno); } @@ -92,7 +105,8 @@ void FairMQMessageZMQ::Rebuild(const size_t size) void FairMQMessageZMQ::Rebuild(void* data, const size_t size, fairmq_free_fn* ffn, void* hint) { CloseMessage(); - if (zmq_msg_init_data(&fMessage, data, size, ffn, hint) != 0) + fMsg = fair::mq::tools::make_unique(); + if (zmq_msg_init_data(fMsg.get(), data, size, ffn, hint) != 0) { LOG(ERROR) << "failed initializing message with data, reason: " << zmq_strerror(errno); } @@ -100,17 +114,80 @@ void FairMQMessageZMQ::Rebuild(void* data, const size_t size, fairmq_free_fn* ff void* FairMQMessageZMQ::GetMessage() { - return &fMessage; + if (!fViewMsg) + { + return fMsg.get(); + } + else + { + return fViewMsg.get(); + } } void* FairMQMessageZMQ::GetData() { - return zmq_msg_data(&fMessage); + if (!fViewMsg) + { + return zmq_msg_data(fMsg.get()); + } + else + { + return zmq_msg_data(fViewMsg.get()); + } } -size_t FairMQMessageZMQ::GetSize() +size_t FairMQMessageZMQ::GetSize() const { - return zmq_msg_size(&fMessage); + if (fUsedSizeModified) + { + return fUsedSize; + } + else + { + return zmq_msg_size(fMsg.get()); + } +} + +// To emulate shrinking, a new message is created with the new size (ViewMsg), that points to the original buffer with the new size. +// Once the "view message" is transfered, the original is destroyed. +// Used size is applied only once in ApplyUsedSize, which is called by the socket before sending. +// This function just updates the desired size until the actual "resizing" happens. +bool FairMQMessageZMQ::SetUsedSize(const size_t size) +{ + if (size <= zmq_msg_size(fMsg.get())) + { + fUsedSize = size; + fUsedSizeModified = true; + return true; + } + else + { + LOG(ERROR) << "FairMQMessageZMQ::SetUsedSize: cannot set used size higher than original."; + return false; + } +} + +void FairMQMessageZMQ::ApplyUsedSize() +{ + // Apply only once (before actual send). + // The check is needed because a send could fail and can be reattempted by the user, in this case we do not want to modify buffer again. + if (fUsedSizeModified && !fViewMsg) + { + fViewMsg = fair::mq::tools::make_unique(); + void* ptr = zmq_msg_data(fMsg.get()); + if (zmq_msg_init_data(fViewMsg.get(), + ptr, + fUsedSize, + [](void* /* data */, void* obj) + { + zmq_msg_close(static_cast(obj)); + delete static_cast(obj); + }, + fMsg.release()) != 0) + { + LOG(ERROR) << "failed initializing view message, reason: " << zmq_strerror(errno); + } + } } void FairMQMessageZMQ::SetMessage(void*, const size_t) @@ -123,34 +200,48 @@ FairMQ::Transport FairMQMessageZMQ::GetType() const return fTransportType; } -void FairMQMessageZMQ::Copy(const unique_ptr& msg) +void FairMQMessageZMQ::Copy(const FairMQMessagePtr& msg) { - // Shares the message buffer between msg and this fMessage. - if (zmq_msg_copy(&fMessage, static_cast(msg->GetMessage())) != 0) + // Shares the message buffer between msg and this fMsg. + if (zmq_msg_copy(fMsg.get(), static_cast(msg->GetMessage())) != 0) { LOG(ERROR) << "failed copying message, reason: " << zmq_strerror(errno); + return; } - // Alternatively, following code does a hard copy of the message, which allows to modify the original after making a copy, without affecting the new msg. - - // CloseMessage(); - // size_t size = msg->GetSize(); - // zmq_msg_init_size(&fMessage, size); - // memcpy(zmq_msg_data(&fMessage), msg->GetData(), size); + // if the target message has been resized, apply same to this message also + if (static_cast(msg.get())->fUsedSizeModified) + { + fUsedSizeModified = true; + fUsedSize = static_cast(msg.get())->fUsedSize; + } } void FairMQMessageZMQ::CloseMessage() { - if (zmq_msg_close(&fMessage) != 0) + if (!fViewMsg) { - LOG(ERROR) << "failed closing message, reason: " << zmq_strerror(errno); + if (zmq_msg_close(fMsg.get()) != 0) + { + LOG(ERROR) << "failed closing message, reason: " << zmq_strerror(errno); + } + // reset the message object to allow reuse in Rebuild + fMsg.reset(nullptr); } + else + { + if (zmq_msg_close(fViewMsg.get()) != 0) + { + LOG(ERROR) << "failed closing message, reason: " << zmq_strerror(errno); + } + // reset the message object to allow reuse in Rebuild + fViewMsg.reset(nullptr); + } + fUsedSizeModified = false; + fUsedSize = 0; } FairMQMessageZMQ::~FairMQMessageZMQ() { - if (zmq_msg_close(&fMessage) != 0) - { - LOG(ERROR) << "failed closing message with data, reason: " << zmq_strerror(errno); - } + CloseMessage(); } diff --git a/fairmq/zeromq/FairMQMessageZMQ.h b/fairmq/zeromq/FairMQMessageZMQ.h index 53c199f4..ef9011d0 100644 --- a/fairmq/zeromq/FairMQMessageZMQ.h +++ b/fairmq/zeromq/FairMQMessageZMQ.h @@ -1,8 +1,8 @@ /******************************************************************************** * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * * - * This software is distributed under the terms of the * - * GNU Lesser General Public Licence (LGPL) version 3, * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * * copied verbatim in the file "LICENSE" * ********************************************************************************/ /** @@ -17,6 +17,7 @@ #include #include +#include #include @@ -31,26 +32,33 @@ class FairMQMessageZMQ : public FairMQMessage FairMQMessageZMQ(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr); FairMQMessageZMQ(FairMQUnmanagedRegionPtr& region, void* data, const size_t size); - virtual void Rebuild(); - virtual void Rebuild(const size_t size); - virtual void Rebuild(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr); + void Rebuild() override; + void Rebuild(const size_t size) override; + void Rebuild(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) override; - virtual void* GetMessage(); - virtual void* GetData(); - virtual size_t GetSize(); + void* GetMessage() override; + void* GetData() override; + size_t GetSize() const override; - virtual void SetMessage(void* data, const size_t size); + bool SetUsedSize(const size_t size) override; + void ApplyUsedSize(); - virtual FairMQ::Transport GetType() const; + void SetMessage(void* data, const size_t size) override; - virtual void Copy(const std::unique_ptr& msg); + + FairMQ::Transport GetType() const override; + + void Copy(const std::unique_ptr& msg) override; void CloseMessage(); - virtual ~FairMQMessageZMQ(); + ~FairMQMessageZMQ() override; private: - zmq_msg_t fMessage; + bool fUsedSizeModified; + size_t fUsedSize; + std::unique_ptr fMsg; + std::unique_ptr fViewMsg; // view on a subset of fMsg (treating it as user buffer) static FairMQ::Transport fTransportType; }; diff --git a/fairmq/zeromq/FairMQSocketZMQ.cxx b/fairmq/zeromq/FairMQSocketZMQ.cxx index 7d2f3bb1..213dc67b 100644 --- a/fairmq/zeromq/FairMQSocketZMQ.cxx +++ b/fairmq/zeromq/FairMQSocketZMQ.cxx @@ -1,8 +1,8 @@ /******************************************************************************** * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * * - * This software is distributed under the terms of the * - * GNU Lesser General Public Licence (LGPL) version 3, * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * * copied verbatim in the file "LICENSE" * ********************************************************************************/ /** @@ -12,21 +12,19 @@ * @author D. Klein, A. Rybalchenko */ -#include - -#include - #include "FairMQSocketZMQ.h" #include "FairMQMessageZMQ.h" #include "FairMQLogger.h" +#include + using namespace std; atomic FairMQSocketZMQ::fInterrupted(false); FairMQSocketZMQ::FairMQSocketZMQ(const string& type, const string& name, const string& id /*= ""*/, void* context) : FairMQSocket(ZMQ_SNDMORE, ZMQ_RCVMORE, ZMQ_DONTWAIT) - , fSocket(NULL) + , fSocket(nullptr) , fId() , fBytesTx(0) , fBytesRx(0) @@ -38,7 +36,7 @@ FairMQSocketZMQ::FairMQSocketZMQ(const string& type, const string& name, const s assert(context); fSocket = zmq_socket(context, GetConstant(type)); - if (fSocket == NULL) + if (fSocket == nullptr) { LOG(ERROR) << "Failed creating socket " << fId << ", reason: " << zmq_strerror(errno); exit(EXIT_FAILURE); @@ -71,7 +69,7 @@ FairMQSocketZMQ::FairMQSocketZMQ(const string& type, const string& name, const s if (type == "sub") { - if (zmq_setsockopt(fSocket, ZMQ_SUBSCRIBE, NULL, 0) != 0) + if (zmq_setsockopt(fSocket, ZMQ_SUBSCRIBE, nullptr, 0) != 0) { LOG(ERROR) << "Failed setting ZMQ_SUBSCRIBE socket option, reason: " << zmq_strerror(errno); } @@ -117,6 +115,8 @@ int FairMQSocketZMQ::Send(FairMQMessagePtr& msg, const int flags) { int nbytes = -1; + static_cast(msg.get())->ApplyUsedSize(); + while (true) { nbytes = zmq_msg_send(static_cast(msg->GetMessage()), fSocket, flags); @@ -154,6 +154,7 @@ int FairMQSocketZMQ::Send(FairMQMessagePtr& msg, const int flags) int FairMQSocketZMQ::Receive(FairMQMessagePtr& msg, const int flags) { int nbytes = -1; + while (true) { nbytes = zmq_msg_recv(static_cast(msg->GetMessage()), fSocket, flags); @@ -187,7 +188,7 @@ int FairMQSocketZMQ::Receive(FairMQMessagePtr& msg, const int flags) } } -int64_t FairMQSocketZMQ::Send(vector>& msgVec, const int flags) +int64_t FairMQSocketZMQ::Send(vector& msgVec, const int flags) { const unsigned int vecSize = msgVec.size(); @@ -206,6 +207,8 @@ int64_t FairMQSocketZMQ::Send(vector>& msgVec, const i for (unsigned int i = 0; i < vecSize; ++i) { + static_cast(msgVec[i].get())->ApplyUsedSize(); + nbytes = zmq_msg_send(static_cast(msgVec[i]->GetMessage()), fSocket, (i < vecSize - 1) ? ZMQ_SNDMORE|flags : flags); @@ -260,7 +263,7 @@ int64_t FairMQSocketZMQ::Send(vector>& msgVec, const i } } -int64_t FairMQSocketZMQ::Receive(vector>& msgVec, const int flags) +int64_t FairMQSocketZMQ::Receive(vector& msgVec, const int flags) { int64_t totalSize = 0; int64_t more = 0; @@ -268,13 +271,6 @@ int64_t FairMQSocketZMQ::Receive(vector>& msgVec, cons while (true) { - // Warn if the vector is filled before Receive() and empty it. - // if (msgVec.size() > 0) - // { - // LOG(WARN) << "Message vector contains elements before Receive(), they will be deleted!"; - // msgVec.clear(); - // } - totalSize = 0; more = 0; repeat = false; @@ -306,8 +302,8 @@ int64_t FairMQSocketZMQ::Receive(vector>& msgVec, cons return nbytes; } - size_t more_size = sizeof(more); - zmq_getsockopt(fSocket, ZMQ_RCVMORE, &more, &more_size); + size_t moreSize = sizeof(more); + zmq_getsockopt(fSocket, ZMQ_RCVMORE, &more, &moreSize); } while (more); @@ -327,7 +323,7 @@ void FairMQSocketZMQ::Close() { // LOG(DEBUG) << "Closing socket " << fId; - if (fSocket == NULL) + if (fSocket == nullptr) { return; } @@ -337,7 +333,7 @@ void FairMQSocketZMQ::Close() LOG(ERROR) << "Failed closing socket " << fId << ", reason: " << zmq_strerror(errno); } - fSocket = NULL; + fSocket = nullptr; } void FairMQSocketZMQ::Interrupt() diff --git a/fairmq/zeromq/FairMQTransportFactoryZMQ.h b/fairmq/zeromq/FairMQTransportFactoryZMQ.h index 2af49f00..46b19e93 100644 --- a/fairmq/zeromq/FairMQTransportFactoryZMQ.h +++ b/fairmq/zeromq/FairMQTransportFactoryZMQ.h @@ -1,8 +1,8 @@ /******************************************************************************** * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * * - * This software is distributed under the terms of the * - * GNU Lesser General Public Licence (LGPL) version 3, * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * * copied verbatim in the file "LICENSE" * ********************************************************************************/ /**