diff --git a/fairmq/Device.cxx b/fairmq/Device.cxx index bb876f2e..bfb7d87a 100644 --- a/fairmq/Device.cxx +++ b/fairmq/Device.cxx @@ -290,7 +290,9 @@ void Device::BindWrapper() Bind(); - ChangeState(Transition::Auto); + if (!NewStatePending()) { + ChangeState(Transition::Auto); + } } void Device::ConnectWrapper() @@ -327,7 +329,9 @@ void Device::ConnectWrapper() Connect(); - ChangeState(Transition::Auto); + if (!NewStatePending()) { + ChangeState(Transition::Auto); + } } void Device::AttachChannels(vector& chans) @@ -427,7 +431,9 @@ void Device::InitTaskWrapper() { InitTask(); - ChangeState(Transition::Auto); + if (!NewStatePending()) { + ChangeState(Transition::Auto); + } } void Device::RunWrapper() @@ -443,6 +449,10 @@ void Device::RunWrapper() if (rateLogging) { rateLogger = make_unique(&Device::LogSocketRates, this); } + tools::CallOnDestruction joinRateLogger([&](){ + if (rateLogging && rateLogger->joinable()) { rateLogger->join(); } + }); + // notify transports to resume transfers for (auto& t : fTransports) { @@ -485,10 +495,6 @@ void Device::RunWrapper() PostRun(); cod.disable(); - - if (rateLogging && rateLogger->joinable()) { - rateLogger->join(); - } } void Device::HandleSingleChannelInput() @@ -772,7 +778,9 @@ void Device::ResetTaskWrapper() { ResetTask(); - ChangeState(Transition::Auto); + if (!NewStatePending()) { + ChangeState(Transition::Auto); + } } void Device::ResetWrapper() @@ -786,7 +794,9 @@ void Device::ResetWrapper() fChannels.clear(); fTransports.clear(); fTransportFactory.reset(); - ChangeState(Transition::Auto); + if (!NewStatePending()) { + ChangeState(Transition::Auto); + } } Device::~Device() diff --git a/fairmq/StateMachine.cxx b/fairmq/StateMachine.cxx index 83baec90..140c29f8 100644 --- a/fairmq/StateMachine.cxx +++ b/fairmq/StateMachine.cxx @@ -187,9 +187,7 @@ struct Machine_ : public state_machine_def { unique_lock lock(fStateMtx); - while (!fNewStatePending) { - fNewStatePendingCV.wait_for(lock, chrono::milliseconds(100)); - } + fNewStatePendingCV.wait(lock, [this]{ return fNewStatePending.load(); }); LOG(state) << fState << " ---> " << fNewState; fState = static_cast(fNewState); diff --git a/fairmq/StateQueue.h b/fairmq/StateQueue.h index 58ce146d..1c1ad60f 100644 --- a/fairmq/StateQueue.h +++ b/fairmq/StateQueue.h @@ -1,5 +1,5 @@ /******************************************************************************** - * Copyright (C) 2019 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * Copyright (C) 2019-2022 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * * * This software is distributed under the terms of the * * GNU Lesser General Public Licence (LGPL) version 3, * @@ -9,16 +9,14 @@ #ifndef FAIRMQSTATEQUEUE_H_ #define FAIRMQSTATEQUEUE_H_ -#include - -#include -#include #include -#include // pair #include +#include +#include +#include +#include // pair -namespace fair::mq -{ +namespace fair::mq { class StateQueue { @@ -33,41 +31,47 @@ class StateQueue fair::mq::State WaitForNext() { std::unique_lock lock(fMtx); - while (fStates.empty()) { - fCV.wait_for(lock, std::chrono::milliseconds(50)); - } - - fair::mq::State state = fStates.front(); - - if (state == fair::mq::State::Error) { - throw DeviceErrorState("Controlled device transitioned to error state."); - } - - fStates.pop(); - return state; + fCV.wait(lock, [this] { return Predicate(); }); + return PopFrontUnsafe(); } - template - std::pair WaitForNext(std::chrono::duration const& duration) + template + std::pair WaitForNext(Timeout&& duration) { std::unique_lock lock(fMtx); - fCV.wait_for(lock, duration); - - if (fStates.empty()) { - return { false, fair::mq::State::Ok }; - } - - fair::mq::State state = fStates.front(); - - if (state == fair::mq::State::Error) { - throw DeviceErrorState("Controlled device transitioned to error state."); - } - - fStates.pop(); - return { true, state }; + fCV.wait_for(lock, std::forward(duration), [this] { return Predicate(); }); + return ReturnPairUnsafe(); } - void WaitForState(fair::mq::State state) { while (WaitForNext() != state) {} } + template + std::pair WaitForNextOrCustom(CustomPredicate&& customPredicate) + { + std::unique_lock lock(fMtx); + fCV.wait(lock, [this, cp = std::move(customPredicate)] { return Predicate() || cp(); }); + return ReturnPairUnsafe(); + } + + template + std::pair WaitForCustom(CustomPredicate&& customPredicate) + { + std::unique_lock lock(fMtx); + fCV.wait(lock, [cp = std::move(customPredicate)] { return cp(); }); + return ReturnPairUnsafe(); + } + + void WaitForState(fair::mq::State state) + { + while (WaitForNext() != state) {} + } + + template + void WaitForStateOrCustom(fair::mq::State state, CustomPredicate customPredicate) + { + auto next = WaitForNextOrCustom(customPredicate); + while (!customPredicate() && (next.first && next.second != state)) { + next = WaitForNextOrCustom(customPredicate); + } + } void Push(fair::mq::State state) { @@ -75,7 +79,35 @@ class StateQueue std::lock_guard lock(fMtx); fStates.push(state); } - fCV.notify_all(); + fCV.notify_one(); + } + + template + void Push(fair::mq::State state, CustomSignal&& signal) + { + { + std::lock_guard lock(fMtx); + fStates.push(state); + signal(); + } + fCV.notify_one(); + } + + template + void Notify(CustomSignal&& signal) + { + { + std::lock_guard lock(fMtx); + signal(); + } + fCV.notify_one(); + } + + template + void Locked(CustomSignal&& signal) + { + std::lock_guard lock(fMtx); + signal(); } void Clear() @@ -88,8 +120,29 @@ class StateQueue std::queue fStates; std::mutex fMtx; std::condition_variable fCV; + + // must be called under locked fMtx + fair::mq::State PopFrontUnsafe() + { + fair::mq::State state = fStates.front(); + if (state == fair::mq::State::Error) { + throw DeviceErrorState("Controlled device transitioned to error state."); + } + fStates.pop(); + return state; + } + + // must be called under locked fMtx + std::pair ReturnPairUnsafe() + { + auto const pred = Predicate(); + return {pred, pred ? PopFrontUnsafe() : fair::mq::State::Ok}; + } + + // must be called under locked fMtx + bool Predicate() { return !fStates.empty(); } }; -} // namespace fair::mq +} // namespace fair::mq #endif /* FAIRMQSTATEQUEUE_H_ */ diff --git a/fairmq/plugins/control/Control.cxx b/fairmq/plugins/control/Control.cxx index 61090a25..6e90dcea 100644 --- a/fairmq/plugins/control/Control.cxx +++ b/fairmq/plugins/control/Control.cxx @@ -1,5 +1,5 @@ /******************************************************************************** - * Copyright (C) 2017-2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * Copyright (C) 2017-2022 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * * * This software is distributed under the terms of the * * GNU Lesser General Public Licence (LGPL) version 3, * @@ -56,11 +56,11 @@ Control::Control(const string& name, Plugin::Version version, const string& main SubscribeToDeviceStateChange([&](DeviceState newState) { LOG(trace) << "control plugin notified on new state: " << newState; - fStateQueue.Push(newState); - if (newState == DeviceState::Error) { fPluginShutdownRequested = true; - fDeviceShutdownRequested = true; + fStateQueue.Push(newState, [this]{ fDeviceShutdownRequested = true; }); + } else { + fStateQueue.Push(newState); } }); @@ -99,18 +99,42 @@ Control::Control(const string& name, Plugin::Version version, const string& main auto Control::RunStartupSequence() -> void { - ChangeDeviceState(DeviceStateTransition::InitDevice); - while (fStateQueue.WaitForNext() != DeviceState::InitializingDevice) {} - ChangeDeviceState(DeviceStateTransition::CompleteInit); - while (fStateQueue.WaitForNext() != DeviceState::Initialized) {} - ChangeDeviceState(DeviceStateTransition::Bind); - while (fStateQueue.WaitForNext() != DeviceState::Bound) {} - ChangeDeviceState(DeviceStateTransition::Connect); - while (fStateQueue.WaitForNext() != DeviceState::DeviceReady) {} - ChangeDeviceState(DeviceStateTransition::InitTask); - while (fStateQueue.WaitForNext() != DeviceState::Ready) {} - ChangeDeviceState(DeviceStateTransition::Run); - while (fStateQueue.WaitForNext() != DeviceState::Running) {} + using Transition = DeviceStateTransition; + using State = DeviceState; + auto shutdownRequested = [this]{ return fDeviceShutdownRequested.load(); }; + + ChangeDeviceState(Transition::InitDevice); + fStateQueue.WaitForStateOrCustom(State::InitializingDevice, shutdownRequested); + if (fDeviceShutdownRequested) { return; /* --> shutdown sequence */ } + + ChangeDeviceState(Transition::CompleteInit); + fStateQueue.WaitForStateOrCustom(State::Initialized, shutdownRequested); + if (fDeviceShutdownRequested) { return; /* --> shutdown sequence */ } + + ChangeDeviceState(Transition::Bind); + fStateQueue.WaitForStateOrCustom(State::Binding, shutdownRequested); + if (fDeviceShutdownRequested) { return; /* --> shutdown sequence */ } + + fStateQueue.WaitForStateOrCustom(State::Bound, shutdownRequested); + if (fDeviceShutdownRequested) { return; /* --> shutdown sequence */ } + + ChangeDeviceState(Transition::Connect); + fStateQueue.WaitForStateOrCustom(State::Connecting, shutdownRequested); + if (fDeviceShutdownRequested) { return; /* --> shutdown sequence */ } + + fStateQueue.WaitForStateOrCustom(State::DeviceReady, shutdownRequested); + if (fDeviceShutdownRequested) { return; /* --> shutdown sequence */ } + + ChangeDeviceState(Transition::InitTask); + fStateQueue.WaitForStateOrCustom(State::InitializingTask, shutdownRequested); + if (fDeviceShutdownRequested) { return; /* --> shutdown sequence */ } + + fStateQueue.WaitForStateOrCustom(State::Ready, shutdownRequested); + if (fDeviceShutdownRequested) { return; /* --> shutdown sequence */ } + + ChangeDeviceState(Transition::Run); + fStateQueue.WaitForStateOrCustom(State::Running, shutdownRequested); + if (fDeviceShutdownRequested) { return; /* --> shutdown sequence */ } } auto ControlPluginProgramOptions() -> Plugin::ProgOptions @@ -123,10 +147,8 @@ auto ControlPluginProgramOptions() -> Plugin::ProgOptions return pluginOptions; } -auto Control::InteractiveMode() -> void -try { - RunStartupSequence(); - +auto Control::RunREPL() -> void +{ char input = 0; // hold the user console input pollfd cinfd[1]; cinfd[0].fd = fileno(stdin); @@ -161,7 +183,7 @@ try { case 'i': cout << "\n --> [i] init device\n\n" << flush; if (ChangeDeviceState(DeviceStateTransition::InitDevice)) { - while (fStateQueue.WaitForNext() != DeviceState::InitializingDevice) {} + fStateQueue.WaitForState(DeviceState::InitializingDevice); ChangeDeviceState(DeviceStateTransition::CompleteInit); } break; @@ -243,7 +265,19 @@ try { } } - RunShutdownSequence(); +} + +auto Control::InteractiveMode() -> void +try { + RunStartupSequence(); + + if(!fDeviceShutdownRequested) { + RunREPL(); + } + + if(!fDeviceShutdownRequested) { + RunShutdownSequence(); + } } catch (PluginServices::DeviceControlError& e) { // If we are here, it means another plugin has taken control. That's fine, just print the exception message and do nothing else. LOG(debug) << e.what(); @@ -366,16 +400,13 @@ auto Control::StaticMode() -> void try { RunStartupSequence(); - { - // Wait for next state, which is DeviceState::Ready, - // or for device shutdown request (Ctrl-C) - pair result; - do { - result = fStateQueue.WaitForNext(chrono::milliseconds(50)); - } while (result.first == false && !fDeviceShutdownRequested); - } + // Wait for next state, which is DeviceState::Ready, + // or for device shutdown request (Ctrl-C) + fStateQueue.WaitForNextOrCustom([this]{ return fDeviceShutdownRequested.load(); }); - RunShutdownSequence(); + if(!fDeviceShutdownRequested) { + RunShutdownSequence(); + } } catch (PluginServices::DeviceControlError& e) { // If we are here, it means another plugin has taken control. That's fine, just print the exception message and do nothing else. LOG(debug) << e.what(); @@ -387,16 +418,12 @@ auto Control::GUIMode() -> void try { RunStartupSequence(); - { - // Wait for next state, which is DeviceState::Ready, - // or for device shutdown request (Ctrl-C) - pair result; - do { - result = fStateQueue.WaitForNext(chrono::milliseconds(50)); - } while (!fDeviceShutdownRequested); - } + // Wait for device shutdown request (Ctrl-C) + fStateQueue.WaitForCustom([this]{ return fDeviceShutdownRequested.load(); }); - RunShutdownSequence(); + if(!fDeviceShutdownRequested) { + RunShutdownSequence(); + } } catch (PluginServices::DeviceControlError& e) { // If we are here, it means another plugin has taken control. That's fine, just print the // exception message and do nothing else. @@ -416,10 +443,10 @@ auto Control::SignalHandler() -> void LOG(info) << "Waiting for graceful device shutdown. Hit Ctrl-C again to abort immediately."; // Signal and wait for controller thread, if we are controller - fDeviceShutdownRequested = true; + fStateQueue.Notify([this] { fDeviceShutdownRequested = true; }); { unique_lock lock(fControllerMutex); - if (fControllerThread.joinable()) fControllerThread.join(); + if (fControllerThread.joinable()) { fControllerThread.join(); } } if (!fDeviceHasShutdown) { @@ -462,6 +489,12 @@ auto Control::RunShutdownSequence() -> void case DeviceState::Running: ChangeDeviceState(DeviceStateTransition::Stop); break; + case DeviceState::Binding: + case DeviceState::Connecting: + case DeviceState::InitializingTask: + case DeviceState::ResettingTask: + case DeviceState::ResettingDevice: + ChangeDeviceState(DeviceStateTransition::Auto); default: // LOG(debug) << "Controller ignoring event: " << nextState; break; @@ -481,9 +514,9 @@ Control::~Control() { unique_lock lock(fControllerMutex); - if (fControllerThread.joinable()) fControllerThread.join(); + if (fControllerThread.joinable()) { fControllerThread.join(); } } - if (fSignalHandlerThread.joinable()) fSignalHandlerThread.join(); + if (fSignalHandlerThread.joinable()) { fSignalHandlerThread.join(); } UnsubscribeFromDeviceStateChange(); } diff --git a/fairmq/plugins/control/Control.h b/fairmq/plugins/control/Control.h index b1c7d24d..30e61ef2 100644 --- a/fairmq/plugins/control/Control.h +++ b/fairmq/plugins/control/Control.h @@ -1,5 +1,5 @@ /******************************************************************************** - * Copyright (C) 2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * Copyright (C) 2017-2022 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * * * This software is distributed under the terms of the * * GNU Lesser General Public Licence (LGPL) version 3, * @@ -46,6 +46,7 @@ class Control : public Plugin auto GUIMode() -> void; auto SignalHandler() -> void; auto RunShutdownSequence() -> void; + auto RunREPL() -> void; auto RunStartupSequence() -> void; std::thread fControllerThread;