use factory for messages

This commit is contained in:
Alexey Rybalchenko 2014-01-21 16:41:07 +01:00
parent 88fee245b8
commit 5989845e31
16 changed files with 36 additions and 17 deletions

View File

@ -39,10 +39,10 @@ void FairMQBenchmarkSampler::Run()
boost::thread resetEventCounter(boost::bind(&FairMQBenchmarkSampler::ResetEventCounter, this));
void* buffer = operator new[](fEventSize);
FairMQMessage* base_event = new FairMQMessageZMQ(buffer, fEventSize);
FairMQMessage* base_event = fTransportFactory->CreateMessage(buffer, fEventSize);
while ( fState == RUNNING ) {
FairMQMessage* event = new FairMQMessageZMQ();
FairMQMessage* event = fTransportFactory->CreateMessage();
event->Copy(base_event);
fPayloadOutputs->at(0)->Send(event);

View File

@ -26,7 +26,7 @@ void FairMQBuffer::Run()
bool received = false;
while ( fState == RUNNING ) {
FairMQMessage* msg = new FairMQMessageZMQ();
FairMQMessage* msg = fTransportFactory->CreateMessage();
received = fPayloadInputs->at(0)->Receive(msg);

View File

@ -38,7 +38,7 @@ void FairMQMerger::Run()
bool received = false;
while ( fState == RUNNING ) {
FairMQMessage* msg = new FairMQMessageZMQ();
FairMQMessage* msg = fTransportFactory->CreateMessage();
zmq_poll(items, fNumInputs, 100);

View File

@ -45,7 +45,7 @@ void FairMQProcessor::Run()
bool received = false;
while ( fState == RUNNING ) {
FairMQMessage* msg = new FairMQMessageZMQ();
FairMQMessage* msg = fTransportFactory->CreateMessage();
received = fPayloadInputs->at(0)->Receive(msg);
receivedMsgs++;

View File

@ -25,7 +25,7 @@ void FairMQProxy::Run()
boost::thread rateLogger(boost::bind(&FairMQDevice::LogSocketRates, this));
FairMQMessage* msg = new FairMQMessageZMQ();
FairMQMessage* msg = fTransportFactory->CreateMessage();
while ( fState == RUNNING ) {
fPayloadInputs->at(0)->Receive(msg);

View File

@ -45,6 +45,7 @@ void FairMQSampler::Init()
FairMQDevice::Init();
fSamplerTask->SetBranch(fBranch);
fSamplerTask->SetTransport(fTransportFactory); // TODO: simplify message creation for sampler task?
fFairRunAna->SetInputFile(TString(fInputFile));
TString output = fInputFile;
@ -139,7 +140,7 @@ void FairMQSampler::ListenToCommands()
while ( true ) {
try {
FairMQMessage* msg = new FairMQMessageZMQ();
FairMQMessage* msg = fTransportFactory->CreateMessage();
received = fPayloadInputs->at(0)->Receive(msg);

View File

@ -12,7 +12,7 @@ FairMQSamplerTask::FairMQSamplerTask(const Text_t* name, int iVerbose) :
FairTask(name, iVerbose),
fInput(NULL),
fBranch(""),
fOutput(new FairMQMessageZMQ)
fOutput(NULL)
{
}
@ -20,7 +20,7 @@ FairMQSamplerTask::FairMQSamplerTask() :
FairTask( "Abstract base task used for loading a branch from a root file into memory"),
fInput(NULL),
fBranch(""),
fOutput(new FairMQMessageZMQ)
fOutput(NULL)
{
}
@ -48,4 +48,7 @@ FairMQMessage* FairMQSamplerTask::GetOutput()
return fOutput;
}
void FairMQSamplerTask::SetTransport(FairMQTransportFactory* factory)
{
fTransportFactory = factory;
}

View File

@ -13,7 +13,7 @@
#include "TClonesArray.h"
#include <string>
#include "FairMQMessage.h"
#include "FairMQMessageZMQ.h"
#include "FairMQTransportFactory.h"
#include "TString.h"
@ -27,10 +27,13 @@ class FairMQSamplerTask: public FairTask
virtual void Exec(Option_t* opt) = 0;
void SetBranch(TString branch);
FairMQMessage* GetOutput();
void SetTransport(FairMQTransportFactory* factory);
protected:
TClonesArray* fInput;
TString fBranch;
FairMQMessage* fOutput;
FairMQTransportFactory* fTransportFactory;
};
#endif /* FAIRMQSAMPLERTASK_H_ */

View File

@ -22,7 +22,7 @@ void FairMQSink::Run()
boost::thread rateLogger(boost::bind(&FairMQDevice::LogSocketRates, this));
while ( fState == RUNNING ) {
FairMQMessage* msg = new FairMQMessageZMQ();
FairMQMessage* msg = fTransportFactory->CreateMessage();
fPayloadInputs->at(0)->Receive(msg);

View File

@ -11,7 +11,6 @@
#include <string>
#include "FairMQContext.h"
#include "FairMQMessage.h"
#include "FairMQMessageZMQ.h"
class FairMQSocket

View File

@ -12,7 +12,6 @@
#include "FairMQSocket.h"
#include "FairMQContext.h"
#include "FairMQMessageZMQ.h"
class FairMQSocketZMQ : public FairMQSocket

View File

@ -30,7 +30,7 @@ void FairMQSplitter::Run()
int direction = 0;
while ( fState == RUNNING ) {
FairMQMessage* msg = new FairMQMessageZMQ();
FairMQMessage* msg = fTransportFactory->CreateMessage();
received = fPayloadInputs->at(0)->Receive(msg);

View File

@ -15,6 +15,8 @@ class FairMQTransportFactory
{
public:
virtual FairMQMessage* CreateMessage() = 0;
virtual FairMQMessage* CreateMessage(size_t size) = 0;
virtual FairMQMessage* CreateMessage(void* data, size_t size) = 0;
virtual FairMQSocket* CreateSocket(FairMQContext* context, int type, int num) = 0;
virtual ~FairMQTransportFactory() {};

View File

@ -17,6 +17,16 @@ FairMQMessage* FairMQTransportFactoryZMQ::CreateMessage()
return new FairMQMessageZMQ();
}
FairMQMessage* FairMQTransportFactoryZMQ::CreateMessage(size_t size)
{
return new FairMQMessageZMQ(size);
}
FairMQMessage* FairMQTransportFactoryZMQ::CreateMessage(void* data, size_t size)
{
return new FairMQMessageZMQ(data, size);
}
FairMQSocket* FairMQTransportFactoryZMQ::CreateSocket(FairMQContext* context, int type, int num)
{
return new FairMQSocketZMQ(context, type, num);

View File

@ -19,6 +19,8 @@ class FairMQTransportFactoryZMQ : public FairMQTransportFactory
FairMQTransportFactoryZMQ();
virtual FairMQMessage* CreateMessage();
virtual FairMQMessage* CreateMessage(size_t size);
virtual FairMQMessage* CreateMessage(void* data, size_t size);
virtual FairMQSocket* CreateSocket(FairMQContext* context, int type, int num);
virtual ~FairMQTransportFactoryZMQ() {};