diff --git a/fairmq/plugins/DDS/runDDSCommandUI.cxx b/fairmq/plugins/DDS/runDDSCommandUI.cxx index c0fe4a7b..141ad5f8 100644 --- a/fairmq/plugins/DDS/runDDSCommandUI.cxx +++ b/fairmq/plugins/DDS/runDDSCommandUI.cxx @@ -6,27 +6,33 @@ * copied verbatim in the file "LICENSE" * ********************************************************************************/ -#include -#include +#include +#include + +#include + #include #include #include #include #include + +#include +#include #include #include -#include #include #include -#include #include +#include #include // raw mode console input #include -#include +#include #include using namespace std; using namespace dds::intercom_api; +using namespace fair::mq::sdk::cmd; namespace bpo = boost::program_options; struct TerminalConfig @@ -50,7 +56,8 @@ struct TerminalConfig } }; -struct StateSubscription { +struct StateSubscription +{ const string& fTopologyPath; CCustomCmd& fDdsCustomCmd; @@ -58,11 +65,11 @@ struct StateSubscription { : fTopologyPath(topologyPath) , fDdsCustomCmd(ddsCustomCmd) { - fDdsCustomCmd.send("subscribe-to-state-changes", fTopologyPath); + fDdsCustomCmd.send(Cmds(make()).Serialize(), fTopologyPath); } ~StateSubscription() { - fDdsCustomCmd.send("unsubscribe-from-state-changes", fTopologyPath); + fDdsCustomCmd.send(Cmds(make()).Serialize(), fTopologyPath); this_thread::sleep_for(chrono::milliseconds(100)); // give dds a chance to complete request } }; @@ -74,7 +81,7 @@ void printControlsHelp() cout << "To quit press Ctrl+C" << endl; } -void commandMode(const string& commandIn, const string& topologyPath, CCustomCmd& ddsCustomCmd) { +void sendCommand(const string& commandIn, const string& topologyPath, CCustomCmd& ddsCustomCmd) { char c; string command(commandIn); TerminalConfig tconfig; @@ -88,52 +95,43 @@ void commandMode(const string& commandIn, const string& topologyPath, CCustomCmd while (true) { if (command == "c") { cout << "> checking state of the devices" << endl; - ddsCustomCmd.send("check-state", topologyPath); + ddsCustomCmd.send(Cmds(make()).Serialize(), topologyPath); } else if (command == "o") { cout << "> dumping config of the devices" << endl; - ddsCustomCmd.send("dump-config", topologyPath); + ddsCustomCmd.send(Cmds(make()).Serialize(), topologyPath); } else if (command == "i") { cout << "> init devices" << endl; - ddsCustomCmd.send("INIT DEVICE", topologyPath); + ddsCustomCmd.send(Cmds(make(fair::mq::Transition::InitDevice)).Serialize(), topologyPath); } else if (command == "k") { cout << "> complete init" << endl; - ddsCustomCmd.send("COMPLETE INIT", topologyPath); + ddsCustomCmd.send(Cmds(make(fair::mq::Transition::CompleteInit)).Serialize(), topologyPath); } else if (command == "b") { cout << "> bind devices" << endl; - ddsCustomCmd.send("BIND", topologyPath); + ddsCustomCmd.send(Cmds(make(fair::mq::Transition::Bind)).Serialize(), topologyPath); } else if (command == "x") { cout << "> connect devices" << endl; - ddsCustomCmd.send("CONNECT", topologyPath); + ddsCustomCmd.send(Cmds(make(fair::mq::Transition::Connect)).Serialize(), topologyPath); } else if (command == "j") { cout << "> init tasks" << endl; - ddsCustomCmd.send("INIT TASK", topologyPath); - } else if (command == "p") { - cout << "> pause devices" << endl; - ddsCustomCmd.send("PAUSE", topologyPath); + ddsCustomCmd.send(Cmds(make(fair::mq::Transition::InitTask)).Serialize(), topologyPath); } else if (command == "r") { cout << "> run tasks" << endl; - ddsCustomCmd.send("RUN", topologyPath); + ddsCustomCmd.send(Cmds(make(fair::mq::Transition::Run)).Serialize(), topologyPath); } else if (command == "s") { cout << "> stop devices" << endl; - ddsCustomCmd.send("STOP", topologyPath); + ddsCustomCmd.send(Cmds(make(fair::mq::Transition::Stop)).Serialize(), topologyPath); } else if (command == "t") { cout << "> reset tasks" << endl; - ddsCustomCmd.send("RESET TASK", topologyPath); + ddsCustomCmd.send(Cmds(make(fair::mq::Transition::ResetTask)).Serialize(), topologyPath); } else if (command == "d") { cout << "> reset devices" << endl; - ddsCustomCmd.send("RESET DEVICE", topologyPath); + ddsCustomCmd.send(Cmds(make(fair::mq::Transition::ResetDevice)).Serialize(), topologyPath); } else if (command == "h") { cout << "> help" << endl; printControlsHelp(); } else if (command == "q") { cout << "> end" << endl; - ddsCustomCmd.send("END", topologyPath); - } else if (command == "q!") { - cout << "> shutdown" << endl; - ddsCustomCmd.send("SHUTDOWN", topologyPath); - } else if (command == "r!") { - cout << "> startup" << endl; - ddsCustomCmd.send("STARTUP", topologyPath); + ddsCustomCmd.send(Cmds(make(fair::mq::Transition::End)).Serialize(), topologyPath); } else { cout << "\033[01;32mInvalid input: [" << c << "]\033[0m" << endl; printControlsHelp(); @@ -152,33 +150,36 @@ void commandMode(const string& commandIn, const string& topologyPath, CCustomCmd struct WaitMode { explicit WaitMode(const string& targetState) - : fTargetState(targetState) - {} + : fTransitionedCount(0) + { + if (targetState != "") { + size_t n = targetState.find("->"); + if (n == string::npos) { + fTargetStatePair.first = fair::mq::State::Ok; + fTargetStatePair.second = fair::mq::GetState(targetState); + } else { + fTargetStatePair.first = fair::mq::GetState(targetState.substr(0, n)); + fTargetStatePair.second = fair::mq::GetState(targetState.substr(n + 2)); + } + } + } - void Run(const chrono::milliseconds& timeout, - const string& topologyPath, - CCustomCmd& ddsCustomCmd, - unsigned int numberDevices, - const string& command = "") + void Run(const chrono::milliseconds& timeout, const string& topologyPath, CCustomCmd& ddsCustomCmd, unsigned int numDevices, const string& command = "") { StateSubscription stateSubscription(topologyPath, ddsCustomCmd); if (command != "") { - commandMode(command, topologyPath, ddsCustomCmd); + sendCommand(command, topologyPath, ddsCustomCmd); } // TODO once DDS provides an API to retrieve actual number of tasks, use it here auto condition = [&] { - bool res(!fTargetStates.empty() - && all_of(fTargetStates.cbegin(), - fTargetStates.cend(), - [&](unordered_map::value_type i) { - return boost::algorithm::ends_with(i.second, fTargetState); - })); - if (numberDevices > 0) { - res = res && (fTargetStates.size() == numberDevices); + bool res = fTransitionedCount == numDevices; + if (fTargetStatePair.first == fair::mq::State::Ok) { + cout << "Waiting for " << numDevices << " devices to reach " << fTargetStatePair.second << ", condition check: " << res << endl; + } else { + cout << "Waiting for " << numDevices << " devices to reach " << fTargetStatePair.first << "->" << fTargetStatePair.second << ", condition check: " << res << endl; } - cout << "waiting for " << numberDevices << " devices to reach " << fTargetState << ", condition check: " << res << endl; return res; }; @@ -191,21 +192,33 @@ struct WaitMode } else { fCV.wait(lock, condition); } + + // cout << "WaitMode.Run() finished" << endl; } - void AddNewStateEntry(uint64_t senderId, const string& state) + void CountStates(fair::mq::State lastState, fair::mq::State currentState) { { unique_lock lock(fMtx); - fTargetStates[senderId] = state; + if (fTargetStatePair.first == fair::mq::State::Ok) { + if (fTargetStatePair.second == currentState) { + fTransitionedCount++; + // cout << "fTransitionedCount = " << fTransitionedCount << " for single value" << endl; + } + } else { + if (fTargetStatePair.first == lastState && fTargetStatePair.second == currentState) { + fTransitionedCount++; + // cout << "fTransitionedCount = " << fTransitionedCount << " for double value" << endl; + } + } } fCV.notify_one(); } mutex fMtx; condition_variable fCV; - unordered_map fTargetStates; - string fTargetState; + pair fTargetStatePair; + unsigned int fTransitionedCount; }; int main(int argc, char* argv[]) @@ -216,7 +229,7 @@ int main(int argc, char* argv[]) string topologyPath; string targetState; unsigned int timeout; - unsigned int numberDevices(0); + unsigned int numDevices(0); bpo::options_description options("Common options"); @@ -232,7 +245,7 @@ int main(int argc, char* argv[]) ("path,p", bpo::value (&topologyPath)->default_value(""), "DDS Topology path to send command to (empty - send to all tasks)") ("wait-for-state,w", bpo::value (&targetState)->default_value(""), "Wait until targeted FairMQ devices reach the given state") ("timeout,t", bpo::value (&timeout)->default_value(0), "Timeout in milliseconds when waiting for a device state (0 - wait infinitely)") - ("number-devices,n", bpo::value (&numberDevices)->default_value(0), "Number of devices (will be removed in the future)") + ("number-devices,n", bpo::value (&numDevices)->default_value(0), "Number of devices (will be removed in the future)") ("help,h", "Produce help message"); bpo::variables_map vm; @@ -257,40 +270,58 @@ int main(int argc, char* argv[]) // subscribe to receive messages from DDS ddsCustomCmd.subscribe([&](const string& msg, const string& /*condition*/, uint64_t senderId) { - // cerr << "Received: " << msg << endl; - vector parts; - boost::algorithm::split(parts, msg, boost::algorithm::is_any_of(":,")); - if (parts[0] == "state-change") { - // cerr << "Received: " << msg << endl; - boost::trim(parts[2]); - waitMode.AddNewStateEntry(senderId, parts[3]); - if(parts[3] == "IDLE->EXITING") { - ddsCustomCmd.send("state-change-exiting-received", std::to_string(senderId)); + Cmds cmds; + cmds.Deserialize(msg); + // cout << "Received " << cmds.Size() << " command(s) with total size of " << msg.length() << " bytes: " << endl; + for (const auto& cmd : cmds) { + // cout << " > " << cmd->GetType() << endl; + switch (cmd->GetType()) { + case Type::state_change: { + cout << "Received state_change from " << static_cast(*cmd).GetDeviceId() << ": " << static_cast(*cmd).GetLastState() << "->" << static_cast(*cmd).GetCurrentState() << endl; + if (static_cast(*cmd).GetCurrentState() == fair::mq::State::Exiting) { + ddsCustomCmd.send(Cmds(make()).Serialize(), to_string(senderId)); + } + waitMode.CountStates(static_cast(*cmd).GetLastState(), static_cast(*cmd).GetCurrentState()); + } + break; + case Type::state_change_subscription: + if (static_cast(*cmd).GetResult() != Result::Ok) { + cout << "State change subscription failed for " << static_cast(*cmd).GetDeviceId() << endl; + } + break; + case Type::state_change_unsubscription: + if (static_cast(*cmd).GetResult() != Result::Ok) { + cout << "State change unsubscription failed for " << static_cast(*cmd).GetDeviceId() << endl; + } + break; + case Type::transition_status: { + // if (static_cast(*cmd).GetResult() == Result::Ok) { + // cout << "Device " << static_cast(*cmd).GetDeviceId() << " started to transition with " << static_cast(*cmd).GetTransition() << endl; + // } else { + // cout << "Device " << static_cast(*cmd).GetDeviceId() << " cannot transition with " << static_cast(*cmd).GetTransition() << endl; + // } + } + break; + default: + cout << "Unexpected/unknown command received: " << cmd->GetType() << endl; + cout << "Origin: " << senderId << endl; + break; } - } else if (parts[0] == "state-changes-subscription") { - if (parts[2] != "OK") { - cerr << "state-changes-subscription failed with return code: " << parts[2]; - } - } else if (parts[0] == "state-changes-unsubscription") { - if (parts[2] != "OK") { - cerr << "state-changes-unsubscription failed with return code: " << parts[2]; - } - } else { - cout << "Received: " << msg << endl; } }); service.start(sessionID); if (targetState == "") { - commandMode(command, topologyPath, ddsCustomCmd); + sendCommand(command, topologyPath, ddsCustomCmd); } else { - waitMode.Run(chrono::milliseconds(timeout), topologyPath, ddsCustomCmd, numberDevices, command); + waitMode.Run(chrono::milliseconds(timeout), topologyPath, ddsCustomCmd, numDevices, command); } + + ddsCustomCmd.unsubscribe(); } catch (exception& e) { cerr << "Error: " << e.what() << endl; return EXIT_FAILURE; } - return EXIT_SUCCESS; }