From dfc1dd6a06a52d93bdecc66f19f3c153cd213fe2 Mon Sep 17 00:00:00 2001 From: NicolasWinckler Date: Wed, 15 Jan 2014 16:52:48 +0100 Subject: [PATCH] - Make FairMQSampler generic. - Remove fairroot dependency from fairmq. --- fairmq/CMakeLists.txt | 9 +- fairmq/FairMQLogger.h | 4 +- fairmq/FairMQMessage.h | 3 +- fairmq/FairMQProcessor.cxx | 71 ------- fairmq/FairMQProcessor.h | 28 --- fairmq/FairMQProcessorTask.cxx | 17 -- fairmq/FairMQProcessorTask.h | 25 --- fairmq/FairMQSampler.cxx | 217 -------------------- fairmq/FairMQSampler.h | 60 ------ fairmq/FairMQSamplerTask.cxx | 54 ----- fairmq/FairMQSamplerTask.h | 39 ---- fairmq/FairMQTransportFactory.h | 1 + fairmq/nanomsg/FairMQMessageNN.cxx | 18 +- fairmq/nanomsg/FairMQMessageNN.h | 10 +- fairmq/nanomsg/FairMQSocketNN.cxx | 4 +- fairmq/nanomsg/FairMQTransportFactoryNN.cxx | 2 +- fairmq/zeromq/FairMQMessageZMQ.cxx | 71 +++---- fairmq/zeromq/FairMQMessageZMQ.h | 6 +- fairmq/zeromq/FairMQTransportFactoryZMQ.cxx | 2 +- 19 files changed, 62 insertions(+), 579 deletions(-) delete mode 100644 fairmq/FairMQProcessor.cxx delete mode 100644 fairmq/FairMQProcessor.h delete mode 100644 fairmq/FairMQProcessorTask.cxx delete mode 100644 fairmq/FairMQProcessorTask.h delete mode 100644 fairmq/FairMQSampler.cxx delete mode 100644 fairmq/FairMQSampler.h delete mode 100644 fairmq/FairMQSamplerTask.cxx delete mode 100644 fairmq/FairMQSamplerTask.h diff --git a/fairmq/CMakeLists.txt b/fairmq/CMakeLists.txt index a9b2e084..4ce8b438 100644 --- a/fairmq/CMakeLists.txt +++ b/fairmq/CMakeLists.txt @@ -1,8 +1,6 @@ set(INCLUDE_DIRECTORIES - ${BASE_INCLUDE_DIRECTORIES} ${CMAKE_SOURCE_DIR}/fairmq ${Boost_INCLUDE_DIR} - ${ROOT_INCLUDE_DIR} ) if(NANOMSG_FOUND) @@ -20,7 +18,6 @@ endif(NANOMSG_FOUND) include_directories(${INCLUDE_DIRECTORIES}) set(LINK_DIRECTORIES - ${ROOT_LIBRARY_DIR} ${Boost_LIBRARY_DIRS} ) @@ -34,17 +31,13 @@ set(SRCS "FairMQMessage.cxx" "FairMQSocket.cxx" "FairMQDevice.cxx" - "FairMQSampler.cxx" "FairMQBenchmarkSampler.cxx" - "FairMQProcessor.cxx" "FairMQSink.cxx" "FairMQBuffer.cxx" "FairMQProxy.cxx" "FairMQSplitter.cxx" "FairMQMerger.cxx" "FairMQPoller.cxx" - "FairMQSamplerTask.cxx" - "FairMQProcessorTask.cxx" ) if(NANOMSG_FOUND) @@ -75,7 +68,7 @@ endif(NANOMSG_FOUND) set(DEPENDENCIES ${DEPENDENCIES} ${CMAKE_THREAD_LIBS_INIT} - Base ParBase FairTools GeoBase boost_thread boost_timer boost_system + boost_thread boost_timer boost_system ) set(LIBRARY_NAME FairMQ) diff --git a/fairmq/FairMQLogger.h b/fairmq/FairMQLogger.h index 9725c70f..4e9db54e 100644 --- a/fairmq/FairMQLogger.h +++ b/fairmq/FairMQLogger.h @@ -25,11 +25,11 @@ class FairMQLogger DEBUG, INFO, ERROR, STATE }; FairMQLogger(); - FairMQLogger(const string& bindAdress); // TODO: check this for const ref + FairMQLogger(const string& bindAdress); virtual ~FairMQLogger(); void Log(int type, const string& logmsg); static FairMQLogger* GetInstance(); - static FairMQLogger* InitInstance(const string& bindAddress); // TODO: check this for const ref + static FairMQLogger* InitInstance(const string& bindAddress); }; typedef unsigned long long timestamp_t; diff --git a/fairmq/FairMQMessage.h b/fairmq/FairMQMessage.h index 6139e737..e16d5066 100644 --- a/fairmq/FairMQMessage.h +++ b/fairmq/FairMQMessage.h @@ -16,13 +16,14 @@ class FairMQMessage public: virtual void Rebuild() = 0; virtual void Rebuild(size_t size) = 0; - virtual void Rebuild(void* data, size_t site) = 0; + virtual void Rebuild(void* data, size_t size) = 0; virtual void* GetMessage() = 0; virtual void* GetData() = 0; virtual size_t GetSize() = 0; virtual void SetMessage(void* data, size_t size) = 0; + virtual void CloseMessage() = 0; virtual void Copy(FairMQMessage* msg) = 0; virtual ~FairMQMessage() {}; diff --git a/fairmq/FairMQProcessor.cxx b/fairmq/FairMQProcessor.cxx deleted file mode 100644 index 2aa614e1..00000000 --- a/fairmq/FairMQProcessor.cxx +++ /dev/null @@ -1,71 +0,0 @@ -/** - * FairMQProcessor.cxx - * - * @since 2012-12-06 - * @author D. Klein, A. Rybalchenko - */ - -#include -#include - -#include "FairMQProcessor.h" -#include "FairMQLogger.h" - -FairMQProcessor::FairMQProcessor() : - fProcessorTask(NULL) -{ -} - -FairMQProcessor::~FairMQProcessor() -{ - delete fProcessorTask; -} - -void FairMQProcessor::SetTask(FairMQProcessorTask* task) -{ - fProcessorTask = task; -} - -void FairMQProcessor::Init() -{ - FairMQDevice::Init(); - - fProcessorTask->InitTask(); -} - -void FairMQProcessor::Run() -{ - FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, ">>>>>>> Run <<<<<<<"); - - boost::thread rateLogger(boost::bind(&FairMQDevice::LogSocketRates, this)); - - int receivedMsgs = 0; - int sentMsgs = 0; - - bool received = false; - - while ( fState == RUNNING ) { - FairMQMessage* msg = fTransportFactory->CreateMessage(); - - received = fPayloadInputs->at(0)->Receive(msg); - receivedMsgs++; - - if (received) { - fProcessorTask->Exec(msg, NULL); - - fPayloadOutputs->at(0)->Send(msg); - sentMsgs++; - received = false; - } - - delete msg; - } - - cout << "I've received " << receivedMsgs << " and sent " << sentMsgs << " messages!" << endl; - - boost::this_thread::sleep(boost::posix_time::milliseconds(5000)); - - rateLogger.interrupt(); - rateLogger.join(); -} - diff --git a/fairmq/FairMQProcessor.h b/fairmq/FairMQProcessor.h deleted file mode 100644 index 47ded4cd..00000000 --- a/fairmq/FairMQProcessor.h +++ /dev/null @@ -1,28 +0,0 @@ -/** - * FairMQProcessor.h - * - * @since 2012-12-06 - * @author D. Klein, A. Rybalchenko - */ - -#ifndef FAIRMQPROCESSOR_H_ -#define FAIRMQPROCESSOR_H_ - -#include "FairMQDevice.h" -#include "FairMQProcessorTask.h" - - -class FairMQProcessor: public FairMQDevice -{ - public: - FairMQProcessor(); - virtual ~FairMQProcessor(); - void SetTask(FairMQProcessorTask* task); - protected: - virtual void Init(); - virtual void Run(); - private: - FairMQProcessorTask* fProcessorTask; -}; - -#endif /* FAIRMQPROCESSOR_H_ */ diff --git a/fairmq/FairMQProcessorTask.cxx b/fairmq/FairMQProcessorTask.cxx deleted file mode 100644 index ca0f6e06..00000000 --- a/fairmq/FairMQProcessorTask.cxx +++ /dev/null @@ -1,17 +0,0 @@ -/** - * FairMQProcessorTask.cxx - * - * @since Dec 6, 2012-12-06 - * @author: D. Klein, A. Rybalchenko - */ - -#include "FairMQProcessorTask.h" - - -FairMQProcessorTask::FairMQProcessorTask() -{ -} - -FairMQProcessorTask::~FairMQProcessorTask() -{ -} diff --git a/fairmq/FairMQProcessorTask.h b/fairmq/FairMQProcessorTask.h deleted file mode 100644 index 3d563d9e..00000000 --- a/fairmq/FairMQProcessorTask.h +++ /dev/null @@ -1,25 +0,0 @@ -/** - * FairMQProcessorTask.h - * - * @since Dec 6, 2012-12-06 - * @author: D. Klein, A. Rybalchenko - */ - -#ifndef FAIRMQPROCESSORTASK_H_ -#define FAIRMQPROCESSORTASK_H_ - -#include - -#include "FairMQMessage.h" -#include "FairTask.h" - - -class FairMQProcessorTask : public FairTask -{ - public: - FairMQProcessorTask(); - virtual ~FairMQProcessorTask(); - virtual void Exec(FairMQMessage* msg, Option_t* opt) = 0; -}; - -#endif /* FAIRMQPROCESSORTASK_H_ */ diff --git a/fairmq/FairMQSampler.cxx b/fairmq/FairMQSampler.cxx deleted file mode 100644 index 377d3dcc..00000000 --- a/fairmq/FairMQSampler.cxx +++ /dev/null @@ -1,217 +0,0 @@ -/** - * FairMQSampler.cpp - * - * @since 2012-09-27 - * @author D. Klein, A. Rybalchenko - */ - -#include -#include - -#include -#include -#include - -#include "TList.h" -#include "TObjString.h" -#include "TClonesArray.h" -#include "FairParRootFileIo.h" -#include "FairRuntimeDb.h" -#include "TROOT.h" - -#include "FairMQSampler.h" -#include "FairMQLogger.h" - - -FairMQSampler::FairMQSampler() : - fFairRunAna(new FairRunAna()), - fSamplerTask(NULL), - fInputFile(""), - fParFile(""), - fBranch(""), - fEventRate(1) -{ -} - -FairMQSampler::~FairMQSampler() -{ - if(fFairRunAna) { - fFairRunAna->TerminateRun(); - } -} - -void FairMQSampler::Init() -{ - FairMQDevice::Init(); - - fSamplerTask->SetBranch(fBranch); - fSamplerTask->SetTransport(fTransportFactory); - - fFairRunAna->SetInputFile(TString(fInputFile)); - TString output = fInputFile; - output.Append(".out.root"); - fFairRunAna->SetOutputFile(output.Data()); - - fFairRunAna->AddTask(fSamplerTask); - - FairRuntimeDb* rtdb = fFairRunAna->GetRuntimeDb(); - FairParRootFileIo* parInput1 = new FairParRootFileIo(); - parInput1->open(TString(fParFile).Data()); - rtdb->setFirstInput(parInput1); - rtdb->print(); - - fFairRunAna->Init(); - //fFairRunAna->Run(0, 0); - FairRootManager* ioman = FairRootManager::Instance(); - fNumEvents = int((ioman->GetInChain())->GetEntries()); -} - -void FairMQSampler::Run() -{ - FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, ">>>>>>> Run <<<<<<<"); - boost::this_thread::sleep(boost::posix_time::milliseconds(1000)); - - boost::thread rateLogger(boost::bind(&FairMQDevice::LogSocketRates, this)); - boost::thread resetEventCounter(boost::bind(&FairMQSampler::ResetEventCounter, this)); - //boost::thread commandListener(boost::bind(&FairMQSampler::ListenToCommands, this)); - - int sentMsgs = 0; - - boost::timer::auto_cpu_timer timer; - - cout << "Number of events to process: " << fNumEvents << endl; - - Long64_t eventNr = 0; - -// while ( fState == RUNNING ) { - - for ( /* eventNr */ ; eventNr < fNumEvents; eventNr++ ) { - fFairRunAna->RunMQ(eventNr); - - fPayloadOutputs->at(0)->Send(fSamplerTask->GetOutput()); - sentMsgs++; - - --fEventCounter; - - while (fEventCounter == 0) { - boost::this_thread::sleep(boost::posix_time::milliseconds(1)); - } - - if( fState != RUNNING ) { break; } - } - - boost::this_thread::interruption_point(); -// } - - boost::timer::cpu_times const elapsed_time(timer.elapsed()); - - cout << "Sent everything in:\n" << boost::timer::format(elapsed_time, 2) << endl; - cout << "Sent " << sentMsgs << " messages!" << endl; - - //boost::this_thread::sleep(boost::posix_time::milliseconds(5000)); - - rateLogger.interrupt(); - rateLogger.join(); - resetEventCounter.interrupt(); - resetEventCounter.join(); - //commandListener.interrupt(); - //commandListener.join(); -} - -void FairMQSampler::ResetEventCounter() -{ - while ( true ) { - try { - fEventCounter = fEventRate / 100; - boost::this_thread::sleep(boost::posix_time::milliseconds(10)); - } catch (boost::thread_interrupted&) { - cout << "resetEventCounter interrupted" << endl; - break; - } - } - FairMQLogger::GetInstance()->Log(FairMQLogger::DEBUG, ">>>>>>> stopping resetEventCounter <<<<<<<"); -} - -void FairMQSampler::ListenToCommands() -{ - FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, ">>>>>>> ListenToCommands <<<<<<<"); - - bool received = false; - - while ( true ) { - try { - FairMQMessage* msg = fTransportFactory->CreateMessage(); - - received = fPayloadInputs->at(0)->Receive(msg); - - if (received) { - //command handling goes here. - FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, "> received command <"); - received = false; - } - - delete msg; - - boost::this_thread::interruption_point(); - } catch (boost::thread_interrupted&) { - cout << "commandListener interrupted" << endl; - break; - } - } - FairMQLogger::GetInstance()->Log(FairMQLogger::DEBUG, ">>>>>>> stopping commandListener <<<<<<<"); -} - -void FairMQSampler::SetProperty(const int key, const string& value, const int slot/*= 0*/) -{ - switch (key) { - case InputFile: - fInputFile = value; - break; - case ParFile: - fParFile = value; - break; - case Branch: - fBranch = value; - break; - default: - FairMQDevice::SetProperty(key, value, slot); - break; - } -} - -string FairMQSampler::GetProperty(const int key, const string& default_/*= ""*/, const int slot/*= 0*/) -{ - switch (key) { - case InputFile: - return fInputFile; - case ParFile: - return fParFile; - case Branch: - return fBranch; - default: - return FairMQDevice::GetProperty(key, default_, slot); - } -} - -void FairMQSampler::SetProperty(const int key, const int value, const int slot/*= 0*/) -{ - switch (key) { - case EventRate: - fEventRate = value; - break; - default: - FairMQDevice::SetProperty(key, value, slot); - break; - } -} - -int FairMQSampler::GetProperty(const int key, const int default_/*= 0*/, const int slot/*= 0*/) -{ - switch (key) { - case EventRate: - return fEventRate; - default: - return FairMQDevice::GetProperty(key, default_, slot); - } -} - diff --git a/fairmq/FairMQSampler.h b/fairmq/FairMQSampler.h deleted file mode 100644 index 8f541eca..00000000 --- a/fairmq/FairMQSampler.h +++ /dev/null @@ -1,60 +0,0 @@ -/** - * FairMQSampler.h - * - * @since 2012-09-27 - * @author D. Klein, A. Rybalchenko - */ - -#ifndef FAIRMQSAMPLER_H_ -#define FAIRMQSAMPLER_H_ - -#include "FairRunAna.h" -#include "FairTask.h" -#include "FairMQDevice.h" -#include "FairMQSamplerTask.h" - - -/** - * Reads simulated digis from a root file and samples the digi as a time-series UDP stream. - * Must be initialized with the filename to the root file and the name of the sub-detector - * branch, whose digis should be streamed. - * - * The purpose of this class is to provide a data source of digis very similar to the - * future detector output at the point where the detector is connected to the online - * computing farm. For the development of online analysis algorithms, it is very important - * to simulate the future detector output as realistic as possible to evaluate the - * feasibility and quality of the various possible online analysis features. - */ -class FairMQSampler: public FairMQDevice -{ - public: - enum { - InputFile = FairMQDevice::Last, - Branch, - ParFile, - EventRate - }; - FairMQSampler(); - virtual ~FairMQSampler(); - - void ResetEventCounter(); - virtual void ListenToCommands(); - virtual void SetProperty(const int key, const string& value, const int slot = 0); - virtual string GetProperty(const int key, const string& default_ = "", const int slot = 0); - virtual void SetProperty(const int key, const int value, const int slot = 0); - virtual int GetProperty(const int key, const int default_ = 0, const int slot = 0); - protected: - FairRunAna* fFairRunAna; - int fNumEvents; - FairMQSamplerTask* fSamplerTask; - string fInputFile; // Filename of a root file containing the simulated digis. - string fParFile; - string fBranch; // The name of the sub-detector branch to stream the digis from. - int fEventRate; - int fEventCounter; - virtual void Init(); - virtual void Run(); - -}; - -#endif /* FAIRMQSAMPLER_H_ */ diff --git a/fairmq/FairMQSamplerTask.cxx b/fairmq/FairMQSamplerTask.cxx deleted file mode 100644 index 7fd26d18..00000000 --- a/fairmq/FairMQSamplerTask.cxx +++ /dev/null @@ -1,54 +0,0 @@ -/** - * FairMQSamplerTask.cxx - * - * @since 2012-11-22 - * @author D. Klein, A. Rybalchenko - */ - -#include "FairMQSamplerTask.h" - - -FairMQSamplerTask::FairMQSamplerTask(const Text_t* name, int iVerbose) : - FairTask(name, iVerbose), - fInput(NULL), - fBranch(""), - fOutput(NULL) -{ -} - -FairMQSamplerTask::FairMQSamplerTask() : - FairTask( "Abstract base task used for loading a branch from a root file into memory"), - fInput(NULL), - fBranch(""), - fOutput(NULL) -{ -} - -FairMQSamplerTask::~FairMQSamplerTask() -{ - delete fInput; - //delete fOutput; // leave fOutput in memory, because it is needed even after FairMQSamplerTask is terminated. ClearOutput will clean it when it is no longer needed. -} - -InitStatus FairMQSamplerTask::Init() -{ - FairRootManager* ioman = FairRootManager::Instance(); - fInput = (TClonesArray*) ioman->GetObject(fBranch.Data()); - - return kSUCCESS; -} - -void FairMQSamplerTask::SetBranch(TString branch) -{ - fBranch = branch; -} - -FairMQMessage* FairMQSamplerTask::GetOutput() -{ - return fOutput; -} - -void FairMQSamplerTask::SetTransport(FairMQTransportFactory* factory) -{ - fTransportFactory = factory; -} diff --git a/fairmq/FairMQSamplerTask.h b/fairmq/FairMQSamplerTask.h deleted file mode 100644 index a3a646ef..00000000 --- a/fairmq/FairMQSamplerTask.h +++ /dev/null @@ -1,39 +0,0 @@ -/** - * FairMQSamplerTask.h - * - * @since 2012-11-22 - * @author D. Klein, A. Rybalchenko - */ - -#ifndef FAIRMQSAMPLERTASK_H_ -#define FAIRMQSAMPLERTASK_H_ - -#include "FairTask.h" -#include -#include "TClonesArray.h" -#include -#include "FairMQMessage.h" -#include "FairMQTransportFactory.h" -#include "TString.h" - - -class FairMQSamplerTask: public FairTask -{ - public: - FairMQSamplerTask(); - FairMQSamplerTask(const Text_t* name, int iVerbose=1); - virtual ~FairMQSamplerTask(); - virtual InitStatus Init(); - virtual void Exec(Option_t* opt) = 0; - void SetBranch(TString branch); - FairMQMessage* GetOutput(); - void SetTransport(FairMQTransportFactory* factory); - - protected: - TClonesArray* fInput; - TString fBranch; - FairMQMessage* fOutput; - FairMQTransportFactory* fTransportFactory; -}; - -#endif /* FAIRMQSAMPLERTASK_H_ */ diff --git a/fairmq/FairMQTransportFactory.h b/fairmq/FairMQTransportFactory.h index 939573c8..3acb8ee4 100644 --- a/fairmq/FairMQTransportFactory.h +++ b/fairmq/FairMQTransportFactory.h @@ -13,6 +13,7 @@ #include "FairMQMessage.h" #include "FairMQSocket.h" #include "FairMQPoller.h" +#include "FairMQLogger.h" using std::vector; diff --git a/fairmq/nanomsg/FairMQMessageNN.cxx b/fairmq/nanomsg/FairMQMessageNN.cxx index a0ac641b..dd653f3e 100644 --- a/fairmq/nanomsg/FairMQMessageNN.cxx +++ b/fairmq/nanomsg/FairMQMessageNN.cxx @@ -12,7 +12,6 @@ #include "FairMQMessageNN.h" #include "FairMQLogger.h" - FairMQMessageNN::FairMQMessageNN() : fSize(0), fMessage(NULL) @@ -22,7 +21,7 @@ FairMQMessageNN::FairMQMessageNN() : FairMQMessageNN::FairMQMessageNN(size_t size) { fMessage = nn_allocmsg(size, 0); - if(!fMessage){ + if (!fMessage){ stringstream logmsg; logmsg << "failed allocating message, reason: " << nn_strerror(errno); FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); @@ -33,7 +32,7 @@ FairMQMessageNN::FairMQMessageNN(size_t size) FairMQMessageNN::FairMQMessageNN(void* data, size_t size) { fMessage = nn_allocmsg(size, 0); - if(!fMessage){ + if (!fMessage){ stringstream logmsg; logmsg << "failed allocating message, reason: " << nn_strerror(errno); FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); @@ -53,7 +52,7 @@ void FairMQMessageNN::Rebuild(size_t size) { Clear(); fMessage = nn_allocmsg(size, 0); - if(!fMessage){ + if (!fMessage){ stringstream logmsg; logmsg << "failed allocating message, reason: " << nn_strerror(errno); FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); @@ -65,11 +64,12 @@ void FairMQMessageNN::Rebuild(void* data, size_t size) { Clear(); fMessage = nn_allocmsg(size, 0); - if(!fMessage){ + if (!fMessage){ stringstream logmsg; logmsg << "failed allocating message, reason: " << nn_strerror(errno); FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); } + memcpy (fMessage, data, size); fSize = size; } @@ -96,9 +96,9 @@ void FairMQMessageNN::SetMessage(void* data, size_t size) void FairMQMessageNN::Copy(FairMQMessage* msg) { - if(fMessage){ + if (fMessage){ int rc = nn_freemsg(fMessage); - if( rc < 0 ){ + if ( rc < 0 ){ stringstream logmsg; logmsg << "failed freeing message, reason: " << nn_strerror(errno); FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); @@ -108,7 +108,7 @@ void FairMQMessageNN::Copy(FairMQMessage* msg) size_t size = msg->GetSize(); fMessage = nn_allocmsg(size, 0); - if(!fMessage){ + if (!fMessage){ stringstream logmsg; logmsg << "failed allocating message, reason: " << nn_strerror(errno); FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); @@ -117,7 +117,7 @@ void FairMQMessageNN::Copy(FairMQMessage* msg) fSize = size; } -void FairMQMessageNN::Clear() +inline void FairMQMessageNN::Clear() { int rc = nn_freemsg(fMessage); if (rc < 0) { diff --git a/fairmq/nanomsg/FairMQMessageNN.h b/fairmq/nanomsg/FairMQMessageNN.h index b725a5d3..4c91e1b2 100644 --- a/fairmq/nanomsg/FairMQMessageNN.h +++ b/fairmq/nanomsg/FairMQMessageNN.h @@ -19,7 +19,7 @@ class FairMQMessageNN : public FairMQMessage FairMQMessageNN(); FairMQMessageNN(size_t size); FairMQMessageNN(void* data, size_t size); - + virtual void Rebuild(); virtual void Rebuild(size_t size); virtual void Rebuild(void* data, size_t site); @@ -28,16 +28,18 @@ class FairMQMessageNN : public FairMQMessage virtual void* GetData(); virtual size_t GetSize(); - virtual void Copy(FairMQMessage* msg); + virtual void SetMessage(void* data, size_t size); - void SetMessage(void* data, size_t size); - void Clear(); + virtual void CloseMessage() {}; + virtual void Copy(FairMQMessage* msg); virtual ~FairMQMessageNN(); private: void* fMessage; size_t fSize; + + void Clear(); }; #endif /* FAIRMQMESSAGENN_H_ */ diff --git a/fairmq/nanomsg/FairMQSocketNN.cxx b/fairmq/nanomsg/FairMQSocketNN.cxx index 2d0a43f6..e5a6766a 100644 --- a/fairmq/nanomsg/FairMQSocketNN.cxx +++ b/fairmq/nanomsg/FairMQSocketNN.cxx @@ -81,7 +81,7 @@ size_t FairMQSocketNN::Send(FairMQMessage* msg) size_t FairMQSocketNN::Receive(FairMQMessage* msg) { - void* ptr = msg->GetMessage(); + void* ptr = NULL; int rc = nn_recv(fSocket, &ptr, NN_MSG, 0); if (rc < 0) { stringstream logmsg; @@ -98,7 +98,7 @@ size_t FairMQSocketNN::Receive(FairMQMessage* msg) void* FairMQSocketNN::GetSocket() { - return NULL;// dummy method to compy with the interface. functionality not possible in zeromq. + return NULL; // dummy method to comply with the interface. functionality not possible in zeromq. } int FairMQSocketNN::GetSocket(int nothing) diff --git a/fairmq/nanomsg/FairMQTransportFactoryNN.cxx b/fairmq/nanomsg/FairMQTransportFactoryNN.cxx index 6a5eaa9f..c0facc7e 100644 --- a/fairmq/nanomsg/FairMQTransportFactoryNN.cxx +++ b/fairmq/nanomsg/FairMQTransportFactoryNN.cxx @@ -9,7 +9,7 @@ FairMQTransportFactoryNN::FairMQTransportFactoryNN() { - + FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, "Using NanoMsg library"); } FairMQMessage* FairMQTransportFactoryNN::CreateMessage() diff --git a/fairmq/zeromq/FairMQMessageZMQ.cxx b/fairmq/zeromq/FairMQMessageZMQ.cxx index a30fd67f..af251b69 100644 --- a/fairmq/zeromq/FairMQMessageZMQ.cxx +++ b/fairmq/zeromq/FairMQMessageZMQ.cxx @@ -2,7 +2,7 @@ * FairMQMessageZMQ.cxx * * @since 2012-12-05 - * @author D. Klein, A. Rybalchenko + * @author D. Klein, A. Rybalchenko, N. Winckler */ #include @@ -11,9 +11,10 @@ #include "FairMQLogger.h" -FairMQMessageZMQ::FairMQMessageZMQ() +FairMQMessageZMQ::FairMQMessageZMQ() : + fMessage(new zmq_msg_t()) { - int rc = zmq_msg_init (&fMessage); + int rc = zmq_msg_init (fMessage); if (rc != 0) { stringstream logmsg; logmsg << "failed initializing message, reason: " << zmq_strerror(errno); @@ -21,9 +22,10 @@ FairMQMessageZMQ::FairMQMessageZMQ() } } -FairMQMessageZMQ::FairMQMessageZMQ(size_t size) +FairMQMessageZMQ::FairMQMessageZMQ(size_t size) : + fMessage(new zmq_msg_t()) { - int rc = zmq_msg_init_size (&fMessage, size); + int rc = zmq_msg_init_size (fMessage, size); if (rc != 0) { stringstream logmsg; logmsg << "failed initializing message with size, reason: " << zmq_strerror(errno); @@ -31,9 +33,10 @@ FairMQMessageZMQ::FairMQMessageZMQ(size_t size) } } -FairMQMessageZMQ::FairMQMessageZMQ(void* data, size_t size) +FairMQMessageZMQ::FairMQMessageZMQ(void* data, size_t size) : + fMessage(new zmq_msg_t()) { - int rc = zmq_msg_init_data (&fMessage, data, size, &CleanUp, NULL); + int rc = zmq_msg_init_data (fMessage, data, size, &CleanUp, NULL); if (rc != 0) { stringstream logmsg; logmsg << "failed initializing message with data, reason: " << zmq_strerror(errno); @@ -43,14 +46,8 @@ FairMQMessageZMQ::FairMQMessageZMQ(void* data, size_t size) void FairMQMessageZMQ::Rebuild() { - int rc = zmq_msg_close (&fMessage); - if (rc != 0) { - stringstream logmsg; - logmsg << "failed closing message, reason: " << zmq_strerror(errno); - FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); - } - - rc = zmq_msg_init (&fMessage); + CloseMessage(); + int rc = zmq_msg_init (fMessage); if (rc != 0) { stringstream logmsg; logmsg << "failed initializing message, reason: " << zmq_strerror(errno); @@ -60,14 +57,8 @@ void FairMQMessageZMQ::Rebuild() void FairMQMessageZMQ::Rebuild(size_t size) { - int rc = zmq_msg_close (&fMessage); - if (rc != 0) { - stringstream logmsg; - logmsg << "failed closing message, reason: " << zmq_strerror(errno); - FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); - } - - rc = zmq_msg_init_size (&fMessage, size); + CloseMessage(); + int rc = zmq_msg_init_size (fMessage, size); if (rc != 0) { stringstream logmsg; logmsg << "failed initializing message with size, reason: " << zmq_strerror(errno); @@ -77,34 +68,28 @@ void FairMQMessageZMQ::Rebuild(size_t size) void FairMQMessageZMQ::Rebuild(void* data, size_t size) { - int rc = zmq_msg_close (&fMessage); + CloseMessage(); + int rc = zmq_msg_init_data (fMessage, data, size, &CleanUp, NULL); if (rc != 0) { stringstream logmsg; - logmsg << "failed closing message, reason: " << zmq_strerror(errno); + logmsg << "failed initializing message with data, reason: " << zmq_strerror(errno); FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); } - - rc = zmq_msg_init_data (&fMessage, data, size, &CleanUp, NULL); - if (rc != 0) { - stringstream logmsg2; - logmsg2 << "failed initializing message with data, reason: " << zmq_strerror(errno); - FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg2.str()); - } } void* FairMQMessageZMQ::GetMessage() { - return &fMessage; + return fMessage; } void* FairMQMessageZMQ::GetData() { - return zmq_msg_data (&fMessage); + return zmq_msg_data (fMessage); } size_t FairMQMessageZMQ::GetSize() { - return zmq_msg_size (&fMessage); + return zmq_msg_size (fMessage); } void FairMQMessageZMQ::SetMessage(void* data, size_t size) @@ -112,9 +97,19 @@ void FairMQMessageZMQ::SetMessage(void* data, size_t size) // dummy method to comply with the interface. functionality not allowed in zeromq. } +inline void FairMQMessageZMQ::CloseMessage() +{ + int rc = zmq_msg_close (fMessage); + if (rc != 0) { + stringstream logmsg; + logmsg << "failed closing message, reason: " << zmq_strerror(errno); + FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); + } +} + void FairMQMessageZMQ::Copy(FairMQMessage* msg) { - int rc = zmq_msg_copy (&fMessage, &(static_cast(msg)->fMessage)); + int rc = zmq_msg_copy (fMessage, (static_cast(msg)->fMessage)); if (rc != 0) { stringstream logmsg; logmsg << "failed copying message, reason: " << zmq_strerror(errno); @@ -129,10 +124,10 @@ void FairMQMessageZMQ::CleanUp(void* data, void* hint) FairMQMessageZMQ::~FairMQMessageZMQ() { - int rc = zmq_msg_close (&fMessage); + int rc = zmq_msg_close (fMessage); if (rc != 0) { stringstream logmsg; logmsg << "failed closing message with data, reason: " << zmq_strerror(errno); FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str()); } -} +} \ No newline at end of file diff --git a/fairmq/zeromq/FairMQMessageZMQ.h b/fairmq/zeromq/FairMQMessageZMQ.h index fc6a4a99..f21af547 100644 --- a/fairmq/zeromq/FairMQMessageZMQ.h +++ b/fairmq/zeromq/FairMQMessageZMQ.h @@ -24,13 +24,15 @@ class FairMQMessageZMQ : public FairMQMessage virtual void Rebuild(); virtual void Rebuild(size_t size); - virtual void Rebuild(void* data, size_t site); + virtual void Rebuild(void* data, size_t size); virtual void* GetMessage(); virtual void* GetData(); virtual size_t GetSize(); + virtual void SetMessage(void* data, size_t size); + virtual void CloseMessage(); virtual void Copy(FairMQMessage* msg); static void CleanUp(void* data, void* hint); @@ -38,7 +40,7 @@ class FairMQMessageZMQ : public FairMQMessage virtual ~FairMQMessageZMQ(); private: - zmq_msg_t fMessage; + zmq_msg_t* fMessage; }; #endif /* FAIRMQMESSAGEZMQ_H_ */ diff --git a/fairmq/zeromq/FairMQTransportFactoryZMQ.cxx b/fairmq/zeromq/FairMQTransportFactoryZMQ.cxx index 170a23d7..92cd5111 100644 --- a/fairmq/zeromq/FairMQTransportFactoryZMQ.cxx +++ b/fairmq/zeromq/FairMQTransportFactoryZMQ.cxx @@ -9,7 +9,7 @@ FairMQTransportFactoryZMQ::FairMQTransportFactoryZMQ() { - + FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, "Using ZeroMQ library"); } FairMQMessage* FairMQTransportFactoryZMQ::CreateMessage()