From 07f7142ae2df0a2242358e85dc0b4efbcbe30247 Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Wed, 10 Jul 2019 17:00:36 +0200 Subject: [PATCH] Rebased, cleaned up --- examples/dds/Sampler.cxx | 3 +- examples/dds/ex-dds-hosts.cfg | 6 +- examples/dds/fairmq-start-ex-dds.sh.in | 14 +-- fairmq/plugins/DDS/DDS.cxx | 31 +++-- fairmq/plugins/DDS/DDS.h | 1 + fairmq/plugins/DDS/runDDSCommandUI.cxx | 154 ++++++++++++------------- 6 files changed, 102 insertions(+), 107 deletions(-) diff --git a/examples/dds/Sampler.cxx b/examples/dds/Sampler.cxx index f75d63a2..7e0a8a02 100644 --- a/examples/dds/Sampler.cxx +++ b/examples/dds/Sampler.cxx @@ -42,8 +42,7 @@ bool Sampler::ConditionalRun() // in case of error or transfer interruption, return false to go to IDLE state // successfull transfer will return number of bytes transfered (can be 0 if sending an empty message). - if (Send(msg, "data1") < 0) - { + if (Send(msg, "data1") < 0) { return false; } diff --git a/examples/dds/ex-dds-hosts.cfg b/examples/dds/ex-dds-hosts.cfg index 70789a5b..26070783 100644 --- a/examples/dds/ex-dds-hosts.cfg +++ b/examples/dds/ex-dds-hosts.cfg @@ -2,6 +2,6 @@ # source setup.sh @bash_end@ -sampler, username@localhost, , /path/to/dds-work/, 1 -processor, username@localhost, , /path/to/dds-work/, 10 -sink, username@localhost, , /path/to/dds-work/, 1 +sampler, username@localhost, , /home/username/dev/dds-work/, 1 +processor, username@localhost, , /home/username/dev/dds-work/, 10 +sink, username@localhost, , /home/username/dev/dds-work/, 1 diff --git a/examples/dds/fairmq-start-ex-dds.sh.in b/examples/dds/fairmq-start-ex-dds.sh.in index c345cc66..bab11977 100755 --- a/examples/dds/fairmq-start-ex-dds.sh.in +++ b/examples/dds/fairmq-start-ex-dds.sh.in @@ -26,18 +26,18 @@ echo "SESSION ID: ${DDS_SESSION_ID}" trap "cleanup ${DDS_SESSION_ID}" EXIT requiredNofAgents=12 -dds-submit -s ${DDS_SESSION_ID} -r localhost -n ${requiredNofAgents} -dds-info -s ${DDS_SESSION_ID} --wait-for-idle-agents ${requiredNofAgents} +dds-submit -r localhost -n ${requiredNofAgents} +dds-info --wait-for-idle-agents ${requiredNofAgents} topologyFile=@DATA_DIR@/ex-dds-topology.xml echo "TOPOLOGY FILE: ${topologyFile}" -dds-topology -s ${DDS_SESSION_ID} --disable-validation --activate ${topologyFile} +dds-topology --disable-validation --activate ${topologyFile} echo "------------------------" echo "Waiting for Topology to finish ..." sampler_and_sink="main/(Sampler|Sink)" -fairmq-dds-command-ui -p $sampler_and_sink --wait-for-state RUNNING->READY +fairmq-dds-command-ui -p $sampler_and_sink --wait-for-state "RUNNING->READY" echo "..." fairmq-dds-command-ui -c s -w READY fairmq-dds-command-ui -c t -w "DEVICE READY" @@ -45,12 +45,12 @@ fairmq-dds-command-ui -c d -w IDLE fairmq-dds-command-ui -c q -w EXITING # fairmq-dds-command-ui -c q! -w EXITING echo "..." -dds-info -s ${DDS_SESSION_ID} --wait-for-idle-agents ${requiredNofAgents} +dds-info --wait-for-idle-agents ${requiredNofAgents} echo "------------------------" -dds-topology -s ${DDS_SESSION_ID} --stop +dds-topology --stop -dds-agent-cmd getlog -a -s ${DDS_SESSION_ID} +dds-agent-cmd getlog -a logDir="${wrkDir}/logs" for file in $(find "${logDir}" -name "*.tar.gz"); do tar -xf ${file} -C "${logDir}" ; done echo "AGENT LOG FILES IN: ${logDir}" diff --git a/fairmq/plugins/DDS/DDS.cxx b/fairmq/plugins/DDS/DDS.cxx index 380dc3ae..40c1e537 100644 --- a/fairmq/plugins/DDS/DDS.cxx +++ b/fairmq/plugins/DDS/DDS.cxx @@ -40,7 +40,7 @@ DDS::DDS(const string& name, const Plugin::Version version, const string& mainta , fConnectingChans() , fStopMutex() , fStopCondition() - , fCommands({ "BIND", "CONNECT", "INIT TASK", "RUN", "STOP", "RESET TASK", "RESET DEVICE" }) + , fCommands({ "BIND", "CONNECT", "INIT TASK", "RUN", "STOP", "RESET TASK", "RESET DEVICE", "SHUTDOWN", "STARTUP" }) , fControllerThread() , fEvents() , fEventsMutex() @@ -48,7 +48,8 @@ DDS::DDS(const string& name, const Plugin::Version version, const string& mainta , fCurrentState(DeviceState::Idle) , fLastState(DeviceState::Idle) , fDeviceTerminationRequested(false) - , fHeartbeatInterval{100} + , fServiceStarted(false) + , fHeartbeatInterval(100) { try { TakeDeviceControl(); @@ -83,9 +84,6 @@ auto DDS::HandleControl() -> void SubscribeForConnectingChannels(); - // start DDS service - subscriptions will only start firing after this step - fService.start(dds_session_id); - // subscribe to device state changes, pushing new state changes into the event queue SubscribeToDeviceStateChange([&](DeviceState newState) { { @@ -97,17 +95,14 @@ auto DDS::HandleControl() -> void fDeviceTerminationRequested = true; } - { + if (fServiceStarted) { lock_guard lock{fStateChangeSubscriberMutex}; string id = GetProperty("id"); fLastState = fCurrentState; fCurrentState = newState; for (auto subscriberId : fStateChangeSubscribers) { - LOG(debug) << "Publishing state-change: " << fLastState << "->" << newState - << " to " << subscriberId; - fDDSCustomCmd.send("state-change: " + id + "," + ToStr(fLastState) + "->" - + ToStr(newState), - to_string(subscriberId)); + LOG(debug) << "Publishing state-change: " << fLastState << "->" << newState << " to " << subscriberId; + fDDSCustomCmd.send("state-change: " + id + "," + ToStr(fLastState) + "->" + ToStr(newState), to_string(subscriberId)); } } }); @@ -123,6 +118,10 @@ auto DDS::HandleControl() -> void // and propagate addresses of bound channels to DDS. FillChannelContainers(); + // start DDS service - subscriptions will only start firing after this step + fService.start(dds_session_id); + fServiceStarted = true; + // publish bound addresses via DDS at keys corresponding to the channel prefixes, e.g. 'data' in data[i] PublishBoundChannels(); @@ -267,8 +266,7 @@ auto DDS::SubscribeForConnectingChannels() -> void // when multiple subChannels are used, their order on every device should be the same, irregardless of arrival order from DDS. sort(mi->second.fSubChannelAddresses.begin(), mi->second.fSubChannelAddresses.end()); auto it3 = mi->second.fDDSValues.begin(); - for (unsigned int i = 0; i < mi->second.fSubChannelAddresses.size(); ++i) - { + for (unsigned int i = 0; i < mi->second.fSubChannelAddresses.size(); ++i) { SetProperty(string{"chans." + mi->first + "." + to_string(i) + ".address"}, it3->second); ++it3; } @@ -372,11 +370,8 @@ auto DDS::SubscribeForCustomCommands() -> void fDDSCustomCmd.send("state-changes-subscription: " + id + ",OK", to_string(senderId)); { lock_guard lock{fStateChangeSubscriberMutex}; - LOG(debug) << "Publishing state-change: " << fLastState << "->" << fCurrentState - << " to " << senderId; - fDDSCustomCmd.send("state-change: " + id + "," + ToStr(fLastState) + "->" - + ToStr(fCurrentState), - to_string(senderId)); + LOG(debug) << "Publishing state-change: " << fLastState << "->" << fCurrentState << " to " << senderId; + fDDSCustomCmd.send("state-change: " + id + "," + ToStr(fLastState) + "->" + ToStr(fCurrentState), to_string(senderId)); } } else if (cmd == "unsubscribe-from-state-changes") { { diff --git a/fairmq/plugins/DDS/DDS.h b/fairmq/plugins/DDS/DDS.h index 9ae2bb50..ae1d7839 100644 --- a/fairmq/plugins/DDS/DDS.h +++ b/fairmq/plugins/DDS/DDS.h @@ -98,6 +98,7 @@ class DDS : public Plugin DeviceState fCurrentState, fLastState; std::atomic fDeviceTerminationRequested; + std::atomic fServiceStarted; std::set fHeartbeatSubscribers; std::mutex fHeartbeatSubscriberMutex; diff --git a/fairmq/plugins/DDS/runDDSCommandUI.cxx b/fairmq/plugins/DDS/runDDSCommandUI.cxx index 8b393e07..b9b69461 100644 --- a/fairmq/plugins/DDS/runDDSCommandUI.cxx +++ b/fairmq/plugins/DDS/runDDSCommandUI.cxx @@ -18,7 +18,7 @@ #include #include #include -#include +#include #include #include // raw mode console input #include @@ -28,7 +28,6 @@ using namespace std; using namespace dds::intercom_api; namespace bpo = boost::program_options; -using WaitForStateMap = map; struct TerminalConfig { @@ -75,9 +74,9 @@ void printControlsHelp() cout << "To quit press Ctrl+C" << endl; } -void commandMode(const string& command_in, const string& topologyPath, CCustomCmd& ddsCustomCmd) { +void commandMode(const string& commandIn, const string& topologyPath, CCustomCmd& ddsCustomCmd) { char c; - string command(command_in); + string command(commandIn); TerminalConfig tconfig; if (command == "") { @@ -88,48 +87,54 @@ void commandMode(const string& command_in, const string& topologyPath, CCustomCm while (true) { if (command == "c") { - cout << " > checking state of the devices" << endl; + cout << "\033[01;32m > checking state of the devices\033[0m" << endl; ddsCustomCmd.send("check-state", topologyPath); } else if (command == "o") { - cout << " > dumping config of the devices" << endl; + cout << "\033[01;32m > dumping config of the devices\033[0m" << endl; ddsCustomCmd.send("dump-config", topologyPath); } else if (command == "i") { - cout << " > init devices" << endl; + cout << "\033[01;32m > init devices\033[0m" << endl; ddsCustomCmd.send("INIT DEVICE", topologyPath); + } else if (command == "b") { + cout << "\033[01;32m > bind devices\033[0m" << endl; + ddsCustomCmd.send("BIND", topologyPath); + } else if (command == "x") { + cout << "\033[01;32m > connect devices\033[0m" << endl; + ddsCustomCmd.send("CONNECT", topologyPath); } else if (command == "j") { - cout << " > init tasks" << endl; + cout << "\033[01;32m > init tasks\033[0m" << endl; ddsCustomCmd.send("INIT TASK", topologyPath); } else if (command == "p") { - cout << " > pause devices" << endl; + cout << "\033[01;32m > pause devices\033[0m" << endl; ddsCustomCmd.send("PAUSE", topologyPath); } else if (command == "r") { - cout << " > run tasks" << endl; + cout << "\033[01;32m > run tasks\033[0m" << endl; ddsCustomCmd.send("RUN", topologyPath); } else if (command == "s") { - cout << " > stop devices" << endl; + cout << "\033[01;32m > stop devices\033[0m" << endl; ddsCustomCmd.send("STOP", topologyPath); } else if (command == "t") { - cout << " > reset tasks" << endl; + cout << "\033[01;32m > reset tasks\033[0m" << endl; ddsCustomCmd.send("RESET TASK", topologyPath); } else if (command == "d") { - cout << " > reset devices" << endl; + cout << "\033[01;32m > reset devices\033[0m" << endl; ddsCustomCmd.send("RESET DEVICE", topologyPath); } else if (command == "h") { - cout << " > help" << endl; + cout << "\033[01;32m > help\033[0m" << endl; printControlsHelp(); } else if (command == "q") { - cout << " > end" << endl; + cout << "\033[01;32m > end\033[0m" << endl; ddsCustomCmd.send("END", topologyPath); } else if (command == "q!") { ddsCustomCmd.send("SHUTDOWN", topologyPath); } else if (command == "r!") { ddsCustomCmd.send("STARTUP", topologyPath); } else { - cout << "Invalid input: [" << c << "]" << endl; + cout << "\033[01;32mInvalid input: [" << c << "]\033[0m" << endl; printControlsHelp(); } - if (command_in != "") { + if (commandIn != "") { this_thread::sleep_for(chrono::milliseconds(100)); // give dds a chance to complete request break; } else { @@ -139,36 +144,49 @@ void commandMode(const string& command_in, const string& topologyPath, CCustomCm } } -void waitMode(const string& waitForState, - mutex& waitForStateMutex, - condition_variable& waitForStateCV, - const WaitForStateMap& waitForStateMap, - const string& topologyPath, - CCustomCmd& ddsCustomCmd, - chrono::milliseconds timeout) +struct WaitMode { - StateSubscription stateSubscription(topologyPath, ddsCustomCmd); + explicit WaitMode(const string& targetState) + : fTargetState(targetState) + {} - auto condition = [&] { - return !waitForStateMap.empty() // TODO once DDS provides an API to retrieve actual number - // of tasks, use it here - && all_of(waitForStateMap.cbegin(), - waitForStateMap.cend(), - [&](WaitForStateMap::value_type i) { - return boost::algorithm::ends_with(i.second, waitForState); - }); - }; + void Run(const chrono::milliseconds& timeout, const string& topologyPath, CCustomCmd& ddsCustomCmd) + { + StateSubscription stateSubscription(topologyPath, ddsCustomCmd); - unique_lock lock(waitForStateMutex); + // TODO once DDS provides an API to retrieve actual number of tasks, use it here + auto condition = [&] { return !fTargetStates.empty() && all_of(fTargetStates.cbegin(), + fTargetStates.cend(), + [&](unordered_map::value_type i) { + return boost::algorithm::ends_with(i.second, fTargetState); + }); + }; - if (timeout > std::chrono::milliseconds(0)) { - if (!waitForStateCV.wait_for(lock, timeout, condition)) { - throw runtime_error("timeout"); + unique_lock lock(fMtx); + + if (timeout > chrono::milliseconds(0)) { + if (!fCV.wait_for(lock, timeout, condition)) { + throw runtime_error("timeout"); + } + } else { + fCV.wait(lock, condition); } - } else { - waitForStateCV.wait(lock, condition); } -} + + void AddNewStateEntry(uint64_t senderId, const string& state) + { + { + unique_lock lock(fMtx); + fTargetStates[senderId] = state; + } + fCV.notify_one(); + } + + mutex fMtx; + condition_variable fCV; + unordered_map fTargetStates; + string fTargetState; +}; int main(int argc, char* argv[]) { @@ -176,34 +194,23 @@ int main(int argc, char* argv[]) string sessionID; string command; string topologyPath; - string waitForState; + string targetState; unsigned int timeout; - mutex waitForStateMutex; - condition_variable waitForStateCV; - WaitForStateMap waitForStateMap; bpo::options_description options("Common options"); - auto env_session_id = std::getenv("DDS_SESSION_ID"); - if (env_session_id) { - options.add_options()("session,s", - bpo::value(&sessionID)->default_value(env_session_id), - "DDS Session ID (overrides any value in env var $DDS_SESSION_ID)"); + auto envSessionId = getenv("DDS_SESSION_ID"); + if (envSessionId) { + options.add_options()("session,s", bpo::value(&sessionID)->default_value(envSessionId), "DDS Session ID (overrides any value in env var $DDS_SESSION_ID)"); } else { - options.add_options()("session,s", - bpo::value(&sessionID)->required(), - "DDS Session ID (overrides any value in env var $DDS_SESSION_ID)"); + options.add_options()("session,s", bpo::value(&sessionID)->required(), "DDS Session ID (overrides any value in env var $DDS_SESSION_ID)"); } options.add_options() - ("command,c", bpo::value (&command)->default_value(""), - "Command character") - ("path,p", bpo::value (&topologyPath)->default_value(""), - "DDS Topology path to send command to (empty - send to all tasks)") - ("wait-for-state,w", bpo::value (&waitForState)->default_value(""), - "Wait until targeted FairMQ devices reach the given state") - ("timeout,t", bpo::value (&timeout)->default_value(0), - "Timeout in milliseconds when waiting for a device state (0 - wait infinitely)") + ("command,c", bpo::value (&command)->default_value(""), "Command character") + ("path,p", bpo::value (&topologyPath)->default_value(""), "DDS Topology path to send command to (empty - send to all tasks)") + ("wait-for-state,w", bpo::value (&targetState)->default_value(""), "Wait until targeted FairMQ devices reach the given state") + ("timeout,t", bpo::value (&timeout)->default_value(0), "Timeout in milliseconds when waiting for a device state (0 - wait infinitely)") ("help,h", "Produce help message"); bpo::variables_map vm; @@ -217,6 +224,8 @@ int main(int argc, char* argv[]) bpo::notify(vm); + WaitMode waitMode(targetState); + CIntercomService service; CCustomCmd ddsCustomCmd(service); @@ -226,16 +235,12 @@ int main(int argc, char* argv[]) // subscribe to receive messages from DDS ddsCustomCmd.subscribe([&](const string& msg, const string& /*condition*/, uint64_t senderId) { - cout << "Received: " << endl << msg << endl; + cerr << "Received: " << msg << endl; vector parts; boost::algorithm::split(parts, msg, boost::algorithm::is_any_of(":,")); if (parts[0] == "state-change") { - { - unique_lock lock(waitForStateMutex); - boost::trim(parts[2]); - waitForStateMap[senderId] = parts[2]; - } - waitForStateCV.notify_one(); + boost::trim(parts[2]); + waitMode.AddNewStateEntry(senderId, parts[2]); } else if (parts[0] == "state-changes-subscription") { if (parts[2] != "OK") { cerr << "state-changes-subscription failed with return code: " << parts[2]; @@ -245,13 +250,14 @@ int main(int argc, char* argv[]) cerr << "state-changes-unsubscription failed with return code: " << parts[2]; } } else { - cout << "Received: " << endl << msg << endl; + // cout << "Received: " << msg << endl; } }); service.start(sessionID); - if (waitForState == "") { + + if (targetState == "") { commandMode(command, topologyPath, ddsCustomCmd); } else { PrintControlsHelp(); @@ -316,13 +322,7 @@ int main(int argc, char* argv[]) if (command != "") { commandMode(command, topologyPath, ddsCustomCmd); } - waitMode(waitForState, - waitForStateMutex, - waitForStateCV, - waitForStateMap, - topologyPath, - ddsCustomCmd, - chrono::milliseconds(timeout)); + waitMode.Run(chrono::milliseconds(timeout), topologyPath, ddsCustomCmd); } } catch (exception& e) { cerr << "Error: " << e.what() << endl;