diff --git a/fairmq/FairMQBenchmarkSampler.cxx b/fairmq/FairMQBenchmarkSampler.cxx index 847a1329..fdbe7ba1 100644 --- a/fairmq/FairMQBenchmarkSampler.cxx +++ b/fairmq/FairMQBenchmarkSampler.cxx @@ -39,10 +39,10 @@ void FairMQBenchmarkSampler::Run() boost::thread resetEventCounter(boost::bind(&FairMQBenchmarkSampler::ResetEventCounter, this)); void* buffer = operator new[](fEventSize); - FairMQMessage* base_event = new FairMQMessageZMQ(buffer, fEventSize); + FairMQMessage* base_event = fTransportFactory->CreateMessage(buffer, fEventSize); while ( fState == RUNNING ) { - FairMQMessage* event = new FairMQMessageZMQ(); + FairMQMessage* event = fTransportFactory->CreateMessage(); event->Copy(base_event); fPayloadOutputs->at(0)->Send(event); diff --git a/fairmq/FairMQBuffer.cxx b/fairmq/FairMQBuffer.cxx index be5fdeef..c170e1c4 100644 --- a/fairmq/FairMQBuffer.cxx +++ b/fairmq/FairMQBuffer.cxx @@ -26,7 +26,7 @@ void FairMQBuffer::Run() bool received = false; while ( fState == RUNNING ) { - FairMQMessage* msg = new FairMQMessageZMQ(); + FairMQMessage* msg = fTransportFactory->CreateMessage(); received = fPayloadInputs->at(0)->Receive(msg); diff --git a/fairmq/FairMQMerger.cxx b/fairmq/FairMQMerger.cxx index 81902e3d..e09dc882 100644 --- a/fairmq/FairMQMerger.cxx +++ b/fairmq/FairMQMerger.cxx @@ -38,7 +38,7 @@ void FairMQMerger::Run() bool received = false; while ( fState == RUNNING ) { - FairMQMessage* msg = new FairMQMessageZMQ(); + FairMQMessage* msg = fTransportFactory->CreateMessage(); zmq_poll(items, fNumInputs, 100); diff --git a/fairmq/FairMQProcessor.cxx b/fairmq/FairMQProcessor.cxx index 7b091cdc..844522f4 100644 --- a/fairmq/FairMQProcessor.cxx +++ b/fairmq/FairMQProcessor.cxx @@ -45,7 +45,7 @@ void FairMQProcessor::Run() bool received = false; while ( fState == RUNNING ) { - FairMQMessage* msg = new FairMQMessageZMQ(); + FairMQMessage* msg = fTransportFactory->CreateMessage(); received = fPayloadInputs->at(0)->Receive(msg); receivedMsgs++; diff --git a/fairmq/FairMQProxy.cxx b/fairmq/FairMQProxy.cxx index 776469b2..267883d9 100644 --- a/fairmq/FairMQProxy.cxx +++ b/fairmq/FairMQProxy.cxx @@ -25,7 +25,7 @@ void FairMQProxy::Run() boost::thread rateLogger(boost::bind(&FairMQDevice::LogSocketRates, this)); - FairMQMessage* msg = new FairMQMessageZMQ(); + FairMQMessage* msg = fTransportFactory->CreateMessage(); while ( fState == RUNNING ) { fPayloadInputs->at(0)->Receive(msg); diff --git a/fairmq/FairMQSampler.cxx b/fairmq/FairMQSampler.cxx index 95ef2bf3..275ac57f 100644 --- a/fairmq/FairMQSampler.cxx +++ b/fairmq/FairMQSampler.cxx @@ -45,6 +45,7 @@ void FairMQSampler::Init() FairMQDevice::Init(); fSamplerTask->SetBranch(fBranch); + fSamplerTask->SetTransport(fTransportFactory); // TODO: simplify message creation for sampler task? fFairRunAna->SetInputFile(TString(fInputFile)); TString output = fInputFile; @@ -139,7 +140,7 @@ void FairMQSampler::ListenToCommands() while ( true ) { try { - FairMQMessage* msg = new FairMQMessageZMQ(); + FairMQMessage* msg = fTransportFactory->CreateMessage(); received = fPayloadInputs->at(0)->Receive(msg); diff --git a/fairmq/FairMQSamplerTask.cxx b/fairmq/FairMQSamplerTask.cxx index aa860640..7fd26d18 100644 --- a/fairmq/FairMQSamplerTask.cxx +++ b/fairmq/FairMQSamplerTask.cxx @@ -12,7 +12,7 @@ FairMQSamplerTask::FairMQSamplerTask(const Text_t* name, int iVerbose) : FairTask(name, iVerbose), fInput(NULL), fBranch(""), - fOutput(new FairMQMessageZMQ) + fOutput(NULL) { } @@ -20,7 +20,7 @@ FairMQSamplerTask::FairMQSamplerTask() : FairTask( "Abstract base task used for loading a branch from a root file into memory"), fInput(NULL), fBranch(""), - fOutput(new FairMQMessageZMQ) + fOutput(NULL) { } @@ -48,4 +48,7 @@ FairMQMessage* FairMQSamplerTask::GetOutput() return fOutput; } - +void FairMQSamplerTask::SetTransport(FairMQTransportFactory* factory) +{ + fTransportFactory = factory; +} diff --git a/fairmq/FairMQSamplerTask.h b/fairmq/FairMQSamplerTask.h index 05fcc92f..a3a646ef 100644 --- a/fairmq/FairMQSamplerTask.h +++ b/fairmq/FairMQSamplerTask.h @@ -13,7 +13,7 @@ #include "TClonesArray.h" #include #include "FairMQMessage.h" -#include "FairMQMessageZMQ.h" +#include "FairMQTransportFactory.h" #include "TString.h" @@ -27,10 +27,13 @@ class FairMQSamplerTask: public FairTask virtual void Exec(Option_t* opt) = 0; void SetBranch(TString branch); FairMQMessage* GetOutput(); + void SetTransport(FairMQTransportFactory* factory); + protected: TClonesArray* fInput; TString fBranch; FairMQMessage* fOutput; + FairMQTransportFactory* fTransportFactory; }; #endif /* FAIRMQSAMPLERTASK_H_ */ diff --git a/fairmq/FairMQSink.cxx b/fairmq/FairMQSink.cxx index 43865e55..edc7af4f 100644 --- a/fairmq/FairMQSink.cxx +++ b/fairmq/FairMQSink.cxx @@ -22,7 +22,7 @@ void FairMQSink::Run() boost::thread rateLogger(boost::bind(&FairMQDevice::LogSocketRates, this)); while ( fState == RUNNING ) { - FairMQMessage* msg = new FairMQMessageZMQ(); + FairMQMessage* msg = fTransportFactory->CreateMessage(); fPayloadInputs->at(0)->Receive(msg); diff --git a/fairmq/FairMQSocket.h b/fairmq/FairMQSocket.h index 7cbaa30b..73b3f2bd 100644 --- a/fairmq/FairMQSocket.h +++ b/fairmq/FairMQSocket.h @@ -11,7 +11,6 @@ #include #include "FairMQContext.h" #include "FairMQMessage.h" -#include "FairMQMessageZMQ.h" class FairMQSocket diff --git a/fairmq/FairMQSocketZMQ.h b/fairmq/FairMQSocketZMQ.h index 76ba4012..389e264a 100644 --- a/fairmq/FairMQSocketZMQ.h +++ b/fairmq/FairMQSocketZMQ.h @@ -12,7 +12,6 @@ #include "FairMQSocket.h" #include "FairMQContext.h" -#include "FairMQMessageZMQ.h" class FairMQSocketZMQ : public FairMQSocket diff --git a/fairmq/FairMQSplitter.cxx b/fairmq/FairMQSplitter.cxx index 3ee2703c..c8f841de 100644 --- a/fairmq/FairMQSplitter.cxx +++ b/fairmq/FairMQSplitter.cxx @@ -30,7 +30,7 @@ void FairMQSplitter::Run() int direction = 0; while ( fState == RUNNING ) { - FairMQMessage* msg = new FairMQMessageZMQ(); + FairMQMessage* msg = fTransportFactory->CreateMessage(); received = fPayloadInputs->at(0)->Receive(msg); diff --git a/fairmq/FairMQTransportFactory.cxx b/fairmq/FairMQTransportFactory.cxx index 331cb10c..9cfc033e 100644 --- a/fairmq/FairMQTransportFactory.cxx +++ b/fairmq/FairMQTransportFactory.cxx @@ -3,4 +3,4 @@ * * @since 2014-01-20 * @author: A. Rybalchenko - */ \ No newline at end of file + */ diff --git a/fairmq/FairMQTransportFactory.h b/fairmq/FairMQTransportFactory.h index 50178fda..dbfe8045 100644 --- a/fairmq/FairMQTransportFactory.h +++ b/fairmq/FairMQTransportFactory.h @@ -15,6 +15,8 @@ class FairMQTransportFactory { public: virtual FairMQMessage* CreateMessage() = 0; + virtual FairMQMessage* CreateMessage(size_t size) = 0; + virtual FairMQMessage* CreateMessage(void* data, size_t size) = 0; virtual FairMQSocket* CreateSocket(FairMQContext* context, int type, int num) = 0; virtual ~FairMQTransportFactory() {}; diff --git a/fairmq/FairMQTransportFactoryZMQ.cxx b/fairmq/FairMQTransportFactoryZMQ.cxx index 87d3bef6..8fbf2a84 100644 --- a/fairmq/FairMQTransportFactoryZMQ.cxx +++ b/fairmq/FairMQTransportFactoryZMQ.cxx @@ -17,7 +17,17 @@ FairMQMessage* FairMQTransportFactoryZMQ::CreateMessage() return new FairMQMessageZMQ(); } +FairMQMessage* FairMQTransportFactoryZMQ::CreateMessage(size_t size) +{ + return new FairMQMessageZMQ(size); +} + +FairMQMessage* FairMQTransportFactoryZMQ::CreateMessage(void* data, size_t size) +{ + return new FairMQMessageZMQ(data, size); +} + FairMQSocket* FairMQTransportFactoryZMQ::CreateSocket(FairMQContext* context, int type, int num) { return new FairMQSocketZMQ(context, type, num); -} \ No newline at end of file +} diff --git a/fairmq/FairMQTransportFactoryZMQ.h b/fairmq/FairMQTransportFactoryZMQ.h index d90760b1..f41c9f86 100644 --- a/fairmq/FairMQTransportFactoryZMQ.h +++ b/fairmq/FairMQTransportFactoryZMQ.h @@ -19,6 +19,8 @@ class FairMQTransportFactoryZMQ : public FairMQTransportFactory FairMQTransportFactoryZMQ(); virtual FairMQMessage* CreateMessage(); + virtual FairMQMessage* CreateMessage(size_t size); + virtual FairMQMessage* CreateMessage(void* data, size_t size); virtual FairMQSocket* CreateSocket(FairMQContext* context, int type, int num); virtual ~FairMQTransportFactoryZMQ() {};