From a53ef79552086646b70bc99fd4d02266a3ab85d9 Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Thu, 14 Jun 2018 13:38:21 +0200 Subject: [PATCH] Run state handlers on the main thread (breaking change for control). --- fairmq/DeviceRunner.cxx | 14 +- fairmq/FairMQDevice.h | 2 + fairmq/FairMQStateMachine.cxx | 407 +++++++++++------------ fairmq/FairMQStateMachine.h | 6 +- fairmq/PluginManager.h | 5 + fairmq/PluginServices.h | 5 + test/device/_multiple_devices.cxx | 55 +-- test/plugin_services/Fixture.h | 16 +- test/plugins/_plugin.cxx | 17 +- test/plugins/_plugin_manager.cxx | 25 +- test/plugins/_plugin_manager_prelink.cxx | 2 +- 11 files changed, 284 insertions(+), 270 deletions(-) diff --git a/fairmq/DeviceRunner.cxx b/fairmq/DeviceRunner.cxx index 04779bc4..65553b01 100644 --- a/fairmq/DeviceRunner.cxx +++ b/fairmq/DeviceRunner.cxx @@ -13,11 +13,12 @@ using namespace fair::mq; DeviceRunner::DeviceRunner(int argc, char* const argv[]) -: fRawCmdLineArgs{tools::ToStrVector(argc, argv, false)} -, fPluginManager{PluginManager::MakeFromCommandLineOptions(fRawCmdLineArgs)} -, fDevice{nullptr} -{ -} + : fRawCmdLineArgs(tools::ToStrVector(argc, argv, false)) + , fPluginManager(PluginManager::MakeFromCommandLineOptions(fRawCmdLineArgs)) + , fConfig() + , fDevice(nullptr) + , fEvents() +{} auto DeviceRunner::Run() -> int { @@ -87,6 +88,9 @@ auto DeviceRunner::Run() -> int // Instantiate and run plugins fPluginManager->InstantiatePlugins(); + // Run the device + fDevice->RunStateMachine(); + // Wait for control plugin to release device control fPluginManager->WaitForPluginsToReleaseDeviceControl(); diff --git a/fairmq/FairMQDevice.h b/fairmq/FairMQDevice.h index 071cab02..1ae70ba7 100644 --- a/fairmq/FairMQDevice.h +++ b/fairmq/FairMQDevice.h @@ -416,6 +416,8 @@ class FairMQDevice : public FairMQStateMachine void SetRawCmdLineArgs(const std::vector& args) { fRawCmdLineArgs = args; } std::vector GetRawCmdLineArgs() const { return fRawCmdLineArgs; } + void RunStateMachine() { ProcessWork(); }; + protected: std::shared_ptr fTransportFactory; ///< Default transport factory std::unordered_map> fTransports; ///< Container for transports diff --git a/fairmq/FairMQStateMachine.cxx b/fairmq/FairMQStateMachine.cxx index c1f16a3b..3fa96fd4 100644 --- a/fairmq/FairMQStateMachine.cxx +++ b/fairmq/FairMQStateMachine.cxx @@ -13,6 +13,7 @@ */ #include "FairMQStateMachine.h" +#include // Increase maximum number of boost::msm states (default is 10) // This #define has to be before any msm header includes @@ -29,13 +30,20 @@ #include #include -#include #include +#include #include using namespace std; +using namespace boost::msm::front; -namespace msmf = boost::msm::front; +namespace std +{ + +template<> +struct hash : fair::mq::tools::HashEnum {}; + +} /* namespace std */ namespace fair { @@ -44,31 +52,105 @@ namespace mq namespace fsm { -// defining events for the boost MSM state machine -struct INIT_DEVICE_E { string name() const { return "INIT_DEVICE"; } }; -struct internal_DEVICE_READY_E { string name() const { return "internal_DEVICE_READY"; } }; -struct INIT_TASK_E { string name() const { return "INIT_TASK"; } }; -struct internal_READY_E { string name() const { return "internal_READY"; } }; -struct RUN_E { string name() const { return "RUN"; } }; -struct PAUSE_E { string name() const { return "PAUSE"; } }; -struct STOP_E { string name() const { return "STOP"; } }; -struct RESET_TASK_E { string name() const { return "RESET_TASK"; } }; -struct RESET_DEVICE_E { string name() const { return "RESET_DEVICE"; } }; -struct internal_IDLE_E { string name() const { return "internal_IDLE"; } }; -struct END_E { string name() const { return "END"; } }; -struct ERROR_FOUND_E { string name() const { return "ERROR_FOUND"; } }; +// list of FSM states +struct OK_FSM_STATE : public state<> { static string Name() { return "OK"; } static FairMQStateMachine::State Type() { return FairMQStateMachine::State::OK; } }; +struct ERROR_FSM_STATE : public terminate_state<> { static string Name() { return "ERROR"; } static FairMQStateMachine::State Type() { return FairMQStateMachine::State::Error; } }; -// deactivate the warning for non-virtual destructor thrown in the boost library -#if defined(__clang__) -_Pragma("clang diagnostic push") -_Pragma("clang diagnostic ignored \"-Wnon-virtual-dtor\"") -#elif defined(__GNUC__) || defined(__GNUG__) -_Pragma("GCC diagnostic push") -_Pragma("GCC diagnostic ignored \"-Wnon-virtual-dtor\"") -#endif +struct IDLE_FSM_STATE : public state<> { static string Name() { return "IDLE"; } static FairMQStateMachine::State Type() { return FairMQStateMachine::State::IDLE; } }; +struct INITIALIZING_DEVICE_FSM_STATE : public state<> { static string Name() { return "INITIALIZING_DEVICE"; } static FairMQStateMachine::State Type() { return FairMQStateMachine::State::INITIALIZING_DEVICE; } }; +struct DEVICE_READY_FSM_STATE : public state<> { static string Name() { return "DEVICE_READY"; } static FairMQStateMachine::State Type() { return FairMQStateMachine::State::DEVICE_READY; } }; +struct INITIALIZING_TASK_FSM_STATE : public state<> { static string Name() { return "INITIALIZING_TASK"; } static FairMQStateMachine::State Type() { return FairMQStateMachine::State::INITIALIZING_TASK; } }; +struct READY_FSM_STATE : public state<> { static string Name() { return "READY"; } static FairMQStateMachine::State Type() { return FairMQStateMachine::State::READY; } }; +struct RUNNING_FSM_STATE : public state<> { static string Name() { return "RUNNING"; } static FairMQStateMachine::State Type() { return FairMQStateMachine::State::RUNNING; } }; +struct PAUSED_FSM_STATE : public state<> { static string Name() { return "PAUSED"; } static FairMQStateMachine::State Type() { return FairMQStateMachine::State::PAUSED; } }; +struct RESETTING_TASK_FSM_STATE : public state<> { static string Name() { return "RESETTING_TASK"; } static FairMQStateMachine::State Type() { return FairMQStateMachine::State::RESETTING_TASK; } }; +struct RESETTING_DEVICE_FSM_STATE : public state<> { static string Name() { return "RESETTING_DEVICE"; } static FairMQStateMachine::State Type() { return FairMQStateMachine::State::RESETTING_DEVICE; } }; +struct EXITING_FSM_STATE : public state<> { static string Name() { return "EXITING"; } static FairMQStateMachine::State Type() { return FairMQStateMachine::State::EXITING; } }; + +// list of FSM events +struct INIT_DEVICE_FSM_EVENT { static string Name() { return "INIT_DEVICE"; } static FairMQStateMachine::Event Type() { return FairMQStateMachine::Event::INIT_DEVICE; } }; +struct internal_DEVICE_READY_FSM_EVENT { static string Name() { return "internal_DEVICE_READY"; } static FairMQStateMachine::Event Type() { return FairMQStateMachine::Event::internal_DEVICE_READY; } }; +struct INIT_TASK_FSM_EVENT { static string Name() { return "INIT_TASK"; } static FairMQStateMachine::Event Type() { return FairMQStateMachine::Event::INIT_TASK; } }; +struct internal_READY_FSM_EVENT { static string Name() { return "internal_READY"; } static FairMQStateMachine::Event Type() { return FairMQStateMachine::Event::internal_READY; } }; +struct RUN_FSM_EVENT { static string Name() { return "RUN"; } static FairMQStateMachine::Event Type() { return FairMQStateMachine::Event::RUN; } }; +struct PAUSE_FSM_EVENT { static string Name() { return "PAUSE"; } static FairMQStateMachine::Event Type() { return FairMQStateMachine::Event::PAUSE; } }; +struct STOP_FSM_EVENT { static string Name() { return "STOP"; } static FairMQStateMachine::Event Type() { return FairMQStateMachine::Event::STOP; } }; +struct RESET_TASK_FSM_EVENT { static string Name() { return "RESET_TASK"; } static FairMQStateMachine::Event Type() { return FairMQStateMachine::Event::RESET_TASK; } }; +struct RESET_DEVICE_FSM_EVENT { static string Name() { return "RESET_DEVICE"; } static FairMQStateMachine::Event Type() { return FairMQStateMachine::Event::RESET_DEVICE; } }; +struct internal_IDLE_FSM_EVENT { static string Name() { return "internal_IDLE"; } static FairMQStateMachine::Event Type() { return FairMQStateMachine::Event::internal_IDLE; } }; +struct END_FSM_EVENT { static string Name() { return "END"; } static FairMQStateMachine::Event Type() { return FairMQStateMachine::Event::END; } }; +struct ERROR_FOUND_FSM_EVENT { static string Name() { return "ERROR_FOUND"; } static FairMQStateMachine::Event Type() { return FairMQStateMachine::Event::ERROR_FOUND; } }; + +static array stateNames = +{ + { + "OK", + "Error", + "IDLE", + "INITIALIZING_DEVICE", + "DEVICE_READY", + "INITIALIZING_TASK", + "READY", + "RUNNING", + "PAUSED", + "RESETTING_TASK", + "RESETTING_DEVICE", + "EXITING" + } +}; + +static array eventNames = +{ + { + "INIT_DEVICE", + "internal_DEVICE_READY", + "INIT_TASK", + "internal_READY", + "RUN", + "PAUSE", + "STOP", + "RESET_TASK", + "RESET_DEVICE", + "internal_IDLE", + "END", + "ERROR_FOUND" + } +}; + +static map stateNumbers = +{ + { "OK", FairMQStateMachine::State::OK }, + { "Error", FairMQStateMachine::State::Error }, + { "IDLE", FairMQStateMachine::State::IDLE }, + { "INITIALIZING_DEVICE", FairMQStateMachine::State::INITIALIZING_DEVICE }, + { "DEVICE_READY", FairMQStateMachine::State::DEVICE_READY }, + { "INITIALIZING_TASK", FairMQStateMachine::State::INITIALIZING_TASK }, + { "READY", FairMQStateMachine::State::READY }, + { "RUNNING", FairMQStateMachine::State::RUNNING }, + { "PAUSED", FairMQStateMachine::State::PAUSED }, + { "RESETTING_TASK", FairMQStateMachine::State::RESETTING_TASK }, + { "RESETTING_DEVICE", FairMQStateMachine::State::RESETTING_DEVICE }, + { "EXITING", FairMQStateMachine::State::EXITING } +}; + +static map eventNumbers = +{ + { "INIT_DEVICE", FairMQStateMachine::Event::INIT_DEVICE }, + { "internal_DEVICE_READY", FairMQStateMachine::Event::internal_DEVICE_READY }, + { "INIT_TASK", FairMQStateMachine::Event::INIT_TASK }, + { "internal_READY", FairMQStateMachine::Event::internal_READY }, + { "RUN", FairMQStateMachine::Event::RUN }, + { "PAUSE", FairMQStateMachine::Event::PAUSE }, + { "STOP", FairMQStateMachine::Event::STOP }, + { "RESET_TASK", FairMQStateMachine::Event::RESET_TASK }, + { "RESET_DEVICE", FairMQStateMachine::Event::RESET_DEVICE }, + { "internal_IDLE", FairMQStateMachine::Event::internal_IDLE }, + { "END", FairMQStateMachine::Event::END }, + { "ERROR_FOUND", FairMQStateMachine::Event::ERROR_FOUND } +}; // defining the boost MSM state machine -struct Machine_ : public msmf::state_machine_def +struct Machine_ : public state_machine_def { public: Machine_() @@ -81,23 +163,22 @@ struct Machine_ : public msmf::state_machine_def , fWorkAvailable(false) , fStateChangeSignal() , fStateChangeSignalsMap() - , fTerminationRequested(false) , fState() - , fWorkerThread() {} virtual ~Machine_() {} + // initial states + using initial_state = boost::mpl::vector; + template void on_entry(Event const&, FSM& fsm) { LOG(state) << "Starting FairMQ state machine"; fState = FairMQStateMachine::IDLE; + LOG(state) << "Entering IDLE state"; fsm.CallStateChangeCallbacks(FairMQStateMachine::IDLE); - - // start a worker thread to execute user states in. - fsm.fWorkerThread = thread(&Machine_::Worker, &fsm); } template @@ -106,41 +187,23 @@ struct Machine_ : public msmf::state_machine_def LOG(state) << "Exiting FairMQ state machine"; } - // list of FSM states - struct OK_FSM : public msmf::state<> {}; - struct ERROR_FSM : public msmf::terminate_state<> {}; - - struct IDLE_FSM : public msmf::state<> {}; - struct INITIALIZING_DEVICE_FSM : public msmf::state<> {}; - struct DEVICE_READY_FSM : public msmf::state<> {}; - struct INITIALIZING_TASK_FSM : public msmf::state<> {}; - struct READY_FSM : public msmf::state<> {}; - struct RUNNING_FSM : public msmf::state<> {}; - struct PAUSED_FSM : public msmf::state<> {}; - struct RESETTING_TASK_FSM : public msmf::state<> {}; - struct RESETTING_DEVICE_FSM : public msmf::state<> {}; - struct EXITING_FSM : public msmf::state<> {}; - - // initial states - using initial_state = boost::mpl::vector; - // actions - struct IdleFct + struct AutomaticFct { template - void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) + void operator()(EVT const&, FSM& fsm, SourceState& /* ss */, TargetState& ts) { - LOG(state) << "Entering IDLE state"; - fsm.fState = FairMQStateMachine::IDLE; + fsm.fState = ts.Type(); + LOG(state) << "Entering " << ts.Name() << " state"; } }; struct InitDeviceFct { template - void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) + void operator()(EVT const&, FSM& fsm, SourceState& /* ss */, TargetState& ts) { - fsm.fState = FairMQStateMachine::INITIALIZING_DEVICE; + fsm.fState = ts.Type(); unique_lock lock(fsm.fWorkMutex); while (fsm.fWorkActive) @@ -148,28 +211,18 @@ struct Machine_ : public msmf::state_machine_def fsm.fWorkDoneCondition.wait(lock); } fsm.fWorkAvailable = true; - LOG(state) << "Entering INITIALIZING DEVICE state"; + LOG(state) << "Entering " << ts.Name() << " state"; fsm.fWork = fsm.fInitWrapperHandler; fsm.fWorkAvailableCondition.notify_one(); } }; - struct DeviceReadyFct - { - template - void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) - { - LOG(state) << "Entering DEVICE READY state"; - fsm.fState = FairMQStateMachine::DEVICE_READY; - } - }; - struct InitTaskFct { template - void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) + void operator()(EVT const&, FSM& fsm, SourceState& /* ss */, TargetState& ts) { - fsm.fState = FairMQStateMachine::INITIALIZING_TASK; + fsm.fState = ts.Type(); unique_lock lock(fsm.fWorkMutex); while (fsm.fWorkActive) @@ -177,28 +230,18 @@ struct Machine_ : public msmf::state_machine_def fsm.fWorkDoneCondition.wait(lock); } fsm.fWorkAvailable = true; - LOG(state) << "Entering INITIALIZING TASK state"; + LOG(state) << "Entering " << ts.Name() << " state"; fsm.fWork = fsm.fInitTaskWrapperHandler; fsm.fWorkAvailableCondition.notify_one(); } }; - struct ReadyFct - { - template - void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) - { - LOG(state) << "Entering READY state"; - fsm.fState = FairMQStateMachine::READY; - } - }; - struct RunFct { template - void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) + void operator()(EVT const&, FSM& fsm, SourceState& /* ss */, TargetState& ts) { - fsm.fState = FairMQStateMachine::RUNNING; + fsm.fState = ts.Type(); unique_lock lock(fsm.fWorkMutex); while (fsm.fWorkActive) @@ -206,7 +249,7 @@ struct Machine_ : public msmf::state_machine_def fsm.fWorkDoneCondition.wait(lock); } fsm.fWorkAvailable = true; - LOG(state) << "Entering RUNNING state"; + LOG(state) << "Entering " << ts.Name() << " state"; fsm.fWork = fsm.fRunWrapperHandler; fsm.fWorkAvailableCondition.notify_one(); } @@ -215,9 +258,9 @@ struct Machine_ : public msmf::state_machine_def struct PauseFct { template - void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) + void operator()(EVT const&, FSM& fsm, SourceState& /* ss */, TargetState& ts) { - fsm.fState = FairMQStateMachine::PAUSED; + fsm.fState = ts.Type(); fsm.fUnblockHandler(); unique_lock lock(fsm.fWorkMutex); @@ -226,37 +269,18 @@ struct Machine_ : public msmf::state_machine_def fsm.fWorkDoneCondition.wait(lock); } fsm.fWorkAvailable = true; - LOG(state) << "Entering PAUSED state"; + LOG(state) << "Entering " << ts.Name() << " state"; fsm.fWork = fsm.fPauseWrapperHandler; fsm.fWorkAvailableCondition.notify_one(); } }; - struct ResumeFct - { - template - void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) - { - fsm.fState = FairMQStateMachine::RUNNING; - - unique_lock lock(fsm.fWorkMutex); - while (fsm.fWorkActive) - { - fsm.fWorkDoneCondition.wait(lock); - } - fsm.fWorkAvailable = true; - LOG(state) << "Entering RUNNING state"; - fsm.fWork = fsm.fRunWrapperHandler; - fsm.fWorkAvailableCondition.notify_one(); - } - }; - struct StopFct { template - void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) + void operator()(EVT const&, FSM& fsm, SourceState& /* ss */, TargetState& ts) { - fsm.fState = FairMQStateMachine::READY; + fsm.fState = ts.Type(); fsm.fUnblockHandler(); unique_lock lock(fsm.fWorkMutex); @@ -264,27 +288,27 @@ struct Machine_ : public msmf::state_machine_def { fsm.fWorkDoneCondition.wait(lock); } - LOG(state) << "Entering READY state"; + LOG(state) << "Entering " << ts.Name() << " state"; } }; struct InternalStopFct { template - void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) + void operator()(EVT const&, FSM& fsm, SourceState& /* ss */, TargetState& ts) { - fsm.fState = FairMQStateMachine::READY; + fsm.fState = ts.Type(); fsm.fUnblockHandler(); - LOG(state) << "RUNNING state finished without an external event, entering READY state"; + LOG(state) << "RUNNING state finished without an external event, entering " << ts.Name() << " state"; } }; struct ResetTaskFct { template - void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) + void operator()(EVT const&, FSM& fsm, SourceState& /* ss */, TargetState& ts) { - fsm.fState = FairMQStateMachine::RESETTING_TASK; + fsm.fState = ts.Type(); unique_lock lock(fsm.fWorkMutex); while (fsm.fWorkActive) @@ -292,7 +316,7 @@ struct Machine_ : public msmf::state_machine_def fsm.fWorkDoneCondition.wait(lock); } fsm.fWorkAvailable = true; - LOG(state) << "Entering RESETTING TASK state"; + LOG(state) << "Entering " << ts.Name() << " state"; fsm.fWork = fsm.fResetTaskWrapperHandler; fsm.fWorkAvailableCondition.notify_one(); } @@ -301,9 +325,9 @@ struct Machine_ : public msmf::state_machine_def struct ResetDeviceFct { template - void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) + void operator()(EVT const&, FSM& fsm, SourceState& /* ss */, TargetState& ts) { - fsm.fState = FairMQStateMachine::RESETTING_DEVICE; + fsm.fState = ts.Type(); unique_lock lock(fsm.fWorkMutex); while (fsm.fWorkActive) @@ -311,7 +335,7 @@ struct Machine_ : public msmf::state_machine_def fsm.fWorkDoneCondition.wait(lock); } fsm.fWorkAvailable = true; - LOG(state) << "Entering RESETTING DEVICE state"; + LOG(state) << "Entering " << ts.Name() << " state"; fsm.fWork = fsm.fResetWrapperHandler; fsm.fWorkAvailableCondition.notify_one(); } @@ -320,26 +344,19 @@ struct Machine_ : public msmf::state_machine_def struct ExitingFct { template - void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) + void operator()(EVT const&, FSM& fsm, SourceState& /* ss */, TargetState& ts) { - LOG(state) << "Entering EXITING state"; - fsm.fState = FairMQStateMachine::EXITING; - fsm.fTerminationRequested = true; + LOG(state) << "Entering " << ts.Name() << " state"; + fsm.fState = ts.Type(); fsm.CallStateChangeCallbacks(FairMQStateMachine::EXITING); - // terminate worker thread + // Stop ProcessWork() { lock_guard lock(fsm.fWorkMutex); fsm.fWorkerTerminated = true; fsm.fWorkAvailableCondition.notify_one(); } - // join the worker thread (executing user states) - if (fsm.fWorkerThread.joinable()) - { - fsm.fWorkerThread.join(); - } - fsm.fExitHandler(); } }; @@ -347,32 +364,32 @@ struct Machine_ : public msmf::state_machine_def struct ErrorFoundFct { template - void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) + void operator()(EVT const&, FSM& fsm, SourceState& /* ss */, TargetState& ts) { - LOG(state) << "Entering ERROR state"; - fsm.fState = FairMQStateMachine::Error; + fsm.fState = ts.Type(); + LOG(state) << "Entering " << ts.Name() << " state"; fsm.CallStateChangeCallbacks(FairMQStateMachine::Error); } }; // Transition table for Machine_ struct transition_table : boost::mpl::vector< - // Start Event Next Action Guard - msmf::Row, - msmf::Row, - msmf::Row, - msmf::Row, - msmf::Row, - msmf::Row, - msmf::Row, - msmf::Row, - msmf::Row, - msmf::Row, - msmf::Row, - msmf::Row, - msmf::Row, - msmf::Row, - msmf::Row> + // Start Event Next Action Guard + Row, + Row, + Row, + Row, + Row, + Row, + Row, + Row, + Row, + Row, + Row, + Row, + Row, + Row, + Row> {}; // replaces the default no-transition response. @@ -391,45 +408,12 @@ struct Machine_ : public msmf::state_machine_def if (pos != string::npos) { stateName = stateName.substr(pos + 1); - stateName = stateName.substr(0, stateName.size() - 4); + stateName = stateName.substr(0, stateName.size() - 10); } if (stateName != "OK") { - LOG(state) << "No transition from state " << stateName << " on event " << e.name(); - } - } - - static string GetStateName(const int state) - { - switch(state) - { - case FairMQStateMachine::OK: - return "OK"; - case FairMQStateMachine::Error: - return "Error"; - case FairMQStateMachine::IDLE: - return "IDLE"; - case FairMQStateMachine::INITIALIZING_DEVICE: - return "INITIALIZING_DEVICE"; - case FairMQStateMachine::DEVICE_READY: - return "DEVICE_READY"; - case FairMQStateMachine::INITIALIZING_TASK: - return "INITIALIZING_TASK"; - case FairMQStateMachine::READY: - return "READY"; - case FairMQStateMachine::RUNNING: - return "RUNNING"; - case FairMQStateMachine::PAUSED: - return "PAUSED"; - case FairMQStateMachine::RESETTING_TASK: - return "RESETTING_TASK"; - case FairMQStateMachine::RESETTING_DEVICE: - return "RESETTING_DEVICE"; - case FairMQStateMachine::EXITING: - return "EXITING"; - default: - return "requested name for non-existent state..."; + LOG(state) << "No transition from state " << stateName << " on event " << e.Name(); } } @@ -461,12 +445,10 @@ struct Machine_ : public msmf::state_machine_def boost::signals2::signal fStateChangeSignal; unordered_map fStateChangeSignalsMap; - atomic fTerminationRequested; atomic fState; - private: - void Worker() + void ProcessWork() { while (true) { @@ -475,7 +457,7 @@ struct Machine_ : public msmf::state_machine_def // Wait for work to be done. while (!fWorkAvailable && !fWorkerTerminated) { - fWorkAvailableCondition.wait(lock); + fWorkAvailableCondition.wait_for(lock, chrono::milliseconds(300)); } if (fWorkerTerminated) @@ -497,20 +479,10 @@ struct Machine_ : public msmf::state_machine_def CallStateChangeCallbacks(fState); } } - - // run state handlers in a separate thread - thread fWorkerThread; }; // Machine_ using FairMQFSM = boost::msm::back::state_machine; -// reactivate the warning for non-virtual destructor -#if defined(__clang__) -_Pragma("clang diagnostic pop") -#elif defined(__GNUC__) || defined(__GNUG__) -_Pragma("GCC diagnostic pop") -#endif - } // namespace fsm } // namespace mq } // namespace fair @@ -552,79 +524,79 @@ bool FairMQStateMachine::ChangeState(int event) case INIT_DEVICE: { lock_guard lock(fChangeStateMutex); - static_pointer_cast(fFsm)->process_event(INIT_DEVICE_E()); + static_pointer_cast(fFsm)->process_event(INIT_DEVICE_FSM_EVENT()); return true; } case internal_DEVICE_READY: { lock_guard lock(fChangeStateMutex); - static_pointer_cast(fFsm)->process_event(internal_DEVICE_READY_E()); + static_pointer_cast(fFsm)->process_event(internal_DEVICE_READY_FSM_EVENT()); return true; } case INIT_TASK: { lock_guard lock(fChangeStateMutex); - static_pointer_cast(fFsm)->process_event(INIT_TASK_E()); + static_pointer_cast(fFsm)->process_event(INIT_TASK_FSM_EVENT()); return true; } case internal_READY: { lock_guard lock(fChangeStateMutex); - static_pointer_cast(fFsm)->process_event(internal_READY_E()); + static_pointer_cast(fFsm)->process_event(internal_READY_FSM_EVENT()); return true; } case RUN: { lock_guard lock(fChangeStateMutex); - static_pointer_cast(fFsm)->process_event(RUN_E()); + static_pointer_cast(fFsm)->process_event(RUN_FSM_EVENT()); return true; } case PAUSE: { lock_guard lock(fChangeStateMutex); - static_pointer_cast(fFsm)->process_event(PAUSE_E()); + static_pointer_cast(fFsm)->process_event(PAUSE_FSM_EVENT()); return true; } case STOP: { lock_guard lock(fChangeStateMutex); - static_pointer_cast(fFsm)->process_event(STOP_E()); + static_pointer_cast(fFsm)->process_event(STOP_FSM_EVENT()); return true; } case RESET_DEVICE: { lock_guard lock(fChangeStateMutex); - static_pointer_cast(fFsm)->process_event(RESET_DEVICE_E()); + static_pointer_cast(fFsm)->process_event(RESET_DEVICE_FSM_EVENT()); return true; } case RESET_TASK: { lock_guard lock(fChangeStateMutex); - static_pointer_cast(fFsm)->process_event(RESET_TASK_E()); + static_pointer_cast(fFsm)->process_event(RESET_TASK_FSM_EVENT()); return true; } case internal_IDLE: { lock_guard lock(fChangeStateMutex); - static_pointer_cast(fFsm)->process_event(internal_IDLE_E()); + static_pointer_cast(fFsm)->process_event(internal_IDLE_FSM_EVENT()); return true; } case END: { lock_guard lock(fChangeStateMutex); - static_pointer_cast(fFsm)->process_event(END_E()); + static_pointer_cast(fFsm)->process_event(END_FSM_EVENT()); return true; } case ERROR_FOUND: { lock_guard lock(fChangeStateMutex); - static_pointer_cast(fFsm)->process_event(ERROR_FOUND_E()); + static_pointer_cast(fFsm)->process_event(ERROR_FOUND_FSM_EVENT()); return true; } default: { LOG(error) << "Requested state transition with an unsupported event: " << event << endl - << "Supported are: INIT_DEVICE, INIT_TASK, RUN, PAUSE, STOP, RESET_TASK, RESET_DEVICE, END, ERROR_FOUND"; + << "Supported are: INIT_DEVICE, INIT_TASK, RUN, PAUSE, STOP, RESET_TASK, RESET_DEVICE, END, ERROR_FOUND"; return false; } } @@ -738,7 +710,11 @@ void FairMQStateMachine::CallStateChangeCallbacks(const State state) const string FairMQStateMachine::GetCurrentStateName() const { - return static_pointer_cast(fFsm)->GetStateName(static_pointer_cast(fFsm)->fState); + return GetStateName(static_pointer_cast(fFsm)->fState); +} +string FairMQStateMachine::GetStateName(const State state) +{ + return stateNames.at(state); } int FairMQStateMachine::GetCurrentState() const { @@ -753,23 +729,12 @@ bool FairMQStateMachine::CheckCurrentState(string state) const return state == GetCurrentStateName(); } -bool FairMQStateMachine::Terminated() +void FairMQStateMachine::ProcessWork() { - return static_pointer_cast(fFsm)->fTerminationRequested; + static_pointer_cast(fFsm)->ProcessWork(); } int FairMQStateMachine::GetEventNumber(const string& event) { - if (event == "INIT_DEVICE") return INIT_DEVICE; - if (event == "INIT_TASK") return INIT_TASK; - if (event == "RUN") return RUN; - if (event == "PAUSE") return PAUSE; - if (event == "STOP") return STOP; - if (event == "RESET_DEVICE") return RESET_DEVICE; - if (event == "RESET_TASK") return RESET_TASK; - if (event == "END") return END; - if (event == "ERROR_FOUND") return ERROR_FOUND; - LOG(error) << "Requested number for non-existent event... " << event << endl - << "Supported are: INIT_DEVICE, INIT_TASK, RUN, PAUSE, STOP, RESET_DEVICE, RESET_TASK, END, ERROR_FOUND"; - return -1; + return eventNumbers.at(event); } diff --git a/fairmq/FairMQStateMachine.h b/fairmq/FairMQStateMachine.h index d7f90bfd..2c149e43 100644 --- a/fairmq/FairMQStateMachine.h +++ b/fairmq/FairMQStateMachine.h @@ -79,10 +79,10 @@ class FairMQStateMachine void CallStateChangeCallbacks(const State state) const; std::string GetCurrentStateName() const; + static std::string GetStateName(const State); int GetCurrentState() const; bool CheckCurrentState(int state) const; bool CheckCurrentState(std::string state) const; - bool Terminated(); // actions to be overwritten by derived classes virtual void InitWrapper() {} @@ -94,8 +94,10 @@ class FairMQStateMachine virtual void Exit() {} virtual void Unblock() {} + void ProcessWork(); + private: - int GetEventNumber(const std::string& event); + static int GetEventNumber(const std::string& event); std::mutex fChangeStateMutex; diff --git a/fairmq/PluginManager.h b/fairmq/PluginManager.h index 8f361d01..36a65ad1 100644 --- a/fairmq/PluginManager.h +++ b/fairmq/PluginManager.h @@ -51,6 +51,11 @@ class PluginManager PluginManager(); + ~PluginManager() + { + LOG(debug) << "Shutting down Plugin Manager"; + } + auto SetSearchPaths(const std::vector&) -> void; auto AppendSearchPath(const boost::filesystem::path&) -> void; auto PrependSearchPath(const boost::filesystem::path&) -> void; diff --git a/fairmq/PluginServices.h b/fairmq/PluginServices.h index c4081394..3565b057 100644 --- a/fairmq/PluginServices.h +++ b/fairmq/PluginServices.h @@ -47,6 +47,11 @@ class PluginServices { } + ~PluginServices() + { + LOG(debug) << "Shutting down Plugin Services"; + } + PluginServices(const PluginServices&) = delete; PluginServices operator=(const PluginServices&) = delete; diff --git a/test/device/_multiple_devices.cxx b/test/device/_multiple_devices.cxx index af3269a2..43e84b0d 100644 --- a/test/device/_multiple_devices.cxx +++ b/test/device/_multiple_devices.cxx @@ -12,6 +12,7 @@ #include #include +#include #include // std::async, std::future namespace @@ -19,6 +20,24 @@ namespace using namespace std; +void control(FairMQDevice& device) +{ + device.ChangeState("INIT_DEVICE"); + device.WaitForEndOfState("INIT_DEVICE"); + device.ChangeState("INIT_TASK"); + device.WaitForEndOfState("INIT_TASK"); + + device.ChangeState("RUN"); + device.WaitForEndOfState("RUN"); + + device.ChangeState("RESET_TASK"); + device.WaitForEndOfState("RESET_TASK"); + device.ChangeState("RESET_DEVICE"); + device.WaitForEndOfState("RESET_DEVICE"); + + device.ChangeState("END"); +} + class MultipleDevices : public ::testing::Test { public: MultipleDevices() @@ -34,20 +53,14 @@ class MultipleDevices : public ::testing::Test { channel.UpdateRateLogging(0); sender.fChannels["data"].push_back(channel); - sender.ChangeState("INIT_DEVICE"); - sender.WaitForEndOfState("INIT_DEVICE"); - sender.ChangeState("INIT_TASK"); - sender.WaitForEndOfState("INIT_TASK"); + thread t(control, std::ref(sender)); - sender.ChangeState("RUN"); - sender.WaitForEndOfState("RUN"); + sender.RunStateMachine(); - sender.ChangeState("RESET_TASK"); - sender.WaitForEndOfState("RESET_TASK"); - sender.ChangeState("RESET_DEVICE"); - sender.WaitForEndOfState("RESET_DEVICE"); - - sender.ChangeState("END"); + if (t.joinable()) + { + t.join(); + } return true; } @@ -62,20 +75,14 @@ class MultipleDevices : public ::testing::Test { channel.UpdateRateLogging(0); receiver.fChannels["data"].push_back(channel); - receiver.ChangeState("INIT_DEVICE"); - receiver.WaitForEndOfState("INIT_DEVICE"); - receiver.ChangeState("INIT_TASK"); - receiver.WaitForEndOfState("INIT_TASK"); + thread t(control, std::ref(receiver)); - receiver.ChangeState("RUN"); - receiver.WaitForEndOfState("RUN"); + receiver.RunStateMachine(); - receiver.ChangeState("RESET_TASK"); - receiver.WaitForEndOfState("RESET_TASK"); - receiver.ChangeState("RESET_DEVICE"); - receiver.WaitForEndOfState("RESET_DEVICE"); - - receiver.ChangeState("END"); + if (t.joinable()) + { + t.join(); + } return true; } diff --git a/test/plugin_services/Fixture.h b/test/plugin_services/Fixture.h index 31665389..cd0688ed 100644 --- a/test/plugin_services/Fixture.h +++ b/test/plugin_services/Fixture.h @@ -14,6 +14,7 @@ #include #include #include +#include namespace fair { @@ -36,21 +37,28 @@ inline auto control(std::shared_ptr device) -> void struct PluginServices : ::testing::Test { PluginServices() - : mConfig() - , mDevice{std::make_shared()} - , mServices{&mConfig, mDevice} + : mConfig() + , mDevice{std::make_shared()} + , mServices{&mConfig, mDevice} + , fRunStateMachineThread() { + fRunStateMachineThread = std::thread(&FairMQDevice::RunStateMachine, mDevice.get()); mDevice->SetTransport("zeromq"); + } ~PluginServices() { - if(mDevice->GetCurrentState() == FairMQDevice::IDLE) control(mDevice); + if (mDevice->GetCurrentState() == FairMQDevice::IDLE) control(mDevice); + if (fRunStateMachineThread.joinable()) { + fRunStateMachineThread.join(); + } } FairMQProgOptions mConfig; std::shared_ptr mDevice; fair::mq::PluginServices mServices; + std::thread fRunStateMachineThread; }; } /* namespace test */ diff --git a/test/plugins/_plugin.cxx b/test/plugins/_plugin.cxx index 8c1caacc..38ef88b7 100644 --- a/test/plugins/_plugin.cxx +++ b/test/plugins/_plugin.cxx @@ -16,6 +16,7 @@ #include #include #include +#include namespace { @@ -38,7 +39,7 @@ auto control(shared_ptr device) -> void TEST(Plugin, Operators) { - FairMQProgOptions config{}; + FairMQProgOptions config; auto device = make_shared(); PluginServices services{&config, device}; Plugin p1{"dds", {1, 0, 0}, "Foo Bar ", "https://git.test.net/dds.git", &services}; @@ -46,19 +47,27 @@ TEST(Plugin, Operators) Plugin p3{"file", {1, 0, 0}, "Foo Bar ", "https://git.test.net/file.git", &services}; EXPECT_EQ(p1, p2); EXPECT_NE(p1, p3); - control(device); + thread t(control, device); + device->RunStateMachine(); + if (t.joinable()) { + t.join(); + } } TEST(Plugin, OstreamOperators) { - FairMQProgOptions config{}; + FairMQProgOptions config; auto device = make_shared(); PluginServices services{&config, device}; Plugin p1{"dds", {1, 0, 0}, "Foo Bar ", "https://git.test.net/dds.git", &services}; stringstream ss; ss << p1; EXPECT_EQ(ss.str(), string{"'dds', version '1.0.0', maintainer 'Foo Bar ', homepage 'https://git.test.net/dds.git'"}); - control(device); + thread t(control, device); + device->RunStateMachine(); + if (t.joinable()) { + t.join(); + } } TEST(PluginVersion, Operators) diff --git a/test/plugins/_plugin_manager.cxx b/test/plugins/_plugin_manager.cxx index 8fa2f94c..893c3f83 100644 --- a/test/plugins/_plugin_manager.cxx +++ b/test/plugins/_plugin_manager.cxx @@ -15,6 +15,7 @@ #include #include #include +#include namespace { @@ -39,8 +40,8 @@ auto control(shared_ptr device) -> void TEST(PluginManager, LoadPluginDynamic) { - FairMQProgOptions config{}; - auto mgr = PluginManager{}; + FairMQProgOptions config; + PluginManager mgr; auto device = make_shared(); mgr.EmplacePluginServices(&config, device); @@ -53,7 +54,7 @@ TEST(PluginManager, LoadPluginDynamic) // check order const auto expected = vector{"test_dummy", "test_dummy2"}; - auto actual = vector{}; + auto actual = vector(); mgr.ForEachPlugin([&](Plugin& plugin){ actual.push_back(plugin.GetName()); }); ASSERT_TRUE(actual == expected); @@ -62,18 +63,22 @@ TEST(PluginManager, LoadPluginDynamic) mgr.ForEachPluginProgOptions([&count](const options_description& /*d*/){ ++count; }); ASSERT_EQ(count, 1); - control(device); + thread t(control, device); + device->RunStateMachine(); + if (t.joinable()) { + t.join(); + } } TEST(PluginManager, LoadPluginStatic) { - auto mgr = PluginManager{}; + PluginManager mgr; auto device = make_shared(); device->SetTransport("zeromq"); ASSERT_NO_THROW(mgr.LoadPlugin("s:control")); - FairMQProgOptions config{}; + FairMQProgOptions config; config.SetValue("control", "static"); config.SetValue("catch-signals", 0); mgr.EmplacePluginServices(&config, device); @@ -82,7 +87,7 @@ TEST(PluginManager, LoadPluginStatic) // check order const auto expected = vector{"control"}; - auto actual = vector{}; + auto actual = vector(); mgr.ForEachPlugin([&](Plugin& plugin){ actual.push_back(plugin.GetName()); }); ASSERT_TRUE(actual == expected); @@ -91,6 +96,8 @@ TEST(PluginManager, LoadPluginStatic) mgr.ForEachPluginProgOptions([&count](const options_description&){ ++count; }); ASSERT_EQ(count, 1); + device->RunStateMachine(); + mgr.WaitForPluginsToReleaseDeviceControl(); } @@ -112,7 +119,7 @@ TEST(PluginManager, SearchPathValidation) const auto path1 = path{"/tmp/test1"}; const auto path2 = path{"/tmp/test2"}; const auto path3 = path{"/tmp/test3"}; - auto mgr = PluginManager{}; + PluginManager mgr; mgr.SetSearchPaths({path1, path2}); auto expected = vector{path1, path2}; @@ -140,7 +147,7 @@ TEST(PluginManager, SearchPaths) fs.close(); const auto empty_path = path{""}; - auto mgr = PluginManager{}; + PluginManager mgr; ASSERT_NO_THROW(mgr.AppendSearchPath(non_existing_dir)); ASSERT_NO_THROW(mgr.AppendSearchPath(existing_dir)); ASSERT_THROW(mgr.AppendSearchPath(existing_file), PluginManager::BadSearchPath); diff --git a/test/plugins/_plugin_manager_prelink.cxx b/test/plugins/_plugin_manager_prelink.cxx index a53085b3..5921c9e0 100644 --- a/test/plugins/_plugin_manager_prelink.cxx +++ b/test/plugins/_plugin_manager_prelink.cxx @@ -17,7 +17,7 @@ using namespace std; TEST(PluginManager, LoadPluginPrelinkedDynamic) { - auto mgr = PluginManager{}; + PluginManager mgr; ASSERT_NO_THROW(mgr.LoadPlugin("p:test_dummy")); ASSERT_NO_THROW(mgr.LoadPlugin("p:test_dummy2"));