FairMQ: add FairMQMessage::Copy(const FairMQMessage& msg), deprecate the old one.

This commit is contained in:
Alexey Rybalchenko 2017-12-08 12:02:08 +01:00 committed by Mohammad Al-Turany
parent e340a52bf2
commit ba78964e29
10 changed files with 90 additions and 21 deletions

View File

@ -29,14 +29,15 @@ class FairMQMessage
virtual void Rebuild(const size_t size) = 0; virtual void Rebuild(const size_t size) = 0;
virtual void Rebuild(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) = 0; virtual void Rebuild(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) = 0;
virtual void* GetData() = 0; virtual void* GetData() const = 0;
virtual size_t GetSize() const = 0; virtual size_t GetSize() const = 0;
virtual bool SetUsedSize(const size_t size) = 0; virtual bool SetUsedSize(const size_t size) = 0;
virtual FairMQ::Transport GetType() const = 0; virtual FairMQ::Transport GetType() const = 0;
virtual void Copy(const std::unique_ptr<FairMQMessage>& msg) = 0; virtual void Copy(const std::unique_ptr<FairMQMessage>& msg) __attribute__((deprecated("Use 'Copy(const FairMQMessage& msg)'"))) = 0;
virtual void Copy(const FairMQMessage& msg) = 0;
virtual ~FairMQMessage() {}; virtual ~FairMQMessage() {};
}; };

View File

@ -67,7 +67,7 @@ void FairMQBenchmarkSampler::Run()
if (fSameMessage) if (fSameMessage)
{ {
FairMQMessagePtr msg(dataOutChannel.Transport()->CreateMessage()); FairMQMessagePtr msg(dataOutChannel.Transport()->CreateMessage());
msg->Copy(baseMsg); msg->Copy(*baseMsg);
if (dataOutChannel.Send(msg) >= 0) if (dataOutChannel.Send(msg) >= 0)
{ {

View File

@ -49,7 +49,7 @@ bool FairMQMultiplier::HandleSingleData(std::unique_ptr<FairMQMessage>& payload,
for (unsigned int j = 0; j < fChannels.at(fOutChannelNames.at(i)).size(); ++j) // all subChannels in a channel for (unsigned int j = 0; j < fChannels.at(fOutChannelNames.at(i)).size(); ++j) // all subChannels in a channel
{ {
FairMQMessagePtr msgCopy(fTransportFactory->CreateMessage()); FairMQMessagePtr msgCopy(fTransportFactory->CreateMessage());
msgCopy->Copy(payload); msgCopy->Copy(*payload);
Send(msgCopy, fOutChannelNames.at(i), j); Send(msgCopy, fOutChannelNames.at(i), j);
} }
@ -60,7 +60,7 @@ bool FairMQMultiplier::HandleSingleData(std::unique_ptr<FairMQMessage>& payload,
for (unsigned int i = 0; i < lastChannelSize - 1; ++i) // iterate over all except last subChannels of the last channel for (unsigned int i = 0; i < lastChannelSize - 1; ++i) // iterate over all except last subChannels of the last channel
{ {
FairMQMessagePtr msgCopy(fTransportFactory->CreateMessage()); FairMQMessagePtr msgCopy(fTransportFactory->CreateMessage());
msgCopy->Copy(payload); msgCopy->Copy(*payload);
Send(msgCopy, fOutChannelNames.back(), i); Send(msgCopy, fOutChannelNames.back(), i);
} }
@ -81,7 +81,7 @@ bool FairMQMultiplier::HandleMultipartData(FairMQParts& payload, int /*index*/)
for (int k = 0; k < payload.Size(); ++k) for (int k = 0; k < payload.Size(); ++k)
{ {
FairMQMessagePtr msgCopy(fTransportFactory->CreateMessage()); FairMQMessagePtr msgCopy(fTransportFactory->CreateMessage());
msgCopy->Copy(payload.At(k)); msgCopy->Copy(payload.AtRef(k));
parts.AddPart(std::move(msgCopy)); parts.AddPart(std::move(msgCopy));
} }
@ -98,7 +98,7 @@ bool FairMQMultiplier::HandleMultipartData(FairMQParts& payload, int /*index*/)
for (int k = 0; k < payload.Size(); ++k) for (int k = 0; k < payload.Size(); ++k)
{ {
FairMQMessagePtr msgCopy(fTransportFactory->CreateMessage()); FairMQMessagePtr msgCopy(fTransportFactory->CreateMessage());
msgCopy->Copy(payload.At(k)); msgCopy->Copy(payload.AtRef(k));
parts.AddPart(std::move(msgCopy)); parts.AddPart(std::move(msgCopy));
} }

View File

@ -131,12 +131,12 @@ void FairMQMessageNN::Rebuild(void* data, const size_t size, fairmq_free_fn* ffn
} }
} }
void* FairMQMessageNN::GetMessage() void* FairMQMessageNN::GetMessage() const
{ {
return fMessage; return fMessage;
} }
void* FairMQMessageNN::GetData() void* FairMQMessageNN::GetData() const
{ {
return fMessage; return fMessage;
} }
@ -173,6 +173,30 @@ FairMQ::Transport FairMQMessageNN::GetType() const
return fTransportType; return fTransportType;
} }
void FairMQMessageNN::Copy(const FairMQMessage& msg)
{
if (fMessage)
{
if (nn_freemsg(fMessage) < 0)
{
LOG(ERROR) << "failed freeing message, reason: " << nn_strerror(errno);
}
}
size_t size = msg.GetSize();
fMessage = nn_allocmsg(size, 0);
if (!fMessage)
{
LOG(ERROR) << "failed allocating message, reason: " << nn_strerror(errno);
}
else
{
memcpy(fMessage, static_cast<const FairMQMessageNN&>(msg).GetMessage(), size);
fSize = size;
}
}
void FairMQMessageNN::Copy(const FairMQMessagePtr& msg) void FairMQMessageNN::Copy(const FairMQMessagePtr& msg)
{ {
if (fMessage) if (fMessage)

View File

@ -41,13 +41,14 @@ class FairMQMessageNN : public FairMQMessage
void Rebuild(const size_t size) override; void Rebuild(const size_t size) override;
void Rebuild(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) override; void Rebuild(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) override;
void* GetData() override; void* GetData() const override;
size_t GetSize() const override; size_t GetSize() const override;
bool SetUsedSize(const size_t size) override; bool SetUsedSize(const size_t size) override;
FairMQ::Transport GetType() const override; FairMQ::Transport GetType() const override;
void Copy(const FairMQMessage& msg) override;
void Copy(const FairMQMessagePtr& msg) override; void Copy(const FairMQMessagePtr& msg) override;
~FairMQMessageNN() override; ~FairMQMessageNN() override;
@ -59,7 +60,7 @@ class FairMQMessageNN : public FairMQMessage
FairMQUnmanagedRegion* fRegionPtr; FairMQUnmanagedRegion* fRegionPtr;
static FairMQ::Transport fTransportType; static FairMQ::Transport fTransportType;
void* GetMessage(); void* GetMessage() const;
void CloseMessage(); void CloseMessage();
void SetMessage(void* data, const size_t size); void SetMessage(void* data, const size_t size);
}; };

View File

@ -202,7 +202,7 @@ zmq_msg_t* FairMQMessageSHM::GetMessage()
return &fMessage; return &fMessage;
} }
void* FairMQMessageSHM::GetData() void* FairMQMessageSHM::GetData() const
{ {
if (fLocalPtr) if (fLocalPtr)
{ {
@ -274,7 +274,30 @@ FairMQ::Transport FairMQMessageSHM::GetType() const
return fTransportType; return fTransportType;
} }
void FairMQMessageSHM::Copy(const unique_ptr<FairMQMessage>& msg) void FairMQMessageSHM::Copy(const FairMQMessage& msg)
{
if (fHandle < 0)
{
bipc::managed_shared_memory::handle_t otherHandle = static_cast<const FairMQMessageSHM&>(msg).fHandle;
if (otherHandle)
{
if (InitializeChunk(msg.GetSize()))
{
memcpy(GetData(), msg.GetData(), msg.GetSize());
}
}
else
{
LOG(ERROR) << "FairMQMessageSHM::Copy() fail: source message not initialized!";
}
}
else
{
LOG(ERROR) << "FairMQMessageSHM::Copy() fail: target message already initialized!";
}
}
void FairMQMessageSHM::Copy(const FairMQMessagePtr& msg)
{ {
if (fHandle < 0) if (fHandle < 0)
{ {

View File

@ -39,13 +39,14 @@ class FairMQMessageSHM : public FairMQMessage
void Rebuild(const size_t size) override; void Rebuild(const size_t size) override;
void Rebuild(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) override; void Rebuild(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) override;
void* GetData() override; void* GetData() const override;
size_t GetSize() const override; size_t GetSize() const override;
bool SetUsedSize(const size_t size) override; bool SetUsedSize(const size_t size) override;
FairMQ::Transport GetType() const override; FairMQ::Transport GetType() const override;
void Copy(const FairMQMessage& msg) override;
void Copy(const FairMQMessagePtr& msg) override; void Copy(const FairMQMessagePtr& msg) override;
~FairMQMessageSHM() override; ~FairMQMessageSHM() override;
@ -58,10 +59,10 @@ class FairMQMessageSHM : public FairMQMessage
static std::atomic<bool> fInterrupted; static std::atomic<bool> fInterrupted;
static FairMQ::Transport fTransportType; static FairMQ::Transport fTransportType;
uint64_t fRegionId; uint64_t fRegionId;
fair::mq::shmem::Region* fRegionPtr; mutable fair::mq::shmem::Region* fRegionPtr;
boost::interprocess::managed_shared_memory::handle_t fHandle; boost::interprocess::managed_shared_memory::handle_t fHandle;
size_t fSize; size_t fSize;
char* fLocalPtr; mutable char* fLocalPtr;
bool InitializeChunk(const size_t size); bool InitializeChunk(const size_t size);
zmq_msg_t* GetMessage(); zmq_msg_t* GetMessage();

View File

@ -45,7 +45,7 @@ auto RunPushPullWithMsgResize(string transport, string address) -> void {
outMsg->SetUsedSize(250); outMsg->SetUsedSize(250);
ASSERT_EQ(outMsg->GetSize(), 250); ASSERT_EQ(outMsg->GetSize(), 250);
FairMQMessagePtr msgCopy(push.NewMessage()); FairMQMessagePtr msgCopy(push.NewMessage());
msgCopy->Copy(outMsg); msgCopy->Copy(*outMsg);
ASSERT_EQ(msgCopy->GetSize(), 250); ASSERT_EQ(msgCopy->GetSize(), 250);
ASSERT_EQ(push.Send(outMsg), 250); ASSERT_EQ(push.Send(outMsg), 250);

View File

@ -112,7 +112,7 @@ void FairMQMessageZMQ::Rebuild(void* data, const size_t size, fairmq_free_fn* ff
} }
} }
zmq_msg_t* FairMQMessageZMQ::GetMessage() zmq_msg_t* FairMQMessageZMQ::GetMessage() const
{ {
if (!fViewMsg) if (!fViewMsg)
{ {
@ -124,7 +124,7 @@ zmq_msg_t* FairMQMessageZMQ::GetMessage()
} }
} }
void* FairMQMessageZMQ::GetData() void* FairMQMessageZMQ::GetData() const
{ {
if (!fViewMsg) if (!fViewMsg)
{ {
@ -195,6 +195,24 @@ FairMQ::Transport FairMQMessageZMQ::GetType() const
return fTransportType; return fTransportType;
} }
void FairMQMessageZMQ::Copy(const FairMQMessage& msg)
{
const FairMQMessageZMQ& zMsg = static_cast<const FairMQMessageZMQ&>(msg);
// Shares the message buffer between msg and this fMsg.
if (zmq_msg_copy(fMsg.get(), zMsg.GetMessage()) != 0)
{
LOG(ERROR) << "failed copying message, reason: " << zmq_strerror(errno);
return;
}
// if the target message has been resized, apply same to this message also
if (zMsg.fUsedSizeModified)
{
fUsedSizeModified = true;
fUsedSize = zMsg.fUsedSize;
}
}
void FairMQMessageZMQ::Copy(const FairMQMessagePtr& msg) void FairMQMessageZMQ::Copy(const FairMQMessagePtr& msg)
{ {
FairMQMessageZMQ* msgPtr = static_cast<FairMQMessageZMQ*>(msg.get()); FairMQMessageZMQ* msgPtr = static_cast<FairMQMessageZMQ*>(msg.get());

View File

@ -40,7 +40,7 @@ class FairMQMessageZMQ : public FairMQMessage
void Rebuild(const size_t size) override; void Rebuild(const size_t size) override;
void Rebuild(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) override; void Rebuild(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) override;
void* GetData() override; void* GetData() const override;
size_t GetSize() const override; size_t GetSize() const override;
bool SetUsedSize(const size_t size) override; bool SetUsedSize(const size_t size) override;
@ -49,6 +49,7 @@ class FairMQMessageZMQ : public FairMQMessage
FairMQ::Transport GetType() const override; FairMQ::Transport GetType() const override;
void Copy(const FairMQMessagePtr& msg) override; void Copy(const FairMQMessagePtr& msg) override;
void Copy(const FairMQMessage& msg) override;
~FairMQMessageZMQ() override; ~FairMQMessageZMQ() override;
@ -59,7 +60,7 @@ class FairMQMessageZMQ : public FairMQMessage
std::unique_ptr<zmq_msg_t> fViewMsg; // view on a subset of fMsg (treating it as user buffer) std::unique_ptr<zmq_msg_t> fViewMsg; // view on a subset of fMsg (treating it as user buffer)
static FairMQ::Transport fTransportType; static FairMQ::Transport fTransportType;
zmq_msg_t* GetMessage(); zmq_msg_t* GetMessage() const;
void CloseMessage(); void CloseMessage();
}; };