From ffbe90b638a51902d57da50524418983e58c649b Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Tue, 27 Nov 2018 17:22:26 +0100 Subject: [PATCH] Update to new DDS 2.2 API - require DDS 2.2 - fix regressions in automatic port binding - fix regression in channel API - update DDS example readme --- CMakeLists.txt | 2 +- examples/dds/README.md | 14 ++-- fairmq/FairMQChannel.cxx | 4 +- fairmq/FairMQChannel.h | 2 +- fairmq/plugins/DDS/DDS.cxx | 131 ++++++++++++------------------------- fairmq/plugins/DDS/DDS.h | 2 +- 6 files changed, 53 insertions(+), 102 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 28ce56f5..5eb82ae0 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -67,7 +67,7 @@ if(BUILD_OFI_TRANSPORT) endif() if(BUILD_DDS_PLUGIN) - find_package2(PRIVATE DDS VERSION 2.0 REQUIRED) + find_package2(PRIVATE DDS VERSION 2.2 REQUIRED) endif() if(BUILD_TESTING) diff --git a/examples/dds/README.md b/examples/dds/README.md index 2f1f1dbf..286c73e6 100644 --- a/examples/dds/README.md +++ b/examples/dds/README.md @@ -47,7 +47,7 @@ The configuration of the channel connection addresses is done by the DDS plugin Note that the attributes `value` contain a different value. -##### 4. Start DDS server. +##### 4. Start DDS session. First you need to initialize DDS environment: @@ -55,10 +55,10 @@ First you need to initialize DDS environment: source DDS_env.sh # this script is located in the DDS installation directory ``` -The DDS server is started with: +The DDS session is started with: ```bash -dds-server start -s +dds-session start ``` ##### 5. Submit DDS Agents (configured in the hosts file). @@ -91,7 +91,7 @@ A simple utility (fairmq-dds-command-ui) is included with FairMQ to send command To see it in action, start the fairmq-dds-command-ui while the topology is running. Run the utility with `-h` to see everything that it can do. -The utility requires a session parameter to connect to appropriate DDS session. The session value is given when starting dds-server. +The utility requires a session parameter to connect to appropriate DDS session. The session value is given when starting dds-session. By default the command UI sends commands to all tasks. This can be further refined by giving a specific topology path via `-p` argument. Given our topology file, here are some examples of valid paths: @@ -108,15 +108,15 @@ Given our topology file, here are some examples of valid paths: ./fairmq/plugins/DDS/fairmq-dds-command-ui -s 937ffbca-b524-44d8-9898-1d69aedc3751 -c c -p main/ProcessorGroup/Processor_9 ``` -##### 9. Stop DDS server/topology. +##### 9. Stop DDS session/topology. The execution of tasks can be stopped with: ```bash dds-topology --stop ``` -Or by stopping the DDS server: +Or by stopping the DDS session: ```bash -dds-server stop +dds-session stop ``` For general DDS documentation please refer to [DDS Website](http://dds.gsi.de/). diff --git a/fairmq/FairMQChannel.cxx b/fairmq/FairMQChannel.cxx index a2666609..7e82d0fa 100644 --- a/fairmq/FairMQChannel.cxx +++ b/fairmq/FairMQChannel.cxx @@ -614,7 +614,7 @@ bool FairMQChannel::BindEndpoint(string& endpoint) uniform_int_distribution randomPort(fPortRangeMin, fPortRangeMax); do { - LOG(debug) << "Could not bind to configured (TCP) port, trying random port in range " << fPortRangeMin << "-" << fPortRangeMax; + LOG(debug) << "Could not bind to configured (TCP) port (" << endpoint << "), trying random port in range " << fPortRangeMin << "-" << fPortRangeMax; ++numAttempts; if (numAttempts > maxAttempts) { @@ -624,7 +624,7 @@ bool FairMQChannel::BindEndpoint(string& endpoint) size_t pos = endpoint.rfind(':'); endpoint = endpoint.substr(0, pos + 1) + fair::mq::tools::ToString(static_cast(randomPort(generator))); - } while (fSocket->Bind(endpoint)); + } while (!fSocket->Bind(endpoint)); return true; } else { diff --git a/fairmq/FairMQChannel.h b/fairmq/FairMQChannel.h index 804dc5fa..f0f79df0 100644 --- a/fairmq/FairMQChannel.h +++ b/fairmq/FairMQChannel.h @@ -91,7 +91,7 @@ class FairMQChannel /// Get channel index /// @return Returns channel index (e.g. 0 in "data[0]") - std::string GetChannelIndex() const { return GetPrefix(); } // TODO: deprecate this in favor of following + std::string GetChannelIndex() const { return GetIndex(); } // TODO: deprecate this in favor of following std::string GetIndex() const; /// Get socket type diff --git a/fairmq/plugins/DDS/DDS.cxx b/fairmq/plugins/DDS/DDS.cxx index 1df9842d..533ea987 100644 --- a/fairmq/plugins/DDS/DDS.cxx +++ b/fairmq/plugins/DDS/DDS.cxx @@ -48,26 +48,20 @@ DDS::DDS(const string& name, const Plugin::Version version, const string& mainta , fDeviceTerminationRequested(false) , fHeartbeatInterval{100} { - try - { + try { TakeDeviceControl(); fControllerThread = thread(&DDS::HandleControl, this); fHeartbeatThread = thread(&DDS::HeartbeatSender, this); - } - catch (PluginServices::DeviceControlError& e) - { + } catch (PluginServices::DeviceControlError& e) { LOG(debug) << e.what(); - } - catch (exception& e) - { + } catch (exception& e) { LOG(error) << "Error in plugin initialization: " << e.what(); } } auto DDS::HandleControl() -> void { - try - { + try { // subscribe for state changes from DDS (subscriptions start firing after fService.start() is called) SubscribeForCustomCommands(); @@ -80,23 +74,20 @@ auto DDS::HandleControl() -> void SubscribeForConnectingChannels(); // subscribe to device state changes, pushing new state chenges into the event queue - SubscribeToDeviceStateChange([&](DeviceState newState) - { + SubscribeToDeviceStateChange([&](DeviceState newState) { { lock_guard lock{fEventsMutex}; fEvents.push(newState); } fNewEvent.notify_one(); - if (newState == DeviceState::Exiting) - { + if (newState == DeviceState::Exiting) { fDeviceTerminationRequested = true; } { lock_guard lock{fStateChangeSubscriberMutex}; string id = GetProperty("id"); - for (auto subscriberId : fStateChangeSubscribers) - { + for (auto subscriberId : fStateChangeSubscribers) { LOG(debug) << "Publishing state-change: " << newState << " to " << subscriberId; fDDSCustomCmd.send("state-change: " + id + "," + ToStr(newState), to_string(subscriberId)); } @@ -131,14 +122,11 @@ auto DDS::HandleControl() -> void // wait until stop signal unique_lock lock(fStopMutex); - while (!fDeviceTerminationRequested) - { + while (!fDeviceTerminationRequested) { fStopCondition.wait_for(lock, chrono::seconds(1)); } LOG(debug) << "Stopping DDS control plugin"; - } - catch (exception& e) - { + } catch (exception& e) { LOG(error) << "Error: " << e.what() << endl; return; } @@ -146,13 +134,10 @@ auto DDS::HandleControl() -> void fDDSKeyValue.unsubscribe(); fDDSCustomCmd.unsubscribe(); - try - { + try { UnsubscribeFromDeviceStateChange(); ReleaseDeviceControl(); - } - catch (fair::mq::PluginServices::DeviceControlError& e) - { + } catch (fair::mq::PluginServices::DeviceControlError& e) { LOG(error) << e.what(); } } @@ -194,7 +179,7 @@ auto DDS::FillChannelContainers() -> void for (const auto& vi : iValues) { size_t pos = vi.find(":"); - string chanName = vi.substr(0, pos ); + string chanName = vi.substr(0, pos); // check if provided name is a valid channel name if (fConnectingChans.find(chanName) == fConnectingChans.end()) { @@ -229,11 +214,9 @@ auto DDS::FillChannelContainers() -> void auto DDS::SubscribeForConnectingChannels() -> void { - fDDSKeyValue.subscribe([&] (const string& propertyId, const string& key, const string& value) - { - try - { - LOG(debug) << "Received update for " << propertyId << ": key=" << key << " value=" << value; + fDDSKeyValue.subscribe([&] (const string& propertyId, const string& value, uint64_t senderTaskID) { + try { + LOG(debug) << "Received update for " << propertyId << ": value=" << value << ", senderTaskID=" << senderTaskID; string val = value; // check if it is to handle as one out of multiple values auto it = fIofN.find(propertyId); @@ -254,20 +237,18 @@ auto DDS::SubscribeForConnectingChannels() -> void auto it2 = fI.find(propertyId); if (it2 != fI.end()) { LOG(debug) << "adding connecting channel " << propertyId << " : " << connectionStrings.at(it2->second); - fConnectingChans.at(propertyId).fDDSValues.insert(make_pair(key.c_str(), connectionStrings.at(it2->second).c_str())); + fConnectingChans.at(propertyId).fDDSValues.insert({senderTaskID, connectionStrings.at(it2->second).c_str()}); } else { LOG(error) << "multiple bound channels received, but no task index specified, only assigning the first"; - fConnectingChans.at(propertyId).fDDSValues.insert(make_pair(key.c_str(), connectionStrings.at(0).c_str())); + fConnectingChans.at(propertyId).fDDSValues.insert({senderTaskID, connectionStrings.at(0).c_str()}); } } else { // only one bound channel received - fConnectingChans.at(propertyId).fDDSValues.insert(make_pair(key.c_str(), val.c_str())); + fConnectingChans.at(propertyId).fDDSValues.insert({senderTaskID, val.c_str()}); } // update channels and remove them from unfinished container - for (auto mi = fConnectingChans.begin(); mi != fConnectingChans.end(); /* no increment */) - { - if (mi->second.fSubChannelAddresses.size() == mi->second.fDDSValues.size()) - { + for (auto mi = fConnectingChans.begin(); mi != fConnectingChans.end(); /* no increment */) { + if (mi->second.fSubChannelAddresses.size() == mi->second.fDDSValues.size()) { // when multiple subChannels are used, their order on every device should be the same, irregardless of arrival order from DDS. sort(mi->second.fSubChannelAddresses.begin(), mi->second.fSubChannelAddresses.end()); auto it3 = mi->second.fDDSValues.begin(); @@ -277,24 +258,19 @@ auto DDS::SubscribeForConnectingChannels() -> void ++it3; } fConnectingChans.erase(mi++); - } - else - { + } else { ++mi; } } - } - catch (const exception& e) - { - LOG(error) << "Error on handling DDS property update for " << propertyId << ": key=" << key << " value=" << value << ": " << e.what(); + } catch (const exception& e) { + LOG(error) << "Error on handling DDS property update for " << propertyId << ": value=" << value << ", senderTaskID=" << senderTaskID << ": " << e.what(); } }); } auto DDS::PublishBoundChannels() -> void { - for (const auto& chan : fBindingChans) - { + for (const auto& chan : fBindingChans) { string joined = boost::algorithm::join(chan.second, ","); LOG(debug) << "Publishing " << chan.first << " bound addresses (" << chan.second.size() << ") to DDS under '" << chan.first << "' property name."; fDDSKeyValue.putValue(chan.first, joined); @@ -306,13 +282,11 @@ auto DDS::HeartbeatSender() -> void string id = GetProperty("id"); string pid(to_string(getpid())); - while (!fDeviceTerminationRequested) - { + while (!fDeviceTerminationRequested) { { lock_guard lock{fHeartbeatSubscriberMutex}; - for (const auto subscriberId : fHeartbeatSubscribers) - { + for (const auto subscriberId : fHeartbeatSubscribers) { fDDSCustomCmd.send("heartbeat: " + id + "," + pid, to_string(subscriberId)); } } @@ -326,58 +300,42 @@ auto DDS::SubscribeForCustomCommands() -> void string id = GetProperty("id"); string pid(to_string(getpid())); - fDDSCustomCmd.subscribe([id, pid, this](const string& cmd, const string& cond, uint64_t senderId) - { + fDDSCustomCmd.subscribe([id, pid, this](const string& cmd, const string& cond, uint64_t senderId) { LOG(info) << "Received command: " << cmd; - if (cmd == "check-state") - { + if (cmd == "check-state") { fDDSCustomCmd.send(id + ": " + ToStr(GetCurrentDeviceState()) + " (pid: " + pid + ")", to_string(senderId)); - } - else if (fCommands.find(cmd) != fCommands.end()) - { + } else if (fCommands.find(cmd) != fCommands.end()) { fDDSCustomCmd.send(id + ": attempting to " + cmd, to_string(senderId)); ChangeDeviceState(ToDeviceStateTransition(cmd)); - } - else if (cmd == "END") - { + } else if (cmd == "END") { fDDSCustomCmd.send(id + ": attempting to " + cmd, to_string(senderId)); ChangeDeviceState(ToDeviceStateTransition(cmd)); fDDSCustomCmd.send(id + ": " + ToStr(GetCurrentDeviceState()), to_string(senderId)); - if (ToStr(GetCurrentDeviceState()) == "EXITING") - { + if (ToStr(GetCurrentDeviceState()) == "EXITING") { unique_lock lock(fStopMutex); fStopCondition.notify_one(); } - } - else if (cmd == "dump-config") - { + } else if (cmd == "dump-config") { stringstream ss; - for (const auto pKey: GetPropertyKeys()) - { + for (const auto pKey: GetPropertyKeys()) { ss << id << ": " << pKey << " -> " << GetPropertyAsString(pKey) << endl; } fDDSCustomCmd.send(ss.str(), to_string(senderId)); - } - else if (cmd == "subscribe-to-heartbeats") - { + } else if (cmd == "subscribe-to-heartbeats") { { // auto size = fHeartbeatSubscribers.size(); lock_guard lock{fHeartbeatSubscriberMutex}; fHeartbeatSubscribers.insert(senderId); } fDDSCustomCmd.send("heartbeat-subscription: " + id + ",OK", to_string(senderId)); - } - else if (cmd == "unsubscribe-from-heartbeats") - { + } else if (cmd == "unsubscribe-from-heartbeats") { { lock_guard lock{fHeartbeatSubscriberMutex}; fHeartbeatSubscribers.erase(senderId); } fDDSCustomCmd.send("heartbeat-unsubscription: " + id + ",OK", to_string(senderId)); - } - else if (cmd == "subscribe-to-state-changes") - { + } else if (cmd == "subscribe-to-state-changes") { { // auto size = fStateChangeSubscribers.size(); lock_guard lock{fStateChangeSubscriberMutex}; @@ -387,17 +345,13 @@ auto DDS::SubscribeForCustomCommands() -> void auto state = GetCurrentDeviceState(); LOG(debug) << "Publishing state-change: " << state << " to " << senderId; fDDSCustomCmd.send("state-change: " + id + "," + ToStr(state), to_string(senderId)); - } - else if (cmd == "unsubscribe-from-state-changes") - { + } else if (cmd == "unsubscribe-from-state-changes") { { lock_guard lock{fStateChangeSubscriberMutex}; fStateChangeSubscribers.erase(senderId); } fDDSCustomCmd.send("state-changes-unsubscription: " + id + ",OK", to_string(senderId)); - } - else - { + } else { LOG(warn) << "Unknown command: " << cmd; LOG(warn) << "Origin: " << senderId; LOG(warn) << "Destination: " << cond; @@ -408,8 +362,7 @@ auto DDS::SubscribeForCustomCommands() -> void auto DDS::WaitForNextState() -> DeviceState { unique_lock lock{fEventsMutex}; - while (fEvents.empty()) - { + while (fEvents.empty()) { fNewEvent.wait(lock); } @@ -420,13 +373,11 @@ auto DDS::WaitForNextState() -> DeviceState DDS::~DDS() { - if (fControllerThread.joinable()) - { + if (fControllerThread.joinable()) { fControllerThread.join(); } - if (fHeartbeatThread.joinable()) - { + if (fHeartbeatThread.joinable()) { fHeartbeatThread.join(); } } diff --git a/fairmq/plugins/DDS/DDS.h b/fairmq/plugins/DDS/DDS.h index ee3232bc..2ef265fd 100644 --- a/fairmq/plugins/DDS/DDS.h +++ b/fairmq/plugins/DDS/DDS.h @@ -41,7 +41,7 @@ struct DDSConfig // container of sub channel addresses std::vector fSubChannelAddresses; // dds values for the channel - std::unordered_map fDDSValues; + std::unordered_map fDDSValues; }; struct IofN