FairMQ: Add functionality to set used message size

This commit is contained in:
Alexey Rybalchenko 2017-12-05 12:51:07 +01:00 committed by Mohammad Al-Turany
parent ac4695b215
commit e5aa85b61d
10 changed files with 250 additions and 98 deletions

View File

@ -31,7 +31,9 @@ class FairMQMessage
virtual void* GetMessage() = 0; virtual void* GetMessage() = 0;
virtual void* GetData() = 0; virtual void* GetData() = 0;
virtual size_t GetSize() = 0; virtual size_t GetSize() const = 0;
virtual bool SetUsedSize(const size_t size) = 0;
virtual void SetMessage(void* data, size_t size) = 0; virtual void SetMessage(void* data, size_t size) = 0;

View File

@ -1,8 +1,8 @@
/******************************************************************************** /********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* * * *
* This software is distributed under the terms of the * * This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, * * GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" * * copied verbatim in the file "LICENSE" *
********************************************************************************/ ********************************************************************************/
/** /**
@ -141,11 +141,27 @@ void* FairMQMessageNN::GetData()
return fMessage; return fMessage;
} }
size_t FairMQMessageNN::GetSize() size_t FairMQMessageNN::GetSize() const
{ {
return fSize; return fSize;
} }
bool FairMQMessageNN::SetUsedSize(const size_t size)
{
if (size <= fSize)
{
// with size smaller than original nanomsg will simply "chop" the data, avoiding reallocation
fMessage = nn_reallocmsg(fMessage, size);
fSize = size;
return true;
}
else
{
LOG(ERROR) << "FairMQMessageNN::SetUsedSize: cannot set used size higher than original.";
return false;
}
}
void FairMQMessageNN::SetMessage(void* data, const size_t size) void FairMQMessageNN::SetMessage(void* data, const size_t size)
{ {
fMessage = data; fMessage = data;

View File

@ -1,8 +1,8 @@
/******************************************************************************** /********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* * * *
* This software is distributed under the terms of the * * This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, * * GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" * * copied verbatim in the file "LICENSE" *
********************************************************************************/ ********************************************************************************/
/** /**
@ -33,21 +33,23 @@ class FairMQMessageNN : public FairMQMessage
FairMQMessageNN(const FairMQMessageNN&) = delete; FairMQMessageNN(const FairMQMessageNN&) = delete;
FairMQMessageNN operator=(const FairMQMessageNN&) = delete; FairMQMessageNN operator=(const FairMQMessageNN&) = delete;
virtual void Rebuild(); void Rebuild() override;
virtual void Rebuild(const size_t size); void Rebuild(const size_t size) override;
virtual void Rebuild(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr); void Rebuild(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) override;
virtual void* GetMessage(); void* GetMessage() override;
virtual void* GetData(); void* GetData() override;
virtual size_t GetSize(); size_t GetSize() const override;
virtual void SetMessage(void* data, const size_t size); bool SetUsedSize(const size_t size) override;
virtual FairMQ::Transport GetType() const; void SetMessage(void* data, const size_t size) override;
virtual void Copy(const std::unique_ptr<FairMQMessage>& msg); FairMQ::Transport GetType() const override;
virtual ~FairMQMessageNN(); void Copy(const std::unique_ptr<FairMQMessage>& msg) override;
~FairMQMessageNN() override;
friend class FairMQSocketNN; friend class FairMQSocketNN;

View File

@ -65,7 +65,7 @@ FairMQSocketNN::FairMQSocketNN(const string& type, const string& name, const str
} }
if (type == "sub") if (type == "sub")
{ {
nn_setsockopt(fSocket, NN_SUB, NN_SUB_SUBSCRIBE, NULL, 0); nn_setsockopt(fSocket, NN_SUB, NN_SUB_SUBSCRIBE, nullptr, 0);
} }
} }
@ -185,7 +185,7 @@ int FairMQSocketNN::Receive(FairMQMessagePtr& msg, const int flags)
while (true) while (true)
{ {
void* ptr = NULL; void* ptr = nullptr;
nbytes = nn_recv(fSocket, &ptr, NN_MSG, flags); nbytes = nn_recv(fSocket, &ptr, NN_MSG, flags);
if (nbytes >= 0) if (nbytes >= 0)
{ {
@ -310,7 +310,7 @@ int64_t FairMQSocketNN::Receive(vector<unique_ptr<FairMQMessage>>& msgVec, const
while (true) while (true)
{ {
// pointer to point to received message buffer // pointer to point to received message buffer
char* ptr = NULL; char* ptr = nullptr;
// receive the message into a buffer allocated by nanomsg and let ptr point to it // receive the message into a buffer allocated by nanomsg and let ptr point to it
int nbytes = nn_recv(fSocket, &ptr, NN_MSG, flags); int nbytes = nn_recv(fSocket, &ptr, NN_MSG, flags);
if (nbytes >= 0) // if no errors or non-blocking timeouts if (nbytes >= 0) // if no errors or non-blocking timeouts
@ -396,7 +396,7 @@ void FairMQSocketNN::Resume()
void* FairMQSocketNN::GetSocket() const void* FairMQSocketNN::GetSocket() const
{ {
return NULL; // dummy method to comply with the interface. functionality not possible in zeromq. return nullptr; // dummy method to comply with the interface. functionality not possible in zeromq.
} }
int FairMQSocketNN::GetSocket(int /*nothing*/) const int FairMQSocketNN::GetSocket(int /*nothing*/) const

View File

@ -91,7 +91,7 @@ FairMQMessageSHM::FairMQMessageSHM(Manager& manager, FairMQUnmanagedRegionPtr& r
, fRegionPtr(nullptr) , fRegionPtr(nullptr)
, fHandle(-1) , fHandle(-1)
, fSize(size) , fSize(size)
, fLocalPtr(data) , fLocalPtr(static_cast<char*>(data))
{ {
fHandle = (bipc::managed_shared_memory::handle_t)(reinterpret_cast<const char*>(data) - reinterpret_cast<const char*>(region->GetData())); fHandle = (bipc::managed_shared_memory::handle_t)(reinterpret_cast<const char*>(data) - reinterpret_cast<const char*>(region->GetData()));
@ -117,7 +117,9 @@ bool FairMQMessageSHM::InitializeChunk(const size_t size)
{ {
try try
{ {
fLocalPtr = fManager.Segment().allocate(size); bipc::managed_shared_memory::size_type actualSize = size;
char* hint = 0; // unused for bipc::allocate_new
fLocalPtr = fManager.Segment().allocation_command<char>(bipc::allocate_new, size, actualSize, hint);
} }
catch (bipc::bad_alloc& ba) catch (bipc::bad_alloc& ba)
{ {
@ -229,11 +231,44 @@ void* FairMQMessageSHM::GetData()
} }
} }
size_t FairMQMessageSHM::GetSize() size_t FairMQMessageSHM::GetSize() const
{ {
return fSize; return fSize;
} }
bool FairMQMessageSHM::SetUsedSize(const size_t size)
{
if (size == fSize)
{
return true;
}
else if (size <= fSize)
{
try
{
bipc::managed_shared_memory::size_type shrunkSize = size;
fLocalPtr = fManager.Segment().allocation_command<char>(bipc::shrink_in_place, fSize + 128, shrunkSize, fLocalPtr);
fSize = size;
// update meta header
MetaHeader* hdrPtr = static_cast<MetaHeader*>(zmq_msg_data(&fMessage));
hdrPtr->fSize = fSize;
return true;
}
catch (bipc::interprocess_exception& e)
{
LOG(INFO) << "FairMQMessageSHM::SetUsedSize could not set used size: " << e.what();
return false;
}
}
else
{
LOG(ERROR) << "FairMQMessageSHM::SetUsedSize: cannot set used size higher than original.";
return false;
}
}
void FairMQMessageSHM::SetMessage(void*, const size_t) void FairMQMessageSHM::SetMessage(void*, const size_t)
{ {
// dummy method to comply with the interface. functionality not allowed in zeromq. // dummy method to comply with the interface. functionality not allowed in zeromq.

View File

@ -41,7 +41,9 @@ class FairMQMessageSHM : public FairMQMessage
void* GetMessage() override; void* GetMessage() override;
void* GetData() override; void* GetData() override;
size_t GetSize() override; size_t GetSize() const override;
bool SetUsedSize(const size_t size) override;
void SetMessage(void* data, const size_t size) override; void SetMessage(void* data, const size_t size) override;
@ -64,7 +66,7 @@ class FairMQMessageSHM : public FairMQMessage
fair::mq::shmem::Region* fRegionPtr; fair::mq::shmem::Region* fRegionPtr;
boost::interprocess::managed_shared_memory::handle_t fHandle; boost::interprocess::managed_shared_memory::handle_t fHandle;
size_t fSize; size_t fSize;
void* fLocalPtr; char* fLocalPtr;
}; };
#endif /* FAIRMQMESSAGESHM_H_ */ #endif /* FAIRMQMESSAGESHM_H_ */

View File

@ -1,8 +1,8 @@
/******************************************************************************** /********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* * * *
* This software is distributed under the terms of the * * This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, * * GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" * * copied verbatim in the file "LICENSE" *
********************************************************************************/ ********************************************************************************/
/** /**
@ -12,11 +12,10 @@
* @author D. Klein, A. Rybalchenko, N. Winckler * @author D. Klein, A. Rybalchenko, N. Winckler
*/ */
#include <cstring>
#include <cstdlib>
#include "FairMQMessageZMQ.h" #include "FairMQMessageZMQ.h"
#include "FairMQLogger.h" #include "FairMQLogger.h"
#include <fairmq/Tools.h>
#include "FairMQUnmanagedRegionZMQ.h" #include "FairMQUnmanagedRegionZMQ.h"
using namespace std; using namespace std;
@ -24,48 +23,60 @@ using namespace std;
FairMQ::Transport FairMQMessageZMQ::fTransportType = FairMQ::Transport::ZMQ; FairMQ::Transport FairMQMessageZMQ::fTransportType = FairMQ::Transport::ZMQ;
FairMQMessageZMQ::FairMQMessageZMQ() FairMQMessageZMQ::FairMQMessageZMQ()
: fMessage() : fUsedSizeModified(false)
, fUsedSize()
, fMsg(fair::mq::tools::make_unique<zmq_msg_t>())
, fViewMsg(nullptr)
{ {
if (zmq_msg_init(&fMessage) != 0) if (zmq_msg_init(fMsg.get()) != 0)
{ {
LOG(ERROR) << "failed initializing message, reason: " << zmq_strerror(errno); LOG(ERROR) << "failed initializing message, reason: " << zmq_strerror(errno);
} }
} }
FairMQMessageZMQ::FairMQMessageZMQ(const size_t size) FairMQMessageZMQ::FairMQMessageZMQ(const size_t size)
: fMessage() : fUsedSizeModified(false)
, fUsedSize(size)
, fMsg(fair::mq::tools::make_unique<zmq_msg_t>())
, fViewMsg(nullptr)
{ {
if (zmq_msg_init_size(&fMessage, size) != 0) if (zmq_msg_init_size(fMsg.get(), size) != 0)
{ {
LOG(ERROR) << "failed initializing message with size, reason: " << zmq_strerror(errno); LOG(ERROR) << "failed initializing message with size, reason: " << zmq_strerror(errno);
} }
} }
FairMQMessageZMQ::FairMQMessageZMQ(void* data, const size_t size, fairmq_free_fn* ffn, void* hint) FairMQMessageZMQ::FairMQMessageZMQ(void* data, const size_t size, fairmq_free_fn* ffn, void* hint)
: fMessage() : fUsedSizeModified(false)
, fUsedSize()
, fMsg(fair::mq::tools::make_unique<zmq_msg_t>())
, fViewMsg(nullptr)
{ {
if (zmq_msg_init_data(&fMessage, data, size, ffn, hint) != 0) if (zmq_msg_init_data(fMsg.get(), data, size, ffn, hint) != 0)
{ {
LOG(ERROR) << "failed initializing message with data, reason: " << zmq_strerror(errno); LOG(ERROR) << "failed initializing message with data, reason: " << zmq_strerror(errno);
} }
} }
FairMQMessageZMQ::FairMQMessageZMQ(FairMQUnmanagedRegionPtr& region, void* data, const size_t size) FairMQMessageZMQ::FairMQMessageZMQ(FairMQUnmanagedRegionPtr& region, void* data, const size_t size)
: fMessage() : fUsedSizeModified(false)
, fUsedSize()
, fMsg(fair::mq::tools::make_unique<zmq_msg_t>())
, fViewMsg(nullptr)
{ {
// FIXME: make this zero-copy: // FIXME: make this zero-copy:
// simply taking over the provided buffer can casue premature delete, since region could be destroyed before the message is sent out. // simply taking over the provided buffer can casue premature delete, since region could be destroyed before the message is sent out.
// Needs lifetime extension for the ZMQ region. // Needs lifetime extension for the ZMQ region.
if (zmq_msg_init_size(&fMessage, size) != 0) if (zmq_msg_init_size(fMsg.get(), size) != 0)
{ {
LOG(ERROR) << "failed initializing message with size, reason: " << zmq_strerror(errno); LOG(ERROR) << "failed initializing message with size, reason: " << zmq_strerror(errno);
} }
memcpy(zmq_msg_data(&fMessage), data, size); memcpy(zmq_msg_data(fMsg.get()), data, size);
// call region callback // call region callback
static_cast<FairMQUnmanagedRegionZMQ*>(region.get())->fCallback(data, size); static_cast<FairMQUnmanagedRegionZMQ*>(region.get())->fCallback(data, size);
// if (zmq_msg_init_data(&fMessage, data, size, [](void*, void*){}, nullptr) != 0) // if (zmq_msg_init_data(fMsg.get(), data, size, [](void*, void*){}, nullptr) != 0)
// { // {
// LOG(ERROR) << "failed initializing message with data, reason: " << zmq_strerror(errno); // LOG(ERROR) << "failed initializing message with data, reason: " << zmq_strerror(errno);
// } // }
@ -74,7 +85,8 @@ FairMQMessageZMQ::FairMQMessageZMQ(FairMQUnmanagedRegionPtr& region, void* data,
void FairMQMessageZMQ::Rebuild() void FairMQMessageZMQ::Rebuild()
{ {
CloseMessage(); CloseMessage();
if (zmq_msg_init(&fMessage) != 0) fMsg = fair::mq::tools::make_unique<zmq_msg_t>();
if (zmq_msg_init(fMsg.get()) != 0)
{ {
LOG(ERROR) << "failed initializing message, reason: " << zmq_strerror(errno); LOG(ERROR) << "failed initializing message, reason: " << zmq_strerror(errno);
} }
@ -83,7 +95,8 @@ void FairMQMessageZMQ::Rebuild()
void FairMQMessageZMQ::Rebuild(const size_t size) void FairMQMessageZMQ::Rebuild(const size_t size)
{ {
CloseMessage(); CloseMessage();
if (zmq_msg_init_size(&fMessage, size) != 0) fMsg = fair::mq::tools::make_unique<zmq_msg_t>();
if (zmq_msg_init_size(fMsg.get(), size) != 0)
{ {
LOG(ERROR) << "failed initializing message with size, reason: " << zmq_strerror(errno); LOG(ERROR) << "failed initializing message with size, reason: " << zmq_strerror(errno);
} }
@ -92,7 +105,8 @@ void FairMQMessageZMQ::Rebuild(const size_t size)
void FairMQMessageZMQ::Rebuild(void* data, const size_t size, fairmq_free_fn* ffn, void* hint) void FairMQMessageZMQ::Rebuild(void* data, const size_t size, fairmq_free_fn* ffn, void* hint)
{ {
CloseMessage(); CloseMessage();
if (zmq_msg_init_data(&fMessage, data, size, ffn, hint) != 0) fMsg = fair::mq::tools::make_unique<zmq_msg_t>();
if (zmq_msg_init_data(fMsg.get(), data, size, ffn, hint) != 0)
{ {
LOG(ERROR) << "failed initializing message with data, reason: " << zmq_strerror(errno); LOG(ERROR) << "failed initializing message with data, reason: " << zmq_strerror(errno);
} }
@ -100,17 +114,80 @@ void FairMQMessageZMQ::Rebuild(void* data, const size_t size, fairmq_free_fn* ff
void* FairMQMessageZMQ::GetMessage() void* FairMQMessageZMQ::GetMessage()
{ {
return &fMessage; if (!fViewMsg)
{
return fMsg.get();
}
else
{
return fViewMsg.get();
}
} }
void* FairMQMessageZMQ::GetData() void* FairMQMessageZMQ::GetData()
{ {
return zmq_msg_data(&fMessage); if (!fViewMsg)
{
return zmq_msg_data(fMsg.get());
}
else
{
return zmq_msg_data(fViewMsg.get());
}
} }
size_t FairMQMessageZMQ::GetSize() size_t FairMQMessageZMQ::GetSize() const
{ {
return zmq_msg_size(&fMessage); if (fUsedSizeModified)
{
return fUsedSize;
}
else
{
return zmq_msg_size(fMsg.get());
}
}
// To emulate shrinking, a new message is created with the new size (ViewMsg), that points to the original buffer with the new size.
// Once the "view message" is transfered, the original is destroyed.
// Used size is applied only once in ApplyUsedSize, which is called by the socket before sending.
// This function just updates the desired size until the actual "resizing" happens.
bool FairMQMessageZMQ::SetUsedSize(const size_t size)
{
if (size <= zmq_msg_size(fMsg.get()))
{
fUsedSize = size;
fUsedSizeModified = true;
return true;
}
else
{
LOG(ERROR) << "FairMQMessageZMQ::SetUsedSize: cannot set used size higher than original.";
return false;
}
}
void FairMQMessageZMQ::ApplyUsedSize()
{
// Apply only once (before actual send).
// The check is needed because a send could fail and can be reattempted by the user, in this case we do not want to modify buffer again.
if (fUsedSizeModified && !fViewMsg)
{
fViewMsg = fair::mq::tools::make_unique<zmq_msg_t>();
void* ptr = zmq_msg_data(fMsg.get());
if (zmq_msg_init_data(fViewMsg.get(),
ptr,
fUsedSize,
[](void* /* data */, void* obj)
{
zmq_msg_close(static_cast<zmq_msg_t*>(obj));
delete static_cast<zmq_msg_t*>(obj);
},
fMsg.release()) != 0)
{
LOG(ERROR) << "failed initializing view message, reason: " << zmq_strerror(errno);
}
}
} }
void FairMQMessageZMQ::SetMessage(void*, const size_t) void FairMQMessageZMQ::SetMessage(void*, const size_t)
@ -123,34 +200,48 @@ FairMQ::Transport FairMQMessageZMQ::GetType() const
return fTransportType; return fTransportType;
} }
void FairMQMessageZMQ::Copy(const unique_ptr<FairMQMessage>& msg) void FairMQMessageZMQ::Copy(const FairMQMessagePtr& msg)
{ {
// Shares the message buffer between msg and this fMessage. // Shares the message buffer between msg and this fMsg.
if (zmq_msg_copy(&fMessage, static_cast<zmq_msg_t*>(msg->GetMessage())) != 0) if (zmq_msg_copy(fMsg.get(), static_cast<zmq_msg_t*>(msg->GetMessage())) != 0)
{ {
LOG(ERROR) << "failed copying message, reason: " << zmq_strerror(errno); LOG(ERROR) << "failed copying message, reason: " << zmq_strerror(errno);
return;
} }
// Alternatively, following code does a hard copy of the message, which allows to modify the original after making a copy, without affecting the new msg. // if the target message has been resized, apply same to this message also
if (static_cast<FairMQMessageZMQ*>(msg.get())->fUsedSizeModified)
// CloseMessage(); {
// size_t size = msg->GetSize(); fUsedSizeModified = true;
// zmq_msg_init_size(&fMessage, size); fUsedSize = static_cast<FairMQMessageZMQ*>(msg.get())->fUsedSize;
// memcpy(zmq_msg_data(&fMessage), msg->GetData(), size); }
} }
void FairMQMessageZMQ::CloseMessage() void FairMQMessageZMQ::CloseMessage()
{ {
if (zmq_msg_close(&fMessage) != 0) if (!fViewMsg)
{ {
LOG(ERROR) << "failed closing message, reason: " << zmq_strerror(errno); if (zmq_msg_close(fMsg.get()) != 0)
{
LOG(ERROR) << "failed closing message, reason: " << zmq_strerror(errno);
}
// reset the message object to allow reuse in Rebuild
fMsg.reset(nullptr);
} }
else
{
if (zmq_msg_close(fViewMsg.get()) != 0)
{
LOG(ERROR) << "failed closing message, reason: " << zmq_strerror(errno);
}
// reset the message object to allow reuse in Rebuild
fViewMsg.reset(nullptr);
}
fUsedSizeModified = false;
fUsedSize = 0;
} }
FairMQMessageZMQ::~FairMQMessageZMQ() FairMQMessageZMQ::~FairMQMessageZMQ()
{ {
if (zmq_msg_close(&fMessage) != 0) CloseMessage();
{
LOG(ERROR) << "failed closing message with data, reason: " << zmq_strerror(errno);
}
} }

View File

@ -1,8 +1,8 @@
/******************************************************************************** /********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* * * *
* This software is distributed under the terms of the * * This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, * * GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" * * copied verbatim in the file "LICENSE" *
********************************************************************************/ ********************************************************************************/
/** /**
@ -17,6 +17,7 @@
#include <cstddef> #include <cstddef>
#include <string> #include <string>
#include <memory>
#include <zmq.h> #include <zmq.h>
@ -31,26 +32,33 @@ class FairMQMessageZMQ : public FairMQMessage
FairMQMessageZMQ(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr); FairMQMessageZMQ(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr);
FairMQMessageZMQ(FairMQUnmanagedRegionPtr& region, void* data, const size_t size); FairMQMessageZMQ(FairMQUnmanagedRegionPtr& region, void* data, const size_t size);
virtual void Rebuild(); void Rebuild() override;
virtual void Rebuild(const size_t size); void Rebuild(const size_t size) override;
virtual void Rebuild(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr); void Rebuild(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) override;
virtual void* GetMessage(); void* GetMessage() override;
virtual void* GetData(); void* GetData() override;
virtual size_t GetSize(); size_t GetSize() const override;
virtual void SetMessage(void* data, const size_t size); bool SetUsedSize(const size_t size) override;
void ApplyUsedSize();
virtual FairMQ::Transport GetType() const; void SetMessage(void* data, const size_t size) override;
virtual void Copy(const std::unique_ptr<FairMQMessage>& msg);
FairMQ::Transport GetType() const override;
void Copy(const std::unique_ptr<FairMQMessage>& msg) override;
void CloseMessage(); void CloseMessage();
virtual ~FairMQMessageZMQ(); ~FairMQMessageZMQ() override;
private: private:
zmq_msg_t fMessage; bool fUsedSizeModified;
size_t fUsedSize;
std::unique_ptr<zmq_msg_t> fMsg;
std::unique_ptr<zmq_msg_t> fViewMsg; // view on a subset of fMsg (treating it as user buffer)
static FairMQ::Transport fTransportType; static FairMQ::Transport fTransportType;
}; };

View File

@ -1,8 +1,8 @@
/******************************************************************************** /********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* * * *
* This software is distributed under the terms of the * * This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, * * GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" * * copied verbatim in the file "LICENSE" *
********************************************************************************/ ********************************************************************************/
/** /**
@ -12,21 +12,19 @@
* @author D. Klein, A. Rybalchenko * @author D. Klein, A. Rybalchenko
*/ */
#include <sstream>
#include <zmq.h>
#include "FairMQSocketZMQ.h" #include "FairMQSocketZMQ.h"
#include "FairMQMessageZMQ.h" #include "FairMQMessageZMQ.h"
#include "FairMQLogger.h" #include "FairMQLogger.h"
#include <zmq.h>
using namespace std; using namespace std;
atomic<bool> FairMQSocketZMQ::fInterrupted(false); atomic<bool> FairMQSocketZMQ::fInterrupted(false);
FairMQSocketZMQ::FairMQSocketZMQ(const string& type, const string& name, const string& id /*= ""*/, void* context) FairMQSocketZMQ::FairMQSocketZMQ(const string& type, const string& name, const string& id /*= ""*/, void* context)
: FairMQSocket(ZMQ_SNDMORE, ZMQ_RCVMORE, ZMQ_DONTWAIT) : FairMQSocket(ZMQ_SNDMORE, ZMQ_RCVMORE, ZMQ_DONTWAIT)
, fSocket(NULL) , fSocket(nullptr)
, fId() , fId()
, fBytesTx(0) , fBytesTx(0)
, fBytesRx(0) , fBytesRx(0)
@ -38,7 +36,7 @@ FairMQSocketZMQ::FairMQSocketZMQ(const string& type, const string& name, const s
assert(context); assert(context);
fSocket = zmq_socket(context, GetConstant(type)); fSocket = zmq_socket(context, GetConstant(type));
if (fSocket == NULL) if (fSocket == nullptr)
{ {
LOG(ERROR) << "Failed creating socket " << fId << ", reason: " << zmq_strerror(errno); LOG(ERROR) << "Failed creating socket " << fId << ", reason: " << zmq_strerror(errno);
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
@ -71,7 +69,7 @@ FairMQSocketZMQ::FairMQSocketZMQ(const string& type, const string& name, const s
if (type == "sub") if (type == "sub")
{ {
if (zmq_setsockopt(fSocket, ZMQ_SUBSCRIBE, NULL, 0) != 0) if (zmq_setsockopt(fSocket, ZMQ_SUBSCRIBE, nullptr, 0) != 0)
{ {
LOG(ERROR) << "Failed setting ZMQ_SUBSCRIBE socket option, reason: " << zmq_strerror(errno); LOG(ERROR) << "Failed setting ZMQ_SUBSCRIBE socket option, reason: " << zmq_strerror(errno);
} }
@ -117,6 +115,8 @@ int FairMQSocketZMQ::Send(FairMQMessagePtr& msg, const int flags)
{ {
int nbytes = -1; int nbytes = -1;
static_cast<FairMQMessageZMQ*>(msg.get())->ApplyUsedSize();
while (true) while (true)
{ {
nbytes = zmq_msg_send(static_cast<zmq_msg_t*>(msg->GetMessage()), fSocket, flags); nbytes = zmq_msg_send(static_cast<zmq_msg_t*>(msg->GetMessage()), fSocket, flags);
@ -154,6 +154,7 @@ int FairMQSocketZMQ::Send(FairMQMessagePtr& msg, const int flags)
int FairMQSocketZMQ::Receive(FairMQMessagePtr& msg, const int flags) int FairMQSocketZMQ::Receive(FairMQMessagePtr& msg, const int flags)
{ {
int nbytes = -1; int nbytes = -1;
while (true) while (true)
{ {
nbytes = zmq_msg_recv(static_cast<zmq_msg_t*>(msg->GetMessage()), fSocket, flags); nbytes = zmq_msg_recv(static_cast<zmq_msg_t*>(msg->GetMessage()), fSocket, flags);
@ -187,7 +188,7 @@ int FairMQSocketZMQ::Receive(FairMQMessagePtr& msg, const int flags)
} }
} }
int64_t FairMQSocketZMQ::Send(vector<unique_ptr<FairMQMessage>>& msgVec, const int flags) int64_t FairMQSocketZMQ::Send(vector<FairMQMessagePtr>& msgVec, const int flags)
{ {
const unsigned int vecSize = msgVec.size(); const unsigned int vecSize = msgVec.size();
@ -206,6 +207,8 @@ int64_t FairMQSocketZMQ::Send(vector<unique_ptr<FairMQMessage>>& msgVec, const i
for (unsigned int i = 0; i < vecSize; ++i) for (unsigned int i = 0; i < vecSize; ++i)
{ {
static_cast<FairMQMessageZMQ*>(msgVec[i].get())->ApplyUsedSize();
nbytes = zmq_msg_send(static_cast<zmq_msg_t*>(msgVec[i]->GetMessage()), nbytes = zmq_msg_send(static_cast<zmq_msg_t*>(msgVec[i]->GetMessage()),
fSocket, fSocket,
(i < vecSize - 1) ? ZMQ_SNDMORE|flags : flags); (i < vecSize - 1) ? ZMQ_SNDMORE|flags : flags);
@ -260,7 +263,7 @@ int64_t FairMQSocketZMQ::Send(vector<unique_ptr<FairMQMessage>>& msgVec, const i
} }
} }
int64_t FairMQSocketZMQ::Receive(vector<unique_ptr<FairMQMessage>>& msgVec, const int flags) int64_t FairMQSocketZMQ::Receive(vector<FairMQMessagePtr>& msgVec, const int flags)
{ {
int64_t totalSize = 0; int64_t totalSize = 0;
int64_t more = 0; int64_t more = 0;
@ -268,13 +271,6 @@ int64_t FairMQSocketZMQ::Receive(vector<unique_ptr<FairMQMessage>>& msgVec, cons
while (true) while (true)
{ {
// Warn if the vector is filled before Receive() and empty it.
// if (msgVec.size() > 0)
// {
// LOG(WARN) << "Message vector contains elements before Receive(), they will be deleted!";
// msgVec.clear();
// }
totalSize = 0; totalSize = 0;
more = 0; more = 0;
repeat = false; repeat = false;
@ -306,8 +302,8 @@ int64_t FairMQSocketZMQ::Receive(vector<unique_ptr<FairMQMessage>>& msgVec, cons
return nbytes; return nbytes;
} }
size_t more_size = sizeof(more); size_t moreSize = sizeof(more);
zmq_getsockopt(fSocket, ZMQ_RCVMORE, &more, &more_size); zmq_getsockopt(fSocket, ZMQ_RCVMORE, &more, &moreSize);
} }
while (more); while (more);
@ -327,7 +323,7 @@ void FairMQSocketZMQ::Close()
{ {
// LOG(DEBUG) << "Closing socket " << fId; // LOG(DEBUG) << "Closing socket " << fId;
if (fSocket == NULL) if (fSocket == nullptr)
{ {
return; return;
} }
@ -337,7 +333,7 @@ void FairMQSocketZMQ::Close()
LOG(ERROR) << "Failed closing socket " << fId << ", reason: " << zmq_strerror(errno); LOG(ERROR) << "Failed closing socket " << fId << ", reason: " << zmq_strerror(errno);
} }
fSocket = NULL; fSocket = nullptr;
} }
void FairMQSocketZMQ::Interrupt() void FairMQSocketZMQ::Interrupt()

View File

@ -1,8 +1,8 @@
/******************************************************************************** /********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* * * *
* This software is distributed under the terms of the * * This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, * * GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" * * copied verbatim in the file "LICENSE" *
********************************************************************************/ ********************************************************************************/
/** /**