diff --git a/examples/dds/ex-dds-topology-infinite.xml b/examples/dds/ex-dds-topology-infinite.xml index e85d7e39..a8d37a1d 100644 --- a/examples/dds/ex-dds-topology-infinite.xml +++ b/examples/dds/ex-dds-topology-infinite.xml @@ -1,7 +1,7 @@ - - + + @@ -14,7 +14,7 @@ SamplerWorker - data1 + fmqchan_data1 @@ -25,8 +25,8 @@ ProcessorWorker - data1 - data2 + fmqchan_data1 + fmqchan_data2 @@ -37,7 +37,7 @@ SinkWorker - data2 + fmqchan_data2 diff --git a/examples/dds/ex-dds-topology.xml b/examples/dds/ex-dds-topology.xml index a2396911..a1c13946 100644 --- a/examples/dds/ex-dds-topology.xml +++ b/examples/dds/ex-dds-topology.xml @@ -1,7 +1,7 @@ - - + + @@ -14,7 +14,7 @@ SamplerWorker - data1 + fmqchan_data1 @@ -25,8 +25,8 @@ ProcessorWorker - data1 - data2 + fmqchan_data1 + fmqchan_data2 @@ -37,7 +37,7 @@ SinkWorker - data2 + fmqchan_data2 diff --git a/fairmq/plugins/DDS/DDS.cxx b/fairmq/plugins/DDS/DDS.cxx index 88b6697f..74f3e605 100644 --- a/fairmq/plugins/DDS/DDS.cxx +++ b/fairmq/plugins/DDS/DDS.cxx @@ -263,8 +263,15 @@ auto DDS::SubscribeForConnectingChannels() -> void { LOG(debug) << "Subscribing for DDS properties."; - fDDS.SubscribeKeyValue([&] (const string& propertyId, const string& value, uint64_t senderTaskID) { - LOG(debug) << "Received update for " << propertyId << ": value=" << value << ", senderTaskID=" << senderTaskID; + fDDS.SubscribeKeyValue([&] (const string& key, const string& value, uint64_t senderTaskID) { + LOG(debug) << "Received property: key=" << key << ", value=" << value << ", senderTaskID=" << senderTaskID; + + if (key.compare(0, 8, "fmqchan_") != 0) { + LOG(debug) << "property update is not a channel info update: " << key; + return; + } + string channelName = key.substr(8); + LOG(info) << "Update for channel name: " << channelName; boost::asio::post(fWorkerQueue, [=]() { try { @@ -274,14 +281,14 @@ auto DDS::SubscribeForConnectingChannels() -> void } string val = value; // check if it is to handle as one out of multiple values - auto it = fIofN.find(propertyId); + auto it = fIofN.find(channelName); if (it != fIofN.end()) { it->second.fEntries.push_back(value); if (it->second.fEntries.size() == it->second.fN) { sort(it->second.fEntries.begin(), it->second.fEntries.end()); val = it->second.fEntries.at(it->second.fI); } else { - LOG(debug) << "received " << it->second.fEntries.size() << " values for " << propertyId << ", expecting total of " << it->second.fN; + LOG(debug) << "received " << it->second.fEntries.size() << " values for " << channelName << ", expecting total of " << it->second.fN; return; } } @@ -289,16 +296,16 @@ auto DDS::SubscribeForConnectingChannels() -> void vector connectionStrings; boost::algorithm::split(connectionStrings, val, boost::algorithm::is_any_of(",")); if (connectionStrings.size() > 1) { // multiple bound channels received - auto it2 = fI.find(propertyId); + auto it2 = fI.find(channelName); if (it2 != fI.end()) { - LOG(debug) << "adding connecting channel " << propertyId << " : " << connectionStrings.at(it2->second); - fConnectingChans.at(propertyId).fDDSValues.insert({senderTaskID, connectionStrings.at(it2->second).c_str()}); + LOG(debug) << "adding connecting channel " << channelName << " : " << connectionStrings.at(it2->second); + fConnectingChans.at(channelName).fDDSValues.insert({senderTaskID, connectionStrings.at(it2->second).c_str()}); } else { LOG(error) << "multiple bound channels received, but no task index specified, only assigning the first"; - fConnectingChans.at(propertyId).fDDSValues.insert({senderTaskID, connectionStrings.at(0).c_str()}); + fConnectingChans.at(channelName).fDDSValues.insert({senderTaskID, connectionStrings.at(0).c_str()}); } } else { // only one bound channel received - fConnectingChans.at(propertyId).fDDSValues.insert({senderTaskID, val.c_str()}); + fConnectingChans.at(channelName).fDDSValues.insert({senderTaskID, val.c_str()}); } // update channels and remove them from unfinished container @@ -317,7 +324,7 @@ auto DDS::SubscribeForConnectingChannels() -> void } } } catch (const exception& e) { - LOG(error) << "Error on handling DDS property update for " << propertyId << ": value=" << value << ", senderTaskID=" << senderTaskID << ": " << e.what(); + LOG(error) << "Error handling DDS property: key=" << key << ", value=" << value << ", senderTaskID=" << senderTaskID << ": " << e.what(); } }); }); @@ -327,8 +334,8 @@ auto DDS::PublishBoundChannels() -> void { for (const auto& chan : fBindingChans) { string joined = boost::algorithm::join(chan.second, ","); - LOG(debug) << "Publishing " << chan.first << " bound addresses (" << chan.second.size() << ") to DDS under '" << chan.first << "' property name."; - fDDS.PutValue(chan.first, joined); + LOG(debug) << "Publishing bound addresses (" << chan.second.size() << ") of channel '" << chan.first << "' to DDS under '" << "fmqchan_" + chan.first << "' property name."; + fDDS.PutValue("fmqchan_" + chan.first, joined); } } diff --git a/test/sdk/test_topo.xml b/test/sdk/test_topo.xml index 09a70896..4d164ee1 100644 --- a/test/sdk/test_topo.xml +++ b/test/sdk/test_topo.xml @@ -1,6 +1,6 @@ - + @@ -11,7 +11,7 @@ SamplerWorker - data + fmqchan_data @@ -21,7 +21,7 @@ SinkWorker - data + fmqchan_data