From 3e424354e7d927c7dac6c4bae00de540e258e435 Mon Sep 17 00:00:00 2001 From: NicolasWinckler Date: Mon, 10 Nov 2014 16:42:08 +0100 Subject: [PATCH] add 1) generic MQ-devices (Sampler, Processor, and FileSink) in fairmq, 2) policy classes in base/MQ and 3) a Tutorial7 in example --- fairmq/devices/GenericFileSink.h | 46 ++++++ fairmq/devices/GenericFileSink.tpl | 82 +++++++++++ fairmq/devices/GenericProcessor.h | 158 ++++++++++++++++++++ fairmq/devices/GenericSampler.h | 99 +++++++++++++ fairmq/devices/GenericSampler.tpl | 227 +++++++++++++++++++++++++++++ 5 files changed, 612 insertions(+) create mode 100644 fairmq/devices/GenericFileSink.h create mode 100644 fairmq/devices/GenericFileSink.tpl create mode 100644 fairmq/devices/GenericProcessor.h create mode 100644 fairmq/devices/GenericSampler.h create mode 100644 fairmq/devices/GenericSampler.tpl diff --git a/fairmq/devices/GenericFileSink.h b/fairmq/devices/GenericFileSink.h new file mode 100644 index 00000000..c58e6bb8 --- /dev/null +++ b/fairmq/devices/GenericFileSink.h @@ -0,0 +1,46 @@ +/* + * File: GenericFileSink.h + * Author: winckler + * + * Created on October 7, 2014, 6:06 PM + */ + +#ifndef GENERICFILESINK_H +#define GENERICFILESINK_H + +#include "FairMQDevice.h" +#include +#include +#include "FairMQLogger.h" + +template +class GenericFileSink : public FairMQDevice, public InputPolicy, public OutputPolicy +{ + using InputPolicy::message; + //using OutputPolicy::InitOutFile; + using OutputPolicy::AddToFile; + +public: + GenericFileSink(); + virtual ~GenericFileSink(); + + template + void InitInputPolicyContainer(Args... args) + { + InputPolicy::InitContainer(std::forward(args)...); + } + + + virtual void SetTransport(FairMQTransportFactory* transport); + virtual void InitOutputFile(); + +protected: + virtual void Run(); + virtual void Init(); + +}; + +#include "GenericFileSink.tpl" + +#endif /* GENERICFILESINK_H */ + diff --git a/fairmq/devices/GenericFileSink.tpl b/fairmq/devices/GenericFileSink.tpl new file mode 100644 index 00000000..33bf1556 --- /dev/null +++ b/fairmq/devices/GenericFileSink.tpl @@ -0,0 +1,82 @@ +/* + * File: GenericFileSink.tpl + * Author: winckler + * + * Created on October 7, 2014, 7:21 PM + */ + +template +GenericFileSink::GenericFileSink() : + InputPolicy(), + OutputPolicy() +{ +} + +template +GenericFileSink::~GenericFileSink() +{ +} + +template +void GenericFileSink::SetTransport(FairMQTransportFactory* transport) +{ + FairMQDevice::SetTransport(transport); + //InputPolicy::SetTransport(transport); +} + + +template +void GenericFileSink::Init() +{ + FairMQDevice::Init(); + InitOutputFile(); + //InputPolicy::Init(); + //OutputPolicy::Init(); +} + +template +void GenericFileSink::InitOutputFile() +{ + OutputPolicy::InitOutFile(); +} + +template +void GenericFileSink::Run() +{ + MQLOG(INFO) << ">>>>>>> Run <<<<<<<"; + + boost::thread rateLogger(boost::bind(&FairMQDevice::LogSocketRates, this)); + + int received = 0; + int receivedMsg = 0; + + while (fState == RUNNING) + { + FairMQMessage* msg = fTransportFactory->CreateMessage(); + received = fPayloadInputs->at(0)->Receive(msg); + if(received>0) + { + AddToFile(message(msg)); + receivedMsg++; + } + delete msg; + } + + MQLOG(INFO) << "Received " << receivedMsg << " messages!"; + try + { + rateLogger.interrupt(); + rateLogger.join(); + } + catch(boost::thread_resource_error& e) + { + MQLOG(ERROR) << e.what(); + } + + FairMQDevice::Shutdown(); + + // notify parent thread about end of processing. + boost::lock_guard lock(fRunningMutex); + fRunningFinished = true; + fRunningCondition.notify_one(); +} diff --git a/fairmq/devices/GenericProcessor.h b/fairmq/devices/GenericProcessor.h new file mode 100644 index 00000000..96420316 --- /dev/null +++ b/fairmq/devices/GenericProcessor.h @@ -0,0 +1,158 @@ +/* + * File: GenericProcessor.h + * Author: winckler + * + * Created on December 1, 2014, 10:22 PM + */ + +#ifndef GENERICPROCESSOR_H +#define GENERICPROCESSOR_H + +#include "FairMQDevice.h" + +template +class GenericProcessor: public FairMQDevice, + public InputPolicy, + public OutputPolicy, + public TaskPolicy +{ + public: + + GenericProcessor() : InputPolicy(), OutputPolicy(), TaskPolicy() + {} + + virtual ~GenericProcessor() + {} + + void SetTransport(FairMQTransportFactory* transport) + { + FairMQDevice::SetTransport(transport); + //InputPolicy::SetTransport(transport); + //OutputPolicy::SetTransport(transport); + } + + + template + void InitTask(Args... args) + { + TaskPolicy::InitTask(std::forward(args)...); + } + + template + void InitInputContainer(Args... args) + { + InputPolicy::InitContainer(std::forward(args)...); + } + + template + void InitOutputContainer(Args... args) + { + OutputPolicy::InitContainer(std::forward(args)...); + } + + + //void SendPart(); + //bool ReceivePart(); + + + + void SendPart() + { + fPayloadOutputs->at(0)->Send(OutputPolicy::message(TaskPolicy::GetData()), "snd-more"); + OutputPolicy::CloseMessage(); + } + + /* + bool ReceivePart() + { + int64_t more = 0; + size_t more_size = sizeof(more); + fPayloadInputs->at(0)->GetOption("rcv-more", &more, &more_size); + if(more) + { + InputPolicy::CloseMessage(); + //fProcessorTask->GetPayload()->CloseMessage(); + fProcessorTask->SetPayload(fTransportFactory->CreateMessage()); + return fPayloadInputs->at(0)->Receive(fProcessorTask->GetPayload()); + } + else + { + return false; + } + } + */ + + protected: + virtual void Init() + { + FairMQDevice::Init(); + // TODO: implement the code below with the new design + //fProcessorTask->InitTask(); + //fProcessorTask->SetSendPart(boost::bind(&FairMQProcessor::SendPart, this)); + //fProcessorTask->SetReceivePart(boost::bind(&FairMQProcessor::ReceivePart, this)); + } + + virtual void Run() + { + MQLOG(INFO) << ">>>>>>> Run <<<<<<<"; + + boost::thread rateLogger(boost::bind(&FairMQDevice::LogSocketRates, this)); + int receivedMsgs = 0; + int sentMsgs = 0; + int received = 0; + + while ( fState == RUNNING ) + { + FairMQMessage* msg = fTransportFactory->CreateMessage(); + + received = fPayloadInputs->at(0)->Receive(msg); + receivedMsgs++; + + // InputPolicy::message(msg) --> deserialize data of msg and fill output container + // TaskPolicy::ExecuteTask( ... ) --> process output container + TaskPolicy::ExecuteTask(InputPolicy::message(msg)); + + // OutputPolicy::fMessage point to msg + OutputPolicy::SetMessage(msg); + + if (received > 0) + { + // TaskPolicy::GetOutputData() --> Get processed output container + // OutputPolicy::message(...) --> Serialize output container and fill fMessage + fPayloadOutputs->at(0)->Send(OutputPolicy::message(TaskPolicy::GetOutputData())); + sentMsgs++; + received = 0; + } + + if(msg) + msg->CloseMessage(); + //OutputPolicy::CloseMessage(); + } + + MQLOG(INFO) << "Received " << receivedMsgs << " and sent " << sentMsgs << " messages!"; + + try + { + rateLogger.interrupt(); + rateLogger.join(); + } + catch(boost::thread_resource_error& e) + { + MQLOG(ERROR) << e.what(); + } + + FairMQDevice::Shutdown(); + // notify parent thread about end of processing. + boost::lock_guard lock(fRunningMutex); + fRunningFinished = true; + fRunningCondition.notify_one(); + } + + +}; + +//#include "GenericSampler.tpl" + + +#endif /* GENERICPROCESSOR_H */ + diff --git a/fairmq/devices/GenericSampler.h b/fairmq/devices/GenericSampler.h new file mode 100644 index 00000000..56b74f03 --- /dev/null +++ b/fairmq/devices/GenericSampler.h @@ -0,0 +1,99 @@ +/* + * File: GenericSampler.h + * Author: winckler + * + * Created on November 24, 2014, 3:30 PM + */ + +#ifndef GENERICSAMPLER_H +#define GENERICSAMPLER_H + + + + +#include +#include + +#include +#include +#include + +#include "TList.h" +#include "TObjString.h" +#include "TClonesArray.h" +#include "TROOT.h" + + +#include "FairMQDevice.h" +#include "FairMQLogger.h" + +/** + * Reads simulated digis from a root file and samples the digi as a time-series UDP stream. + * Must be initialized with the filename to the root file and the name of the sub-detector + * branch, whose digis should be streamed. + * + * The purpose of this class is to provide a data source of digis very similar to the + * future detector output at the point where the detector is connected to the online + * computing farm. For the development of online analysis algorithms, it is very important + * to simulate the future detector output as realistic as possible to evaluate the + * feasibility and quality of the various possible online analysis features. + */ + +template +class GenericSampler: public FairMQDevice, public SamplerPolicy, public OutputPolicy +{ + using SamplerPolicy::GetDataBranch; // get data from file + using OutputPolicy::message; // serialize method + + public: + enum { + InputFile = FairMQDevice::Last, + Branch, + ParFile, + EventRate + }; + GenericSampler(); + virtual ~GenericSampler(); + virtual void SetTransport(FairMQTransportFactory* factory); + void ResetEventCounter(); + virtual void ListenToCommands(); + + template + void SetFileProperties(Args&... args) + { + SamplerPolicy::SetFileProperties(args...); + } + + virtual void SetProperty(const int key, const string& value, const int slot = 0); + virtual string GetProperty(const int key, const string& default_ = "", const int slot = 0); + virtual void SetProperty(const int key, const int value, const int slot = 0); + virtual int GetProperty(const int key, const int default_ = 0, const int slot = 0); + + /** + * Sends the currently available output of the Sampler Task as part of a multipart message + * and reinitializes the message to be filled with the next part. + * This method can be given as a callback to the SamplerTask. + * The final message part must be sent with normal Send method. + */ + void SendPart(); + + void SetContinuous(bool flag) { fContinuous = flag; } + +protected: + virtual void Init(); + virtual void Run(); + +protected: + string fInputFile; // Filename of a root file containing the simulated digis. + string fParFile; + string fBranch; // The name of the sub-detector branch to stream the digis from. + int fNumEvents; + int fEventRate; + int fEventCounter; + bool fContinuous; +}; + +#include "GenericSampler.tpl" + +#endif /* GENERICSAMPLER_H */ + diff --git a/fairmq/devices/GenericSampler.tpl b/fairmq/devices/GenericSampler.tpl new file mode 100644 index 00000000..fd01a05a --- /dev/null +++ b/fairmq/devices/GenericSampler.tpl @@ -0,0 +1,227 @@ +/* + * File: GenericSampler.tpl + * Author: winckler + * + * Created on November 24, 2014, 3:59 PM + */ + +template + GenericSampler::GenericSampler() : + fNumEvents(0), + fEventRate(1), + fEventCounter(0), + fContinuous(false) + { + } + + +template + GenericSampler::~GenericSampler() + { + } + + + + +template + void GenericSampler::SetTransport(FairMQTransportFactory* factory) + { + FairMQDevice::SetTransport(factory); + //OutputPolicy::SetTransport(factory); + } + +template +void GenericSampler::Init() +{ + FairMQDevice::Init(); + SamplerPolicy::InitSampler(); + fNumEvents=SamplerPolicy::GetDataBunchNumber(); +} + +template +void GenericSampler::Run() +{ + LOG(INFO) << ">>>>>>> Run <<<<<<<"; + + boost::thread rateLogger(boost::bind(&FairMQDevice::LogSocketRates, this)); + // boost::thread resetEventCounter(boost::bind(&GenericSampler::ResetEventCounter, this)); + // boost::thread commandListener(boost::bind(&GenericSampler::ListenToCommands, this)); + + int sentMsgs = 0; + + boost::timer::auto_cpu_timer timer; + + LOG(INFO) << "Number of events to process: " << fNumEvents; + +// while ( fState == RUNNING ) { + + do + { + for ( Long64_t eventNr = 0 ; eventNr < fNumEvents; ++eventNr ) + { + //fSamplerTask->SetEventIndex(eventNr); + FairMQMessage* msg = fTransportFactory->CreateMessage(); + OutputPolicy::SetMessage(msg); + fPayloadOutputs->at(0)->Send(message(GetDataBranch(eventNr))); + ++sentMsgs; + + if(msg) + msg->CloseMessage(); + + // Optional event rate limiting + // --fEventCounter; + // while (fEventCounter == 0) { + // boost::this_thread::sleep(boost::posix_time::milliseconds(1)); + // } + + if( fState != RUNNING ) { break; } + } + } + while ( fState == RUNNING && fContinuous ); + +// } + + boost::timer::cpu_times const elapsed_time(timer.elapsed()); + LOG(INFO) << "Sent everything in:\n" << boost::timer::format(elapsed_time, 2); + LOG(INFO) << "Sent " << sentMsgs << " messages!"; + + try { + rateLogger.interrupt(); + rateLogger.join(); + // resetEventCounter.interrupt(); + // resetEventCounter.join(); + // commandListener.interrupt(); + // commandListener.join(); + } + catch (boost::thread_resource_error &e) { + LOG(ERROR) << e.what(); + } + + FairMQDevice::Shutdown(); + + // notify parent thread about end of processing. + boost::lock_guard lock(fRunningMutex); + fRunningFinished = true; + fRunningCondition.notify_one(); +} + +/* +template +void GenericSampler::SendPart() +{ + fPayloadOutputs->at(0)->Send(OutputPolicy::GetMessage(), "snd-more"); + OutputPolicy::CloseMessage(); +} +*/ + +template +void GenericSampler::ResetEventCounter() +{ + while (true) + { + try + { + fEventCounter = fEventRate / 100; + boost::this_thread::sleep(boost::posix_time::milliseconds(10)); + } + catch (boost::thread_interrupted &) + { + LOG(DEBUG) << "resetEventCounter interrupted"; + break; + } + } + LOG(DEBUG) << ">>>>>>> stopping resetEventCounter <<<<<<<"; +} + +template +void GenericSampler::ListenToCommands() +{ + LOG(INFO) << ">>>>>>> ListenToCommands <<<<<<<"; + + int received = 0; + + while (true) { + try { + FairMQMessage *msg = fTransportFactory->CreateMessage(); + + received = fPayloadInputs->at(0)->Receive(msg); + + if (received > 0) { + // command handling goes here. + LOG(INFO) << "> received command <"; + received = 0; + } + + delete msg; + + boost::this_thread::interruption_point(); + } + catch (boost::thread_interrupted &) { + LOG(DEBUG) << "commandListener interrupted"; + break; + } + } + LOG(DEBUG) << ">>>>>>> stopping commandListener <<<<<<<"; +} + +template +void GenericSampler::SetProperty(const int key, const string& value, const int slot/*= 0*/) +{ + switch (key) + { + case InputFile: + fInputFile = value; + break; + case ParFile: + fParFile = value; + break; + case Branch: + fBranch = value; + break; + default: + FairMQDevice::SetProperty(key, value, slot); + break; + } +} + +template +string GenericSampler::GetProperty(const int key, const string& default_/*= ""*/, const int slot/*= 0*/) +{ + switch (key) + { + case InputFile: + return fInputFile; + case ParFile: + return fParFile; + case Branch: + return fBranch; + default: + return FairMQDevice::GetProperty(key, default_, slot); + } +} + +template +void GenericSampler::SetProperty(const int key, const int value, const int slot/*= 0*/) +{ + switch (key) + { + case EventRate: + fEventRate = value; + break; + default: + FairMQDevice::SetProperty(key, value, slot); + break; + } +} + +template +int GenericSampler::GetProperty(const int key, const int default_/*= 0*/, const int slot/*= 0*/) +{ + switch (key) + { + case EventRate: + return fEventRate; + default: + return FairMQDevice::GetProperty(key, default_, slot); + } +}