From f7cdf5ee23520c0634fe32fc07134599bc0d8d30 Mon Sep 17 00:00:00 2001 From: Dennis Klein Date: Wed, 24 Jul 2019 16:44:06 +0200 Subject: [PATCH] DDS plugin: Implement --control external --- fairmq/plugins/Control.cxx | 4 +- fairmq/plugins/DDS/DDS.cxx | 159 +++++++++++++++---------------------- fairmq/plugins/DDS/DDS.h | 100 +++++++++++++++++------ test/sdk/Fixtures.h | 2 +- test/sdk/test_topo.xml | 2 +- 5 files changed, 142 insertions(+), 125 deletions(-) diff --git a/fairmq/plugins/Control.cxx b/fairmq/plugins/Control.cxx index 3580ccfd..f52f87e4 100644 --- a/fairmq/plugins/Control.cxx +++ b/fairmq/plugins/Control.cxx @@ -70,7 +70,7 @@ Control::Control(const string& name, const Plugin::Version version, const string if (control == "static") { LOG(debug) << "Running builtin controller: static"; fControllerThread = thread(&Control::StaticMode, this); - } else if (control == "interactive") { + } else if (control == "dynamic" || control == "external" || control == "interactive") { LOG(debug) << "Running builtin controller: interactive"; fControllerThread = thread(&Control::InteractiveMode, this); } else { @@ -113,7 +113,7 @@ auto ControlPluginProgramOptions() -> Plugin::ProgOptions namespace po = boost::program_options; auto pluginOptions = po::options_description{"Control (builtin) Plugin"}; pluginOptions.add_options() - ("control", po::value()->default_value("interactive"), "Control mode, 'static' or 'interactive'") + ("control", po::value()->default_value("dynamic"), "Control mode, 'static' or 'dynamic' (aliases for dynamic are external and interactive)") ("catch-signals", po::value()->default_value(1), "Enable signal handling (1/0)."); return pluginOptions; } diff --git a/fairmq/plugins/DDS/DDS.cxx b/fairmq/plugins/DDS/DDS.cxx index 12a40367..6a9d5f08 100644 --- a/fairmq/plugins/DDS/DDS.cxx +++ b/fairmq/plugins/DDS/DDS.cxx @@ -33,20 +33,10 @@ namespace plugins DDS::DDS(const string& name, const Plugin::Version version, const string& maintainer, const string& homepage, PluginServices* pluginServices) : Plugin(name, version, maintainer, homepage, pluginServices) - , fService() - , fDDSCustomCmd(fService) - , fDDSKeyValue(fService) - , fDDSTaskId(dds::env_prop()) - , fBindingChans() - , fConnectingChans() - , fStopMutex() - , fStopCondition() , fTransitions({ "BIND", "CONNECT", "INIT TASK", "RUN", "STOP", "RESET TASK", "RESET DEVICE" }) - , fControllerThread() , fCurrentState(DeviceState::Idle) , fLastState(DeviceState::Idle) , fDeviceTerminationRequested(false) - , fServiceStarted(false) , fHeartbeatInterval(100) { try { @@ -63,95 +53,69 @@ DDS::DDS(const string& name, const Plugin::Version version, const string& mainta auto DDS::HandleControl() -> void { try { - LOG(debug) << "$DDS_TASK_PATH: " << getenv("DDS_TASK_PATH"); - LOG(debug) << "$DDS_GROUP_NAME: " << getenv("DDS_GROUP_NAME"); - LOG(debug) << "$DDS_COLLECTION_NAME: " << getenv("DDS_COLLECTION_NAME"); - LOG(debug) << "$DDS_TASK_NAME: " << getenv("DDS_TASK_NAME"); - LOG(debug) << "$DDS_TASK_INDEX: " << getenv("DDS_TASK_INDEX"); - LOG(debug) << "$DDS_COLLECTION_INDEX: " << getenv("DDS_COLLECTION_INDEX"); - LOG(debug) << "$DDS_TASK_ID: " << getenv("DDS_TASK_ID"); - LOG(debug) << "$DDS_LOCATION: " << getenv("DDS_LOCATION"); - string dds_session_id(getenv("DDS_SESSION_ID")); - LOG(debug) << "$DDS_SESSION_ID: " << dds_session_id; + auto control = GetProperty("control"); + bool staticMode(false); + if (control == "static") { + LOG(debug) << "Running DDS controller: static"; + staticMode = true; + } else if (control == "dynamic" || control == "external" || control == "interactive") { + LOG(debug) << "Running DDS controller: external"; + } else { + LOG(error) << "Unrecognized control mode '" << control << "' requested. " << "Ignoring and falling back to static control mode."; + staticMode = true; + } - LOG(info) << "DDS Task Id (from API): " << fDDSTaskId; - - // subscribe for state changes from DDS (subscriptions start firing after fService.start() is called) SubscribeForCustomCommands(); - - // subscribe for DDS service errors. - fService.subscribeOnError([](const dds::intercom_api::EErrorCode errorCode, const string& errorMsg) { - LOG(error) << "DDS Error received: error code: " << errorCode << ", error message: " << errorMsg << endl; - }); - SubscribeForConnectingChannels(); // subscribe to device state changes, pushing new state changes into the event queue SubscribeToDeviceStateChange([&](DeviceState newState) { fStateQueue.Push(newState); - if (newState == DeviceState::Exiting) { + switch(newState) { + case DeviceState::Bound: + // Receive addresses of connecting channels from DDS + // and propagate addresses of bound channels to DDS. + FillChannelContainers(); + + // publish bound addresses via DDS at keys corresponding to the channel prefixes, e.g. 'data' in data[i] + PublishBoundChannels(); + break; + case DeviceState::Exiting: fDeviceTerminationRequested = true; + UnsubscribeFromDeviceStateChange(); + ReleaseDeviceControl(); + break; + default: + break; } - if (fServiceStarted) { - lock_guard lock{fStateChangeSubscriberMutex}; - string id = GetProperty("id"); - fLastState = fCurrentState; - fCurrentState = newState; - for (auto subscriberId : fStateChangeSubscribers) { - LOG(debug) << "Publishing state-change: " << fLastState << "->" << newState << " to " << subscriberId; - fDDSCustomCmd.send("state-change: " + id + "," + ToString(fDDSTaskId) + "," + ToStr(fLastState) + "->" + ToStr(newState), to_string(subscriberId)); - } + lock_guard lock{fStateChangeSubscriberMutex}; + string id = GetProperty("id"); + fLastState = fCurrentState; + fCurrentState = newState; + for (auto subscriberId : fStateChangeSubscribers) { + LOG(debug) << "Publishing state-change: " << fLastState << "->" << newState << " to " << subscriberId; + fDDS.Send("state-change: " + id + "," + ToString(dds::env_prop()) + "," + ToStr(fLastState) + "->" + ToStr(newState), to_string(subscriberId)); } }); - ChangeDeviceState(DeviceStateTransition::InitDevice); - while (fStateQueue.WaitForNext() != DeviceState::InitializingDevice) {} - ChangeDeviceState(DeviceStateTransition::CompleteInit); - while (fStateQueue.WaitForNext() != DeviceState::Initialized) {} - ChangeDeviceState(DeviceStateTransition::Bind); - while (fStateQueue.WaitForNext() != DeviceState::Bound) {} + if (staticMode) { + TransitionDeviceStateTo(DeviceState::Running); - // in the Initializing state subscribe to receive addresses of connecting channels from DDS - // and propagate addresses of bound channels to DDS. - FillChannelContainers(); - - // start DDS service - subscriptions will only start firing after this step - fService.start(dds_session_id); - fServiceStarted = true; - - // publish bound addresses via DDS at keys corresponding to the channel prefixes, e.g. 'data' in data[i] - PublishBoundChannels(); - - ChangeDeviceState(DeviceStateTransition::Connect); - while (fStateQueue.WaitForNext() != DeviceState::DeviceReady) {} - - ChangeDeviceState(DeviceStateTransition::InitTask); - while (fStateQueue.WaitForNext() != DeviceState::Ready) {} - ChangeDeviceState(DeviceStateTransition::Run); - - // wait until stop signal - unique_lock lock(fStopMutex); - while (!fDeviceTerminationRequested) { - fStopCondition.wait_for(lock, chrono::seconds(1)); + // wait until stop signal + unique_lock lock(fStopMutex); + while (!fDeviceTerminationRequested) { + fStopCondition.wait_for(lock, chrono::seconds(1)); + } + LOG(debug) << "Stopping DDS control plugin"; } - LOG(debug) << "Stopping DDS control plugin"; } catch (DeviceErrorState&) { ReleaseDeviceControl(); } catch (exception& e) { + ReleaseDeviceControl(); LOG(error) << "Error: " << e.what() << endl; return; } - - fDDSKeyValue.unsubscribe(); - fDDSCustomCmd.unsubscribe(); - - try { - UnsubscribeFromDeviceStateChange(); - ReleaseDeviceControl(); - } catch (fair::mq::PluginServices::DeviceControlError& e) { - LOG(error) << e.what(); - } } auto DDS::FillChannelContainers() -> void @@ -228,7 +192,7 @@ auto DDS::SubscribeForConnectingChannels() -> void { LOG(debug) << "Subscribing for DDS properties."; - fDDSKeyValue.subscribe([&] (const string& propertyId, const string& value, uint64_t senderTaskID) { + fDDS.SubscribeKeyValue([&] (const string& propertyId, const string& value, uint64_t senderTaskID) { try { LOG(debug) << "Received update for " << propertyId << ": value=" << value << ", senderTaskID=" << senderTaskID; string val = value; @@ -286,7 +250,7 @@ auto DDS::PublishBoundChannels() -> void for (const auto& chan : fBindingChans) { string joined = boost::algorithm::join(chan.second, ","); LOG(debug) << "Publishing " << chan.first << " bound addresses (" << chan.second.size() << ") to DDS under '" << chan.first << "' property name."; - fDDSKeyValue.putValue(chan.first, joined); + fDDS.PutValue(chan.first, joined); } } @@ -299,7 +263,7 @@ auto DDS::HeartbeatSender() -> void lock_guard lock{fHeartbeatSubscriberMutex}; for (const auto subscriberId : fHeartbeatSubscribers) { - fDDSCustomCmd.send("heartbeat: " + id , to_string(subscriberId)); + fDDS.Send("heartbeat: " + id , to_string(subscriberId)); } } @@ -313,30 +277,30 @@ auto DDS::SubscribeForCustomCommands() -> void string id = GetProperty("id"); - fDDSCustomCmd.subscribe([id, this](const string& cmd, const string& cond, uint64_t senderId) { + fDDS.SubscribeCustomCmd([id, this](const string& cmd, const string& cond, uint64_t senderId) { LOG(info) << "Received command: '" << cmd << "' from " << senderId; if (cmd == "check-state") { - fDDSCustomCmd.send(id + ": " + ToStr(GetCurrentDeviceState()), to_string(senderId)); + fDDS.Send(id + ": " + ToStr(GetCurrentDeviceState()), to_string(senderId)); } else if (cmd == "INIT DEVICE") { if (ChangeDeviceState(ToDeviceStateTransition(cmd))) { - fDDSCustomCmd.send(id + ": queued, " + cmd, to_string(senderId)); + fDDS.Send(id + ": queued, " + cmd, to_string(senderId)); while (fStateQueue.WaitForNext() != DeviceState::InitializingDevice) {} ChangeDeviceState(DeviceStateTransition::CompleteInit); } else { - fDDSCustomCmd.send(id + ": could not queue, " + cmd , to_string(senderId)); + fDDS.Send(id + ": could not queue, " + cmd, to_string(senderId)); } } else if (fTransitions.find(cmd) != fTransitions.end()) { if (ChangeDeviceState(ToDeviceStateTransition(cmd))) { - fDDSCustomCmd.send(id + ": queued, " + cmd, to_string(senderId)); + fDDS.Send(id + ": queued, " + cmd, to_string(senderId)); } else { - fDDSCustomCmd.send(id + ": could not queue, " + cmd , to_string(senderId)); + fDDS.Send(id + ": could not queue, " + cmd, to_string(senderId)); } } else if (cmd == "END") { if (ChangeDeviceState(ToDeviceStateTransition(cmd))) { - fDDSCustomCmd.send(id + ": queued, " + cmd, to_string(senderId)); + fDDS.Send(id + ": queued, " + cmd, to_string(senderId)); } else { - fDDSCustomCmd.send(id + ": could not queue, " + cmd , to_string(senderId)); + fDDS.Send(id + ": could not queue, " + cmd, to_string(senderId)); } if (ToStr(GetCurrentDeviceState()) == "EXITING") { unique_lock lock(fStopMutex); @@ -347,43 +311,43 @@ auto DDS::SubscribeForCustomCommands() -> void for (const auto pKey: GetPropertyKeys()) { ss << id << ": " << pKey << " -> " << GetPropertyAsString(pKey) << endl; } - fDDSCustomCmd.send(ss.str(), to_string(senderId)); + fDDS.Send(ss.str(), to_string(senderId)); } else if (cmd == "subscribe-to-heartbeats") { { // auto size = fHeartbeatSubscribers.size(); lock_guard lock{fHeartbeatSubscriberMutex}; fHeartbeatSubscribers.insert(senderId); } - fDDSCustomCmd.send("heartbeat-subscription: " + id + ",OK", to_string(senderId)); + fDDS.Send("heartbeat-subscription: " + id + ",OK", to_string(senderId)); } else if (cmd == "unsubscribe-from-heartbeats") { { lock_guard lock{fHeartbeatSubscriberMutex}; fHeartbeatSubscribers.erase(senderId); } - fDDSCustomCmd.send("heartbeat-unsubscription: " + id + ",OK", to_string(senderId)); + fDDS.Send("heartbeat-unsubscription: " + id + ",OK", to_string(senderId)); } else if (cmd == "subscribe-to-state-changes") { { // auto size = fStateChangeSubscribers.size(); lock_guard lock{fStateChangeSubscriberMutex}; fStateChangeSubscribers.insert(senderId); } - fDDSCustomCmd.send("state-changes-subscription: " + id + ",OK", to_string(senderId)); + fDDS.Send("state-changes-subscription: " + id + ",OK", to_string(senderId)); { lock_guard lock{fStateChangeSubscriberMutex}; LOG(debug) << "Publishing state-change: " << fLastState << "->" << fCurrentState << " to " << senderId; // fDDSCustomCmd.send("state-change: " + id + "," + ToStr(fLastState) + "->" + ToStr(fCurrentState), to_string(senderId)); - fDDSCustomCmd.send("state-change: " + id + "," + ToString(fDDSTaskId) + "," + ToStr(fLastState) + "->" + ToStr(fCurrentState), to_string(senderId)); + fDDS.Send("state-change: " + id + "," + ToString(dds::env_prop()) + "," + ToStr(fLastState) + "->" + ToStr(fCurrentState), to_string(senderId)); } } else if (cmd == "unsubscribe-from-state-changes") { { lock_guard lock{fStateChangeSubscriberMutex}; fStateChangeSubscribers.erase(senderId); } - fDDSCustomCmd.send("state-changes-unsubscription: " + id + ",OK", to_string(senderId)); + fDDS.Send("state-changes-unsubscription: " + id + ",OK", to_string(senderId)); } else if (cmd == "SHUTDOWN") { TransitionDeviceStateTo(DeviceState::Exiting); } else if (cmd == "STARTUP") { - TransitionDeviceStateTo(DeviceState::Ready); + TransitionDeviceStateTo(DeviceState::Running); } else { LOG(warn) << "Unknown command: " << cmd; LOG(warn) << "Origin: " << senderId; @@ -394,6 +358,9 @@ auto DDS::SubscribeForCustomCommands() -> void DDS::~DDS() { + UnsubscribeFromDeviceStateChange(); + ReleaseDeviceControl(); + if (fControllerThread.joinable()) { fControllerThread.join(); } diff --git a/fairmq/plugins/DDS/DDS.h b/fairmq/plugins/DDS/DDS.h index 4cf3851f..e9ca4dfe 100644 --- a/fairmq/plugins/DDS/DDS.h +++ b/fairmq/plugins/DDS/DDS.h @@ -9,23 +9,22 @@ #ifndef FAIR_MQ_PLUGINS_DDS #define FAIR_MQ_PLUGINS_DDS -#include -#include -#include - -#include #include - -#include -#include -#include -#include -#include -#include -#include -#include +#include +#include #include +#include +#include +#include +#include #include +#include +#include +#include +#include +#include +#include +#include namespace fair { @@ -36,23 +35,78 @@ namespace plugins struct DDSConfig { - DDSConfig() - : fSubChannelAddresses() - , fDDSValues() - {} - // container of sub channel addresses std::vector fSubChannelAddresses; // dds values for the channel std::unordered_map fDDSValues; }; +struct DDSSubscription +{ + DDSSubscription() + : fDDSCustomCmd(fService) + , fDDSKeyValue(fService) + { + LOG(debug) << "$DDS_TASK_PATH: " << dds::env_prop(); + LOG(debug) << "$DDS_GROUP_NAME: " << dds::env_prop(); + LOG(debug) << "$DDS_COLLECTION_NAME: " << dds::env_prop(); + LOG(debug) << "$DDS_TASK_NAME: " << dds::env_prop(); + LOG(debug) << "$DDS_TASK_INDEX: " << dds::env_prop(); + LOG(debug) << "$DDS_COLLECTION_INDEX: " << dds::env_prop(); + LOG(debug) << "$DDS_TASK_ID: " << dds::env_prop(); + LOG(debug) << "$DDS_LOCATION: " << dds::env_prop(); + std::string dds_session_id(dds::env_prop()); + LOG(debug) << "$DDS_SESSION_ID: " << dds_session_id; + + // subscribe for DDS service errors. + fService.subscribeOnError([](const dds::intercom_api::EErrorCode errorCode, const std::string& errorMsg) { + LOG(error) << "DDS Error received: error code: " << errorCode << ", error message: " << errorMsg; + }); + + assert(!dds_session_id.empty()); + fService.start(dds_session_id); + } + + ~DDSSubscription() { + fDDSKeyValue.unsubscribe(); + fDDSCustomCmd.unsubscribe(); + } + + template + auto SubscribeCustomCmd(Args&&... args) -> void + { + fDDSCustomCmd.subscribe(std::forward(args)...); + } + + template + auto SubscribeKeyValue(Args&&... args) -> void + { + fDDSKeyValue.subscribe(std::forward(args)...); + } + + template + auto Send(Args&&... args) -> void + { + fDDSCustomCmd.send(std::forward(args)...); + } + + template + auto PutValue(Args&&... args) -> void + { + fDDSKeyValue.putValue(std::forward(args)...); + } + + private: + dds::intercom_api::CIntercomService fService; + dds::intercom_api::CCustomCmd fDDSCustomCmd; + dds::intercom_api::CKeyValue fDDSKeyValue; +}; + struct IofN { IofN(int i, int n) : fI(i) , fN(n) - , fEntries() {} unsigned int fI; @@ -77,10 +131,7 @@ class DDS : public Plugin auto HeartbeatSender() -> void; - dds::intercom_api::CIntercomService fService; - dds::intercom_api::CCustomCmd fDDSCustomCmd; - dds::intercom_api::CKeyValue fDDSKeyValue; - uint64_t fDDSTaskId; + DDSSubscription fDDS; std::unordered_map> fBindingChans; std::unordered_map fConnectingChans; @@ -98,7 +149,6 @@ class DDS : public Plugin fair::mq::StateQueue fStateQueue; std::atomic fDeviceTerminationRequested; - std::atomic fServiceStarted; std::set fHeartbeatSubscribers; std::mutex fHeartbeatSubscriberMutex; diff --git a/test/sdk/Fixtures.h b/test/sdk/Fixtures.h index a9443b52..5b0e4388 100644 --- a/test/sdk/Fixtures.h +++ b/test/sdk/Fixtures.h @@ -57,7 +57,7 @@ struct TopologyFixture : ::testing::Test LOG(info) << mDDSTopo; auto n(mDDSTopo.GetNumRequiredAgents()); mDDSSession.SubmitAgents(n); - mDDSSession.ActivateTopology(mDDSTopoFile); + mDDSSession.ActivateTopology(mDDSTopo); } auto TearDown() -> void override { diff --git a/test/sdk/test_topo.xml b/test/sdk/test_topo.xml index c8ae0bcc..89ed0ac2 100644 --- a/test/sdk/test_topo.xml +++ b/test/sdk/test_topo.xml @@ -27,7 +27,7 @@
Sampler - + Sink