diff --git a/examples/dds/Sampler.cxx b/examples/dds/Sampler.cxx index f6c3c065..3f4c90e3 100644 --- a/examples/dds/Sampler.cxx +++ b/examples/dds/Sampler.cxx @@ -26,6 +26,12 @@ Sampler::Sampler() { } +void Sampler::InitTask() +{ + fIterations = fConfig->GetValue("iterations"); + fCounter = 0; +} + bool Sampler::ConditionalRun() { std::this_thread::sleep_for(std::chrono::seconds(1)); @@ -43,6 +49,14 @@ bool Sampler::ConditionalRun() return false; } + if (fIterations > 0) { + ++fCounter; + if (fCounter >= fIterations) { + LOG(info) << "Sent " << fCounter << " messages. Finished."; + return false; + } + } + return true; } diff --git a/examples/dds/Sampler.h b/examples/dds/Sampler.h index cb4556d4..b7856768 100644 --- a/examples/dds/Sampler.h +++ b/examples/dds/Sampler.h @@ -25,9 +25,14 @@ class Sampler : public FairMQDevice public: Sampler(); virtual ~Sampler(); + void InitTask() override; protected: virtual bool ConditionalRun(); + + private: + uint64_t fIterations; + uint64_t fCounter; }; } // namespace example_dds diff --git a/examples/dds/Sink.cxx b/examples/dds/Sink.cxx index 62958ea1..db19e873 100644 --- a/examples/dds/Sink.cxx +++ b/examples/dds/Sink.cxx @@ -25,11 +25,24 @@ Sink::Sink() OnData("data2", &Sink::HandleData); } +void Sink::InitTask() +{ + fIterations = fConfig->GetValue("iterations"); + fCounter = 0; +} + // handler is called whenever a message arrives on "data2", with a reference to the message and a sub-channel index (here 0) bool Sink::HandleData(FairMQMessagePtr& msg, int /*index*/) { LOG(info) << "Received: \"" << string(static_cast(msg->GetData()), msg->GetSize()) << "\""; + if (fIterations > 0) { + ++fCounter; + if (fCounter >= fIterations) { + LOG(info) << "Received " << fCounter << " messages. Finished."; + return false; + } + } // return true if want to be called again (otherwise go to IDLE state) return true; } diff --git a/examples/dds/Sink.h b/examples/dds/Sink.h index 88a345f9..5a5e9344 100644 --- a/examples/dds/Sink.h +++ b/examples/dds/Sink.h @@ -25,9 +25,14 @@ class Sink : public FairMQDevice public: Sink(); virtual ~Sink(); + void InitTask() override; protected: bool HandleData(FairMQMessagePtr&, int); + + private: + uint64_t fIterations; + uint64_t fCounter; }; } // namespace example_dds diff --git a/examples/dds/runSampler.cxx b/examples/dds/runSampler.cxx index a511f992..5374d5a4 100644 --- a/examples/dds/runSampler.cxx +++ b/examples/dds/runSampler.cxx @@ -11,8 +11,12 @@ namespace bpo = boost::program_options; -void addCustomOptions(bpo::options_description& /*options*/) +void addCustomOptions(bpo::options_description& options) { + options.add_options()( + "iterations,i", + bpo::value()->default_value(1000), + "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)"); } FairMQDevicePtr getDevice(const fair::mq::ProgOptions& /*config*/) diff --git a/examples/dds/runSink.cxx b/examples/dds/runSink.cxx index 0c6cf710..63cad938 100644 --- a/examples/dds/runSink.cxx +++ b/examples/dds/runSink.cxx @@ -11,8 +11,12 @@ namespace bpo = boost::program_options; -void addCustomOptions(bpo::options_description& /*options*/) +void addCustomOptions(bpo::options_description& options) { + options.add_options()( + "iterations,i", + bpo::value()->default_value(1000), + "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)"); } FairMQDevicePtr getDevice(const fair::mq::ProgOptions& /*config*/)