FairMQ: Rewrite the state machine

* Simplify the code
* Drop Boost.MSM dependency
* Drop threaded execution
* Support deferred state changes
* Monitor state changes/queues via callbacks
This commit is contained in:
Dennis Klein 2017-09-22 20:46:53 +02:00 committed by Mohammad Al-Turany
parent 179968db1e
commit 2589ca5ced
7 changed files with 491 additions and 1 deletions

View File

@ -95,6 +95,7 @@ set(FAIRMQ_HEADER_FILES
plugins/Builtin.h plugins/Builtin.h
plugins/Control.h plugins/Control.h
runFairMQDevice.h runFairMQDevice.h
StateMachine.h
shmem/FairMQMessageSHM.h shmem/FairMQMessageSHM.h
shmem/FairMQPollerSHM.h shmem/FairMQPollerSHM.h
shmem/FairMQRegionSHM.h shmem/FairMQRegionSHM.h
@ -155,6 +156,7 @@ set(FAIRMQ_SOURCE_FILES
PluginManager.cxx PluginManager.cxx
PluginServices.cxx PluginServices.cxx
plugins/Control.cxx plugins/Control.cxx
StateMachine.cxx
shmem/FairMQMessageSHM.cxx shmem/FairMQMessageSHM.cxx
shmem/FairMQPollerSHM.cxx shmem/FairMQPollerSHM.cxx
shmem/FairMQRegionSHM.cxx shmem/FairMQRegionSHM.cxx

View File

@ -95,7 +95,7 @@ class EventManager
} }
template<typename E, typename ...Args> template<typename E, typename ...Args>
auto Emit(typename E::KeyType& key, Args&&... args) const -> void auto Emit(typename E::KeyType& key, Args... args) const -> void
{ {
const std::type_index event_type_index{typeid(E)}; const std::type_index event_type_index{typeid(E)};
const std::type_index callback_type_index{typeid(Callback<E, Args...>)}; const std::type_index callback_type_index{typeid(Callback<E, Args...>)};

195
fairmq/StateMachine.cxx Normal file
View File

@ -0,0 +1,195 @@
/********************************************************************************
* Copyright (C) 2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "StateMachine.h"
using namespace fair::mq;
using namespace std;
const std::unordered_map<std::string, StateMachine::State> StateMachine::fkStateStrMap = {
{"OK", State::Ok},
{"ERROR", State::Error},
{"IDLE", State::Idle},
{"INITIALIZING DEVICE", State::InitializingDevice},
{"DEVICE READY", State::DeviceReady},
{"INITIALIZING TASK", State::InitializingTask},
{"READY", State::Ready},
{"RUNNING", State::Running},
{"RESETTING TASK", State::ResettingTask},
{"RESETTING DEVICE", State::ResettingDevice},
{"EXITING", State::Exiting}
};
const std::unordered_map<StateMachine::State, std::string, tools::HashEnum<StateMachine::State>> StateMachine::fkStrStateMap = {
{State::Ok, "OK"},
{State::Error, "ERROR"},
{State::Idle, "IDLE"},
{State::InitializingDevice, "INITIALIZING DEVICE"},
{State::DeviceReady, "DEVICE READY"},
{State::InitializingTask, "INITIALIZING TASK"},
{State::Ready, "READY"},
{State::Running, "RUNNING"},
{State::ResettingTask, "RESETTING TASK"},
{State::ResettingDevice, "RESETTING DEVICE"},
{State::Exiting, "EXITING"}
};
const std::unordered_map<std::string, StateMachine::StateTransition> StateMachine::fkStateTransitionStrMap = {
{"INIT DEVICE", StateTransition::InitDevice},
{"INIT TASK", StateTransition::InitTask},
{"RUN", StateTransition::Run},
{"STOP", StateTransition::Stop},
{"RESET TASK", StateTransition::ResetTask},
{"RESET DEVICE", StateTransition::ResetDevice},
{"END", StateTransition::End},
{"ERROR FOUND", StateTransition::ErrorFound},
{"AUTOMATIC", StateTransition::Automatic},
};
const std::unordered_map<StateMachine::StateTransition, std::string, tools::HashEnum<StateMachine::StateTransition>> StateMachine::fkStrStateTransitionMap = {
{StateTransition::InitDevice, "INIT DEVICE"},
{StateTransition::InitTask, "INIT TASK"},
{StateTransition::Run, "RUN"},
{StateTransition::Stop, "STOP"},
{StateTransition::ResetTask, "RESET TASK"},
{StateTransition::ResetDevice, "RESET DEVICE"},
{StateTransition::End, "END"},
{StateTransition::ErrorFound, "ERROR FOUND"},
{StateTransition::Automatic, "AUTOMATIC"},
};
auto StateMachine::Run() -> void
{
LOG(STATE) << "Starting FairMQ state machine";
LOG(DEBUG) << "Entering initial " << fErrorState << " state (orthogonal error state machine)";
LOG(STATE) << "Entering initial " << fState << " state";
std::unique_lock<std::mutex> lock{fMutex};
while (true)
{
while (fNextStates.empty())
{
fNewState.wait(lock);
}
State lastState;
if (fNextStates.front() == State::Error)
{
// advance error FSM
lastState = fErrorState;
fErrorState = fNextStates.front();
fNextStates.pop_front();
LOG(ERROR) << "Entering " << fErrorState << " state (orthogonal error state machine)";
}
else
{
// advance regular FSM
lastState = fState;
fState = fNextStates.front();
fNextStates.pop_front();
LOG(STATE) << "Entering " << fState << " state";
}
lock.unlock();
fCallbacks.Emit<StateChange, State>(fState, lastState);
lock.lock();
if (fState == State::Exiting || fErrorState == State::Error) break;
}
LOG(STATE) << "Exiting FairMQ state machine";
}
auto StateMachine::ChangeState(StateTransition transition) -> void
{
State lastState;
std::unique_lock<std::mutex> lock{fMutex};
if (transition == StateTransition::ErrorFound)
{
lastState = fErrorState;
}
else if (fNextStates.empty())
{
lastState = fState;
}
else
{
lastState = fNextStates.back();
}
const State nextState{Transition(lastState, transition)};
fNextStates.push_back(nextState);
lock.unlock();
fCallbacks.Emit<StateQueued, State>(nextState, lastState);
fNewState.notify_one();
}
auto StateMachine::Transition(const State currentState, const StateTransition transition) -> State
{
switch (currentState) {
case State::Idle:
if (transition == StateTransition::InitDevice ) return State::InitializingDevice;
if (transition == StateTransition::End ) return State::Exiting;
break;
case State::InitializingDevice:
if (transition == StateTransition::Automatic ) return State::DeviceReady;
break;
case State::DeviceReady:
if (transition == StateTransition::InitTask ) return State::InitializingTask;
if (transition == StateTransition::ResetDevice) return State::ResettingDevice;
break;
case State::InitializingTask:
if (transition == StateTransition::Automatic ) return State::Ready;
break;
case State::Ready:
if (transition == StateTransition::Run ) return State::Running;
if (transition == StateTransition::ResetTask ) return State::ResettingTask;
break;
case State::Running:
if (transition == StateTransition::Stop ) return State::Ready;
break;
case State::ResettingTask:
if (transition == StateTransition::Automatic ) return State::DeviceReady;
break;
case State::ResettingDevice:
if (transition == StateTransition::Automatic ) return State::Idle;
break;
case State::Exiting:
break;
case State::Ok:
if (transition == StateTransition::ErrorFound ) return State::Error;
break;
case State::Error:
break;
}
throw IllegalTransition{tools::ToString("No transition ", transition, " from state ", currentState, ".")};
}
StateMachine::StateMachine()
: fState{State::Idle}
, fErrorState{State::Ok}
{
}
auto StateMachine::Reset() -> void
{
std::unique_lock<std::mutex> lock{fMutex};
fState = State::Idle;
fErrorState = State::Ok;
fNextStates.clear();
}
auto StateMachine::NextStatePending() -> bool
{
std::unique_lock<std::mutex> lock{fMutex};
return fNextStates.size() > 0;
}

132
fairmq/StateMachine.h Normal file
View File

@ -0,0 +1,132 @@
/********************************************************************************
* Copyright (C) 2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#ifndef FAIR_MQ_STATEMACHINE_H
#define FAIR_MQ_STATEMACHINE_H
#include <utility>
#include <FairMQLogger.h>
#include <fairmq/Tools.h>
#include <fairmq/EventManager.h>
#include <deque>
#include <functional>
#include <mutex>
#include <condition_variable>
#include <thread>
#include <unordered_map>
namespace fair
{
namespace mq
{
/**
* @class StateMachine StateMachine.h <fairmq/StateMachine.h>
* @brief Implements the state machine for FairMQ devices
*
* See https://github.com/FairRootGroup/FairRoot/blob/dev/fairmq/docs/Device.md#13-state-machine
*/
class StateMachine
{
public:
enum class State : int
{
Ok,
Error,
Idle,
InitializingDevice,
DeviceReady,
InitializingTask,
Ready,
Running,
ResettingTask,
ResettingDevice,
Exiting
};
enum class StateTransition : int // transition event between States
{
InitDevice,
InitTask,
Run,
Stop,
ResetTask,
ResetDevice,
End,
ErrorFound,
Automatic
};
/// @brief Convert string to State
/// @param state to convert
/// @return State enum entry
/// @throw std::out_of_range if a string cannot be resolved to a State
static auto ToState(const std::string& state) -> State { return fkStateStrMap.at(state); }
/// @brief Convert string to StateTransition
/// @param transition to convert
/// @return StateTransition enum entry
/// @throw std::out_of_range if a string cannot be resolved to a StateTransition
static auto ToStateTransition(const std::string& transition) -> StateTransition { return fkStateTransitionStrMap.at(transition); }
/// @brief Convert State to string
/// @param state to convert
/// @return string representation of State enum entry
static auto ToStr(State state) -> std::string { return fkStrStateMap.at(state); }
/// @brief Convert StateTransition to string
/// @param transition to convert
/// @return string representation of StateTransition enum entry
static auto ToStr(StateTransition transition) -> std::string { return fkStrStateTransitionMap.at(transition); }
friend auto operator<<(std::ostream& os, const State& state) -> std::ostream& { return os << ToStr(state); }
friend auto operator<<(std::ostream& os, const StateTransition& transition) -> std::ostream& { return os << ToStr(transition); }
StateMachine();
struct IllegalTransition : std::runtime_error { using std::runtime_error::runtime_error; };
struct StateChange : Event<State> {};
struct StateQueued : Event<State> {};
auto SubscribeToStateChange(const std::string& subscriber, std::function<void(typename StateChange::KeyType newState, State lastState)> callback) -> void { fCallbacks.Subscribe<StateChange, State>(subscriber, callback); }
auto UnsubscribeFromStateChange(const std::string& subscriber) -> void { fCallbacks.Unsubscribe<StateChange, State>(subscriber); }
auto SubscribeToStateQueued(const std::string& subscriber, std::function<void(typename StateChange::KeyType newState, State lastState)> callback) -> void { fCallbacks.Subscribe<StateQueued, State>(subscriber, callback); }
auto UnsubscribeFromStateQueued(const std::string& subscriber) -> void { fCallbacks.Unsubscribe<StateQueued, State>(subscriber); }
auto GetCurrentState() const -> State { std::lock_guard<std::mutex> lock{fMutex}; return fState; }
auto GetCurrentErrorState() const -> State { std::lock_guard<std::mutex> lock{fMutex}; return fErrorState; }
auto GetLastQueuedState() const -> State { std::lock_guard<std::mutex> lock{fMutex}; return fNextStates.back(); }
auto ChangeState(StateTransition transition) -> void;
auto Run() -> void;
auto Reset() -> void;
auto NextStatePending() -> bool;
private:
State fState;
State fErrorState;
std::deque<State> fNextStates;
EventManager fCallbacks;
static const std::unordered_map<std::string, State> fkStateStrMap;
static const std::unordered_map<State, std::string, tools::HashEnum<State>> fkStrStateMap;
static const std::unordered_map<std::string, StateTransition> fkStateTransitionStrMap;
static const std::unordered_map<StateTransition, std::string, tools::HashEnum<StateTransition>> fkStrStateTransitionMap;
mutable std::mutex fMutex;
std::condition_variable fNewState;
static auto Transition(const State currentState, const StateTransition transition) -> State;
}; /* class StateMachine */
} /* namespace mq */
} /* namespace fair */
#endif /* FAIR_MQ_STATEMACHINE_H */

View File

@ -151,6 +151,15 @@ add_testsuite(FairMQ.EventManager
TIMEOUT 10 TIMEOUT 10
) )
add_testsuite(FairMQ.StateMachine
SOURCES
state_machine/runner.cxx
state_machine/_state_machine.cxx
LINKS FairMQ
TIMEOUT 10
)
############################## ##############################
# Aggregate all test targets # # Aggregate all test targets #
############################## ##############################

View File

@ -0,0 +1,136 @@
/********************************************************************************
* Copyright (C) 2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <gtest/gtest.h>
#include <fairmq/StateMachine.h>
#include <FairMQLogger.h>
#include <string>
#include <thread>
namespace
{
using namespace std;
using namespace fair::mq;
using S = StateMachine::State;
using T = StateMachine::StateTransition;
TEST(StateMachine, RegularFSM)
{
StateMachine fsm;
ASSERT_FALSE(fsm.NextStatePending());
ASSERT_NO_THROW(fsm.ChangeState(T::InitDevice));
ASSERT_THROW(fsm.ChangeState(T::InitDevice), StateMachine::IllegalTransition);
ASSERT_NO_THROW(fsm.ChangeState(T::Automatic));
ASSERT_NO_THROW(fsm.ChangeState(T::InitTask));
ASSERT_NO_THROW(fsm.ChangeState(T::Automatic));
ASSERT_NO_THROW(fsm.ChangeState(T::Run));
ASSERT_NO_THROW(fsm.ChangeState(T::Stop));
ASSERT_NO_THROW(fsm.ChangeState(T::ResetTask));
ASSERT_NO_THROW(fsm.ChangeState(T::Automatic));
int cnt{0};
fsm.SubscribeToStateQueued("test", [&](S newState, S lastState){
++cnt;
});
fsm.SubscribeToStateChange("test", [&](S newState, S lastState){
if (newState == S::Idle && lastState == S::ResettingDevice)
ASSERT_NO_THROW(fsm.ChangeState(T::End));
});
ASSERT_NO_THROW(fsm.ChangeState(T::ResetDevice));
ASSERT_NO_THROW(fsm.ChangeState(T::Automatic));
fsm.UnsubscribeFromStateQueued("test");
ASSERT_TRUE(fsm.NextStatePending());
fsm.Run();
EXPECT_EQ(cnt, 2);
}
TEST(StateMachine, ErrorFSM)
{
StateMachine fsm;
ASSERT_NO_THROW(fsm.ChangeState(T::InitDevice));
ASSERT_NO_THROW(fsm.ChangeState(T::Automatic));
ASSERT_NO_THROW(fsm.ChangeState(T::ErrorFound));
fsm.Run();
}
TEST(StateMachine, Reset)
{
StateMachine fsm;
ASSERT_NO_THROW(fsm.ChangeState(T::End));
fsm.Run();
fsm.Reset();
ASSERT_NO_THROW(fsm.ChangeState(T::End));
fsm.Run();
}
TEST(StateMachine, StateConversions)
{
StateMachine fsm;
EXPECT_NO_THROW(fsm.ToState("OK"));
EXPECT_NO_THROW(fsm.ToState("ERROR"));
EXPECT_NO_THROW(fsm.ToState("IDLE"));
EXPECT_NO_THROW(fsm.ToState("INITIALIZING DEVICE"));
EXPECT_NO_THROW(fsm.ToState("DEVICE READY"));
EXPECT_NO_THROW(fsm.ToState("INITIALIZING TASK"));
EXPECT_NO_THROW(fsm.ToState("READY"));
EXPECT_NO_THROW(fsm.ToState("RUNNING"));
EXPECT_NO_THROW(fsm.ToState("RESETTING TASK"));
EXPECT_NO_THROW(fsm.ToState("RESETTING DEVICE"));
EXPECT_NO_THROW(fsm.ToState("EXITING"));
EXPECT_NO_THROW(fsm.ToStr(S::Ok));
EXPECT_NO_THROW(fsm.ToStr(S::Error));
EXPECT_NO_THROW(fsm.ToStr(S::Idle));
EXPECT_NO_THROW(fsm.ToStr(S::InitializingDevice));
EXPECT_NO_THROW(fsm.ToStr(S::DeviceReady));
EXPECT_NO_THROW(fsm.ToStr(S::InitializingTask));
EXPECT_NO_THROW(fsm.ToStr(S::Ready));
EXPECT_NO_THROW(fsm.ToStr(S::Running));
EXPECT_NO_THROW(fsm.ToStr(S::ResettingTask));
EXPECT_NO_THROW(fsm.ToStr(S::ResettingDevice));
EXPECT_NO_THROW(fsm.ToStr(S::Exiting));
}
TEST(StateMachine, StateTransitionConversions)
{
StateMachine fsm;
EXPECT_NO_THROW(fsm.ToStateTransition("INIT DEVICE"));
EXPECT_NO_THROW(fsm.ToStateTransition("INIT TASK"));
EXPECT_NO_THROW(fsm.ToStateTransition("RUN"));
EXPECT_NO_THROW(fsm.ToStateTransition("STOP"));
EXPECT_NO_THROW(fsm.ToStateTransition("RESET TASK"));
EXPECT_NO_THROW(fsm.ToStateTransition("RESET DEVICE"));
EXPECT_NO_THROW(fsm.ToStateTransition("END"));
EXPECT_NO_THROW(fsm.ToStateTransition("ERROR FOUND"));
EXPECT_NO_THROW(fsm.ToStateTransition("AUTOMATIC"));
EXPECT_NO_THROW(fsm.ToStr(T::InitDevice));
EXPECT_NO_THROW(fsm.ToStr(T::InitTask));
EXPECT_NO_THROW(fsm.ToStr(T::Run));
EXPECT_NO_THROW(fsm.ToStr(T::Stop));
EXPECT_NO_THROW(fsm.ToStr(T::ResetTask));
EXPECT_NO_THROW(fsm.ToStr(T::ResetDevice));
EXPECT_NO_THROW(fsm.ToStr(T::End));
EXPECT_NO_THROW(fsm.ToStr(T::ErrorFound));
EXPECT_NO_THROW(fsm.ToStr(T::Automatic));
}
} // namespace

View File

@ -0,0 +1,16 @@
/********************************************************************************
* Copyright (C) 2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <gtest/gtest.h>
auto main(int argc, char** argv) -> int
{
::testing::InitGoogleTest(&argc, argv);
::testing::FLAGS_gtest_death_test_style = "threadsafe";
return RUN_ALL_TESTS();
}