diff --git a/fairmq/FairMQMessage.h b/fairmq/FairMQMessage.h index 1341c0f9..a559720b 100644 --- a/fairmq/FairMQMessage.h +++ b/fairmq/FairMQMessage.h @@ -29,14 +29,15 @@ class FairMQMessage virtual void Rebuild(const size_t size) = 0; virtual void Rebuild(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) = 0; - virtual void* GetData() = 0; + virtual void* GetData() const = 0; virtual size_t GetSize() const = 0; virtual bool SetUsedSize(const size_t size) = 0; virtual FairMQ::Transport GetType() const = 0; - virtual void Copy(const std::unique_ptr& msg) = 0; + virtual void Copy(const std::unique_ptr& msg) __attribute__((deprecated("Use 'Copy(const FairMQMessage& msg)'"))) = 0; + virtual void Copy(const FairMQMessage& msg) = 0; virtual ~FairMQMessage() {}; }; diff --git a/fairmq/devices/FairMQBenchmarkSampler.cxx b/fairmq/devices/FairMQBenchmarkSampler.cxx index 729d4df2..14e80055 100644 --- a/fairmq/devices/FairMQBenchmarkSampler.cxx +++ b/fairmq/devices/FairMQBenchmarkSampler.cxx @@ -67,7 +67,7 @@ void FairMQBenchmarkSampler::Run() if (fSameMessage) { FairMQMessagePtr msg(dataOutChannel.Transport()->CreateMessage()); - msg->Copy(baseMsg); + msg->Copy(*baseMsg); if (dataOutChannel.Send(msg) >= 0) { diff --git a/fairmq/devices/FairMQMultiplier.cxx b/fairmq/devices/FairMQMultiplier.cxx index 8311e7ab..c40fa289 100644 --- a/fairmq/devices/FairMQMultiplier.cxx +++ b/fairmq/devices/FairMQMultiplier.cxx @@ -49,7 +49,7 @@ bool FairMQMultiplier::HandleSingleData(std::unique_ptr& payload, for (unsigned int j = 0; j < fChannels.at(fOutChannelNames.at(i)).size(); ++j) // all subChannels in a channel { FairMQMessagePtr msgCopy(fTransportFactory->CreateMessage()); - msgCopy->Copy(payload); + msgCopy->Copy(*payload); Send(msgCopy, fOutChannelNames.at(i), j); } @@ -60,7 +60,7 @@ bool FairMQMultiplier::HandleSingleData(std::unique_ptr& payload, for (unsigned int i = 0; i < lastChannelSize - 1; ++i) // iterate over all except last subChannels of the last channel { FairMQMessagePtr msgCopy(fTransportFactory->CreateMessage()); - msgCopy->Copy(payload); + msgCopy->Copy(*payload); Send(msgCopy, fOutChannelNames.back(), i); } @@ -81,7 +81,7 @@ bool FairMQMultiplier::HandleMultipartData(FairMQParts& payload, int /*index*/) for (int k = 0; k < payload.Size(); ++k) { FairMQMessagePtr msgCopy(fTransportFactory->CreateMessage()); - msgCopy->Copy(payload.At(k)); + msgCopy->Copy(payload.AtRef(k)); parts.AddPart(std::move(msgCopy)); } @@ -98,7 +98,7 @@ bool FairMQMultiplier::HandleMultipartData(FairMQParts& payload, int /*index*/) for (int k = 0; k < payload.Size(); ++k) { FairMQMessagePtr msgCopy(fTransportFactory->CreateMessage()); - msgCopy->Copy(payload.At(k)); + msgCopy->Copy(payload.AtRef(k)); parts.AddPart(std::move(msgCopy)); } diff --git a/fairmq/nanomsg/FairMQMessageNN.cxx b/fairmq/nanomsg/FairMQMessageNN.cxx index 77246ae4..c95e1ff3 100644 --- a/fairmq/nanomsg/FairMQMessageNN.cxx +++ b/fairmq/nanomsg/FairMQMessageNN.cxx @@ -131,12 +131,12 @@ void FairMQMessageNN::Rebuild(void* data, const size_t size, fairmq_free_fn* ffn } } -void* FairMQMessageNN::GetMessage() +void* FairMQMessageNN::GetMessage() const { return fMessage; } -void* FairMQMessageNN::GetData() +void* FairMQMessageNN::GetData() const { return fMessage; } @@ -173,6 +173,30 @@ FairMQ::Transport FairMQMessageNN::GetType() const return fTransportType; } +void FairMQMessageNN::Copy(const FairMQMessage& msg) +{ + if (fMessage) + { + if (nn_freemsg(fMessage) < 0) + { + LOG(ERROR) << "failed freeing message, reason: " << nn_strerror(errno); + } + } + + size_t size = msg.GetSize(); + + fMessage = nn_allocmsg(size, 0); + if (!fMessage) + { + LOG(ERROR) << "failed allocating message, reason: " << nn_strerror(errno); + } + else + { + memcpy(fMessage, static_cast(msg).GetMessage(), size); + fSize = size; + } +} + void FairMQMessageNN::Copy(const FairMQMessagePtr& msg) { if (fMessage) diff --git a/fairmq/nanomsg/FairMQMessageNN.h b/fairmq/nanomsg/FairMQMessageNN.h index 97737ae1..ca4e7042 100644 --- a/fairmq/nanomsg/FairMQMessageNN.h +++ b/fairmq/nanomsg/FairMQMessageNN.h @@ -41,13 +41,14 @@ class FairMQMessageNN : public FairMQMessage void Rebuild(const size_t size) override; void Rebuild(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) override; - void* GetData() override; + void* GetData() const override; size_t GetSize() const override; bool SetUsedSize(const size_t size) override; FairMQ::Transport GetType() const override; + void Copy(const FairMQMessage& msg) override; void Copy(const FairMQMessagePtr& msg) override; ~FairMQMessageNN() override; @@ -59,7 +60,7 @@ class FairMQMessageNN : public FairMQMessage FairMQUnmanagedRegion* fRegionPtr; static FairMQ::Transport fTransportType; - void* GetMessage(); + void* GetMessage() const; void CloseMessage(); void SetMessage(void* data, const size_t size); }; diff --git a/fairmq/shmem/FairMQMessageSHM.cxx b/fairmq/shmem/FairMQMessageSHM.cxx index c735588c..50d73ee5 100644 --- a/fairmq/shmem/FairMQMessageSHM.cxx +++ b/fairmq/shmem/FairMQMessageSHM.cxx @@ -202,7 +202,7 @@ zmq_msg_t* FairMQMessageSHM::GetMessage() return &fMessage; } -void* FairMQMessageSHM::GetData() +void* FairMQMessageSHM::GetData() const { if (fLocalPtr) { @@ -274,7 +274,30 @@ FairMQ::Transport FairMQMessageSHM::GetType() const return fTransportType; } -void FairMQMessageSHM::Copy(const unique_ptr& msg) +void FairMQMessageSHM::Copy(const FairMQMessage& msg) +{ + if (fHandle < 0) + { + bipc::managed_shared_memory::handle_t otherHandle = static_cast(msg).fHandle; + if (otherHandle) + { + if (InitializeChunk(msg.GetSize())) + { + memcpy(GetData(), msg.GetData(), msg.GetSize()); + } + } + else + { + LOG(ERROR) << "FairMQMessageSHM::Copy() fail: source message not initialized!"; + } + } + else + { + LOG(ERROR) << "FairMQMessageSHM::Copy() fail: target message already initialized!"; + } +} + +void FairMQMessageSHM::Copy(const FairMQMessagePtr& msg) { if (fHandle < 0) { diff --git a/fairmq/shmem/FairMQMessageSHM.h b/fairmq/shmem/FairMQMessageSHM.h index 3da6301e..c3e845af 100644 --- a/fairmq/shmem/FairMQMessageSHM.h +++ b/fairmq/shmem/FairMQMessageSHM.h @@ -39,13 +39,14 @@ class FairMQMessageSHM : public FairMQMessage void Rebuild(const size_t size) override; void Rebuild(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) override; - void* GetData() override; + void* GetData() const override; size_t GetSize() const override; bool SetUsedSize(const size_t size) override; FairMQ::Transport GetType() const override; + void Copy(const FairMQMessage& msg) override; void Copy(const FairMQMessagePtr& msg) override; ~FairMQMessageSHM() override; @@ -58,10 +59,10 @@ class FairMQMessageSHM : public FairMQMessage static std::atomic fInterrupted; static FairMQ::Transport fTransportType; uint64_t fRegionId; - fair::mq::shmem::Region* fRegionPtr; + mutable fair::mq::shmem::Region* fRegionPtr; boost::interprocess::managed_shared_memory::handle_t fHandle; size_t fSize; - char* fLocalPtr; + mutable char* fLocalPtr; bool InitializeChunk(const size_t size); zmq_msg_t* GetMessage(); diff --git a/fairmq/test/message_resize/_message_resize.cxx b/fairmq/test/message_resize/_message_resize.cxx index 91a05268..2a4bc0e8 100644 --- a/fairmq/test/message_resize/_message_resize.cxx +++ b/fairmq/test/message_resize/_message_resize.cxx @@ -45,7 +45,7 @@ auto RunPushPullWithMsgResize(string transport, string address) -> void { outMsg->SetUsedSize(250); ASSERT_EQ(outMsg->GetSize(), 250); FairMQMessagePtr msgCopy(push.NewMessage()); - msgCopy->Copy(outMsg); + msgCopy->Copy(*outMsg); ASSERT_EQ(msgCopy->GetSize(), 250); ASSERT_EQ(push.Send(outMsg), 250); diff --git a/fairmq/zeromq/FairMQMessageZMQ.cxx b/fairmq/zeromq/FairMQMessageZMQ.cxx index 90bfa4f9..d008e7d0 100644 --- a/fairmq/zeromq/FairMQMessageZMQ.cxx +++ b/fairmq/zeromq/FairMQMessageZMQ.cxx @@ -112,7 +112,7 @@ void FairMQMessageZMQ::Rebuild(void* data, const size_t size, fairmq_free_fn* ff } } -zmq_msg_t* FairMQMessageZMQ::GetMessage() +zmq_msg_t* FairMQMessageZMQ::GetMessage() const { if (!fViewMsg) { @@ -124,7 +124,7 @@ zmq_msg_t* FairMQMessageZMQ::GetMessage() } } -void* FairMQMessageZMQ::GetData() +void* FairMQMessageZMQ::GetData() const { if (!fViewMsg) { @@ -195,6 +195,24 @@ FairMQ::Transport FairMQMessageZMQ::GetType() const return fTransportType; } +void FairMQMessageZMQ::Copy(const FairMQMessage& msg) +{ + const FairMQMessageZMQ& zMsg = static_cast(msg); + // Shares the message buffer between msg and this fMsg. + if (zmq_msg_copy(fMsg.get(), zMsg.GetMessage()) != 0) + { + LOG(ERROR) << "failed copying message, reason: " << zmq_strerror(errno); + return; + } + + // if the target message has been resized, apply same to this message also + if (zMsg.fUsedSizeModified) + { + fUsedSizeModified = true; + fUsedSize = zMsg.fUsedSize; + } +} + void FairMQMessageZMQ::Copy(const FairMQMessagePtr& msg) { FairMQMessageZMQ* msgPtr = static_cast(msg.get()); diff --git a/fairmq/zeromq/FairMQMessageZMQ.h b/fairmq/zeromq/FairMQMessageZMQ.h index 918a698b..0f6baf3e 100644 --- a/fairmq/zeromq/FairMQMessageZMQ.h +++ b/fairmq/zeromq/FairMQMessageZMQ.h @@ -40,7 +40,7 @@ class FairMQMessageZMQ : public FairMQMessage void Rebuild(const size_t size) override; void Rebuild(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) override; - void* GetData() override; + void* GetData() const override; size_t GetSize() const override; bool SetUsedSize(const size_t size) override; @@ -49,6 +49,7 @@ class FairMQMessageZMQ : public FairMQMessage FairMQ::Transport GetType() const override; void Copy(const FairMQMessagePtr& msg) override; + void Copy(const FairMQMessage& msg) override; ~FairMQMessageZMQ() override; @@ -59,7 +60,7 @@ class FairMQMessageZMQ : public FairMQMessage std::unique_ptr fViewMsg; // view on a subset of fMsg (treating it as user buffer) static FairMQ::Transport fTransportType; - zmq_msg_t* GetMessage(); + zmq_msg_t* GetMessage() const; void CloseMessage(); };