From f515eb1100c0ced37f1728216cf633871c84b5b8 Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Wed, 17 Jul 2019 15:19:08 +0200 Subject: [PATCH] Extract state queue into own class. Use in device, plugins --- fairmq/CMakeLists.txt | 1 + fairmq/FairMQDevice.cxx | 59 +++++++++++------------- fairmq/FairMQDevice.h | 11 ++--- fairmq/StateQueue.h | 94 ++++++++++++++++++++++++++++++++++++++ fairmq/States.h | 3 ++ fairmq/plugins/Control.cxx | 70 +++++++--------------------- fairmq/plugins/Control.h | 9 +--- fairmq/plugins/DDS/DDS.cxx | 35 ++++---------- fairmq/plugins/DDS/DDS.h | 6 +-- 9 files changed, 159 insertions(+), 129 deletions(-) create mode 100644 fairmq/StateQueue.h diff --git a/fairmq/CMakeLists.txt b/fairmq/CMakeLists.txt index 9e558707..91c094e7 100644 --- a/fairmq/CMakeLists.txt +++ b/fairmq/CMakeLists.txt @@ -92,6 +92,7 @@ if(BUILD_FAIRMQ OR BUILD_SDK) set(FSM_PUBLIC_HEADER_FILES StateMachine.h States.h + StateQueue.h ) set(FSM_SOURCE_FILES diff --git a/fairmq/FairMQDevice.cxx b/fairmq/FairMQDevice.cxx index a904d227..af27cdbf 100644 --- a/fairmq/FairMQDevice.cxx +++ b/fairmq/FairMQDevice.cxx @@ -66,6 +66,27 @@ constexpr uint64_t FairMQDevice::DefaultMaxRunTime; constexpr float FairMQDevice::DefaultRate; constexpr const char* FairMQDevice::DefaultSession; +struct StateSubscription +{ + fair::mq::StateMachine& fStateMachine; + fair::mq::StateQueue& fStateQueue; + string fId; + + explicit StateSubscription(const string& id, fair::mq::StateMachine& stateMachine, fair::mq::StateQueue& stateQueue) + : fStateMachine(stateMachine) + , fStateQueue(stateQueue) + , fId(id) + { + fStateMachine.SubscribeToStateChange(fId, [&](fair::mq::State state) { + fStateQueue.Push(state); + }); + } + + ~StateSubscription() { + fStateMachine.UnsubscribeFromStateChange(fId); + } +}; + FairMQDevice::FairMQDevice() : FairMQDevice(nullptr, {0, 0, 0}) {} @@ -106,9 +127,6 @@ FairMQDevice::FairMQDevice(ProgOptions* config, const tools::Version version) , fMaxRunRuntimeInS(DefaultMaxRunTime) , fInitializationTimeoutInS(DefaultInitTimeout) , fRawCmdLineArgs() - , fStates() - , fStatesMtx() - , fStatesCV() , fTransitionMtx() , fTransitioning(false) { @@ -127,11 +145,7 @@ FairMQDevice::FairMQDevice(ProgOptions* config, const tools::Version version) fStateMachine.HandleStates([&](fair::mq::State state) { LOG(trace) << "device notified on new state: " << state; - { - lock_guard lock(fStatesMtx); - fStates.push(state); - } - fStatesCV.notify_all(); + fStateQueue.Push(state); switch (state) { case fair::mq::State::InitializingDevice: @@ -167,29 +181,6 @@ FairMQDevice::FairMQDevice(ProgOptions* config, const tools::Version version) fStateMachine.Start(); } -fair::mq::State FairMQDevice::WaitForNextState() -{ - unique_lock lock(fStatesMtx); - while (fStates.empty()) { - fStatesCV.wait_for(lock, chrono::milliseconds(50)); - } - - auto state = fStates.front(); - - if (state == fair::mq::State::Error) { - throw DeviceStateError("Device transitioned to error state."); - } - - fStates.pop(); - - return state; -} - -void FairMQDevice::WaitForState(fair::mq::State state) -{ - while (WaitForNextState() != state) {} -} - void FairMQDevice::TransitionTo(const fair::mq::State s) { { @@ -202,6 +193,10 @@ void FairMQDevice::TransitionTo(const fair::mq::State s) } using fair::mq::State; + + StateQueue sq; + StateSubscription ss(tools::ToString(fId, ".TransitionTo"), fStateMachine, sq); + State currentState = GetCurrentState(); while (s != currentState) { @@ -244,7 +239,7 @@ void FairMQDevice::TransitionTo(const fair::mq::State s) break; } - currentState = WaitForNextState(); + currentState = sq.WaitForNext(); } { diff --git a/fairmq/FairMQDevice.h b/fairmq/FairMQDevice.h index f98a8321..cfb08d2d 100644 --- a/fairmq/FairMQDevice.h +++ b/fairmq/FairMQDevice.h @@ -12,6 +12,7 @@ #include #include #include +#include #include #include @@ -498,8 +499,8 @@ class FairMQDevice void WaitForEndOfState(const fair::mq::Transition transition) __attribute__((deprecated("Use WaitForState(fair::mq::State expectedState)."))); void WaitForEndOfState(const std::string& transition) __attribute__((deprecated("Use WaitForState(fair::mq::State expectedState)."))) { WaitForState(transition); } - fair::mq::State WaitForNextState(); - void WaitForState(fair::mq::State state); + fair::mq::State WaitForNextState() { return fStateQueue.WaitForNext(); } + void WaitForState(fair::mq::State state) { fStateQueue.WaitForState(state); } void WaitForState(const std::string& state) { WaitForState(fair::mq::GetState(state)); } void TransitionTo(const fair::mq::State state); @@ -522,8 +523,6 @@ class FairMQDevice static std::string GetStateName(const fair::mq::State state) { return fair::mq::GetStateName(state); } static std::string GetTransitionName(const fair::mq::Transition transition) { return fair::mq::GetTransitionName(transition); } - struct DeviceStateError : std::runtime_error { using std::runtime_error::runtime_error; }; - static constexpr const char* DefaultId = ""; static constexpr int DefaultIOThreads = 1; static constexpr const char* DefaultTransportName = "zeromq"; @@ -589,9 +588,7 @@ class FairMQDevice int fInitializationTimeoutInS; std::vector fRawCmdLineArgs; - std::queue fStates; - std::mutex fStatesMtx; - std::condition_variable fStatesCV; + fair::mq::StateQueue fStateQueue; std::mutex fTransitionMtx; bool fTransitioning; diff --git a/fairmq/StateQueue.h b/fairmq/StateQueue.h new file mode 100644 index 00000000..b65dc295 --- /dev/null +++ b/fairmq/StateQueue.h @@ -0,0 +1,94 @@ +/******************************************************************************** + * Copyright (C) 2019 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ + +#ifndef FAIRMQSTATEQUEUE_H_ +#define FAIRMQSTATEQUEUE_H_ + +#include + +#include +#include +#include +#include // pair +#include + +namespace fair +{ +namespace mq +{ + +class StateQueue +{ + public: + StateQueue() {} + ~StateQueue() {} + + fair::mq::State WaitForNext() + { + std::unique_lock lock(fMtx); + while (fStates.empty()) { + fCV.wait_for(lock, std::chrono::milliseconds(50)); + } + + fair::mq::State state = fStates.front(); + + if (state == fair::mq::State::Error) { + throw DeviceErrorState("Controlled device transitioned to error state."); + } + + fStates.pop(); + return state; + } + + template + std::pair WaitForNext(std::chrono::duration const& duration) + { + std::unique_lock lock(fMtx); + fCV.wait_for(lock, duration); + + if (fStates.empty()) { + return { false, fair::mq::State::Ok }; + } + + fair::mq::State state = fStates.front(); + + if (state == fair::mq::State::Error) { + throw DeviceErrorState("Controlled device transitioned to error state."); + } + + fStates.pop(); + return { true, state }; + } + + void WaitForState(fair::mq::State state) { while (WaitForNext() != state) {} } + + void Push(fair::mq::State state) + { + { + std::lock_guard lock(fMtx); + fStates.push(state); + } + fCV.notify_all(); + } + + void Clear() + { + std::lock_guard lock(fMtx); + fStates = std::queue(); + } + + private: + std::queue fStates; + std::mutex fMtx; + std::condition_variable fCV; +}; + +} // namespace mq +} // namespace fair + +#endif /* FAIRMQSTATEQUEUE_H_ */ \ No newline at end of file diff --git a/fairmq/States.h b/fairmq/States.h index 9421bc0f..484abc85 100644 --- a/fairmq/States.h +++ b/fairmq/States.h @@ -11,6 +11,7 @@ #include #include +#include namespace fair { @@ -57,6 +58,8 @@ std::string GetTransitionName(const Transition); State GetState(const std::string& state); Transition GetTransition(const std::string& transition); +struct DeviceErrorState : std::runtime_error { using std::runtime_error::runtime_error; }; + inline std::ostream& operator<<(std::ostream& os, const State& state) { return os << GetStateName(state); } inline std::ostream& operator<<(std::ostream& os, const Transition& transition) { return os << GetTransitionName(transition); } diff --git a/fairmq/plugins/Control.cxx b/fairmq/plugins/Control.cxx index 2c86024e..3580ccfd 100644 --- a/fairmq/plugins/Control.cxx +++ b/fairmq/plugins/Control.cxx @@ -46,10 +46,7 @@ Control::Control(const string& name, const Plugin::Version version, const string : Plugin(name, version, maintainer, homepage, pluginServices) , fControllerThread() , fSignalHandlerThread() - , fEvents() - , fEventsMutex() , fControllerMutex() - , fNewEvent() , fDeviceShutdownRequested(false) , fDeviceHasShutdown(false) , fPluginShutdownRequested(false) @@ -57,16 +54,11 @@ Control::Control(const string& name, const Plugin::Version version, const string SubscribeToDeviceStateChange([&](DeviceState newState) { LOG(trace) << "control plugin notified on new state: " << newState; - { - lock_guard lock{fEventsMutex}; - fEvents.push(newState); - } - fNewEvent.notify_one(); + fStateQueue.Push(newState); if (newState == DeviceState::Error) { fPluginShutdownRequested = true; fDeviceShutdownRequested = true; - // throw DeviceErrorState("Controlled device transitioned to error state."); } }); @@ -103,36 +95,17 @@ Control::Control(const string& name, const Plugin::Version version, const string auto Control::RunStartupSequence() -> void { ChangeDeviceState(DeviceStateTransition::InitDevice); - while (WaitForNextState() != DeviceState::InitializingDevice) {} + while (fStateQueue.WaitForNext() != DeviceState::InitializingDevice) {} ChangeDeviceState(DeviceStateTransition::CompleteInit); - while (WaitForNextState() != DeviceState::Initialized) {} + while (fStateQueue.WaitForNext() != DeviceState::Initialized) {} ChangeDeviceState(DeviceStateTransition::Bind); - while (WaitForNextState() != DeviceState::Bound) {} + while (fStateQueue.WaitForNext() != DeviceState::Bound) {} ChangeDeviceState(DeviceStateTransition::Connect); - while (WaitForNextState() != DeviceState::DeviceReady) {} + while (fStateQueue.WaitForNext() != DeviceState::DeviceReady) {} ChangeDeviceState(DeviceStateTransition::InitTask); - while (WaitForNextState() != DeviceState::Ready) {} + while (fStateQueue.WaitForNext() != DeviceState::Ready) {} ChangeDeviceState(DeviceStateTransition::Run); - while (WaitForNextState() != DeviceState::Running) {} -} - -auto Control::WaitForNextState() -> DeviceState -{ - unique_lock lock{fEventsMutex}; - while (fEvents.empty()) { - fNewEvent.wait_for(lock, chrono::milliseconds(50)); - } - - auto result = fEvents.front(); - - if (result == DeviceState::Error) { - ReleaseDeviceControl(); - throw DeviceErrorState("Controlled device transitioned to error state."); - } - - fEvents.pop(); - - return result; + while (fStateQueue.WaitForNext() != DeviceState::Running) {} } auto ControlPluginProgramOptions() -> Plugin::ProgOptions @@ -204,7 +177,7 @@ try { case 'i': cout << "\n --> [i] init device\n\n" << flush; if (ChangeDeviceState(DeviceStateTransition::InitDevice)) { - while (WaitForNextState() != DeviceState::InitializingDevice) {} + while (fStateQueue.WaitForNext() != DeviceState::InitializingDevice) {} ChangeDeviceState(DeviceStateTransition::CompleteInit); } break; @@ -274,7 +247,6 @@ try { } if (GetCurrentDeviceState() == DeviceState::Error) { - ReleaseDeviceControl(); throw DeviceErrorState("Controlled device transitioned to error state."); } @@ -288,6 +260,7 @@ try { // If we are here, it means another plugin has taken control. That's fine, just print the exception message and do nothing else. LOG(debug) << e.what(); } catch (DeviceErrorState&) { + ReleaseDeviceControl(); } auto Control::PrintInteractiveHelpColor() -> void @@ -397,15 +370,10 @@ try { { // Wait for next state, which is DeviceState::Ready, // or for device shutdown request (Ctrl-C) - unique_lock lock{fEventsMutex}; - while (fEvents.empty() && !fDeviceShutdownRequested) { - fNewEvent.wait_for(lock, chrono::milliseconds(50)); - } - - if (fEvents.front() == DeviceState::Error) { - ReleaseDeviceControl(); - throw DeviceErrorState("Controlled device transitioned to error state."); - } + pair result; + do { + result = fStateQueue.WaitForNext(chrono::milliseconds(50)); + } while (result.first == false && !fDeviceShutdownRequested); } RunShutdownSequence(); @@ -413,6 +381,7 @@ try { // If we are here, it means another plugin has taken control. That's fine, just print the exception message and do nothing else. LOG(debug) << e.what(); } catch (DeviceErrorState&) { + ReleaseDeviceControl(); } auto Control::SignalHandler() -> void @@ -440,6 +409,7 @@ auto Control::SignalHandler() -> void } catch (PluginServices::DeviceControlError& e) { LOG(info) << "Graceful device shutdown failed: " << e.what() << " If hanging, hit Ctrl-C again to abort immediately."; } catch (...) { + ReleaseDeviceControl(); LOG(info) << "Graceful device shutdown failed. If hanging, hit Ctrl-C again to abort immediately."; } } @@ -450,7 +420,7 @@ auto Control::RunShutdownSequence() -> void { auto nextState = GetCurrentDeviceState(); if (nextState != DeviceState::Error) { - EmptyEventQueue(); + fStateQueue.Clear(); } while (nextState != DeviceState::Exiting && nextState != DeviceState::Error) { switch (nextState) { @@ -473,19 +443,13 @@ auto Control::RunShutdownSequence() -> void break; } - nextState = WaitForNextState(); + nextState = fStateQueue.WaitForNext(); } fDeviceHasShutdown = true; ReleaseDeviceControl(); } -auto Control::EmptyEventQueue() -> void -{ - lock_guard lock{fEventsMutex}; - fEvents = queue{}; -} - Control::~Control() { // Notify threads to exit diff --git a/fairmq/plugins/Control.h b/fairmq/plugins/Control.h index 08ec56cd..9d0695dc 100644 --- a/fairmq/plugins/Control.h +++ b/fairmq/plugins/Control.h @@ -11,6 +11,7 @@ #include #include +#include #include #include @@ -41,23 +42,17 @@ class Control : public Plugin static auto PrintStateMachineColor() -> void; static auto PrintStateMachine() -> void; auto StaticMode() -> void; - auto WaitForNextState() -> DeviceState; auto SignalHandler() -> void; auto RunShutdownSequence() -> void; auto RunStartupSequence() -> void; - auto EmptyEventQueue() -> void; std::thread fControllerThread; std::thread fSignalHandlerThread; - std::queue fEvents; - std::mutex fEventsMutex; std::mutex fControllerMutex; - std::condition_variable fNewEvent; std::atomic fDeviceShutdownRequested; std::atomic fDeviceHasShutdown; std::atomic fPluginShutdownRequested; - - struct DeviceErrorState : std::runtime_error { using std::runtime_error::runtime_error; }; + fair::mq::StateQueue fStateQueue; }; /* class Control */ auto ControlPluginProgramOptions() -> Plugin::ProgOptions; diff --git a/fairmq/plugins/DDS/DDS.cxx b/fairmq/plugins/DDS/DDS.cxx index 8d11da6b..8648838a 100644 --- a/fairmq/plugins/DDS/DDS.cxx +++ b/fairmq/plugins/DDS/DDS.cxx @@ -42,9 +42,6 @@ DDS::DDS(const string& name, const Plugin::Version version, const string& mainta , fStopCondition() , fTransitions({ "BIND", "CONNECT", "INIT TASK", "RUN", "STOP", "RESET TASK", "RESET DEVICE" }) , fControllerThread() - , fEvents() - , fEventsMutex() - , fNewEvent() , fCurrentState(DeviceState::Idle) , fLastState(DeviceState::Idle) , fDeviceTerminationRequested(false) @@ -86,11 +83,7 @@ auto DDS::HandleControl() -> void // subscribe to device state changes, pushing new state changes into the event queue SubscribeToDeviceStateChange([&](DeviceState newState) { - { - lock_guard lock{fEventsMutex}; - fEvents.push(newState); - } - fNewEvent.notify_one(); + fStateQueue.Push(newState); if (newState == DeviceState::Exiting) { fDeviceTerminationRequested = true; } @@ -108,11 +101,11 @@ auto DDS::HandleControl() -> void }); ChangeDeviceState(DeviceStateTransition::InitDevice); - while (WaitForNextState() != DeviceState::InitializingDevice) {} + while (fStateQueue.WaitForNext() != DeviceState::InitializingDevice) {} ChangeDeviceState(DeviceStateTransition::CompleteInit); - while (WaitForNextState() != DeviceState::Initialized) {} + while (fStateQueue.WaitForNext() != DeviceState::Initialized) {} ChangeDeviceState(DeviceStateTransition::Bind); - while (WaitForNextState() != DeviceState::Bound) {} + while (fStateQueue.WaitForNext() != DeviceState::Bound) {} // in the Initializing state subscribe to receive addresses of connecting channels from DDS // and propagate addresses of bound channels to DDS. @@ -126,10 +119,10 @@ auto DDS::HandleControl() -> void PublishBoundChannels(); ChangeDeviceState(DeviceStateTransition::Connect); - while (WaitForNextState() != DeviceState::DeviceReady) {} + while (fStateQueue.WaitForNext() != DeviceState::DeviceReady) {} ChangeDeviceState(DeviceStateTransition::InitTask); - while (WaitForNextState() != DeviceState::Ready) {} + while (fStateQueue.WaitForNext() != DeviceState::Ready) {} ChangeDeviceState(DeviceStateTransition::Run); // wait until stop signal @@ -138,6 +131,8 @@ auto DDS::HandleControl() -> void fStopCondition.wait_for(lock, chrono::seconds(1)); } LOG(debug) << "Stopping DDS control plugin"; + } catch (DeviceErrorState&) { + ReleaseDeviceControl(); } catch (exception& e) { LOG(error) << "Error: " << e.what() << endl; return; @@ -321,7 +316,7 @@ auto DDS::SubscribeForCustomCommands() -> void } else if (cmd == "INIT DEVICE") { if (ChangeDeviceState(ToDeviceStateTransition(cmd))) { fDDSCustomCmd.send(id + ": queued " + cmd + " transition", to_string(senderId)); - while (WaitForNextState() != DeviceState::InitializingDevice) {} + while (fStateQueue.WaitForNext() != DeviceState::InitializingDevice) {} ChangeDeviceState(DeviceStateTransition::CompleteInit); } else { fDDSCustomCmd.send(id + ": could not queue " + cmd + " transition", to_string(senderId)); @@ -391,18 +386,6 @@ auto DDS::SubscribeForCustomCommands() -> void }); } -auto DDS::WaitForNextState() -> DeviceState -{ - unique_lock lock{fEventsMutex}; - while (fEvents.empty()) { - fNewEvent.wait_for(lock, chrono::milliseconds(50)); - } - - auto result = fEvents.front(); - fEvents.pop(); - return result; -} - DDS::~DDS() { if (fControllerThread.joinable()) { diff --git a/fairmq/plugins/DDS/DDS.h b/fairmq/plugins/DDS/DDS.h index 9f2eb91b..136a5e2c 100644 --- a/fairmq/plugins/DDS/DDS.h +++ b/fairmq/plugins/DDS/DDS.h @@ -11,6 +11,7 @@ #include #include +#include #include @@ -67,7 +68,6 @@ class DDS : public Plugin private: auto HandleControl() -> void; - auto WaitForNextState() -> DeviceState; auto FillChannelContainers() -> void; auto SubscribeForConnectingChannels() -> void; @@ -92,10 +92,8 @@ class DDS : public Plugin const std::set fTransitions; std::thread fControllerThread; - std::queue fEvents; - std::mutex fEventsMutex; - std::condition_variable fNewEvent; DeviceState fCurrentState, fLastState; + fair::mq::StateQueue fStateQueue; std::atomic fDeviceTerminationRequested; std::atomic fServiceStarted;