diff --git a/examples/dds/fairmq-start-ex-dds.sh.in b/examples/dds/fairmq-start-ex-dds.sh.in index bab11977..32c2dac5 100755 --- a/examples/dds/fairmq-start-ex-dds.sh.in +++ b/examples/dds/fairmq-start-ex-dds.sh.in @@ -35,16 +35,12 @@ echo "TOPOLOGY FILE: ${topologyFile}" dds-topology --disable-validation --activate ${topologyFile} echo "------------------------" -echo "Waiting for Topology to finish ..." +echo "...waiting for Topology to finish..." sampler_and_sink="main/(Sampler|Sink)" -fairmq-dds-command-ui -p $sampler_and_sink --wait-for-state "RUNNING->READY" -echo "..." -fairmq-dds-command-ui -c s -w READY -fairmq-dds-command-ui -c t -w "DEVICE READY" -fairmq-dds-command-ui -c d -w IDLE -fairmq-dds-command-ui -c q -w EXITING -# fairmq-dds-command-ui -c q! -w EXITING -echo "..." +fairmq-dds-command-ui -p $sampler_and_sink -w "RUNNING->READY" +echo "...$sampler_and_sink are READY, sending shutdown..." +fairmq-dds-command-ui -c q! -w "EXITING" +echo "...waiting for ${requiredNofAgents} idle agents..." dds-info --wait-for-idle-agents ${requiredNofAgents} echo "------------------------" diff --git a/fairmq/FairMQDevice.cxx b/fairmq/FairMQDevice.cxx index 72671230..a904d227 100644 --- a/fairmq/FairMQDevice.cxx +++ b/fairmq/FairMQDevice.cxx @@ -232,7 +232,14 @@ void FairMQDevice::TransitionTo(const fair::mq::State s) case State::Running: ChangeState(Transition::Stop); break; - default: // Binding, Connecting, InitializingTask, ResettingDevice, ResettingTask + case State::Binding: + case State::Connecting: + case State::InitializingTask: + case State::ResettingDevice: + case State::ResettingTask: + LOG(debug) << "TransitionTo ignoring state: " << currentState << " (expected, automatic transition)."; + break; + default: LOG(debug) << "TransitionTo ignoring state: " << currentState; break; } diff --git a/fairmq/Plugin.h b/fairmq/Plugin.h index 9affec37..c76e2be1 100644 --- a/fairmq/Plugin.h +++ b/fairmq/Plugin.h @@ -79,6 +79,7 @@ class Plugin auto StealDeviceControl() -> void { fPluginServices->StealDeviceControl(fkName); }; auto ReleaseDeviceControl() -> void { fPluginServices->ReleaseDeviceControl(fkName); }; auto ChangeDeviceState(const DeviceStateTransition next) -> bool { return fPluginServices->ChangeDeviceState(fkName, next); } + void TransitionDeviceStateTo(const DeviceState state) { return fPluginServices->TransitionDeviceStateTo(fkName, state); } auto SubscribeToDeviceStateChange(std::function callback) -> void { fPluginServices->SubscribeToDeviceStateChange(fkName, callback); } auto UnsubscribeFromDeviceStateChange() -> void { fPluginServices->UnsubscribeFromDeviceStateChange(fkName); } diff --git a/fairmq/PluginServices.cxx b/fairmq/PluginServices.cxx index c098d850..cc2aa559 100644 --- a/fairmq/PluginServices.cxx +++ b/fairmq/PluginServices.cxx @@ -90,6 +90,23 @@ const unordered_map> {State::ResettingDevice, DeviceState::ResettingDevice}, {State::Exiting, DeviceState::Exiting} }; +const unordered_map PluginServices::fkStateMap = { + {DeviceState::Ok, State::Ok}, + {DeviceState::Error, State::Error}, + {DeviceState::Idle, State::Idle}, + {DeviceState::InitializingDevice, State::InitializingDevice}, + {DeviceState::Initialized, State::Initialized}, + {DeviceState::Binding, State::Binding}, + {DeviceState::Bound, State::Bound}, + {DeviceState::Connecting, State::Connecting}, + {DeviceState::DeviceReady, State::DeviceReady}, + {DeviceState::InitializingTask, State::InitializingTask}, + {DeviceState::Ready, State::Ready}, + {DeviceState::Running, State::Running}, + {DeviceState::ResettingTask, State::ResettingTask}, + {DeviceState::ResettingDevice, State::ResettingDevice}, + {DeviceState::Exiting, State::Exiting} +}; const unordered_map> PluginServices::fkDeviceStateTransitionMap = { {DeviceStateTransition::Auto, Transition::Auto}, {DeviceStateTransition::InitDevice, Transition::InitDevice}, @@ -125,6 +142,22 @@ auto PluginServices::ChangeDeviceState(const string& controller, const DeviceSta return result; } +void PluginServices::TransitionDeviceStateTo(const std::string& controller, DeviceState state) +{ + lock_guard lock{fDeviceControllerMutex}; + + if (!fDeviceController) fDeviceController = controller; + + if (fDeviceController == controller) { + fDevice.TransitionTo(fkStateMap.at(state)); + } else { + throw DeviceControlError{tools::ToString( + "Plugin '", controller, "' is not allowed to change device states. ", + "Currently, plugin '", *fDeviceController, "' has taken control." + )}; + } +} + auto PluginServices::TakeDeviceControl(const string& controller) -> void { lock_guard lock{fDeviceControllerMutex}; diff --git a/fairmq/PluginServices.h b/fairmq/PluginServices.h index fda7541a..1d994000 100644 --- a/fairmq/PluginServices.h +++ b/fairmq/PluginServices.h @@ -160,6 +160,8 @@ class PluginServices /// If the device control role has not been taken yet, calling this function will take over control implicitely. auto ChangeDeviceState(const std::string& controller, const DeviceStateTransition next) -> bool; + void TransitionDeviceStateTo(const std::string& controller, DeviceState state); + /// @brief Subscribe with a callback to device state changes /// @param subscriber id /// @param callback @@ -313,6 +315,7 @@ class PluginServices static const std::unordered_map fkDeviceStateTransitionStrMap; static const std::unordered_map> fkStrDeviceStateTransitionMap; static const std::unordered_map> fkDeviceStateMap; + static const std::unordered_map fkStateMap; static const std::unordered_map> fkDeviceStateTransitionMap; private: diff --git a/fairmq/plugins/DDS/DDS.cxx b/fairmq/plugins/DDS/DDS.cxx index 93cee8e3..8d11da6b 100644 --- a/fairmq/plugins/DDS/DDS.cxx +++ b/fairmq/plugins/DDS/DDS.cxx @@ -379,6 +379,10 @@ auto DDS::SubscribeForCustomCommands() -> void fStateChangeSubscribers.erase(senderId); } fDDSCustomCmd.send("state-changes-unsubscription: " + id + ",OK", to_string(senderId)); + } else if (cmd == "SHUTDOWN") { + TransitionDeviceStateTo(DeviceState::Exiting); + } else if (cmd == "STARTUP") { + TransitionDeviceStateTo(DeviceState::Ready); } else { LOG(warn) << "Unknown command: " << cmd; LOG(warn) << "Origin: " << senderId; diff --git a/fairmq/plugins/DDS/runDDSCommandUI.cxx b/fairmq/plugins/DDS/runDDSCommandUI.cxx index b9b69461..f5f935a9 100644 --- a/fairmq/plugins/DDS/runDDSCommandUI.cxx +++ b/fairmq/plugins/DDS/runDDSCommandUI.cxx @@ -87,47 +87,49 @@ void commandMode(const string& commandIn, const string& topologyPath, CCustomCmd while (true) { if (command == "c") { - cout << "\033[01;32m > checking state of the devices\033[0m" << endl; + cout << "> checking state of the devices" << endl; ddsCustomCmd.send("check-state", topologyPath); } else if (command == "o") { - cout << "\033[01;32m > dumping config of the devices\033[0m" << endl; + cout << "> dumping config of the devices" << endl; ddsCustomCmd.send("dump-config", topologyPath); } else if (command == "i") { - cout << "\033[01;32m > init devices\033[0m" << endl; + cout << "> init devices" << endl; ddsCustomCmd.send("INIT DEVICE", topologyPath); } else if (command == "b") { - cout << "\033[01;32m > bind devices\033[0m" << endl; + cout << "> bind devices" << endl; ddsCustomCmd.send("BIND", topologyPath); } else if (command == "x") { - cout << "\033[01;32m > connect devices\033[0m" << endl; + cout << "> connect devices" << endl; ddsCustomCmd.send("CONNECT", topologyPath); } else if (command == "j") { - cout << "\033[01;32m > init tasks\033[0m" << endl; + cout << "> init tasks" << endl; ddsCustomCmd.send("INIT TASK", topologyPath); } else if (command == "p") { - cout << "\033[01;32m > pause devices\033[0m" << endl; + cout << "> pause devices" << endl; ddsCustomCmd.send("PAUSE", topologyPath); } else if (command == "r") { - cout << "\033[01;32m > run tasks\033[0m" << endl; + cout << "> run tasks" << endl; ddsCustomCmd.send("RUN", topologyPath); } else if (command == "s") { - cout << "\033[01;32m > stop devices\033[0m" << endl; + cout << "> stop devices" << endl; ddsCustomCmd.send("STOP", topologyPath); } else if (command == "t") { - cout << "\033[01;32m > reset tasks\033[0m" << endl; + cout << "> reset tasks" << endl; ddsCustomCmd.send("RESET TASK", topologyPath); } else if (command == "d") { - cout << "\033[01;32m > reset devices\033[0m" << endl; + cout << "> reset devices" << endl; ddsCustomCmd.send("RESET DEVICE", topologyPath); } else if (command == "h") { - cout << "\033[01;32m > help\033[0m" << endl; + cout << "> help" << endl; printControlsHelp(); } else if (command == "q") { - cout << "\033[01;32m > end\033[0m" << endl; + cout << "> end" << endl; ddsCustomCmd.send("END", topologyPath); } else if (command == "q!") { + cout << "> shutdown" << endl; ddsCustomCmd.send("SHUTDOWN", topologyPath); } else if (command == "r!") { + cout << "> startup" << endl; ddsCustomCmd.send("STARTUP", topologyPath); } else { cout << "\033[01;32mInvalid input: [" << c << "]\033[0m" << endl; @@ -150,10 +152,14 @@ struct WaitMode : fTargetState(targetState) {} - void Run(const chrono::milliseconds& timeout, const string& topologyPath, CCustomCmd& ddsCustomCmd) + void Run(const chrono::milliseconds& timeout, const string& topologyPath, CCustomCmd& ddsCustomCmd, const string& command = "") { StateSubscription stateSubscription(topologyPath, ddsCustomCmd); + if (command != "") { + commandMode(command, topologyPath, ddsCustomCmd); + } + // TODO once DDS provides an API to retrieve actual number of tasks, use it here auto condition = [&] { return !fTargetStates.empty() && all_of(fTargetStates.cbegin(), fTargetStates.cend(), @@ -256,73 +262,10 @@ int main(int argc, char* argv[]) service.start(sessionID); - if (targetState == "") { commandMode(command, topologyPath, ddsCustomCmd); } else { - PrintControlsHelp(); - } - - while (cin >> c) { - switch (c) { - case 'c': - cout << " > checking state of the devices" << endl; - ddsCustomCmd.send("check-state", topologyPath); - break; - case 'o': - cout << " > dumping config of the devices" << endl; - ddsCustomCmd.send("dump-config", topologyPath); - break; - case 'i': - cout << " > init devices" << endl; - ddsCustomCmd.send("INIT DEVICE", topologyPath); - break; - case 'b': - cout << " > bind" << endl; - ddsCustomCmd.send("BIND", topologyPath); - break; - case 'x': - cout << " > connect" << endl; - ddsCustomCmd.send("CONNECT", topologyPath); - break; - case 'j': - cout << " > init tasks" << endl; - ddsCustomCmd.send("INIT TASK", topologyPath); - break; - case 'r': - cout << " > run tasks" << endl; - ddsCustomCmd.send("RUN", topologyPath); - break; - case 's': - cout << " > stop devices" << endl; - ddsCustomCmd.send("STOP", topologyPath); - break; - case 't': - cout << " > reset tasks" << endl; - ddsCustomCmd.send("RESET TASK", topologyPath); - break; - case 'd': - cout << " > reset devices" << endl; - ddsCustomCmd.send("RESET DEVICE", topologyPath); - break; - case 'h': - cout << " > help" << endl; - PrintControlsHelp(); - break; - case 'q': - cout << " > end" << endl; - ddsCustomCmd.send("END", topologyPath); - break; - default: - cout << "Invalid input: [" << c << "]" << endl; - PrintControlsHelp(); - break; - } - - if (command != "") { - commandMode(command, topologyPath, ddsCustomCmd); - } - waitMode.Run(chrono::milliseconds(timeout), topologyPath, ddsCustomCmd); + waitMode.Run(chrono::milliseconds(timeout), topologyPath, ddsCustomCmd, command); } } catch (exception& e) { cerr << "Error: " << e.what() << endl;