From 4123ebc9d4b700eacb059f4f125b83f7d1ee0d42 Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Wed, 12 Sep 2018 11:37:36 +0200 Subject: [PATCH] Add interruptable FairMQDevice::WaitFor(duration) method --- fairmq/FairMQDevice.cxx | 13 +++++++++++++ fairmq/FairMQDevice.h | 15 +++++++++++++++ 2 files changed, 28 insertions(+) diff --git a/fairmq/FairMQDevice.cxx b/fairmq/FairMQDevice.cxx index 2f1436f9..7f0763e2 100644 --- a/fairmq/FairMQDevice.cxx +++ b/fairmq/FairMQDevice.cxx @@ -72,6 +72,9 @@ FairMQDevice::FairMQDevice(FairMQProgOptions* config, const fair::mq::tools::Ver , fVersion(version) , fRate(0.) , fRawCmdLineArgs() + , fInterrupted(false) + , fInterruptedCV() + , fInterruptedMtx() { } @@ -491,6 +494,10 @@ void FairMQDevice::RunWrapper() thread rateLogger(&FairMQDevice::LogSocketRates, this); // notify transports to resume transfers + { + lock_guard guard(fInterruptedMtx); + fInterrupted = false; + } for (auto& t : fTransports) { t.second->Resume(); @@ -909,6 +916,7 @@ void FairMQDevice::LogSocketRates() t0 = t1; this_thread::sleep_for(chrono::milliseconds(1000)); + // WaitFor(chrono::milliseconds(1000)); TODO: enable this when nanomsg linger is fixed } } } @@ -919,6 +927,11 @@ void FairMQDevice::Unblock() { t.second->Interrupt(); } + { + lock_guard guard(fInterruptedMtx); + fInterrupted = true; + } + fInterruptedCV.notify_all(); } void FairMQDevice::ResetTaskWrapper() diff --git a/fairmq/FairMQDevice.h b/fairmq/FairMQDevice.h index 6d1ccc0c..3e4aadbe 100644 --- a/fairmq/FairMQDevice.h +++ b/fairmq/FairMQDevice.h @@ -25,6 +25,7 @@ #include // unique_ptr #include // std::sort() #include +#include #include #include #include @@ -433,6 +434,16 @@ class FairMQDevice : public FairMQStateMachine void RunStateMachine() { ProcessWork(); }; + /// Wait for the supplied amount of time or for interruption. + /// If interrupted, returns false, otherwise true. + /// @param duration wait duration + template + bool WaitFor(std::chrono::duration const& duration) + { + std::unique_lock lock(fInterruptedMtx); + return !fInterruptedCV.wait_for(lock, duration, [&] { return fInterrupted.load(); }); // return true if no interruption happened + } + protected: std::shared_ptr fTransportFactory; ///< Default transport factory std::unordered_map> fTransports; ///< Container for transports @@ -551,6 +562,10 @@ class FairMQDevice : public FairMQStateMachine const fair::mq::tools::Version fVersion; float fRate; ///< Rate limiting for ConditionalRun std::vector fRawCmdLineArgs; + + std::atomic fInterrupted; + std::condition_variable fInterruptedCV; + std::mutex fInterruptedMtx; }; #endif /* FAIRMQDEVICE_H_ */