diff --git a/fairmq/CMakeLists.txt b/fairmq/CMakeLists.txt index 7f902017..658d935b 100644 --- a/fairmq/CMakeLists.txt +++ b/fairmq/CMakeLists.txt @@ -127,7 +127,6 @@ Set(FairMQHDRFiles devices/GenericSampler.tpl devices/GenericProcessor.h devices/GenericFileSink.h - devices/GenericFileSink.tpl tools/FairMQTools.h ) install(FILES ${FairMQHDRFiles} DESTINATION include) diff --git a/fairmq/devices/GenericFileSink.h b/fairmq/devices/GenericFileSink.h index 74c57499..d1f48269 100644 --- a/fairmq/devices/GenericFileSink.h +++ b/fairmq/devices/GenericFileSink.h @@ -1,3 +1,11 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence version 3 (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ + /* * File: GenericFileSink.h * Author: winckler @@ -8,39 +16,109 @@ #ifndef GENERICFILESINK_H #define GENERICFILESINK_H +#include "FairMQDevice.h" #include #include - -#include "FairMQDevice.h" #include "FairMQLogger.h" -template -class GenericFileSink : public FairMQDevice, public InputPolicy, public OutputPolicy -{ - //using InputPolicy::message; - //using OutputPolicy::InitOutFile; - //using OutputPolicy::AddToFile; - public: - GenericFileSink(); - virtual ~GenericFileSink(); + +/********************************************************************* + * -------------- NOTES ----------------------- + * All policies must have a default constructor + * Function to define in (parent) policy classes : + * + * -------- INPUT POLICY -------- + * InputPolicy::InitContainer(...) + * CONTAINER_TYPE InputPolicy::DeSerializeMsg(FairMQMessage* msg) + * + * + * -------- OUTPUT POLICY -------- + * OutputPolicy::AddToFile(CONTAINER_TYPE); + * OutputPolicy::InitOutputFile() + **********************************************************************/ + + + +#include "FairMQDevice.h" + +template < typename InputPolicy, + typename OutputPolicy> +class GenericFileSink : public FairMQDevice, + public InputPolicy, + public OutputPolicy +{ +public: + GenericFileSink(): + InputPolicy(), + OutputPolicy() + {} + + virtual ~GenericFileSink() + {} + + + void SetTransport(FairMQTransportFactory* transport) + { + FairMQDevice::SetTransport(transport); + } template - void InitInputPolicyContainer(Args... args) + void InitInputContainer(Args... args) { InputPolicy::InitContainer(std::forward(args)...); } +protected: - virtual void SetTransport(FairMQTransportFactory* transport); - virtual void InitOutputFile(); + virtual void Init() + { + FairMQDevice::Init(); + OutputPolicy::InitOutputFile(); + } protected: - virtual void Run(); - virtual void Init(); + virtual void Run() + { + MQLOG(INFO) << ">>>>>>> Run <<<<<<<"; + + boost::thread rateLogger(boost::bind(&FairMQDevice::LogSocketRates, this)); + + int received = 0; + int receivedMsg = 0; + + while (fState == RUNNING) + { + FairMQMessage* msg = fTransportFactory->CreateMessage(); + received = fPayloadInputs->at(0)->Receive(msg); + if(received>0) + { + OutputPolicy::AddToFile(InputPolicy::DeSerializeMsg(msg)); + receivedMsg++; + } + delete msg; + } + + MQLOG(INFO) << "Received " << receivedMsg << " messages!"; + try + { + rateLogger.interrupt(); + rateLogger.join(); + } + catch(boost::thread_resource_error& e) + { + MQLOG(ERROR) << e.what(); + } + + FairMQDevice::Shutdown(); + + // notify parent thread about end of processing. + boost::lock_guard lock(fRunningMutex); + fRunningFinished = true; + fRunningCondition.notify_one(); + } + }; -#include "GenericFileSink.tpl" - #endif /* GENERICFILESINK_H */ diff --git a/fairmq/devices/GenericMerger.h b/fairmq/devices/GenericMerger.h new file mode 100644 index 00000000..6ac036df --- /dev/null +++ b/fairmq/devices/GenericMerger.h @@ -0,0 +1,107 @@ +/* + * File: GenericMerger.h + * Author: winckler + * + * Created on April 9, 2015, 1:37 PM + */ + +#ifndef GENERICMERGER_H +#define GENERICMERGER_H + + +#include +#include + +#include "FairMQDevice.h" +#include "FairMQLogger.h" +#include "FairMQPoller.h" + + +template < typename MergerPolicy, + typename InputPolicy, + typename OutputPolicy + > +class GenericMerger : public FairMQDevice, + public MergerPolicy, + public InputPolicy, + public OutputPolicy +{ + public: + GenericMerger() : fBlockingTime(100) + {} + + virtual ~GenericMerger() + {} + + void SetTransport(FairMQTransportFactory* transport) + { + FairMQDevice::SetTransport(transport); + } + + + + + protected: + + int fBlockingTime; + + + virtual void Run() + { + MQLOG(INFO) << ">>>>>>> Run <<<<<<<"; + + boost::thread rateLogger(boost::bind(&FairMQDevice::LogSocketRates, this)); + + FairMQPoller* poller = fTransportFactory->CreatePoller(*fPayloadInputs); + + int received = 0; + + while (fState == RUNNING) + { + FairMQMessage* msg = fTransportFactory->CreateMessage(); + //MergerPolicy:: + poller->Poll(fBlockingTime); + + for (int i = 0; i < fNumInputs; i++) + { + if (poller->CheckInput(i)) + { + received = fPayloadInputs->at(i)->Receive(msg); + MergerPolicy::Merge(InputPolicy::DeSerializeMsg(msg)); + } + + OutputPolicy::SetMessage(msg); + + if ( received > 0 && MergerPolicy::ReadyToSend() ) + { + fPayloadOutputs->at(0)->Send(OutputPolicy::SerializeMsg(MergerPolicy::GetOutputData())); + received = 0; + } + } + + delete msg; + } + + delete poller; + + try + { + rateLogger.interrupt(); + rateLogger.join(); + } + catch(boost::thread_resource_error& e) + { + MQLOG(ERROR) << e.what(); + } + + FairMQDevice::Shutdown(); + + // notify parent thread about end of processing. + boost::lock_guard lock(fRunningMutex); + fRunningFinished = true; + fRunningCondition.notify_one(); + } +}; + +#endif /* GENERICMERGER_H */ + diff --git a/fairmq/devices/GenericProcessor.h b/fairmq/devices/GenericProcessor.h index 725f8704..0cd67276 100644 --- a/fairmq/devices/GenericProcessor.h +++ b/fairmq/devices/GenericProcessor.h @@ -1,3 +1,10 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence version 3 (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ /* * File: GenericProcessor.h * Author: winckler @@ -10,7 +17,34 @@ #include "FairMQDevice.h" -template + +/********************************************************************* + * -------------- NOTES ----------------------- + * All policies must have a default constructor + * Function to define in (parent) policy classes : + * + * -------- INPUT POLICY -------- + * InputPolicy::InitContainer(...) + * CONTAINER_TYPE InputPolicy::DeSerializeMsg(FairMQMessage* msg) + * InputPolicy::InitContainer(...) // if GenericProcessor::InitInputContainer(...) is used + * + * + * -------- OUTPUT POLICY -------- + * OutputPolicy::SerializeMsg(CONTAINER_TYPE) + * OutputPolicy::SetMessage(FairMQMessage* msg) + * OutputPolicy::InitContainer(...) // if GenericProcessor::InitOutputContainer(...) is used + * + * -------- TASK POLICY -------- + * CONTAINER_TYPE TaskPolicy::GetOutputData() + * TaskPolicy::ExecuteTask(CONTAINER_TYPE container) + * TaskPolicy::InitTask(...) // if GenericProcessor::InitTask(...) is used + * + **********************************************************************/ + + +template < typename InputPolicy, + typename OutputPolicy, + typename TaskPolicy> class GenericProcessor: public FairMQDevice, public InputPolicy, public OutputPolicy, @@ -22,14 +56,15 @@ class GenericProcessor: public FairMQDevice, virtual ~GenericProcessor() {} - + + // the four following methods ensure + // that the correct policy method is called + void SetTransport(FairMQTransportFactory* transport) { FairMQDevice::SetTransport(transport); - //InputPolicy::SetTransport(transport); - //OutputPolicy::SetTransport(transport); } - + template void InitTask(Args... args) { @@ -48,16 +83,20 @@ class GenericProcessor: public FairMQDevice, OutputPolicy::InitContainer(std::forward(args)...); } - //void SendPart(); - //bool ReceivePart(); + + + /* + * + // *********************** TODO: implement multipart features void SendPart() { fPayloadOutputs->at(0)->Send(OutputPolicy::SerializeMsg(TaskPolicy::GetData()), "snd-more"); OutputPolicy::CloseMessage(); } - /* + //void SendPart(); + //bool ReceivePart(); bool ReceivePart() { int64_t more = 0; @@ -81,7 +120,7 @@ class GenericProcessor: public FairMQDevice, virtual void Init() { FairMQDevice::Init(); - // TODO: implement the code below with the new design + // TODO: implement multipart features //fProcessorTask->InitTask(); //fProcessorTask->SetSendPart(boost::bind(&FairMQProcessor::SendPart, this)); //fProcessorTask->SetReceivePart(boost::bind(&FairMQProcessor::ReceivePart, this)); @@ -121,7 +160,6 @@ class GenericProcessor: public FairMQDevice, if(msg) msg->CloseMessage(); - //OutputPolicy::CloseMessage(); } MQLOG(INFO) << "Received " << receivedMsgs << " and sent " << sentMsgs << " messages!"; @@ -145,8 +183,5 @@ class GenericProcessor: public FairMQDevice, }; -//#include "GenericSampler.tpl" - - #endif /* GENERICPROCESSOR_H */ diff --git a/fairmq/devices/GenericSampler.h b/fairmq/devices/GenericSampler.h index 1d6426f2..97c2c332 100644 --- a/fairmq/devices/GenericSampler.h +++ b/fairmq/devices/GenericSampler.h @@ -1,3 +1,10 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence version 3 (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ /* * File: GenericSampler.h * Author: winckler @@ -5,11 +12,13 @@ * Created on November 24, 2014, 3:30 PM */ + #ifndef GENERICSAMPLER_H #define GENERICSAMPLER_H #include #include +#include #include #include @@ -18,31 +27,40 @@ #include "FairMQDevice.h" #include "FairMQLogger.h" -/** - * Reads simulated digis from a root file and samples the digi as a time-series UDP stream. - * Must be initialized with the filename to the root file and the name of the sub-detector - * branch, whose digis should be streamed. - * - * The purpose of this class is to provide a data source of digis very similar to the - * future detector output at the point where the detector is connected to the online - * computing farm. For the development of online analysis algorithms, it is very important - * to simulate the future detector output as realistic as possible to evaluate the - * feasibility and quality of the various possible online analysis features. - */ - -template -class GenericSampler: public FairMQDevice, public SamplerPolicy, public OutputPolicy -{ - //using SamplerPolicy::GetDataBranch; // get data from file - //using OutputPolicy::message; // serialize method +/* GENERIC SAMPLER (data source) MQ-DEVICE */ +/********************************************************************* + * -------------- NOTES ----------------------- + * All policies must have a default constructor + * Function to define in (parent) policy classes : + * + * -------- INPUT POLICY (SAMPLER POLICY) -------- + * SamplerPolicy::InitSampler() + * int64_t SamplerPolicy::GetNumberOfEvent() + * CONTAINER_TYPE SamplerPolicy::GetDataBranch(int64_t eventNr) + * SamplerPolicy::SetFileProperties(Args&... args) + * + * -------- OUTPUT POLICY -------- + * OutputPolicy::SerializeMsg(CONTAINER_TYPE) + * OutputPolicy::SetMessage(FairMQMessage* msg) + * + **********************************************************************/ +template +class GenericSampler: public FairMQDevice, + public SamplerPolicy, + public OutputPolicy +{ public: - enum { - InputFile = FairMQDevice::Last, - Branch, - ParFile, - EventRate + + enum + { + InputFile = FairMQDevice::Last, + Branch, + ParFile, + EventRate }; + GenericSampler(); virtual ~GenericSampler(); virtual void SetTransport(FairMQTransportFactory* factory); @@ -66,7 +84,8 @@ class GenericSampler: public FairMQDevice, public SamplerPolicy, public OutputPo * This method can be given as a callback to the SamplerTask. * The final message part must be sent with normal Send method. */ - void SendPart(); + // temporary disabled + //void SendPart(); void SetContinuous(bool flag) { fContinuous = flag; } @@ -78,7 +97,7 @@ protected: std::string fInputFile; // Filename of a root file containing the simulated digis. std::string fParFile; std::string fBranch; // The name of the sub-detector branch to stream the digis from. - int fNumEvents; + int64_t fNumEvents; int fEventRate; int fEventCounter; bool fContinuous; diff --git a/fairmq/devices/GenericSampler.tpl b/fairmq/devices/GenericSampler.tpl index 4a2d4453..75e4dd47 100644 --- a/fairmq/devices/GenericSampler.tpl +++ b/fairmq/devices/GenericSampler.tpl @@ -31,7 +31,7 @@ void GenericSampler::Init() { FairMQDevice::Init(); SamplerPolicy::InitSampler(); - fNumEvents=SamplerPolicy::GetDataBunchNumber(); + fNumEvents=SamplerPolicy::GetNumberOfEvent(); } template @@ -53,7 +53,7 @@ void GenericSampler::Run() do { - for ( unsigned long eventNr = 0 ; eventNr < fNumEvents; ++eventNr ) + for ( int64_t eventNr = 0 ; eventNr < fNumEvents; ++eventNr ) { //fSamplerTask->SetEventIndex(eventNr); FairMQMessage* msg = fTransportFactory->CreateMessage();