FairMQ/fairmq/zeromq/FairMQMessageZMQ.cxx
2018-05-17 16:32:49 +02:00

264 lines
7.8 KiB
C++

/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* FairMQMessageZMQ.cxx
*
* @since 2012-12-05
* @author D. Klein, A. Rybalchenko, N. Winckler
*/
#include "FairMQMessageZMQ.h"
#include "FairMQLogger.h"
#include <fairmq/Tools.h>
#include "FairMQUnmanagedRegionZMQ.h"
#include <cstring>
using namespace std;
fair::mq::Transport FairMQMessageZMQ::fTransportType = fair::mq::Transport::ZMQ;
FairMQMessageZMQ::FairMQMessageZMQ()
: fUsedSizeModified(false)
, fUsedSize()
, fMsg(fair::mq::tools::make_unique<zmq_msg_t>())
, fViewMsg(nullptr)
{
if (zmq_msg_init(fMsg.get()) != 0)
{
LOG(error) << "failed initializing message, reason: " << zmq_strerror(errno);
}
}
FairMQMessageZMQ::FairMQMessageZMQ(const size_t size)
: fUsedSizeModified(false)
, fUsedSize(size)
, fMsg(fair::mq::tools::make_unique<zmq_msg_t>())
, fViewMsg(nullptr)
{
if (zmq_msg_init_size(fMsg.get(), size) != 0)
{
LOG(error) << "failed initializing message with size, reason: " << zmq_strerror(errno);
}
}
FairMQMessageZMQ::FairMQMessageZMQ(void* data, const size_t size, fairmq_free_fn* ffn, void* hint)
: fUsedSizeModified(false)
, fUsedSize()
, fMsg(fair::mq::tools::make_unique<zmq_msg_t>())
, fViewMsg(nullptr)
{
if (zmq_msg_init_data(fMsg.get(), data, size, ffn, hint) != 0)
{
LOG(error) << "failed initializing message with data, reason: " << zmq_strerror(errno);
}
}
FairMQMessageZMQ::FairMQMessageZMQ(FairMQUnmanagedRegionPtr& region, void* data, const size_t size, void* hint)
: fUsedSizeModified(false)
, fUsedSize()
, fMsg(fair::mq::tools::make_unique<zmq_msg_t>())
, fViewMsg(nullptr)
{
// FIXME: make this zero-copy:
// simply taking over the provided buffer can casue premature delete, since region could be destroyed before the message is sent out.
// Needs lifetime extension for the ZMQ region.
if (zmq_msg_init_size(fMsg.get(), size) != 0)
{
LOG(error) << "failed initializing message with size, reason: " << zmq_strerror(errno);
}
memcpy(zmq_msg_data(fMsg.get()), data, size);
// call region callback
static_cast<FairMQUnmanagedRegionZMQ*>(region.get())->fCallback(data, size, hint);
// if (zmq_msg_init_data(fMsg.get(), data, size, [](void*, void*){}, nullptr) != 0)
// {
// LOG(error) << "failed initializing message with data, reason: " << zmq_strerror(errno);
// }
}
void FairMQMessageZMQ::Rebuild()
{
CloseMessage();
fMsg = fair::mq::tools::make_unique<zmq_msg_t>();
if (zmq_msg_init(fMsg.get()) != 0)
{
LOG(error) << "failed initializing message, reason: " << zmq_strerror(errno);
}
}
void FairMQMessageZMQ::Rebuild(const size_t size)
{
CloseMessage();
fMsg = fair::mq::tools::make_unique<zmq_msg_t>();
if (zmq_msg_init_size(fMsg.get(), size) != 0)
{
LOG(error) << "failed initializing message with size, reason: " << zmq_strerror(errno);
}
}
void FairMQMessageZMQ::Rebuild(void* data, const size_t size, fairmq_free_fn* ffn, void* hint)
{
CloseMessage();
fMsg = fair::mq::tools::make_unique<zmq_msg_t>();
if (zmq_msg_init_data(fMsg.get(), data, size, ffn, hint) != 0)
{
LOG(error) << "failed initializing message with data, reason: " << zmq_strerror(errno);
}
}
zmq_msg_t* FairMQMessageZMQ::GetMessage() const
{
if (!fViewMsg)
{
return fMsg.get();
}
else
{
return fViewMsg.get();
}
}
void* FairMQMessageZMQ::GetData() const
{
if (!fViewMsg)
{
return zmq_msg_data(fMsg.get());
}
else
{
return zmq_msg_data(fViewMsg.get());
}
}
size_t FairMQMessageZMQ::GetSize() const
{
if (fUsedSizeModified)
{
return fUsedSize;
}
else
{
return zmq_msg_size(fMsg.get());
}
}
// To emulate shrinking, a new message is created with the new size (ViewMsg), that points to the original buffer with the new size.
// Once the "view message" is transfered, the original is destroyed.
// Used size is applied only once in ApplyUsedSize, which is called by the socket before sending.
// This function just updates the desired size until the actual "resizing" happens.
bool FairMQMessageZMQ::SetUsedSize(const size_t size)
{
if (size <= zmq_msg_size(fMsg.get()))
{
fUsedSize = size;
fUsedSizeModified = true;
return true;
}
else
{
LOG(error) << "cannot set used size higher than original.";
return false;
}
}
void FairMQMessageZMQ::ApplyUsedSize()
{
// Apply only once (before actual send).
// The check is needed because a send could fail and can be reattempted by the user, in this case we do not want to modify buffer again.
if (fUsedSizeModified && !fViewMsg)
{
fViewMsg = fair::mq::tools::make_unique<zmq_msg_t>();
void* ptr = zmq_msg_data(fMsg.get());
if (zmq_msg_init_data(fViewMsg.get(),
ptr,
fUsedSize,
[](void* /* data */, void* obj)
{
zmq_msg_close(static_cast<zmq_msg_t*>(obj));
delete static_cast<zmq_msg_t*>(obj);
},
fMsg.release()) != 0)
{
LOG(error) << "failed initializing view message, reason: " << zmq_strerror(errno);
}
}
}
fair::mq::Transport FairMQMessageZMQ::GetType() const
{
return fTransportType;
}
void FairMQMessageZMQ::Copy(const FairMQMessage& msg)
{
const FairMQMessageZMQ& zMsg = static_cast<const FairMQMessageZMQ&>(msg);
// Shares the message buffer between msg and this fMsg.
if (zmq_msg_copy(fMsg.get(), zMsg.GetMessage()) != 0)
{
LOG(error) << "failed copying message, reason: " << zmq_strerror(errno);
return;
}
// if the target message has been resized, apply same to this message also
if (zMsg.fUsedSizeModified)
{
fUsedSizeModified = true;
fUsedSize = zMsg.fUsedSize;
}
}
void FairMQMessageZMQ::Copy(const FairMQMessagePtr& msg)
{
FairMQMessageZMQ* msgPtr = static_cast<FairMQMessageZMQ*>(msg.get());
// Shares the message buffer between msg and this fMsg.
if (zmq_msg_copy(fMsg.get(), msgPtr->GetMessage()) != 0)
{
LOG(error) << "failed copying message, reason: " << zmq_strerror(errno);
return;
}
// if the target message has been resized, apply same to this message also
if (msgPtr->fUsedSizeModified)
{
fUsedSizeModified = true;
fUsedSize = msgPtr->fUsedSize;
}
}
void FairMQMessageZMQ::CloseMessage()
{
if (!fViewMsg)
{
if (zmq_msg_close(fMsg.get()) != 0)
{
LOG(error) << "failed closing message, reason: " << zmq_strerror(errno);
}
// reset the message object to allow reuse in Rebuild
fMsg.reset(nullptr);
}
else
{
if (zmq_msg_close(fViewMsg.get()) != 0)
{
LOG(error) << "failed closing message, reason: " << zmq_strerror(errno);
}
// reset the message object to allow reuse in Rebuild
fViewMsg.reset(nullptr);
}
fUsedSizeModified = false;
fUsedSize = 0;
}
FairMQMessageZMQ::~FairMQMessageZMQ()
{
CloseMessage();
}