- Make FairMQSampler generic.

- Remove fairroot dependency from fairmq.
This commit is contained in:
NicolasWinckler 2014-01-15 16:52:48 +01:00
parent 23b88e9e91
commit dfc1dd6a06
19 changed files with 62 additions and 579 deletions

View File

@ -1,8 +1,6 @@
set(INCLUDE_DIRECTORIES set(INCLUDE_DIRECTORIES
${BASE_INCLUDE_DIRECTORIES}
${CMAKE_SOURCE_DIR}/fairmq ${CMAKE_SOURCE_DIR}/fairmq
${Boost_INCLUDE_DIR} ${Boost_INCLUDE_DIR}
${ROOT_INCLUDE_DIR}
) )
if(NANOMSG_FOUND) if(NANOMSG_FOUND)
@ -20,7 +18,6 @@ endif(NANOMSG_FOUND)
include_directories(${INCLUDE_DIRECTORIES}) include_directories(${INCLUDE_DIRECTORIES})
set(LINK_DIRECTORIES set(LINK_DIRECTORIES
${ROOT_LIBRARY_DIR}
${Boost_LIBRARY_DIRS} ${Boost_LIBRARY_DIRS}
) )
@ -34,17 +31,13 @@ set(SRCS
"FairMQMessage.cxx" "FairMQMessage.cxx"
"FairMQSocket.cxx" "FairMQSocket.cxx"
"FairMQDevice.cxx" "FairMQDevice.cxx"
"FairMQSampler.cxx"
"FairMQBenchmarkSampler.cxx" "FairMQBenchmarkSampler.cxx"
"FairMQProcessor.cxx"
"FairMQSink.cxx" "FairMQSink.cxx"
"FairMQBuffer.cxx" "FairMQBuffer.cxx"
"FairMQProxy.cxx" "FairMQProxy.cxx"
"FairMQSplitter.cxx" "FairMQSplitter.cxx"
"FairMQMerger.cxx" "FairMQMerger.cxx"
"FairMQPoller.cxx" "FairMQPoller.cxx"
"FairMQSamplerTask.cxx"
"FairMQProcessorTask.cxx"
) )
if(NANOMSG_FOUND) if(NANOMSG_FOUND)
@ -75,7 +68,7 @@ endif(NANOMSG_FOUND)
set(DEPENDENCIES set(DEPENDENCIES
${DEPENDENCIES} ${DEPENDENCIES}
${CMAKE_THREAD_LIBS_INIT} ${CMAKE_THREAD_LIBS_INIT}
Base ParBase FairTools GeoBase boost_thread boost_timer boost_system boost_thread boost_timer boost_system
) )
set(LIBRARY_NAME FairMQ) set(LIBRARY_NAME FairMQ)

View File

@ -25,11 +25,11 @@ class FairMQLogger
DEBUG, INFO, ERROR, STATE DEBUG, INFO, ERROR, STATE
}; };
FairMQLogger(); FairMQLogger();
FairMQLogger(const string& bindAdress); // TODO: check this for const ref FairMQLogger(const string& bindAdress);
virtual ~FairMQLogger(); virtual ~FairMQLogger();
void Log(int type, const string& logmsg); void Log(int type, const string& logmsg);
static FairMQLogger* GetInstance(); static FairMQLogger* GetInstance();
static FairMQLogger* InitInstance(const string& bindAddress); // TODO: check this for const ref static FairMQLogger* InitInstance(const string& bindAddress);
}; };
typedef unsigned long long timestamp_t; typedef unsigned long long timestamp_t;

View File

@ -16,13 +16,14 @@ class FairMQMessage
public: public:
virtual void Rebuild() = 0; virtual void Rebuild() = 0;
virtual void Rebuild(size_t size) = 0; virtual void Rebuild(size_t size) = 0;
virtual void Rebuild(void* data, size_t site) = 0; virtual void Rebuild(void* data, size_t size) = 0;
virtual void* GetMessage() = 0; virtual void* GetMessage() = 0;
virtual void* GetData() = 0; virtual void* GetData() = 0;
virtual size_t GetSize() = 0; virtual size_t GetSize() = 0;
virtual void SetMessage(void* data, size_t size) = 0; virtual void SetMessage(void* data, size_t size) = 0;
virtual void CloseMessage() = 0;
virtual void Copy(FairMQMessage* msg) = 0; virtual void Copy(FairMQMessage* msg) = 0;
virtual ~FairMQMessage() {}; virtual ~FairMQMessage() {};

View File

@ -1,71 +0,0 @@
/**
* FairMQProcessor.cxx
*
* @since 2012-12-06
* @author D. Klein, A. Rybalchenko
*/
#include <boost/thread.hpp>
#include <boost/bind.hpp>
#include "FairMQProcessor.h"
#include "FairMQLogger.h"
FairMQProcessor::FairMQProcessor() :
fProcessorTask(NULL)
{
}
FairMQProcessor::~FairMQProcessor()
{
delete fProcessorTask;
}
void FairMQProcessor::SetTask(FairMQProcessorTask* task)
{
fProcessorTask = task;
}
void FairMQProcessor::Init()
{
FairMQDevice::Init();
fProcessorTask->InitTask();
}
void FairMQProcessor::Run()
{
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, ">>>>>>> Run <<<<<<<");
boost::thread rateLogger(boost::bind(&FairMQDevice::LogSocketRates, this));
int receivedMsgs = 0;
int sentMsgs = 0;
bool received = false;
while ( fState == RUNNING ) {
FairMQMessage* msg = fTransportFactory->CreateMessage();
received = fPayloadInputs->at(0)->Receive(msg);
receivedMsgs++;
if (received) {
fProcessorTask->Exec(msg, NULL);
fPayloadOutputs->at(0)->Send(msg);
sentMsgs++;
received = false;
}
delete msg;
}
cout << "I've received " << receivedMsgs << " and sent " << sentMsgs << " messages!" << endl;
boost::this_thread::sleep(boost::posix_time::milliseconds(5000));
rateLogger.interrupt();
rateLogger.join();
}

View File

@ -1,28 +0,0 @@
/**
* FairMQProcessor.h
*
* @since 2012-12-06
* @author D. Klein, A. Rybalchenko
*/
#ifndef FAIRMQPROCESSOR_H_
#define FAIRMQPROCESSOR_H_
#include "FairMQDevice.h"
#include "FairMQProcessorTask.h"
class FairMQProcessor: public FairMQDevice
{
public:
FairMQProcessor();
virtual ~FairMQProcessor();
void SetTask(FairMQProcessorTask* task);
protected:
virtual void Init();
virtual void Run();
private:
FairMQProcessorTask* fProcessorTask;
};
#endif /* FAIRMQPROCESSOR_H_ */

View File

@ -1,17 +0,0 @@
/**
* FairMQProcessorTask.cxx
*
* @since Dec 6, 2012-12-06
* @author: D. Klein, A. Rybalchenko
*/
#include "FairMQProcessorTask.h"
FairMQProcessorTask::FairMQProcessorTask()
{
}
FairMQProcessorTask::~FairMQProcessorTask()
{
}

View File

@ -1,25 +0,0 @@
/**
* FairMQProcessorTask.h
*
* @since Dec 6, 2012-12-06
* @author: D. Klein, A. Rybalchenko
*/
#ifndef FAIRMQPROCESSORTASK_H_
#define FAIRMQPROCESSORTASK_H_
#include <vector>
#include "FairMQMessage.h"
#include "FairTask.h"
class FairMQProcessorTask : public FairTask
{
public:
FairMQProcessorTask();
virtual ~FairMQProcessorTask();
virtual void Exec(FairMQMessage* msg, Option_t* opt) = 0;
};
#endif /* FAIRMQPROCESSORTASK_H_ */

View File

@ -1,217 +0,0 @@
/**
* FairMQSampler.cpp
*
* @since 2012-09-27
* @author D. Klein, A. Rybalchenko
*/
#include <vector>
#include <iostream>
#include <boost/thread.hpp>
#include <boost/bind.hpp>
#include <boost/timer/timer.hpp>
#include "TList.h"
#include "TObjString.h"
#include "TClonesArray.h"
#include "FairParRootFileIo.h"
#include "FairRuntimeDb.h"
#include "TROOT.h"
#include "FairMQSampler.h"
#include "FairMQLogger.h"
FairMQSampler::FairMQSampler() :
fFairRunAna(new FairRunAna()),
fSamplerTask(NULL),
fInputFile(""),
fParFile(""),
fBranch(""),
fEventRate(1)
{
}
FairMQSampler::~FairMQSampler()
{
if(fFairRunAna) {
fFairRunAna->TerminateRun();
}
}
void FairMQSampler::Init()
{
FairMQDevice::Init();
fSamplerTask->SetBranch(fBranch);
fSamplerTask->SetTransport(fTransportFactory);
fFairRunAna->SetInputFile(TString(fInputFile));
TString output = fInputFile;
output.Append(".out.root");
fFairRunAna->SetOutputFile(output.Data());
fFairRunAna->AddTask(fSamplerTask);
FairRuntimeDb* rtdb = fFairRunAna->GetRuntimeDb();
FairParRootFileIo* parInput1 = new FairParRootFileIo();
parInput1->open(TString(fParFile).Data());
rtdb->setFirstInput(parInput1);
rtdb->print();
fFairRunAna->Init();
//fFairRunAna->Run(0, 0);
FairRootManager* ioman = FairRootManager::Instance();
fNumEvents = int((ioman->GetInChain())->GetEntries());
}
void FairMQSampler::Run()
{
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, ">>>>>>> Run <<<<<<<");
boost::this_thread::sleep(boost::posix_time::milliseconds(1000));
boost::thread rateLogger(boost::bind(&FairMQDevice::LogSocketRates, this));
boost::thread resetEventCounter(boost::bind(&FairMQSampler::ResetEventCounter, this));
//boost::thread commandListener(boost::bind(&FairMQSampler::ListenToCommands, this));
int sentMsgs = 0;
boost::timer::auto_cpu_timer timer;
cout << "Number of events to process: " << fNumEvents << endl;
Long64_t eventNr = 0;
// while ( fState == RUNNING ) {
for ( /* eventNr */ ; eventNr < fNumEvents; eventNr++ ) {
fFairRunAna->RunMQ(eventNr);
fPayloadOutputs->at(0)->Send(fSamplerTask->GetOutput());
sentMsgs++;
--fEventCounter;
while (fEventCounter == 0) {
boost::this_thread::sleep(boost::posix_time::milliseconds(1));
}
if( fState != RUNNING ) { break; }
}
boost::this_thread::interruption_point();
// }
boost::timer::cpu_times const elapsed_time(timer.elapsed());
cout << "Sent everything in:\n" << boost::timer::format(elapsed_time, 2) << endl;
cout << "Sent " << sentMsgs << " messages!" << endl;
//boost::this_thread::sleep(boost::posix_time::milliseconds(5000));
rateLogger.interrupt();
rateLogger.join();
resetEventCounter.interrupt();
resetEventCounter.join();
//commandListener.interrupt();
//commandListener.join();
}
void FairMQSampler::ResetEventCounter()
{
while ( true ) {
try {
fEventCounter = fEventRate / 100;
boost::this_thread::sleep(boost::posix_time::milliseconds(10));
} catch (boost::thread_interrupted&) {
cout << "resetEventCounter interrupted" << endl;
break;
}
}
FairMQLogger::GetInstance()->Log(FairMQLogger::DEBUG, ">>>>>>> stopping resetEventCounter <<<<<<<");
}
void FairMQSampler::ListenToCommands()
{
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, ">>>>>>> ListenToCommands <<<<<<<");
bool received = false;
while ( true ) {
try {
FairMQMessage* msg = fTransportFactory->CreateMessage();
received = fPayloadInputs->at(0)->Receive(msg);
if (received) {
//command handling goes here.
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, "> received command <");
received = false;
}
delete msg;
boost::this_thread::interruption_point();
} catch (boost::thread_interrupted&) {
cout << "commandListener interrupted" << endl;
break;
}
}
FairMQLogger::GetInstance()->Log(FairMQLogger::DEBUG, ">>>>>>> stopping commandListener <<<<<<<");
}
void FairMQSampler::SetProperty(const int key, const string& value, const int slot/*= 0*/)
{
switch (key) {
case InputFile:
fInputFile = value;
break;
case ParFile:
fParFile = value;
break;
case Branch:
fBranch = value;
break;
default:
FairMQDevice::SetProperty(key, value, slot);
break;
}
}
string FairMQSampler::GetProperty(const int key, const string& default_/*= ""*/, const int slot/*= 0*/)
{
switch (key) {
case InputFile:
return fInputFile;
case ParFile:
return fParFile;
case Branch:
return fBranch;
default:
return FairMQDevice::GetProperty(key, default_, slot);
}
}
void FairMQSampler::SetProperty(const int key, const int value, const int slot/*= 0*/)
{
switch (key) {
case EventRate:
fEventRate = value;
break;
default:
FairMQDevice::SetProperty(key, value, slot);
break;
}
}
int FairMQSampler::GetProperty(const int key, const int default_/*= 0*/, const int slot/*= 0*/)
{
switch (key) {
case EventRate:
return fEventRate;
default:
return FairMQDevice::GetProperty(key, default_, slot);
}
}

View File

@ -1,60 +0,0 @@
/**
* FairMQSampler.h
*
* @since 2012-09-27
* @author D. Klein, A. Rybalchenko
*/
#ifndef FAIRMQSAMPLER_H_
#define FAIRMQSAMPLER_H_
#include "FairRunAna.h"
#include "FairTask.h"
#include "FairMQDevice.h"
#include "FairMQSamplerTask.h"
/**
* Reads simulated digis from a root file and samples the digi as a time-series UDP stream.
* Must be initialized with the filename to the root file and the name of the sub-detector
* branch, whose digis should be streamed.
*
* The purpose of this class is to provide a data source of digis very similar to the
* future detector output at the point where the detector is connected to the online
* computing farm. For the development of online analysis algorithms, it is very important
* to simulate the future detector output as realistic as possible to evaluate the
* feasibility and quality of the various possible online analysis features.
*/
class FairMQSampler: public FairMQDevice
{
public:
enum {
InputFile = FairMQDevice::Last,
Branch,
ParFile,
EventRate
};
FairMQSampler();
virtual ~FairMQSampler();
void ResetEventCounter();
virtual void ListenToCommands();
virtual void SetProperty(const int key, const string& value, const int slot = 0);
virtual string GetProperty(const int key, const string& default_ = "", const int slot = 0);
virtual void SetProperty(const int key, const int value, const int slot = 0);
virtual int GetProperty(const int key, const int default_ = 0, const int slot = 0);
protected:
FairRunAna* fFairRunAna;
int fNumEvents;
FairMQSamplerTask* fSamplerTask;
string fInputFile; // Filename of a root file containing the simulated digis.
string fParFile;
string fBranch; // The name of the sub-detector branch to stream the digis from.
int fEventRate;
int fEventCounter;
virtual void Init();
virtual void Run();
};
#endif /* FAIRMQSAMPLER_H_ */

View File

@ -1,54 +0,0 @@
/**
* FairMQSamplerTask.cxx
*
* @since 2012-11-22
* @author D. Klein, A. Rybalchenko
*/
#include "FairMQSamplerTask.h"
FairMQSamplerTask::FairMQSamplerTask(const Text_t* name, int iVerbose) :
FairTask(name, iVerbose),
fInput(NULL),
fBranch(""),
fOutput(NULL)
{
}
FairMQSamplerTask::FairMQSamplerTask() :
FairTask( "Abstract base task used for loading a branch from a root file into memory"),
fInput(NULL),
fBranch(""),
fOutput(NULL)
{
}
FairMQSamplerTask::~FairMQSamplerTask()
{
delete fInput;
//delete fOutput; // leave fOutput in memory, because it is needed even after FairMQSamplerTask is terminated. ClearOutput will clean it when it is no longer needed.
}
InitStatus FairMQSamplerTask::Init()
{
FairRootManager* ioman = FairRootManager::Instance();
fInput = (TClonesArray*) ioman->GetObject(fBranch.Data());
return kSUCCESS;
}
void FairMQSamplerTask::SetBranch(TString branch)
{
fBranch = branch;
}
FairMQMessage* FairMQSamplerTask::GetOutput()
{
return fOutput;
}
void FairMQSamplerTask::SetTransport(FairMQTransportFactory* factory)
{
fTransportFactory = factory;
}

View File

@ -1,39 +0,0 @@
/**
* FairMQSamplerTask.h
*
* @since 2012-11-22
* @author D. Klein, A. Rybalchenko
*/
#ifndef FAIRMQSAMPLERTASK_H_
#define FAIRMQSAMPLERTASK_H_
#include "FairTask.h"
#include <vector>
#include "TClonesArray.h"
#include <string>
#include "FairMQMessage.h"
#include "FairMQTransportFactory.h"
#include "TString.h"
class FairMQSamplerTask: public FairTask
{
public:
FairMQSamplerTask();
FairMQSamplerTask(const Text_t* name, int iVerbose=1);
virtual ~FairMQSamplerTask();
virtual InitStatus Init();
virtual void Exec(Option_t* opt) = 0;
void SetBranch(TString branch);
FairMQMessage* GetOutput();
void SetTransport(FairMQTransportFactory* factory);
protected:
TClonesArray* fInput;
TString fBranch;
FairMQMessage* fOutput;
FairMQTransportFactory* fTransportFactory;
};
#endif /* FAIRMQSAMPLERTASK_H_ */

View File

@ -13,6 +13,7 @@
#include "FairMQMessage.h" #include "FairMQMessage.h"
#include "FairMQSocket.h" #include "FairMQSocket.h"
#include "FairMQPoller.h" #include "FairMQPoller.h"
#include "FairMQLogger.h"
using std::vector; using std::vector;

View File

@ -12,7 +12,6 @@
#include "FairMQMessageNN.h" #include "FairMQMessageNN.h"
#include "FairMQLogger.h" #include "FairMQLogger.h"
FairMQMessageNN::FairMQMessageNN() : FairMQMessageNN::FairMQMessageNN() :
fSize(0), fSize(0),
fMessage(NULL) fMessage(NULL)
@ -22,7 +21,7 @@ FairMQMessageNN::FairMQMessageNN() :
FairMQMessageNN::FairMQMessageNN(size_t size) FairMQMessageNN::FairMQMessageNN(size_t size)
{ {
fMessage = nn_allocmsg(size, 0); fMessage = nn_allocmsg(size, 0);
if(!fMessage){ if (!fMessage){
stringstream logmsg; stringstream logmsg;
logmsg << "failed allocating message, reason: " << nn_strerror(errno); logmsg << "failed allocating message, reason: " << nn_strerror(errno);
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
@ -33,7 +32,7 @@ FairMQMessageNN::FairMQMessageNN(size_t size)
FairMQMessageNN::FairMQMessageNN(void* data, size_t size) FairMQMessageNN::FairMQMessageNN(void* data, size_t size)
{ {
fMessage = nn_allocmsg(size, 0); fMessage = nn_allocmsg(size, 0);
if(!fMessage){ if (!fMessage){
stringstream logmsg; stringstream logmsg;
logmsg << "failed allocating message, reason: " << nn_strerror(errno); logmsg << "failed allocating message, reason: " << nn_strerror(errno);
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
@ -53,7 +52,7 @@ void FairMQMessageNN::Rebuild(size_t size)
{ {
Clear(); Clear();
fMessage = nn_allocmsg(size, 0); fMessage = nn_allocmsg(size, 0);
if(!fMessage){ if (!fMessage){
stringstream logmsg; stringstream logmsg;
logmsg << "failed allocating message, reason: " << nn_strerror(errno); logmsg << "failed allocating message, reason: " << nn_strerror(errno);
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
@ -65,11 +64,12 @@ void FairMQMessageNN::Rebuild(void* data, size_t size)
{ {
Clear(); Clear();
fMessage = nn_allocmsg(size, 0); fMessage = nn_allocmsg(size, 0);
if(!fMessage){ if (!fMessage){
stringstream logmsg; stringstream logmsg;
logmsg << "failed allocating message, reason: " << nn_strerror(errno); logmsg << "failed allocating message, reason: " << nn_strerror(errno);
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
} }
memcpy (fMessage, data, size);
fSize = size; fSize = size;
} }
@ -96,9 +96,9 @@ void FairMQMessageNN::SetMessage(void* data, size_t size)
void FairMQMessageNN::Copy(FairMQMessage* msg) void FairMQMessageNN::Copy(FairMQMessage* msg)
{ {
if(fMessage){ if (fMessage){
int rc = nn_freemsg(fMessage); int rc = nn_freemsg(fMessage);
if( rc < 0 ){ if ( rc < 0 ){
stringstream logmsg; stringstream logmsg;
logmsg << "failed freeing message, reason: " << nn_strerror(errno); logmsg << "failed freeing message, reason: " << nn_strerror(errno);
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
@ -108,7 +108,7 @@ void FairMQMessageNN::Copy(FairMQMessage* msg)
size_t size = msg->GetSize(); size_t size = msg->GetSize();
fMessage = nn_allocmsg(size, 0); fMessage = nn_allocmsg(size, 0);
if(!fMessage){ if (!fMessage){
stringstream logmsg; stringstream logmsg;
logmsg << "failed allocating message, reason: " << nn_strerror(errno); logmsg << "failed allocating message, reason: " << nn_strerror(errno);
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
@ -117,7 +117,7 @@ void FairMQMessageNN::Copy(FairMQMessage* msg)
fSize = size; fSize = size;
} }
void FairMQMessageNN::Clear() inline void FairMQMessageNN::Clear()
{ {
int rc = nn_freemsg(fMessage); int rc = nn_freemsg(fMessage);
if (rc < 0) { if (rc < 0) {

View File

@ -28,16 +28,18 @@ class FairMQMessageNN : public FairMQMessage
virtual void* GetData(); virtual void* GetData();
virtual size_t GetSize(); virtual size_t GetSize();
virtual void Copy(FairMQMessage* msg); virtual void SetMessage(void* data, size_t size);
void SetMessage(void* data, size_t size); virtual void CloseMessage() {};
void Clear(); virtual void Copy(FairMQMessage* msg);
virtual ~FairMQMessageNN(); virtual ~FairMQMessageNN();
private: private:
void* fMessage; void* fMessage;
size_t fSize; size_t fSize;
void Clear();
}; };
#endif /* FAIRMQMESSAGENN_H_ */ #endif /* FAIRMQMESSAGENN_H_ */

View File

@ -81,7 +81,7 @@ size_t FairMQSocketNN::Send(FairMQMessage* msg)
size_t FairMQSocketNN::Receive(FairMQMessage* msg) size_t FairMQSocketNN::Receive(FairMQMessage* msg)
{ {
void* ptr = msg->GetMessage(); void* ptr = NULL;
int rc = nn_recv(fSocket, &ptr, NN_MSG, 0); int rc = nn_recv(fSocket, &ptr, NN_MSG, 0);
if (rc < 0) { if (rc < 0) {
stringstream logmsg; stringstream logmsg;
@ -98,7 +98,7 @@ size_t FairMQSocketNN::Receive(FairMQMessage* msg)
void* FairMQSocketNN::GetSocket() void* FairMQSocketNN::GetSocket()
{ {
return NULL;// dummy method to compy with the interface. functionality not possible in zeromq. return NULL; // dummy method to comply with the interface. functionality not possible in zeromq.
} }
int FairMQSocketNN::GetSocket(int nothing) int FairMQSocketNN::GetSocket(int nothing)

View File

@ -9,7 +9,7 @@
FairMQTransportFactoryNN::FairMQTransportFactoryNN() FairMQTransportFactoryNN::FairMQTransportFactoryNN()
{ {
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, "Using NanoMsg library");
} }
FairMQMessage* FairMQTransportFactoryNN::CreateMessage() FairMQMessage* FairMQTransportFactoryNN::CreateMessage()

View File

@ -2,7 +2,7 @@
* FairMQMessageZMQ.cxx * FairMQMessageZMQ.cxx
* *
* @since 2012-12-05 * @since 2012-12-05
* @author D. Klein, A. Rybalchenko * @author D. Klein, A. Rybalchenko, N. Winckler
*/ */
#include <cstdlib> #include <cstdlib>
@ -11,9 +11,10 @@
#include "FairMQLogger.h" #include "FairMQLogger.h"
FairMQMessageZMQ::FairMQMessageZMQ() FairMQMessageZMQ::FairMQMessageZMQ() :
fMessage(new zmq_msg_t())
{ {
int rc = zmq_msg_init (&fMessage); int rc = zmq_msg_init (fMessage);
if (rc != 0) { if (rc != 0) {
stringstream logmsg; stringstream logmsg;
logmsg << "failed initializing message, reason: " << zmq_strerror(errno); logmsg << "failed initializing message, reason: " << zmq_strerror(errno);
@ -21,9 +22,10 @@ FairMQMessageZMQ::FairMQMessageZMQ()
} }
} }
FairMQMessageZMQ::FairMQMessageZMQ(size_t size) FairMQMessageZMQ::FairMQMessageZMQ(size_t size) :
fMessage(new zmq_msg_t())
{ {
int rc = zmq_msg_init_size (&fMessage, size); int rc = zmq_msg_init_size (fMessage, size);
if (rc != 0) { if (rc != 0) {
stringstream logmsg; stringstream logmsg;
logmsg << "failed initializing message with size, reason: " << zmq_strerror(errno); logmsg << "failed initializing message with size, reason: " << zmq_strerror(errno);
@ -31,9 +33,10 @@ FairMQMessageZMQ::FairMQMessageZMQ(size_t size)
} }
} }
FairMQMessageZMQ::FairMQMessageZMQ(void* data, size_t size) FairMQMessageZMQ::FairMQMessageZMQ(void* data, size_t size) :
fMessage(new zmq_msg_t())
{ {
int rc = zmq_msg_init_data (&fMessage, data, size, &CleanUp, NULL); int rc = zmq_msg_init_data (fMessage, data, size, &CleanUp, NULL);
if (rc != 0) { if (rc != 0) {
stringstream logmsg; stringstream logmsg;
logmsg << "failed initializing message with data, reason: " << zmq_strerror(errno); logmsg << "failed initializing message with data, reason: " << zmq_strerror(errno);
@ -43,14 +46,8 @@ FairMQMessageZMQ::FairMQMessageZMQ(void* data, size_t size)
void FairMQMessageZMQ::Rebuild() void FairMQMessageZMQ::Rebuild()
{ {
int rc = zmq_msg_close (&fMessage); CloseMessage();
if (rc != 0) { int rc = zmq_msg_init (fMessage);
stringstream logmsg;
logmsg << "failed closing message, reason: " << zmq_strerror(errno);
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
}
rc = zmq_msg_init (&fMessage);
if (rc != 0) { if (rc != 0) {
stringstream logmsg; stringstream logmsg;
logmsg << "failed initializing message, reason: " << zmq_strerror(errno); logmsg << "failed initializing message, reason: " << zmq_strerror(errno);
@ -60,14 +57,8 @@ void FairMQMessageZMQ::Rebuild()
void FairMQMessageZMQ::Rebuild(size_t size) void FairMQMessageZMQ::Rebuild(size_t size)
{ {
int rc = zmq_msg_close (&fMessage); CloseMessage();
if (rc != 0) { int rc = zmq_msg_init_size (fMessage, size);
stringstream logmsg;
logmsg << "failed closing message, reason: " << zmq_strerror(errno);
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
}
rc = zmq_msg_init_size (&fMessage, size);
if (rc != 0) { if (rc != 0) {
stringstream logmsg; stringstream logmsg;
logmsg << "failed initializing message with size, reason: " << zmq_strerror(errno); logmsg << "failed initializing message with size, reason: " << zmq_strerror(errno);
@ -77,34 +68,28 @@ void FairMQMessageZMQ::Rebuild(size_t size)
void FairMQMessageZMQ::Rebuild(void* data, size_t size) void FairMQMessageZMQ::Rebuild(void* data, size_t size)
{ {
int rc = zmq_msg_close (&fMessage); CloseMessage();
int rc = zmq_msg_init_data (fMessage, data, size, &CleanUp, NULL);
if (rc != 0) { if (rc != 0) {
stringstream logmsg; stringstream logmsg;
logmsg << "failed closing message, reason: " << zmq_strerror(errno); logmsg << "failed initializing message with data, reason: " << zmq_strerror(errno);
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
} }
rc = zmq_msg_init_data (&fMessage, data, size, &CleanUp, NULL);
if (rc != 0) {
stringstream logmsg2;
logmsg2 << "failed initializing message with data, reason: " << zmq_strerror(errno);
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg2.str());
}
} }
void* FairMQMessageZMQ::GetMessage() void* FairMQMessageZMQ::GetMessage()
{ {
return &fMessage; return fMessage;
} }
void* FairMQMessageZMQ::GetData() void* FairMQMessageZMQ::GetData()
{ {
return zmq_msg_data (&fMessage); return zmq_msg_data (fMessage);
} }
size_t FairMQMessageZMQ::GetSize() size_t FairMQMessageZMQ::GetSize()
{ {
return zmq_msg_size (&fMessage); return zmq_msg_size (fMessage);
} }
void FairMQMessageZMQ::SetMessage(void* data, size_t size) void FairMQMessageZMQ::SetMessage(void* data, size_t size)
@ -112,9 +97,19 @@ void FairMQMessageZMQ::SetMessage(void* data, size_t size)
// dummy method to comply with the interface. functionality not allowed in zeromq. // dummy method to comply with the interface. functionality not allowed in zeromq.
} }
inline void FairMQMessageZMQ::CloseMessage()
{
int rc = zmq_msg_close (fMessage);
if (rc != 0) {
stringstream logmsg;
logmsg << "failed closing message, reason: " << zmq_strerror(errno);
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
}
}
void FairMQMessageZMQ::Copy(FairMQMessage* msg) void FairMQMessageZMQ::Copy(FairMQMessage* msg)
{ {
int rc = zmq_msg_copy (&fMessage, &(static_cast<FairMQMessageZMQ*>(msg)->fMessage)); int rc = zmq_msg_copy (fMessage, (static_cast<FairMQMessageZMQ*>(msg)->fMessage));
if (rc != 0) { if (rc != 0) {
stringstream logmsg; stringstream logmsg;
logmsg << "failed copying message, reason: " << zmq_strerror(errno); logmsg << "failed copying message, reason: " << zmq_strerror(errno);
@ -129,7 +124,7 @@ void FairMQMessageZMQ::CleanUp(void* data, void* hint)
FairMQMessageZMQ::~FairMQMessageZMQ() FairMQMessageZMQ::~FairMQMessageZMQ()
{ {
int rc = zmq_msg_close (&fMessage); int rc = zmq_msg_close (fMessage);
if (rc != 0) { if (rc != 0) {
stringstream logmsg; stringstream logmsg;
logmsg << "failed closing message with data, reason: " << zmq_strerror(errno); logmsg << "failed closing message with data, reason: " << zmq_strerror(errno);

View File

@ -24,13 +24,15 @@ class FairMQMessageZMQ : public FairMQMessage
virtual void Rebuild(); virtual void Rebuild();
virtual void Rebuild(size_t size); virtual void Rebuild(size_t size);
virtual void Rebuild(void* data, size_t site); virtual void Rebuild(void* data, size_t size);
virtual void* GetMessage(); virtual void* GetMessage();
virtual void* GetData(); virtual void* GetData();
virtual size_t GetSize(); virtual size_t GetSize();
virtual void SetMessage(void* data, size_t size); virtual void SetMessage(void* data, size_t size);
virtual void CloseMessage();
virtual void Copy(FairMQMessage* msg); virtual void Copy(FairMQMessage* msg);
static void CleanUp(void* data, void* hint); static void CleanUp(void* data, void* hint);
@ -38,7 +40,7 @@ class FairMQMessageZMQ : public FairMQMessage
virtual ~FairMQMessageZMQ(); virtual ~FairMQMessageZMQ();
private: private:
zmq_msg_t fMessage; zmq_msg_t* fMessage;
}; };
#endif /* FAIRMQMESSAGEZMQ_H_ */ #endif /* FAIRMQMESSAGEZMQ_H_ */

View File

@ -9,7 +9,7 @@
FairMQTransportFactoryZMQ::FairMQTransportFactoryZMQ() FairMQTransportFactoryZMQ::FairMQTransportFactoryZMQ()
{ {
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, "Using ZeroMQ library");
} }
FairMQMessage* FairMQTransportFactoryZMQ::CreateMessage() FairMQMessage* FairMQTransportFactoryZMQ::CreateMessage()