From b53691c8adce74bbf5fbea0ccb56cf09258916ae Mon Sep 17 00:00:00 2001 From: Dennis Klein Date: Wed, 30 Jan 2019 18:32:39 +0100 Subject: [PATCH] Add -i option as exit condition After -i# iterations sampler and sink will exit RUNNING state. --- examples/dds/Sampler.cxx | 14 ++++++++++++++ examples/dds/Sampler.h | 5 +++++ examples/dds/Sink.cxx | 13 +++++++++++++ examples/dds/Sink.h | 5 +++++ examples/dds/runSampler.cxx | 6 +++++- examples/dds/runSink.cxx | 6 +++++- 6 files changed, 47 insertions(+), 2 deletions(-) diff --git a/examples/dds/Sampler.cxx b/examples/dds/Sampler.cxx index f6c3c065..3f4c90e3 100644 --- a/examples/dds/Sampler.cxx +++ b/examples/dds/Sampler.cxx @@ -26,6 +26,12 @@ Sampler::Sampler() { } +void Sampler::InitTask() +{ + fIterations = fConfig->GetValue("iterations"); + fCounter = 0; +} + bool Sampler::ConditionalRun() { std::this_thread::sleep_for(std::chrono::seconds(1)); @@ -43,6 +49,14 @@ bool Sampler::ConditionalRun() return false; } + if (fIterations > 0) { + ++fCounter; + if (fCounter >= fIterations) { + LOG(info) << "Sent " << fCounter << " messages. Finished."; + return false; + } + } + return true; } diff --git a/examples/dds/Sampler.h b/examples/dds/Sampler.h index cb4556d4..b7856768 100644 --- a/examples/dds/Sampler.h +++ b/examples/dds/Sampler.h @@ -25,9 +25,14 @@ class Sampler : public FairMQDevice public: Sampler(); virtual ~Sampler(); + void InitTask() override; protected: virtual bool ConditionalRun(); + + private: + uint64_t fIterations; + uint64_t fCounter; }; } // namespace example_dds diff --git a/examples/dds/Sink.cxx b/examples/dds/Sink.cxx index 62958ea1..db19e873 100644 --- a/examples/dds/Sink.cxx +++ b/examples/dds/Sink.cxx @@ -25,11 +25,24 @@ Sink::Sink() OnData("data2", &Sink::HandleData); } +void Sink::InitTask() +{ + fIterations = fConfig->GetValue("iterations"); + fCounter = 0; +} + // handler is called whenever a message arrives on "data2", with a reference to the message and a sub-channel index (here 0) bool Sink::HandleData(FairMQMessagePtr& msg, int /*index*/) { LOG(info) << "Received: \"" << string(static_cast(msg->GetData()), msg->GetSize()) << "\""; + if (fIterations > 0) { + ++fCounter; + if (fCounter >= fIterations) { + LOG(info) << "Received " << fCounter << " messages. Finished."; + return false; + } + } // return true if want to be called again (otherwise go to IDLE state) return true; } diff --git a/examples/dds/Sink.h b/examples/dds/Sink.h index 88a345f9..5a5e9344 100644 --- a/examples/dds/Sink.h +++ b/examples/dds/Sink.h @@ -25,9 +25,14 @@ class Sink : public FairMQDevice public: Sink(); virtual ~Sink(); + void InitTask() override; protected: bool HandleData(FairMQMessagePtr&, int); + + private: + uint64_t fIterations; + uint64_t fCounter; }; } // namespace example_dds diff --git a/examples/dds/runSampler.cxx b/examples/dds/runSampler.cxx index a511f992..5374d5a4 100644 --- a/examples/dds/runSampler.cxx +++ b/examples/dds/runSampler.cxx @@ -11,8 +11,12 @@ namespace bpo = boost::program_options; -void addCustomOptions(bpo::options_description& /*options*/) +void addCustomOptions(bpo::options_description& options) { + options.add_options()( + "iterations,i", + bpo::value()->default_value(1000), + "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)"); } FairMQDevicePtr getDevice(const fair::mq::ProgOptions& /*config*/) diff --git a/examples/dds/runSink.cxx b/examples/dds/runSink.cxx index 0c6cf710..63cad938 100644 --- a/examples/dds/runSink.cxx +++ b/examples/dds/runSink.cxx @@ -11,8 +11,12 @@ namespace bpo = boost::program_options; -void addCustomOptions(bpo::options_description& /*options*/) +void addCustomOptions(bpo::options_description& options) { + options.add_options()( + "iterations,i", + bpo::value()->default_value(1000), + "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)"); } FairMQDevicePtr getDevice(const fair::mq::ProgOptions& /*config*/)