DDS plugin: Prefix channel properties

This commit is contained in:
Alexey Rybalchenko 2019-09-17 14:29:12 +02:00 committed by Dennis Klein
parent 47d9e282d4
commit 9544d9665b
4 changed files with 34 additions and 27 deletions

View File

@ -1,7 +1,7 @@
<topology name="ExampleDDS"> <topology name="ExampleDDS">
<property name="data1" /> <property name="fmqchan_data1" />
<property name="data2" /> <property name="fmqchan_data2" />
<declrequirement name="SamplerWorker" type="wnname" value="sampler"/> <declrequirement name="SamplerWorker" type="wnname" value="sampler"/>
<declrequirement name="ProcessorWorker" type="wnname" value="processor"/> <declrequirement name="ProcessorWorker" type="wnname" value="processor"/>
@ -14,7 +14,7 @@
<name>SamplerWorker</name> <name>SamplerWorker</name>
</requirements> </requirements>
<properties> <properties>
<name access="write">data1</name> <name access="write">fmqchan_data1</name>
</properties> </properties>
</decltask> </decltask>
@ -25,8 +25,8 @@
<name>ProcessorWorker</name> <name>ProcessorWorker</name>
</requirements> </requirements>
<properties> <properties>
<name access="read">data1</name> <name access="read">fmqchan_data1</name>
<name access="read">data2</name> <name access="read">fmqchan_data2</name>
</properties> </properties>
</decltask> </decltask>
@ -37,7 +37,7 @@
<name>SinkWorker</name> <name>SinkWorker</name>
</requirements> </requirements>
<properties> <properties>
<name access="write">data2</name> <name access="write">fmqchan_data2</name>
</properties> </properties>
</decltask> </decltask>

View File

@ -1,7 +1,7 @@
<topology name="ExampleDDS"> <topology name="ExampleDDS">
<property name="data1" /> <property name="fmqchan_data1" />
<property name="data2" /> <property name="fmqchan_data2" />
<declrequirement name="SamplerWorker" type="wnname" value="sampler"/> <declrequirement name="SamplerWorker" type="wnname" value="sampler"/>
<declrequirement name="ProcessorWorker" type="wnname" value="processor"/> <declrequirement name="ProcessorWorker" type="wnname" value="processor"/>
@ -14,7 +14,7 @@
<name>SamplerWorker</name> <name>SamplerWorker</name>
</requirements> </requirements>
<properties> <properties>
<name access="write">data1</name> <name access="write">fmqchan_data1</name>
</properties> </properties>
</decltask> </decltask>
@ -25,8 +25,8 @@
<name>ProcessorWorker</name> <name>ProcessorWorker</name>
</requirements> </requirements>
<properties> <properties>
<name access="read">data1</name> <name access="read">fmqchan_data1</name>
<name access="read">data2</name> <name access="read">fmqchan_data2</name>
</properties> </properties>
</decltask> </decltask>
@ -37,7 +37,7 @@
<name>SinkWorker</name> <name>SinkWorker</name>
</requirements> </requirements>
<properties> <properties>
<name access="write">data2</name> <name access="write">fmqchan_data2</name>
</properties> </properties>
</decltask> </decltask>

View File

@ -263,8 +263,15 @@ auto DDS::SubscribeForConnectingChannels() -> void
{ {
LOG(debug) << "Subscribing for DDS properties."; LOG(debug) << "Subscribing for DDS properties.";
fDDS.SubscribeKeyValue([&] (const string& propertyId, const string& value, uint64_t senderTaskID) { fDDS.SubscribeKeyValue([&] (const string& key, const string& value, uint64_t senderTaskID) {
LOG(debug) << "Received update for " << propertyId << ": value=" << value << ", senderTaskID=" << senderTaskID; LOG(debug) << "Received property: key=" << key << ", value=" << value << ", senderTaskID=" << senderTaskID;
if (key.compare(0, 8, "fmqchan_") != 0) {
LOG(debug) << "property update is not a channel info update: " << key;
return;
}
string channelName = key.substr(8);
LOG(info) << "Update for channel name: " << channelName;
boost::asio::post(fWorkerQueue, [=]() { boost::asio::post(fWorkerQueue, [=]() {
try { try {
@ -274,14 +281,14 @@ auto DDS::SubscribeForConnectingChannels() -> void
} }
string val = value; string val = value;
// check if it is to handle as one out of multiple values // check if it is to handle as one out of multiple values
auto it = fIofN.find(propertyId); auto it = fIofN.find(channelName);
if (it != fIofN.end()) { if (it != fIofN.end()) {
it->second.fEntries.push_back(value); it->second.fEntries.push_back(value);
if (it->second.fEntries.size() == it->second.fN) { if (it->second.fEntries.size() == it->second.fN) {
sort(it->second.fEntries.begin(), it->second.fEntries.end()); sort(it->second.fEntries.begin(), it->second.fEntries.end());
val = it->second.fEntries.at(it->second.fI); val = it->second.fEntries.at(it->second.fI);
} else { } else {
LOG(debug) << "received " << it->second.fEntries.size() << " values for " << propertyId << ", expecting total of " << it->second.fN; LOG(debug) << "received " << it->second.fEntries.size() << " values for " << channelName << ", expecting total of " << it->second.fN;
return; return;
} }
} }
@ -289,16 +296,16 @@ auto DDS::SubscribeForConnectingChannels() -> void
vector<string> connectionStrings; vector<string> connectionStrings;
boost::algorithm::split(connectionStrings, val, boost::algorithm::is_any_of(",")); boost::algorithm::split(connectionStrings, val, boost::algorithm::is_any_of(","));
if (connectionStrings.size() > 1) { // multiple bound channels received if (connectionStrings.size() > 1) { // multiple bound channels received
auto it2 = fI.find(propertyId); auto it2 = fI.find(channelName);
if (it2 != fI.end()) { if (it2 != fI.end()) {
LOG(debug) << "adding connecting channel " << propertyId << " : " << connectionStrings.at(it2->second); LOG(debug) << "adding connecting channel " << channelName << " : " << connectionStrings.at(it2->second);
fConnectingChans.at(propertyId).fDDSValues.insert({senderTaskID, connectionStrings.at(it2->second).c_str()}); fConnectingChans.at(channelName).fDDSValues.insert({senderTaskID, connectionStrings.at(it2->second).c_str()});
} else { } else {
LOG(error) << "multiple bound channels received, but no task index specified, only assigning the first"; LOG(error) << "multiple bound channels received, but no task index specified, only assigning the first";
fConnectingChans.at(propertyId).fDDSValues.insert({senderTaskID, connectionStrings.at(0).c_str()}); fConnectingChans.at(channelName).fDDSValues.insert({senderTaskID, connectionStrings.at(0).c_str()});
} }
} else { // only one bound channel received } else { // only one bound channel received
fConnectingChans.at(propertyId).fDDSValues.insert({senderTaskID, val.c_str()}); fConnectingChans.at(channelName).fDDSValues.insert({senderTaskID, val.c_str()});
} }
// update channels and remove them from unfinished container // update channels and remove them from unfinished container
@ -317,7 +324,7 @@ auto DDS::SubscribeForConnectingChannels() -> void
} }
} }
} catch (const exception& e) { } catch (const exception& e) {
LOG(error) << "Error on handling DDS property update for " << propertyId << ": value=" << value << ", senderTaskID=" << senderTaskID << ": " << e.what(); LOG(error) << "Error handling DDS property: key=" << key << ", value=" << value << ", senderTaskID=" << senderTaskID << ": " << e.what();
} }
}); });
}); });
@ -327,8 +334,8 @@ auto DDS::PublishBoundChannels() -> void
{ {
for (const auto& chan : fBindingChans) { for (const auto& chan : fBindingChans) {
string joined = boost::algorithm::join(chan.second, ","); string joined = boost::algorithm::join(chan.second, ",");
LOG(debug) << "Publishing " << chan.first << " bound addresses (" << chan.second.size() << ") to DDS under '" << chan.first << "' property name."; LOG(debug) << "Publishing bound addresses (" << chan.second.size() << ") of channel '" << chan.first << "' to DDS under '" << "fmqchan_" + chan.first << "' property name.";
fDDS.PutValue(chan.first, joined); fDDS.PutValue("fmqchan_" + chan.first, joined);
} }
} }

View File

@ -1,6 +1,6 @@
<topology name="ExampleDDS"> <topology name="ExampleDDS">
<property name="data" /> <property name="fmqchan_data" />
<declrequirement name="SamplerWorker" type="wnname" value="sampler"/> <declrequirement name="SamplerWorker" type="wnname" value="sampler"/>
<declrequirement name="SinkWorker" type="wnname" value="sink"/> <declrequirement name="SinkWorker" type="wnname" value="sink"/>
@ -11,7 +11,7 @@
<name>SamplerWorker</name> <name>SamplerWorker</name>
</requirements> </requirements>
<properties> <properties>
<name access="write">data</name> <name access="write">fmqchan_data</name>
</properties> </properties>
</decltask> </decltask>
@ -21,7 +21,7 @@
<name>SinkWorker</name> <name>SinkWorker</name>
</requirements> </requirements>
<properties> <properties>
<name access="read">data</name> <name access="read">fmqchan_data</name>
</properties> </properties>
</decltask> </decltask>