diff --git a/fairmq/FairMQStateMachine.cxx b/fairmq/FairMQStateMachine.cxx index fb9dc0d8..18c0531b 100644 --- a/fairmq/FairMQStateMachine.cxx +++ b/fairmq/FairMQStateMachine.cxx @@ -55,45 +55,83 @@ bool FairMQStateMachine::ChangeState(int event) switch (event) { case INIT_DEVICE: + { + std::lock_guard lock(fChangeStateMutex); process_event(FairMQFSM::INIT_DEVICE()); return true; + } case internal_DEVICE_READY: + { + std::lock_guard lock(fChangeStateMutex); process_event(FairMQFSM::internal_DEVICE_READY()); return true; + } case INIT_TASK: + { + std::lock_guard lock(fChangeStateMutex); process_event(FairMQFSM::INIT_TASK()); return true; + } case internal_READY: + { + // std::lock_guard lock(fChangeStateMutex); // InitTask is synchronous, until ROOT workaround is no longer needed. process_event(FairMQFSM::internal_READY()); return true; + } case RUN: + { + std::lock_guard lock(fChangeStateMutex); process_event(FairMQFSM::RUN()); return true; + } case PAUSE: + { + std::lock_guard lock(fChangeStateMutex); process_event(FairMQFSM::PAUSE()); return true; + } case STOP: + { + std::lock_guard lock(fChangeStateMutex); process_event(FairMQFSM::STOP()); return true; + } case RESET_DEVICE: + { + std::lock_guard lock(fChangeStateMutex); process_event(FairMQFSM::RESET_DEVICE()); return true; + } case RESET_TASK: + { + std::lock_guard lock(fChangeStateMutex); process_event(FairMQFSM::RESET_TASK()); return true; + } case internal_IDLE: + { + std::lock_guard lock(fChangeStateMutex); process_event(FairMQFSM::internal_IDLE()); return true; + } case END: + { + std::lock_guard lock(fChangeStateMutex); process_event(FairMQFSM::END()); return true; + } case ERROR_FOUND: + { + std::lock_guard lock(fChangeStateMutex); process_event(FairMQFSM::ERROR_FOUND()); return true; + } default: + { LOG(ERROR) << "Requested state transition with an unsupported event: " << event << std::endl << "Supported are: INIT_DEVICE, INIT_TASK, RUN, PAUSE, STOP, RESET_TASK, RESET_DEVICE, END, ERROR_FOUND"; return false; + } } } catch (std::exception& e) @@ -123,7 +161,7 @@ void FairMQStateMachine::WaitForEndOfState(int event) std::unique_lock lock(fWorkMutex); while (fWorkActive || fWorkAvailable) { - fWorkDoneCondition.wait(lock); + fWorkDoneCondition.wait_for(lock, std::chrono::seconds(1)); } break; diff --git a/fairmq/FairMQStateMachine.h b/fairmq/FairMQStateMachine.h index 85e4ed99..073f589c 100644 --- a/fairmq/FairMQStateMachine.h +++ b/fairmq/FairMQStateMachine.h @@ -82,6 +82,7 @@ struct FairMQFSM_ : public msmf::state_machine_def , fWorkActive(false) , fWorkAvailable(false) , fState() + , fChangeStateMutex() {} // Destructor @@ -100,9 +101,6 @@ struct FairMQFSM_ : public msmf::state_machine_def template void on_exit(Event const&, FSM& fsm) { - // join the worker thread (executing user states) - fsm.fWorkerThread.join(); - LOG(STATE) << "Exiting FairMQ state machine"; } @@ -320,31 +318,17 @@ struct FairMQFSM_ : public msmf::state_machine_def fsm.fState = EXITING; // terminate worker thread - std::lock_guard lock(fsm.fWorkMutex); - fsm.fWorkerTerminated = true; - fsm.fWorkAvailableCondition.notify_one(); + { + std::lock_guard lock(fsm.fWorkMutex); + fsm.fWorkerTerminated = true; + fsm.fWorkAvailableCondition.notify_one(); + } - fsm.fTerminateStateThread = std::thread(&FairMQFSM_::Terminate, &fsm); - fsm.Shutdown(); - fsm.fTerminateStateThread.join(); - } - }; - - struct ExitingRunFct - { - template - void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) - { - LOG(STATE) << "Entering EXITING state"; - - fsm.fState = EXITING; - - fsm.Unblock(); // Unblock potential blocking transfer calls - - // terminate worker thread - std::lock_guard lock(fsm.fWorkMutex); - fsm.fWorkerTerminated = true; - fsm.fWorkAvailableCondition.notify_one(); + // join the worker thread (executing user states) + if (fsm.fWorkerThread.joinable()) + { + fsm.fWorkerThread.join(); + } fsm.fTerminateStateThread = std::thread(&FairMQFSM_::Terminate, &fsm); fsm.Shutdown(); @@ -423,7 +407,6 @@ struct FairMQFSM_ : public msmf::state_machine_def msmf::Row, msmf::Row, msmf::Row, - msmf::Row, msmf::Row, msmf::Row, msmf::Row, @@ -556,6 +539,7 @@ struct FairMQFSM_ : public msmf::state_machine_def bool fWorkAvailable; protected: + std::mutex fChangeStateMutex; std::atomic fState; };