Add -i option as exit condition

After -i# iterations sampler and sink will exit RUNNING state.
This commit is contained in:
Dennis Klein 2019-01-30 18:32:39 +01:00 committed by Dennis Klein
parent 14980d7486
commit b53691c8ad
6 changed files with 47 additions and 2 deletions

View File

@ -26,6 +26,12 @@ Sampler::Sampler()
{ {
} }
void Sampler::InitTask()
{
fIterations = fConfig->GetValue<uint64_t>("iterations");
fCounter = 0;
}
bool Sampler::ConditionalRun() bool Sampler::ConditionalRun()
{ {
std::this_thread::sleep_for(std::chrono::seconds(1)); std::this_thread::sleep_for(std::chrono::seconds(1));
@ -43,6 +49,14 @@ bool Sampler::ConditionalRun()
return false; return false;
} }
if (fIterations > 0) {
++fCounter;
if (fCounter >= fIterations) {
LOG(info) << "Sent " << fCounter << " messages. Finished.";
return false;
}
}
return true; return true;
} }

View File

@ -25,9 +25,14 @@ class Sampler : public FairMQDevice
public: public:
Sampler(); Sampler();
virtual ~Sampler(); virtual ~Sampler();
void InitTask() override;
protected: protected:
virtual bool ConditionalRun(); virtual bool ConditionalRun();
private:
uint64_t fIterations;
uint64_t fCounter;
}; };
} // namespace example_dds } // namespace example_dds

View File

@ -25,11 +25,24 @@ Sink::Sink()
OnData("data2", &Sink::HandleData); OnData("data2", &Sink::HandleData);
} }
void Sink::InitTask()
{
fIterations = fConfig->GetValue<uint64_t>("iterations");
fCounter = 0;
}
// handler is called whenever a message arrives on "data2", with a reference to the message and a sub-channel index (here 0) // handler is called whenever a message arrives on "data2", with a reference to the message and a sub-channel index (here 0)
bool Sink::HandleData(FairMQMessagePtr& msg, int /*index*/) bool Sink::HandleData(FairMQMessagePtr& msg, int /*index*/)
{ {
LOG(info) << "Received: \"" << string(static_cast<char*>(msg->GetData()), msg->GetSize()) << "\""; LOG(info) << "Received: \"" << string(static_cast<char*>(msg->GetData()), msg->GetSize()) << "\"";
if (fIterations > 0) {
++fCounter;
if (fCounter >= fIterations) {
LOG(info) << "Received " << fCounter << " messages. Finished.";
return false;
}
}
// return true if want to be called again (otherwise go to IDLE state) // return true if want to be called again (otherwise go to IDLE state)
return true; return true;
} }

View File

@ -25,9 +25,14 @@ class Sink : public FairMQDevice
public: public:
Sink(); Sink();
virtual ~Sink(); virtual ~Sink();
void InitTask() override;
protected: protected:
bool HandleData(FairMQMessagePtr&, int); bool HandleData(FairMQMessagePtr&, int);
private:
uint64_t fIterations;
uint64_t fCounter;
}; };
} // namespace example_dds } // namespace example_dds

View File

@ -11,8 +11,12 @@
namespace bpo = boost::program_options; namespace bpo = boost::program_options;
void addCustomOptions(bpo::options_description& /*options*/) void addCustomOptions(bpo::options_description& options)
{ {
options.add_options()(
"iterations,i",
bpo::value<uint64_t>()->default_value(1000),
"Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
} }
FairMQDevicePtr getDevice(const fair::mq::ProgOptions& /*config*/) FairMQDevicePtr getDevice(const fair::mq::ProgOptions& /*config*/)

View File

@ -11,8 +11,12 @@
namespace bpo = boost::program_options; namespace bpo = boost::program_options;
void addCustomOptions(bpo::options_description& /*options*/) void addCustomOptions(bpo::options_description& options)
{ {
options.add_options()(
"iterations,i",
bpo::value<uint64_t>()->default_value(1000),
"Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
} }
FairMQDevicePtr getDevice(const fair::mq::ProgOptions& /*config*/) FairMQDevicePtr getDevice(const fair::mq::ProgOptions& /*config*/)