diff --git a/fairmq/CMakeLists.txt b/fairmq/CMakeLists.txt index 473d1556..5d8dd3c6 100644 --- a/fairmq/CMakeLists.txt +++ b/fairmq/CMakeLists.txt @@ -95,6 +95,7 @@ set(FAIRMQ_HEADER_FILES plugins/Builtin.h plugins/Control.h runFairMQDevice.h + StateMachine.h shmem/FairMQMessageSHM.h shmem/FairMQPollerSHM.h shmem/FairMQRegionSHM.h @@ -155,6 +156,7 @@ set(FAIRMQ_SOURCE_FILES PluginManager.cxx PluginServices.cxx plugins/Control.cxx + StateMachine.cxx shmem/FairMQMessageSHM.cxx shmem/FairMQPollerSHM.cxx shmem/FairMQRegionSHM.cxx diff --git a/fairmq/EventManager.h b/fairmq/EventManager.h index 62045cbc..972db58e 100644 --- a/fairmq/EventManager.h +++ b/fairmq/EventManager.h @@ -95,7 +95,7 @@ class EventManager } template - auto Emit(typename E::KeyType& key, Args&&... args) const -> void + auto Emit(typename E::KeyType& key, Args... args) const -> void { const std::type_index event_type_index{typeid(E)}; const std::type_index callback_type_index{typeid(Callback)}; diff --git a/fairmq/StateMachine.cxx b/fairmq/StateMachine.cxx new file mode 100644 index 00000000..d7fbe3fe --- /dev/null +++ b/fairmq/StateMachine.cxx @@ -0,0 +1,195 @@ +/******************************************************************************** + * Copyright (C) 2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ + +#include "StateMachine.h" + +using namespace fair::mq; +using namespace std; + +const std::unordered_map StateMachine::fkStateStrMap = { + {"OK", State::Ok}, + {"ERROR", State::Error}, + {"IDLE", State::Idle}, + {"INITIALIZING DEVICE", State::InitializingDevice}, + {"DEVICE READY", State::DeviceReady}, + {"INITIALIZING TASK", State::InitializingTask}, + {"READY", State::Ready}, + {"RUNNING", State::Running}, + {"RESETTING TASK", State::ResettingTask}, + {"RESETTING DEVICE", State::ResettingDevice}, + {"EXITING", State::Exiting} +}; +const std::unordered_map> StateMachine::fkStrStateMap = { + {State::Ok, "OK"}, + {State::Error, "ERROR"}, + {State::Idle, "IDLE"}, + {State::InitializingDevice, "INITIALIZING DEVICE"}, + {State::DeviceReady, "DEVICE READY"}, + {State::InitializingTask, "INITIALIZING TASK"}, + {State::Ready, "READY"}, + {State::Running, "RUNNING"}, + {State::ResettingTask, "RESETTING TASK"}, + {State::ResettingDevice, "RESETTING DEVICE"}, + {State::Exiting, "EXITING"} +}; +const std::unordered_map StateMachine::fkStateTransitionStrMap = { + {"INIT DEVICE", StateTransition::InitDevice}, + {"INIT TASK", StateTransition::InitTask}, + {"RUN", StateTransition::Run}, + {"STOP", StateTransition::Stop}, + {"RESET TASK", StateTransition::ResetTask}, + {"RESET DEVICE", StateTransition::ResetDevice}, + {"END", StateTransition::End}, + {"ERROR FOUND", StateTransition::ErrorFound}, + {"AUTOMATIC", StateTransition::Automatic}, +}; +const std::unordered_map> StateMachine::fkStrStateTransitionMap = { + {StateTransition::InitDevice, "INIT DEVICE"}, + {StateTransition::InitTask, "INIT TASK"}, + {StateTransition::Run, "RUN"}, + {StateTransition::Stop, "STOP"}, + {StateTransition::ResetTask, "RESET TASK"}, + {StateTransition::ResetDevice, "RESET DEVICE"}, + {StateTransition::End, "END"}, + {StateTransition::ErrorFound, "ERROR FOUND"}, + {StateTransition::Automatic, "AUTOMATIC"}, +}; + +auto StateMachine::Run() -> void +{ + LOG(STATE) << "Starting FairMQ state machine"; + + LOG(DEBUG) << "Entering initial " << fErrorState << " state (orthogonal error state machine)"; + LOG(STATE) << "Entering initial " << fState << " state"; + + std::unique_lock lock{fMutex}; + while (true) + { + while (fNextStates.empty()) + { + fNewState.wait(lock); + } + + State lastState; + + if (fNextStates.front() == State::Error) + { + // advance error FSM + lastState = fErrorState; + fErrorState = fNextStates.front(); + fNextStates.pop_front(); + LOG(ERROR) << "Entering " << fErrorState << " state (orthogonal error state machine)"; + } + else + { + // advance regular FSM + lastState = fState; + fState = fNextStates.front(); + fNextStates.pop_front(); + LOG(STATE) << "Entering " << fState << " state"; + } + lock.unlock(); + + fCallbacks.Emit(fState, lastState); + + lock.lock(); + if (fState == State::Exiting || fErrorState == State::Error) break; + } + + LOG(STATE) << "Exiting FairMQ state machine"; +} + +auto StateMachine::ChangeState(StateTransition transition) -> void +{ + State lastState; + + std::unique_lock lock{fMutex}; + + if (transition == StateTransition::ErrorFound) + { + lastState = fErrorState; + } + else if (fNextStates.empty()) + { + lastState = fState; + } + else + { + lastState = fNextStates.back(); + } + + const State nextState{Transition(lastState, transition)}; + fNextStates.push_back(nextState); + lock.unlock(); + + fCallbacks.Emit(nextState, lastState); + fNewState.notify_one(); +} + +auto StateMachine::Transition(const State currentState, const StateTransition transition) -> State +{ + switch (currentState) { + case State::Idle: + if (transition == StateTransition::InitDevice ) return State::InitializingDevice; + if (transition == StateTransition::End ) return State::Exiting; + break; + case State::InitializingDevice: + if (transition == StateTransition::Automatic ) return State::DeviceReady; + break; + case State::DeviceReady: + if (transition == StateTransition::InitTask ) return State::InitializingTask; + if (transition == StateTransition::ResetDevice) return State::ResettingDevice; + break; + case State::InitializingTask: + if (transition == StateTransition::Automatic ) return State::Ready; + break; + case State::Ready: + if (transition == StateTransition::Run ) return State::Running; + if (transition == StateTransition::ResetTask ) return State::ResettingTask; + break; + case State::Running: + if (transition == StateTransition::Stop ) return State::Ready; + break; + case State::ResettingTask: + if (transition == StateTransition::Automatic ) return State::DeviceReady; + break; + case State::ResettingDevice: + if (transition == StateTransition::Automatic ) return State::Idle; + break; + case State::Exiting: + break; + case State::Ok: + if (transition == StateTransition::ErrorFound ) return State::Error; + break; + case State::Error: + break; + } + throw IllegalTransition{tools::ToString("No transition ", transition, " from state ", currentState, ".")}; +} + +StateMachine::StateMachine() +: fState{State::Idle} +, fErrorState{State::Ok} +{ +} + +auto StateMachine::Reset() -> void +{ + std::unique_lock lock{fMutex}; + + fState = State::Idle; + fErrorState = State::Ok; + fNextStates.clear(); +} + +auto StateMachine::NextStatePending() -> bool +{ + std::unique_lock lock{fMutex}; + + return fNextStates.size() > 0; +} diff --git a/fairmq/StateMachine.h b/fairmq/StateMachine.h new file mode 100644 index 00000000..ecc36e96 --- /dev/null +++ b/fairmq/StateMachine.h @@ -0,0 +1,132 @@ +/******************************************************************************** + * Copyright (C) 2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ + +#ifndef FAIR_MQ_STATEMACHINE_H +#define FAIR_MQ_STATEMACHINE_H + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace fair +{ +namespace mq +{ + +/** + * @class StateMachine StateMachine.h + * @brief Implements the state machine for FairMQ devices + * + * See https://github.com/FairRootGroup/FairRoot/blob/dev/fairmq/docs/Device.md#13-state-machine + */ +class StateMachine +{ + public: + enum class State : int + { + Ok, + Error, + Idle, + InitializingDevice, + DeviceReady, + InitializingTask, + Ready, + Running, + ResettingTask, + ResettingDevice, + Exiting + }; + + enum class StateTransition : int // transition event between States + { + InitDevice, + InitTask, + Run, + Stop, + ResetTask, + ResetDevice, + End, + ErrorFound, + Automatic + }; + + /// @brief Convert string to State + /// @param state to convert + /// @return State enum entry + /// @throw std::out_of_range if a string cannot be resolved to a State + static auto ToState(const std::string& state) -> State { return fkStateStrMap.at(state); } + + /// @brief Convert string to StateTransition + /// @param transition to convert + /// @return StateTransition enum entry + /// @throw std::out_of_range if a string cannot be resolved to a StateTransition + static auto ToStateTransition(const std::string& transition) -> StateTransition { return fkStateTransitionStrMap.at(transition); } + + /// @brief Convert State to string + /// @param state to convert + /// @return string representation of State enum entry + static auto ToStr(State state) -> std::string { return fkStrStateMap.at(state); } + + /// @brief Convert StateTransition to string + /// @param transition to convert + /// @return string representation of StateTransition enum entry + static auto ToStr(StateTransition transition) -> std::string { return fkStrStateTransitionMap.at(transition); } + + friend auto operator<<(std::ostream& os, const State& state) -> std::ostream& { return os << ToStr(state); } + friend auto operator<<(std::ostream& os, const StateTransition& transition) -> std::ostream& { return os << ToStr(transition); } + + StateMachine(); + + struct IllegalTransition : std::runtime_error { using std::runtime_error::runtime_error; }; + + struct StateChange : Event {}; + struct StateQueued : Event {}; + auto SubscribeToStateChange(const std::string& subscriber, std::function callback) -> void { fCallbacks.Subscribe(subscriber, callback); } + auto UnsubscribeFromStateChange(const std::string& subscriber) -> void { fCallbacks.Unsubscribe(subscriber); } + auto SubscribeToStateQueued(const std::string& subscriber, std::function callback) -> void { fCallbacks.Subscribe(subscriber, callback); } + auto UnsubscribeFromStateQueued(const std::string& subscriber) -> void { fCallbacks.Unsubscribe(subscriber); } + + auto GetCurrentState() const -> State { std::lock_guard lock{fMutex}; return fState; } + auto GetCurrentErrorState() const -> State { std::lock_guard lock{fMutex}; return fErrorState; } + auto GetLastQueuedState() const -> State { std::lock_guard lock{fMutex}; return fNextStates.back(); } + + auto ChangeState(StateTransition transition) -> void; + + auto Run() -> void; + auto Reset() -> void; + + auto NextStatePending() -> bool; + + private: + State fState; + State fErrorState; + std::deque fNextStates; + EventManager fCallbacks; + + static const std::unordered_map fkStateStrMap; + static const std::unordered_map> fkStrStateMap; + static const std::unordered_map fkStateTransitionStrMap; + static const std::unordered_map> fkStrStateTransitionMap; + + mutable std::mutex fMutex; + std::condition_variable fNewState; + + static auto Transition(const State currentState, const StateTransition transition) -> State; +}; /* class StateMachine */ + +} /* namespace mq */ +} /* namespace fair */ + +#endif /* FAIR_MQ_STATEMACHINE_H */ diff --git a/fairmq/test/CMakeLists.txt b/fairmq/test/CMakeLists.txt index a96828be..ad6adf8a 100644 --- a/fairmq/test/CMakeLists.txt +++ b/fairmq/test/CMakeLists.txt @@ -151,6 +151,15 @@ add_testsuite(FairMQ.EventManager TIMEOUT 10 ) +add_testsuite(FairMQ.StateMachine + SOURCES + state_machine/runner.cxx + state_machine/_state_machine.cxx + + LINKS FairMQ + TIMEOUT 10 +) + ############################## # Aggregate all test targets # ############################## diff --git a/fairmq/test/state_machine/_state_machine.cxx b/fairmq/test/state_machine/_state_machine.cxx new file mode 100644 index 00000000..8bbf3538 --- /dev/null +++ b/fairmq/test/state_machine/_state_machine.cxx @@ -0,0 +1,136 @@ +/******************************************************************************** + * Copyright (C) 2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ + +#include +#include +#include +#include +#include + +namespace +{ + +using namespace std; +using namespace fair::mq; +using S = StateMachine::State; +using T = StateMachine::StateTransition; + +TEST(StateMachine, RegularFSM) +{ + StateMachine fsm; + + ASSERT_FALSE(fsm.NextStatePending()); + + ASSERT_NO_THROW(fsm.ChangeState(T::InitDevice)); + ASSERT_THROW(fsm.ChangeState(T::InitDevice), StateMachine::IllegalTransition); + + ASSERT_NO_THROW(fsm.ChangeState(T::Automatic)); + ASSERT_NO_THROW(fsm.ChangeState(T::InitTask)); + ASSERT_NO_THROW(fsm.ChangeState(T::Automatic)); + ASSERT_NO_THROW(fsm.ChangeState(T::Run)); + ASSERT_NO_THROW(fsm.ChangeState(T::Stop)); + ASSERT_NO_THROW(fsm.ChangeState(T::ResetTask)); + ASSERT_NO_THROW(fsm.ChangeState(T::Automatic)); + + int cnt{0}; + fsm.SubscribeToStateQueued("test", [&](S newState, S lastState){ + ++cnt; + }); + + fsm.SubscribeToStateChange("test", [&](S newState, S lastState){ + if (newState == S::Idle && lastState == S::ResettingDevice) + ASSERT_NO_THROW(fsm.ChangeState(T::End)); + }); + + ASSERT_NO_THROW(fsm.ChangeState(T::ResetDevice)); + ASSERT_NO_THROW(fsm.ChangeState(T::Automatic)); + + fsm.UnsubscribeFromStateQueued("test"); + + ASSERT_TRUE(fsm.NextStatePending()); + + fsm.Run(); + + EXPECT_EQ(cnt, 2); +} + +TEST(StateMachine, ErrorFSM) +{ + StateMachine fsm; + + ASSERT_NO_THROW(fsm.ChangeState(T::InitDevice)); + ASSERT_NO_THROW(fsm.ChangeState(T::Automatic)); + ASSERT_NO_THROW(fsm.ChangeState(T::ErrorFound)); + + fsm.Run(); +} + +TEST(StateMachine, Reset) +{ + StateMachine fsm; + + ASSERT_NO_THROW(fsm.ChangeState(T::End)); + fsm.Run(); + + fsm.Reset(); + + ASSERT_NO_THROW(fsm.ChangeState(T::End)); + fsm.Run(); +} + +TEST(StateMachine, StateConversions) +{ + StateMachine fsm; + EXPECT_NO_THROW(fsm.ToState("OK")); + EXPECT_NO_THROW(fsm.ToState("ERROR")); + EXPECT_NO_THROW(fsm.ToState("IDLE")); + EXPECT_NO_THROW(fsm.ToState("INITIALIZING DEVICE")); + EXPECT_NO_THROW(fsm.ToState("DEVICE READY")); + EXPECT_NO_THROW(fsm.ToState("INITIALIZING TASK")); + EXPECT_NO_THROW(fsm.ToState("READY")); + EXPECT_NO_THROW(fsm.ToState("RUNNING")); + EXPECT_NO_THROW(fsm.ToState("RESETTING TASK")); + EXPECT_NO_THROW(fsm.ToState("RESETTING DEVICE")); + EXPECT_NO_THROW(fsm.ToState("EXITING")); + EXPECT_NO_THROW(fsm.ToStr(S::Ok)); + EXPECT_NO_THROW(fsm.ToStr(S::Error)); + EXPECT_NO_THROW(fsm.ToStr(S::Idle)); + EXPECT_NO_THROW(fsm.ToStr(S::InitializingDevice)); + EXPECT_NO_THROW(fsm.ToStr(S::DeviceReady)); + EXPECT_NO_THROW(fsm.ToStr(S::InitializingTask)); + EXPECT_NO_THROW(fsm.ToStr(S::Ready)); + EXPECT_NO_THROW(fsm.ToStr(S::Running)); + EXPECT_NO_THROW(fsm.ToStr(S::ResettingTask)); + EXPECT_NO_THROW(fsm.ToStr(S::ResettingDevice)); + EXPECT_NO_THROW(fsm.ToStr(S::Exiting)); +} + +TEST(StateMachine, StateTransitionConversions) +{ + StateMachine fsm; + EXPECT_NO_THROW(fsm.ToStateTransition("INIT DEVICE")); + EXPECT_NO_THROW(fsm.ToStateTransition("INIT TASK")); + EXPECT_NO_THROW(fsm.ToStateTransition("RUN")); + EXPECT_NO_THROW(fsm.ToStateTransition("STOP")); + EXPECT_NO_THROW(fsm.ToStateTransition("RESET TASK")); + EXPECT_NO_THROW(fsm.ToStateTransition("RESET DEVICE")); + EXPECT_NO_THROW(fsm.ToStateTransition("END")); + EXPECT_NO_THROW(fsm.ToStateTransition("ERROR FOUND")); + EXPECT_NO_THROW(fsm.ToStateTransition("AUTOMATIC")); + EXPECT_NO_THROW(fsm.ToStr(T::InitDevice)); + EXPECT_NO_THROW(fsm.ToStr(T::InitTask)); + EXPECT_NO_THROW(fsm.ToStr(T::Run)); + EXPECT_NO_THROW(fsm.ToStr(T::Stop)); + EXPECT_NO_THROW(fsm.ToStr(T::ResetTask)); + EXPECT_NO_THROW(fsm.ToStr(T::ResetDevice)); + EXPECT_NO_THROW(fsm.ToStr(T::End)); + EXPECT_NO_THROW(fsm.ToStr(T::ErrorFound)); + EXPECT_NO_THROW(fsm.ToStr(T::Automatic)); +} + +} // namespace diff --git a/fairmq/test/state_machine/runner.cxx b/fairmq/test/state_machine/runner.cxx new file mode 100644 index 00000000..5442845e --- /dev/null +++ b/fairmq/test/state_machine/runner.cxx @@ -0,0 +1,16 @@ +/******************************************************************************** + * Copyright (C) 2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ + +#include + +auto main(int argc, char** argv) -> int +{ + ::testing::InitGoogleTest(&argc, argv); + ::testing::FLAGS_gtest_death_test_style = "threadsafe"; + return RUN_ALL_TESTS(); +}