dds plugin: handling for same channel names from multiplied collections

This commit is contained in:
Alexey Rybalchenko 2018-03-15 01:41:27 +01:00 committed by Mohammad Al-Turany
parent 665ab104bb
commit 243352d717
3 changed files with 112 additions and 42 deletions

View File

@ -199,6 +199,12 @@ struct ConvertVariableValue : T
if (typeIs<std::vector<std::size_t>>(varValue)) if (typeIs<std::vector<std::size_t>>(varValue))
return T::template Value<std::vector<std::size_t>>(varValue, std::string("<vector<std::size_t>>"), defaulted, empty); return T::template Value<std::vector<std::size_t>>(varValue, std::string("<vector<std::size_t>>"), defaulted, empty);
if (typeIs<std::uint32_t>(varValue))
return T::template Value<std::uint32_t>(varValue, std::string("<std::uint32_t>"), defaulted, empty);
if (typeIs<std::vector<std::uint32_t>>(varValue))
return T::template Value<std::vector<std::uint32_t>>(varValue, std::string("<vector<std::uint32_t>>"), defaulted, empty);
if (typeIs<std::uint64_t>(varValue)) if (typeIs<std::uint64_t>(varValue))
return T::template Value<std::uint64_t>(varValue, std::string("<std::uint64_t>"), defaulted, empty); return T::template Value<std::uint64_t>(varValue, std::string("<std::uint64_t>"), defaulted, empty);

View File

@ -8,6 +8,8 @@
#include "DDS.h" #include "DDS.h"
#include <fairmq/Tools.h>
#include <boost/algorithm/string/join.hpp> #include <boost/algorithm/string/join.hpp>
#include <boost/algorithm/string/split.hpp> #include <boost/algorithm/string/split.hpp>
@ -16,8 +18,10 @@
#include <sstream> #include <sstream>
#include <iostream> #include <iostream>
#include <cstdlib> #include <cstdlib>
#include <stdexcept>
using namespace std; using namespace std;
using fair::mq::tools::ToString;
namespace fair namespace fair
{ {
@ -154,33 +158,66 @@ auto DDS::HandleControl() -> void
auto DDS::FillChannelContainers() -> void auto DDS::FillChannelContainers() -> void
{ {
try {
unordered_map<string, int> channelInfo(GetChannelInfo()); unordered_map<string, int> channelInfo(GetChannelInfo());
for (const auto& c : channelInfo)
{ // fill binding and connecting chans
for (const auto& c : channelInfo) {
string methodKey{"chans." + c.first + "." + to_string(c.second - 1) + ".method"}; string methodKey{"chans." + c.first + "." + to_string(c.second - 1) + ".method"};
if (GetProperty<string>(methodKey) == "bind") if (GetProperty<string>(methodKey) == "bind") {
{
fBindingChans.insert(make_pair(c.first, vector<string>())); fBindingChans.insert(make_pair(c.first, vector<string>()));
for (int i = 0; i < c.second; ++i) for (int i = 0; i < c.second; ++i) {
{
fBindingChans.at(c.first).push_back(GetProperty<string>(string{"chans." + c.first + "." + to_string(i) + ".address"})); fBindingChans.at(c.first).push_back(GetProperty<string>(string{"chans." + c.first + "." + to_string(i) + ".address"}));
} }
} } else if (GetProperty<string>(methodKey) == "connect") {
else if (GetProperty<string>(methodKey) == "connect")
{
fConnectingChans.insert(make_pair(c.first, DDSConfig())); fConnectingChans.insert(make_pair(c.first, DDSConfig()));
LOG(debug) << "preparing to connect: " << c.first << " with " << c.second << " sub-channels."; LOG(debug) << "preparing to connect: " << c.first << " with " << c.second << " sub-channels.";
for (int i = 0; i < c.second; ++i) for (int i = 0; i < c.second; ++i) {
{
fConnectingChans.at(c.first).fSubChannelAddresses.push_back(string()); fConnectingChans.at(c.first).fSubChannelAddresses.push_back(string());
} }
} } else {
else
{
LOG(error) << "Cannot update address configuration. Channel method (bind/connect) not specified."; LOG(error) << "Cannot update address configuration. Channel method (bind/connect) not specified.";
return; return;
} }
} }
// save properties that will have multiple values arriving (with only some of them to be used)
vector<string> iValues = GetProperty<vector<string>>("dds-i");
vector<string> inValues = GetProperty<vector<string>>("dds-i-n");
for (const auto& vi : iValues) {
size_t pos = vi.find(":");
string chanName = vi.substr(0, pos );
// check if provided name is a valid channel name
if (fConnectingChans.find(chanName) == fConnectingChans.end()) {
throw invalid_argument(ToString("channel provided to dds-i is not an actual connecting channel of this device: ", chanName));
}
int i = stoi(vi.substr(pos + 1));
LOG(debug) << "dds-i: adding " << chanName << " -> i of " << i;
fI.insert(make_pair(chanName, i));
}
for (const auto& vi : inValues) {
size_t pos = vi.find(":");
string chanName = vi.substr(0, pos);
// check if provided name is a valid channel name
if (fConnectingChans.find(chanName) == fConnectingChans.end()) {
throw invalid_argument(ToString("channel provided to dds-i-n is not an actual connecting channel of this device: ", chanName));
}
string i_n = vi.substr(pos + 1);
pos = i_n.find("-");
int i = stoi(i_n.substr(0, pos));
int n = stoi(i_n.substr(pos + 1));
LOG(debug) << "dds-i-n: adding " << chanName << " -> i: " << i << " n: " << n;
fIofN.insert(make_pair(chanName, IofN(i, n)));
}
} catch (const exception& e) {
LOG(error) << "Error filling channel containers: " << e.what();
}
} }
auto DDS::SubscribeForConnectingChannels() -> void auto DDS::SubscribeForConnectingChannels() -> void
@ -190,25 +227,33 @@ auto DDS::SubscribeForConnectingChannels() -> void
try try
{ {
LOG(debug) << "Received update for " << propertyId << ": key=" << key << " value=" << value; LOG(debug) << "Received update for " << propertyId << ": key=" << key << " value=" << value;
vector<string> values; string val = value;
boost::algorithm::split(values, value, boost::algorithm::is_any_of(",")); // check if it is to handle as one out of multiple values
if (values.size() > 1) // multiple bound channels received auto it = fIofN.find(propertyId);
{ if (it != fIofN.end()) {
int taskIndex = GetProperty<int>("dds-i"); it->second.fEntries.push_back(value);
if (taskIndex != -1) if (it->second.fEntries.size() == it->second.fN) {
{ sort(it->second.fEntries.begin(), it->second.fEntries.end());
LOG(debug) << "adding connecting channel " << key << " : " << values.at(taskIndex); val = it->second.fEntries.at(it->second.fI);
fConnectingChans.at(propertyId).fDDSValues.insert(make_pair<string, string>(key.c_str(), values.at(taskIndex).c_str())); } else {
LOG(debug) << "received " << it->second.fEntries.size() << " values for " << propertyId << ", expecting total of " << it->second.fN;
return;
} }
else }
{
vector<string> connectionStrings;
boost::algorithm::split(connectionStrings, val, boost::algorithm::is_any_of(","));
if (connectionStrings.size() > 1) { // multiple bound channels received
auto it2 = fI.find(propertyId);
if (it2 != fI.end()) {
LOG(debug) << "adding connecting channel " << propertyId << " : " << connectionStrings.at(it2->second);
fConnectingChans.at(propertyId).fDDSValues.insert(make_pair<string, string>(key.c_str(), connectionStrings.at(it2->second).c_str()));
} else {
LOG(error) << "multiple bound channels received, but no task index specified, only assigning the first"; LOG(error) << "multiple bound channels received, but no task index specified, only assigning the first";
fConnectingChans.at(propertyId).fDDSValues.insert(make_pair<string, string>(key.c_str(), values.at(0).c_str())); fConnectingChans.at(propertyId).fDDSValues.insert(make_pair<string, string>(key.c_str(), connectionStrings.at(0).c_str()));
} }
} } else { // only one bound channel received
else // only one bound channel received fConnectingChans.at(propertyId).fDDSValues.insert(make_pair<string, string>(key.c_str(), val.c_str()));
{
fConnectingChans.at(propertyId).fDDSValues.insert(make_pair<string, string>(key.c_str(), value.c_str()));
} }
// update channels and remove them from unfinished container // update channels and remove them from unfinished container
@ -218,11 +263,11 @@ auto DDS::SubscribeForConnectingChannels() -> void
{ {
// when multiple subChannels are used, their order on every device should be the same, irregardless of arrival order from DDS. // when multiple subChannels are used, their order on every device should be the same, irregardless of arrival order from DDS.
sort(mi->second.fSubChannelAddresses.begin(), mi->second.fSubChannelAddresses.end()); sort(mi->second.fSubChannelAddresses.begin(), mi->second.fSubChannelAddresses.end());
auto it = mi->second.fDDSValues.begin(); auto it3 = mi->second.fDDSValues.begin();
for (unsigned int i = 0; i < mi->second.fSubChannelAddresses.size(); ++i) for (unsigned int i = 0; i < mi->second.fSubChannelAddresses.size(); ++i)
{ {
SetProperty<string>(string{"chans." + mi->first + "." + to_string(i) + ".address"}, it->second); SetProperty<string>(string{"chans." + mi->first + "." + to_string(i) + ".address"}, it3->second);
++it; ++it3;
} }
fConnectingChans.erase(mi++); fConnectingChans.erase(mi++);
} }

View File

@ -44,6 +44,20 @@ struct DDSConfig
std::unordered_map<std::string, std::string> fDDSValues; std::unordered_map<std::string, std::string> fDDSValues;
}; };
struct IofN
{
IofN(int i, int n)
: fI(i)
, fN(n)
, fEntries()
{}
int fI;
int fN;
std::vector<std::string> fEntries;
};
class DDS : public Plugin class DDS : public Plugin
{ {
public: public:
@ -69,6 +83,9 @@ class DDS : public Plugin
std::unordered_map<std::string, std::vector<std::string>> fBindingChans; std::unordered_map<std::string, std::vector<std::string>> fBindingChans;
std::unordered_map<std::string, DDSConfig> fConnectingChans; std::unordered_map<std::string, DDSConfig> fConnectingChans;
std::unordered_map<std::string, int> fI;
std::unordered_map<std::string, IofN> fIofN;
std::mutex fStopMutex; std::mutex fStopMutex;
std::condition_variable fStopCondition; std::condition_variable fStopCondition;
@ -94,7 +111,9 @@ Plugin::ProgOptions DDSProgramOptions()
{ {
boost::program_options::options_description options{"DDS Plugin"}; boost::program_options::options_description options{"DDS Plugin"};
options.add_options() options.add_options()
("dds-i", boost::program_options::value<int>()->default_value(-1), "Task index for chosing connection target (single channel n to m)."); ("dds-i", boost::program_options::value<std::vector<std::string>>()->multitoken()->composing(), "Task index for chosing connection target (single channel n to m). When all values come via same update.")
("dds-i-n", boost::program_options::value<std::vector<std::string>>()->multitoken()->composing(), "Task index for chosing connection target (one out of n values to take). When values come as independent updates.");
return options; return options;
} }