Add config plugin class.

This commit is contained in:
Alexey Rybalchenko
2019-05-29 17:08:20 +02:00
committed by Dennis Klein
parent bf8ec968e7
commit cba6d19781
109 changed files with 1393 additions and 1433 deletions

View File

@@ -1,226 +0,0 @@
/********************************************************************************
* Copyright (C) 2014-2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/*
* File: FairMQParser.cxx
* Author: winckler
*
* Created on May 14, 2015, 5:01 PM
*/
#include "FairMQParser.h"
#include "FairMQLogger.h"
#include <fairmq/Tools.h>
#include <boost/property_tree/json_parser.hpp>
#include <boost/property_tree/ptree.hpp>
#include <boost/any.hpp>
using namespace std;
using namespace fair::mq::tools;
using namespace boost::property_tree;
namespace fair
{
namespace mq
{
namespace parser
{
fair::mq::Properties ptreeToProperties(const ptree& pt, const string& id)
{
if (id == "") {
throw ParserError("no device ID provided. Provide with `--id` cmd option");
}
return Helper::DeviceParser(pt.get_child("fairMQOptions"), id);
}
fair::mq::Properties JSON::UserParser(const string& filename, const string& deviceId)
{
ptree input;
LOG(debug) << "Parsing JSON from " << filename << " ...";
read_json(filename, input);
return ptreeToProperties(input, deviceId);
}
namespace Helper
{
fair::mq::Properties DeviceParser(const ptree& fairMQOptions, const string& deviceId)
{
fair::mq::Properties properties;
for (const auto& node : fairMQOptions) {
if (node.first == "devices") {
for (const auto& device : node.second) {
string deviceIdKey;
// check if key is provided, otherwise use id
string key = device.second.get<string>("key", "");
if (key != "") {
deviceIdKey = key;
} else {
deviceIdKey = device.second.get<string>("id");
}
// if not correct device id, do not fill MQMap
if (deviceId != deviceIdKey) {
continue;
}
LOG(trace) << "Found following channels for device ID '" << deviceId << "' :";
ChannelParser(device.second, properties);
}
}
}
return properties;
}
void ChannelParser(const ptree& tree, fair::mq::Properties& properties)
{
for (const auto& node : tree) {
if (node.first == "channels") {
for (const auto& cn : node.second) {
fair::mq::Properties commonProperties;
commonProperties.emplace("type", cn.second.get<string>("type", FairMQChannel::DefaultType));
commonProperties.emplace("method", cn.second.get<string>("method", FairMQChannel::DefaultMethod));
commonProperties.emplace("address", cn.second.get<string>("address", FairMQChannel::DefaultAddress));
commonProperties.emplace("transport", cn.second.get<string>("transport", FairMQChannel::DefaultTransportName));
commonProperties.emplace("sndBufSize", cn.second.get<int>("sndBufSize", FairMQChannel::DefaultSndBufSize));
commonProperties.emplace("rcvBufSize", cn.second.get<int>("rcvBufSize", FairMQChannel::DefaultRcvBufSize));
commonProperties.emplace("sndKernelSize", cn.second.get<int>("sndKernelSize", FairMQChannel::DefaultSndKernelSize));
commonProperties.emplace("rcvKernelSize", cn.second.get<int>("rcvKernelSize", FairMQChannel::DefaultRcvKernelSize));
commonProperties.emplace("linger", cn.second.get<int>("linger", FairMQChannel::DefaultLinger));
commonProperties.emplace("rateLogging", cn.second.get<int>("rateLogging", FairMQChannel::DefaultRateLogging));
commonProperties.emplace("portRangeMin", cn.second.get<int>("portRangeMin", FairMQChannel::DefaultPortRangeMin));
commonProperties.emplace("portRangeMax", cn.second.get<int>("portRangeMax", FairMQChannel::DefaultPortRangeMax));
commonProperties.emplace("autoBind", cn.second.get<bool>("autoBind", FairMQChannel::DefaultAutoBind));
string name = cn.second.get<string>("name");
int numSockets = cn.second.get<int>("numSockets", 0);
if (numSockets > 0) {
LOG(trace) << name << ":";
LOG(trace) << "\tnumSockets of " << numSockets << " specified, applying common settings to each:";
// TODO: make a loop out of this
LOG(trace) << "\ttype = " << boost::any_cast<string>(commonProperties.at("type"));
LOG(trace) << "\tmethod = " << boost::any_cast<string>(commonProperties.at("method"));
LOG(trace) << "\taddress = " << boost::any_cast<string>(commonProperties.at("address"));
LOG(trace) << "\ttransport = " << boost::any_cast<string>(commonProperties.at("transport"));
LOG(trace) << "\tsndBufSize = " << boost::any_cast<int>(commonProperties.at("sndBufSize"));
LOG(trace) << "\trcvBufSize = " << boost::any_cast<int>(commonProperties.at("rcvBufSize"));
LOG(trace) << "\tsndKernelSize = " << boost::any_cast<int>(commonProperties.at("sndKernelSize"));
LOG(trace) << "\trcvKernelSize = " << boost::any_cast<int>(commonProperties.at("rcvKernelSize"));
LOG(trace) << "\tlinger = " << boost::any_cast<int>(commonProperties.at("linger"));
LOG(trace) << "\trateLogging = " << boost::any_cast<int>(commonProperties.at("rateLogging"));
LOG(trace) << "\tportRangeMin = " << boost::any_cast<int>(commonProperties.at("portRangeMin"));
LOG(trace) << "\tportRangeMax = " << boost::any_cast<int>(commonProperties.at("portRangeMax"));
LOG(trace) << "\tautoBind = " << boost::any_cast<bool>(commonProperties.at("autoBind"));
for (int i = 0; i < numSockets; ++i) {
for (const auto& p : commonProperties) {
properties.emplace(ToString("chans.", name, ".", i, ".", p.first), p.second);
}
}
} else {
SubChannelParser(cn.second.get_child(""), properties, name, commonProperties);
}
}
}
}
}
void SubChannelParser(const ptree& channelTree, fair::mq::Properties& properties, const string& channelName, const fair::mq::Properties& commonProperties)
{
// for each socket in channel
int i = 0;
for (const auto& node : channelTree) {
if (node.first == "sockets") {
for (const auto& sn : node.second) {
// a sub-channel inherits relevant properties from the common channel ...
fair::mq::Properties newProperties(commonProperties);
// ... and adds/overwrites its own properties
newProperties["type"] = sn.second.get<string>("type", boost::any_cast<string>(commonProperties.at("type")));
newProperties["method"] = sn.second.get<string>("method", boost::any_cast<string>(commonProperties.at("method")));
newProperties["address"] = sn.second.get<string>("address", boost::any_cast<string>(commonProperties.at("address")));
newProperties["transport"] = sn.second.get<string>("transport", boost::any_cast<string>(commonProperties.at("transport")));
newProperties["sndBufSize"] = sn.second.get<int>("sndBufSize", boost::any_cast<int>(commonProperties.at("sndBufSize")));
newProperties["rcvBufSize"] = sn.second.get<int>("rcvBufSize", boost::any_cast<int>(commonProperties.at("rcvBufSize")));
newProperties["sndKernelSize"] = sn.second.get<int>("sndKernelSize", boost::any_cast<int>(commonProperties.at("sndKernelSize")));
newProperties["rcvKernelSize"] = sn.second.get<int>("rcvKernelSize", boost::any_cast<int>(commonProperties.at("rcvKernelSize")));
newProperties["linger"] = sn.second.get<int>("linger", boost::any_cast<int>(commonProperties.at("linger")));
newProperties["rateLogging"] = sn.second.get<int>("rateLogging", boost::any_cast<int>(commonProperties.at("rateLogging")));
newProperties["portRangeMin"] = sn.second.get<int>("portRangeMin", boost::any_cast<int>(commonProperties.at("portRangeMin")));
newProperties["portRangeMax"] = sn.second.get<int>("portRangeMax", boost::any_cast<int>(commonProperties.at("portRangeMax")));
newProperties["autoBind"] = sn.second.get<bool>("autoBind", boost::any_cast<bool>(commonProperties.at("autoBind")));
LOG(trace) << "" << channelName << "[" << i << "]:";
// TODO: make a loop out of this
LOG(trace) << "\ttype = " << boost::any_cast<string>(newProperties.at("type"));
LOG(trace) << "\tmethod = " << boost::any_cast<string>(newProperties.at("method"));
LOG(trace) << "\taddress = " << boost::any_cast<string>(newProperties.at("address"));
LOG(trace) << "\ttransport = " << boost::any_cast<string>(newProperties.at("transport"));
LOG(trace) << "\tsndBufSize = " << boost::any_cast<int>(newProperties.at("sndBufSize"));
LOG(trace) << "\trcvBufSize = " << boost::any_cast<int>(newProperties.at("rcvBufSize"));
LOG(trace) << "\tsndKernelSize = " << boost::any_cast<int>(newProperties.at("sndKernelSize"));
LOG(trace) << "\trcvKernelSize = " << boost::any_cast<int>(newProperties.at("rcvKernelSize"));
LOG(trace) << "\tlinger = " << boost::any_cast<int>(newProperties.at("linger"));
LOG(trace) << "\trateLogging = " << boost::any_cast<int>(newProperties.at("rateLogging"));
LOG(trace) << "\tportRangeMin = " << boost::any_cast<int>(newProperties.at("portRangeMin"));
LOG(trace) << "\tportRangeMax = " << boost::any_cast<int>(newProperties.at("portRangeMax"));
LOG(trace) << "\tautoBind = " << boost::any_cast<bool>(newProperties.at("autoBind"));
for (const auto& p : newProperties) {
properties.emplace(ToString("chans.", channelName, ".", i, ".", p.first), p.second);
}
++i;
}
}
} // end socket loop
if (i > 0) {
LOG(trace) << "Found " << i << " socket(s) in channel.";
} else {
LOG(trace) << "" << channelName << ":";
LOG(trace) << "\tNo sockets specified,";
LOG(trace) << "\tapplying common settings to the channel:";
fair::mq::Properties newProperties(commonProperties);
// TODO: make a loop out of this
LOG(trace) << "\ttype = " << boost::any_cast<string>(newProperties.at("type"));
LOG(trace) << "\tmethod = " << boost::any_cast<string>(newProperties.at("method"));
LOG(trace) << "\taddress = " << boost::any_cast<string>(newProperties.at("address"));
LOG(trace) << "\ttransport = " << boost::any_cast<string>(newProperties.at("transport"));
LOG(trace) << "\tsndBufSize = " << boost::any_cast<int>(newProperties.at("sndBufSize"));
LOG(trace) << "\trcvBufSize = " << boost::any_cast<int>(newProperties.at("rcvBufSize"));
LOG(trace) << "\tsndKernelSize = " << boost::any_cast<int>(newProperties.at("sndKernelSize"));
LOG(trace) << "\trcvKernelSize = " << boost::any_cast<int>(newProperties.at("rcvKernelSize"));
LOG(trace) << "\tlinger = " << boost::any_cast<int>(newProperties.at("linger"));
LOG(trace) << "\trateLogging = " << boost::any_cast<int>(newProperties.at("rateLogging"));
LOG(trace) << "\tportRangeMin = " << boost::any_cast<int>(newProperties.at("portRangeMin"));
LOG(trace) << "\tportRangeMax = " << boost::any_cast<int>(newProperties.at("portRangeMax"));
LOG(trace) << "\tautoBind = " << boost::any_cast<bool>(newProperties.at("autoBind"));
for (const auto& p : newProperties) {
properties.emplace(ToString("chans.", channelName, ".0.", p.first), p.second);
}
}
}
} // Helper namespace
} // namespace parser
} // namespace mq
} // namespace fair

View File

@@ -1,57 +0,0 @@
/********************************************************************************
* Copyright (C) 2014-2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/*
* File: FairMQParser.h
* Author: winckler
*
* Created on May 14, 2015, 5:01 PM
*/
#ifndef FAIRMQPARSER_H
#define FAIRMQPARSER_H
#include <string>
#include <vector>
#include <unordered_map>
#include <exception>
#include <boost/property_tree/ptree_fwd.hpp>
#include "FairMQChannel.h"
#include "Properties.h"
namespace fair
{
namespace mq
{
namespace parser
{
struct ParserError : std::runtime_error { using std::runtime_error::runtime_error; };
fair::mq::Properties ptreeToProperties(const boost::property_tree::ptree& pt, const std::string& deviceId);
struct JSON
{
fair::mq::Properties UserParser(const std::string& filename, const std::string& deviceId);
};
namespace Helper
{
fair::mq::Properties DeviceParser(const boost::property_tree::ptree& tree, const std::string& deviceId);
void ChannelParser(const boost::property_tree::ptree& tree, fair::mq::Properties& properties);
void SubChannelParser(const boost::property_tree::ptree& tree, fair::mq::Properties& properties, const std::string& channelName, const fair::mq::Properties& commonProperties);
} // Helper namespace
} // namespace parser
} // namespace mq
} // namespace fair
#endif /* FAIRMQPARSER_H */

View File

@@ -1,596 +0,0 @@
/********************************************************************************
* Copyright (C) 2014-2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/*
* File: FairMQProgOptions.cxx
* Author: winckler
*
* Created on March 11, 2015, 10:20 PM
*/
#include "FairMQLogger.h"
#include "FairMQProgOptions.h"
#include "FairMQParser.h"
#include "FairMQSuboptParser.h"
#include "tools/Unique.h"
#include <boost/filesystem.hpp>
#include <boost/any.hpp>
#include <boost/algorithm/string.hpp> // join/split
#include <boost/regex.hpp>
#include <algorithm>
#include <iomanip>
#include <iostream>
#include <exception>
using namespace std;
using namespace fair::mq;
using boost::any_cast;
namespace po = boost::program_options;
struct ValInfo
{
string value;
string type;
string origin;
};
template<class T>
ostream& operator<<(ostream& os, const vector<T>& v)
{
for (unsigned int i = 0; i < v.size(); ++i) {
os << v[i];
if (i != v.size() - 1) {
os << ", ";
}
}
return os;
}
template<typename T>
pair<string, string> getString(const boost::any& v, const string& label)
{
return { to_string(any_cast<T>(v)), label };
}
template<typename T>
pair<string, string> getStringPair(const boost::any& v, const string& label)
{
stringstream ss;
ss << any_cast<T>(v);
return { ss.str(), label };
}
unordered_map<type_index, function<pair<string, string>(const Property&)>> FairMQProgOptions::fTypeInfos = {
{ type_index(typeid(char)), [](const Property& p) { return pair<string, string>{ string(1, any_cast<char>(p)), "char" }; } },
{ type_index(typeid(unsigned char)), [](const Property& p) { return pair<string, string>{ string(1, any_cast<unsigned char>(p)), "unsigned char" }; } },
{ type_index(typeid(string)), [](const Property& p) { return pair<string, string>{ any_cast<string>(p), "string" }; } },
{ type_index(typeid(int)), [](const Property& p) { return getString<int>(p, "int"); } },
{ type_index(typeid(size_t)), [](const Property& p) { return getString<size_t>(p, "size_t"); } },
{ type_index(typeid(uint32_t)), [](const Property& p) { return getString<uint32_t>(p, "uint32_t"); } },
{ type_index(typeid(uint64_t)), [](const Property& p) { return getString<uint64_t>(p, "uint64_t"); } },
{ type_index(typeid(long)), [](const Property& p) { return getString<long>(p, "long"); } },
{ type_index(typeid(long long)), [](const Property& p) { return getString<long long>(p, "long long"); } },
{ type_index(typeid(unsigned)), [](const Property& p) { return getString<unsigned>(p, "unsigned"); } },
{ type_index(typeid(unsigned long)), [](const Property& p) { return getString<unsigned long>(p, "unsigned long"); } },
{ type_index(typeid(unsigned long long)), [](const Property& p) { return getString<unsigned long long>(p, "unsigned long long"); } },
{ type_index(typeid(float)), [](const Property& p) { return getString<float>(p, "float"); } },
{ type_index(typeid(double)), [](const Property& p) { return getString<double>(p, "double"); } },
{ type_index(typeid(long double)), [](const Property& p) { return getString<long double>(p, "long double"); } },
{ type_index(typeid(bool)), [](const Property& p) { stringstream ss; ss << boolalpha << any_cast<bool>(p); return pair<string, string>{ ss.str(), "bool" }; } },
{ type_index(typeid(vector<bool>)), [](const Property& p) { stringstream ss; ss << boolalpha << any_cast<vector<bool>>(p); return pair<string, string>{ ss.str(), "vector<bool>>" }; } },
{ type_index(typeid(boost::filesystem::path)), [](const Property& p) { return getStringPair<boost::filesystem::path>(p, "boost::filesystem::path"); } },
{ type_index(typeid(vector<char>)), [](const Property& p) { return getStringPair<vector<char>>(p, "vector<char>"); } },
{ type_index(typeid(vector<unsigned char>)), [](const Property& p) { return getStringPair<vector<unsigned char>>(p, "vector<unsigned char>"); } },
{ type_index(typeid(vector<string>)), [](const Property& p) { return getStringPair<vector<string>>(p, "vector<string>"); } },
{ type_index(typeid(vector<int>)), [](const Property& p) { return getStringPair<vector<int>>(p, "vector<int>"); } },
{ type_index(typeid(vector<size_t>)), [](const Property& p) { return getStringPair<vector<size_t>>(p, "vector<size_t>"); } },
{ type_index(typeid(vector<uint32_t>)), [](const Property& p) { return getStringPair<vector<uint32_t>>(p, "vector<uint32_t>"); } },
{ type_index(typeid(vector<uint64_t>)), [](const Property& p) { return getStringPair<vector<uint64_t>>(p, "vector<uint64_t>"); } },
{ type_index(typeid(vector<long>)), [](const Property& p) { return getStringPair<vector<long>>(p, "vector<long>"); } },
{ type_index(typeid(vector<long long>)), [](const Property& p) { return getStringPair<vector<long long>>(p, "vector<long long>"); } },
{ type_index(typeid(vector<unsigned>)), [](const Property& p) { return getStringPair<vector<unsigned>>(p, "vector<unsigned>"); } },
{ type_index(typeid(vector<unsigned long>)), [](const Property& p) { return getStringPair<vector<unsigned long>>(p, "vector<unsigned long>"); } },
{ type_index(typeid(vector<unsigned long long>)), [](const Property& p) { return getStringPair<vector<unsigned long long>>(p, "vector<unsigned long long>"); } },
{ type_index(typeid(vector<float>)), [](const Property& p) { return getStringPair<vector<float>>(p, "vector<float>"); } },
{ type_index(typeid(vector<double>)), [](const Property& p) { return getStringPair<vector<double>>(p, "vector<double>"); } },
{ type_index(typeid(vector<long double>)), [](const Property& p) { return getStringPair<vector<long double>>(p, "vector<long double>"); } },
{ type_index(typeid(vector<boost::filesystem::path>)), [](const Property& p) { return getStringPair<vector<boost::filesystem::path>>(p, "vector<boost::filesystem::path>"); } },
};
unordered_map<type_index, void(*)(const EventManager&, const string&, const Property&)> FairMQProgOptions::fEventEmitters = {
{ type_index(typeid(char)), [](const EventManager& em, const string& k, const Property& p) { em.Emit<PropertyChange, char>(k, any_cast<char>(p)); } },
{ type_index(typeid(unsigned char)), [](const EventManager& em, const string& k, const Property& p) { em.Emit<PropertyChange, unsigned char>(k, any_cast<unsigned char>(p)); } },
{ type_index(typeid(string)), [](const EventManager& em, const string& k, const Property& p) { em.Emit<PropertyChange, string>(k, any_cast<string>(p)); } },
{ type_index(typeid(int)), [](const EventManager& em, const string& k, const Property& p) { em.Emit<PropertyChange, int>(k, any_cast<int>(p)); } },
{ type_index(typeid(size_t)), [](const EventManager& em, const string& k, const Property& p) { em.Emit<PropertyChange, size_t>(k, any_cast<size_t>(p)); } },
{ type_index(typeid(uint32_t)), [](const EventManager& em, const string& k, const Property& p) { em.Emit<PropertyChange, uint32_t>(k, any_cast<uint32_t>(p)); } },
{ type_index(typeid(uint64_t)), [](const EventManager& em, const string& k, const Property& p) { em.Emit<PropertyChange, uint64_t>(k, any_cast<uint64_t>(p)); } },
{ type_index(typeid(long)), [](const EventManager& em, const string& k, const Property& p) { em.Emit<PropertyChange, long>(k, any_cast<long>(p)); } },
{ type_index(typeid(long long)), [](const EventManager& em, const string& k, const Property& p) { em.Emit<PropertyChange, long long>(k, any_cast<long long>(p)); } },
{ type_index(typeid(unsigned)), [](const EventManager& em, const string& k, const Property& p) { em.Emit<PropertyChange, unsigned>(k, any_cast<unsigned>(p)); } },
{ type_index(typeid(unsigned long)), [](const EventManager& em, const string& k, const Property& p) { em.Emit<PropertyChange, unsigned long>(k, any_cast<unsigned long>(p)); } },
{ type_index(typeid(unsigned long long)), [](const EventManager& em, const string& k, const Property& p) { em.Emit<PropertyChange, unsigned long long>(k, any_cast<unsigned long long>(p)); } },
{ type_index(typeid(float)), [](const EventManager& em, const string& k, const Property& p) { em.Emit<PropertyChange, float>(k, any_cast<float>(p)); } },
{ type_index(typeid(double)), [](const EventManager& em, const string& k, const Property& p) { em.Emit<PropertyChange, double>(k, any_cast<double>(p)); } },
{ type_index(typeid(long double)), [](const EventManager& em, const string& k, const Property& p) { em.Emit<PropertyChange, long double>(k, any_cast<long double>(p)); } },
{ type_index(typeid(bool)), [](const EventManager& em, const string& k, const Property& p) { em.Emit<PropertyChange, bool>(k, any_cast<bool>(p)); } },
{ type_index(typeid(vector<bool>)), [](const EventManager& em, const string& k, const Property& p) { em.Emit<PropertyChange, vector<bool>>(k, any_cast<vector<bool>>(p)); } },
{ type_index(typeid(boost::filesystem::path)), [](const EventManager& em, const string& k, const Property& p) { em.Emit<PropertyChange, boost::filesystem::path>(k, any_cast<boost::filesystem::path>(p)); } },
{ type_index(typeid(vector<char>)), [](const EventManager& em, const string& k, const Property& p) { em.Emit<PropertyChange, vector<char>>(k, any_cast<vector<char>>(p)); } },
{ type_index(typeid(vector<unsigned char>)), [](const EventManager& em, const string& k, const Property& p) { em.Emit<PropertyChange, vector<unsigned char>>(k, any_cast<vector<unsigned char>>(p)); } },
{ type_index(typeid(vector<string>)), [](const EventManager& em, const string& k, const Property& p) { em.Emit<PropertyChange, vector<string>>(k, any_cast<vector<string>>(p)); } },
{ type_index(typeid(vector<int>)), [](const EventManager& em, const string& k, const Property& p) { em.Emit<PropertyChange, vector<int>>(k, any_cast<vector<int>>(p)); } },
{ type_index(typeid(vector<size_t>)), [](const EventManager& em, const string& k, const Property& p) { em.Emit<PropertyChange, vector<size_t>>(k, any_cast<vector<size_t>>(p)); } },
{ type_index(typeid(vector<uint32_t>)), [](const EventManager& em, const string& k, const Property& p) { em.Emit<PropertyChange, vector<uint32_t>>(k, any_cast<vector<uint32_t>>(p)); } },
{ type_index(typeid(vector<uint64_t>)), [](const EventManager& em, const string& k, const Property& p) { em.Emit<PropertyChange, vector<uint64_t>>(k, any_cast<vector<uint64_t>>(p)); } },
{ type_index(typeid(vector<long>)), [](const EventManager& em, const string& k, const Property& p) { em.Emit<PropertyChange, vector<long>>(k, any_cast<vector<long>>(p)); } },
{ type_index(typeid(vector<long long>)), [](const EventManager& em, const string& k, const Property& p) { em.Emit<PropertyChange, vector<long long>>(k, any_cast<vector<long long>>(p)); } },
{ type_index(typeid(vector<unsigned>)), [](const EventManager& em, const string& k, const Property& p) { em.Emit<PropertyChange, vector<unsigned>>(k, any_cast<vector<unsigned>>(p)); } },
{ type_index(typeid(vector<unsigned long>)), [](const EventManager& em, const string& k, const Property& p) { em.Emit<PropertyChange, vector<unsigned long>>(k, any_cast<vector<unsigned long>>(p)); } },
{ type_index(typeid(vector<unsigned long long>)), [](const EventManager& em, const string& k, const Property& p) { em.Emit<PropertyChange, vector<unsigned long long>>(k, any_cast<vector<unsigned long long>>(p)); } },
{ type_index(typeid(vector<float>)), [](const EventManager& em, const string& k, const Property& p) { em.Emit<PropertyChange, vector<float>>(k, any_cast<vector<float>>(p)); } },
{ type_index(typeid(vector<double>)), [](const EventManager& em, const string& k, const Property& p) { em.Emit<PropertyChange, vector<double>>(k, any_cast<vector<double>>(p)); } },
{ type_index(typeid(vector<long double>)), [](const EventManager& em, const string& k, const Property& p) { em.Emit<PropertyChange, vector<long double>>(k, any_cast<vector<long double>>(p)); } },
{ type_index(typeid(vector<boost::filesystem::path>)), [](const EventManager& em, const string& k, const Property& p) { em.Emit<PropertyChange, vector<boost::filesystem::path>>(k, any_cast<vector<boost::filesystem::path>>(p)); } },
};
namespace fair
{
namespace mq
{
string ConvertPropertyToString(const Property& p)
{
pair<string, string> info = FairMQProgOptions::fTypeInfos.at(p.type())(p);
return info.first;
}
ValInfo ConvertVarValToValInfo(const po::variable_value& v)
{
string origin;
if (v.defaulted()) {
origin = "[default]";
} else if (v.empty()) {
origin = "[empty]";
} else {
origin = "[provided]";
}
try {
pair<string, string> info = FairMQProgOptions::fTypeInfos.at(v.value().type())(v.value());
return {info.first, info.second, origin};
} catch (out_of_range& oor) {
return {string("[unidentified_type]"), string("[unidentified_type]"), origin};
}
};
string ConvertVarValToString(const po::variable_value& v)
{
return ConvertVarValToValInfo(v).value;
}
} // namespace mq
} // namespace fair
FairMQProgOptions::FairMQProgOptions()
: fVarMap()
, fAllOptions("FairMQ Command Line Options")
, fGeneralOptions("General options")
, fMQOptions("FairMQ device options")
, fParserOptions("FairMQ channel config parser options")
, fMtx()
, fUnregisteredOptions()
, fEvents()
{
fGeneralOptions.add_options()
("help,h", "Print help")
("version,v", "Print version")
("severity", po::value<string>()->default_value("debug"), "Log severity level: trace, debug, info, state, warn, error, fatal, nolog")
("verbosity", po::value<string>()->default_value("medium"), "Log verbosity level: veryhigh, high, medium, low")
("color", po::value<bool >()->default_value(true), "Log color (true/false)")
("log-to-file", po::value<string>()->default_value(""), "Log output to a file.")
("print-options", po::value<bool >()->implicit_value(true), "Print options in machine-readable format (<option>:<computed-value>:<type>:<description>)");
fMQOptions.add_options()
("id", po::value<string >(), "Device ID (required argument).")
("io-threads", po::value<int >()->default_value(1), "Number of I/O threads.")
("transport", po::value<string >()->default_value("zeromq"), "Transport ('zeromq'/'nanomsg'/'shmem').")
("network-interface", po::value<string >()->default_value("default"), "Network interface to bind on (e.g. eth0, ib0..., default will try to detect the interface of the default route).")
("config-key", po::value<string >(), "Use provided value instead of device id for fetching the configuration from the config file.")
("initialization-timeout", po::value<int >()->default_value(120), "Timeout for the initialization in seconds (when expecting dynamic initialization).")
("max-run-time", po::value<uint64_t>()->default_value(0), "Maximum runtime for the Running state handler, after which state will change to Ready (in seconds, 0 for no limit).")
("print-channels", po::value<bool >()->implicit_value(true), "Print registered channel endpoints in a machine-readable format (<channel name>:<min num subchannels>:<max num subchannels>)")
("shm-segment-size", po::value<size_t >()->default_value(2000000000), "Shared memory: size of the shared memory segment (in bytes).")
("shm-monitor", po::value<bool >()->default_value(true), "Shared memory: run monitor daemon.")
("ofi-size-hint", po::value<size_t >()->default_value(0), "EXPERIMENTAL: OFI size hint for the allocator.")
("rate", po::value<float >()->default_value(0.), "Rate for conditional run loop (Hz).")
("session", po::value<string >()->default_value("default"), "Session name.");
fParserOptions.add_options()
("mq-config", po::value<string>(), "JSON input as file.")
("channel-config", po::value<vector<string>>()->multitoken()->composing(), "Configuration of single or multiple channel(s) by comma separated key=value list");
fAllOptions.add(fGeneralOptions);
fAllOptions.add(fMQOptions);
fAllOptions.add(fParserOptions);
ParseDefaults();
}
unordered_map<string, int> FairMQProgOptions::GetChannelInfo() const
{
lock_guard<mutex> lock(fMtx);
return GetChannelInfoImpl();
}
unordered_map<string, int> FairMQProgOptions::GetChannelInfoImpl() const
{
unordered_map<string, int> info;
boost::regex re("chans\\..*\\.type");
for (const auto& m : fVarMap) {
if (boost::regex_match(m.first, re)) {
string chan = m.first.substr(6);
string::size_type n = chan.find(".");
string chanName = chan.substr(0, n);
if (info.find(chanName) == info.end()) {
info.emplace(chanName, 1);
} else {
info[chanName] = info[chanName] + 1;
}
}
}
return info;
}
Properties FairMQProgOptions::GetProperties(const string& q) const
{
boost::regex re(q);
Properties result;
lock_guard<mutex> lock(fMtx);
for (const auto& m : fVarMap) {
if (boost::regex_match(m.first, re)) {
result.emplace(m.first, m.second.value());
}
}
if (result.size() == 0) {
LOG(warn) << "could not find anything with \"" << q << "\"";
}
return result;
}
map<string, string> FairMQProgOptions::GetPropertiesAsString(const string& q) const
{
boost::regex re(q);
map<string, string> result;
lock_guard<mutex> lock(fMtx);
for (const auto& m : fVarMap) {
if (boost::regex_match(m.first, re)) {
result.emplace(m.first, ConvertPropertyToString(m.second.value()));
}
}
if (result.size() == 0) {
LOG(warn) << "could not find anything with \"" << q << "\"";
}
return result;
}
Properties FairMQProgOptions::GetPropertiesStartingWith(const string& q) const
{
Properties result;
lock_guard<mutex> lock(fMtx);
for (const auto& m : fVarMap) {
if (m.first.compare(0, q.length(), q) == 0) {
result.emplace(m.first, m.second.value());
}
}
return result;
}
void FairMQProgOptions::SetProperties(const Properties& input)
{
unique_lock<mutex> lock(fMtx);
map<string, boost::program_options::variable_value>& vm = fVarMap;
for (const auto& m : input) {
vm[m.first].value() = m.second;
}
lock.unlock();
for (const auto& m : input) {
fEventEmitters.at(m.second.type())(fEvents, m.first, m.second);
fEvents.Emit<PropertyChangeAsString, string>(m.first, ConvertPropertyToString(m.second));
}
}
void FairMQProgOptions::AddChannel(const std::string& name, const FairMQChannel& channel)
{
lock_guard<mutex> lock(fMtx);
unordered_map<string, int> existingChannels = GetChannelInfoImpl();
int index = 0;
if (existingChannels.count(name) > 0) {
index = existingChannels.at(name);
}
string prefix = fair::mq::tools::ToString("chans.", name, ".", index, ".");
SetVarMapValue<string>(string(prefix + "type"), channel.GetType());
SetVarMapValue<string>(string(prefix + "method"), channel.GetMethod());
SetVarMapValue<string>(string(prefix + "address"), channel.GetAddress());
SetVarMapValue<string>(string(prefix + "transport"), channel.GetTransportName());
SetVarMapValue<int>(string(prefix + "sndBufSize"), channel.GetSndBufSize());
SetVarMapValue<int>(string(prefix + "rcvBufSize"), channel.GetRcvBufSize());
SetVarMapValue<int>(string(prefix + "sndKernelSize"), channel.GetSndKernelSize());
SetVarMapValue<int>(string(prefix + "rcvKernelSize"), channel.GetRcvKernelSize());
SetVarMapValue<int>(string(prefix + "linger"), channel.GetLinger());
SetVarMapValue<int>(string(prefix + "rateLogging"), channel.GetRateLogging());
SetVarMapValue<int>(string(prefix + "portRangeMin"), channel.GetPortRangeMin());
SetVarMapValue<int>(string(prefix + "portRangeMax"), channel.GetPortRangeMax());
SetVarMapValue<bool>(string(prefix + "autoBind"), channel.GetAutoBind());
}
void FairMQProgOptions::DeleteProperty(const string& key)
{
lock_guard<mutex> lock(fMtx);
map<string, boost::program_options::variable_value>& vm = fVarMap;
vm.erase(key);
}
int FairMQProgOptions::ParseAll(const vector<string>& cmdArgs, bool allowUnregistered)
{
vector<const char*> argv(cmdArgs.size());
transform(cmdArgs.begin(), cmdArgs.end(), argv.begin(), [](const string& str) {
return str.c_str();
});
return ParseAll(argv.size(), const_cast<char**>(argv.data()), allowUnregistered);
}
int FairMQProgOptions::ParseAll(const int argc, char const* const* argv, bool allowUnregistered)
{
ParseCmdLine(argc, argv, allowUnregistered);
// if this option is provided, handle them and return stop value
if (fVarMap.count("help")) {
cout << fAllOptions << endl;
return 1;
}
// if this option is provided, handle them and return stop value
if (fVarMap.count("print-options")) {
PrintOptionsRaw();
return 1;
}
// if these options are provided, do no further checks and let the device handle them
if (fVarMap.count("print-channels") || fVarMap.count("version")) {
fair::Logger::SetConsoleSeverity("nolog");
return 0;
}
string severity = GetValue<string>("severity");
string logFile = GetValue<string>("log-to-file");
bool color = GetValue<bool>("color");
string verbosity = GetValue<string>("verbosity");
fair::Logger::SetVerbosity(verbosity);
if (logFile != "") {
fair::Logger::InitFileSink(severity, logFile);
fair::Logger::SetConsoleSeverity("nolog");
} else {
fair::Logger::SetConsoleColor(color);
fair::Logger::SetConsoleSeverity(severity);
}
string idForParser;
// check if config-key for config parser is provided
if (fVarMap.count("config-key")) {
idForParser = fVarMap["config-key"].as<string>();
} else if (fVarMap.count("id")) {
idForParser = fVarMap["id"].as<string>();
}
// check if any config parser is selected
try {
if (fVarMap.count("mq-config")) {
LOG(debug) << "mq-config: Using default JSON parser";
SetProperties(parser::JSON().UserParser(fVarMap.at("mq-config").as<string>(), idForParser));
} else if (fVarMap.count("channel-config")) {
LOG(debug) << "channel-config: Parsing channel configuration";
ParseChannelsFromCmdLine();
} else {
LOG(warn) << "FairMQProgOptions: no channels configuration provided via neither of:";
for (const auto& p : fParserOptions.options()) {
LOG(warn) << "--" << p->canonical_display_name();
}
}
} catch (exception& e) {
LOG(error) << e.what();
return 1;
}
PrintOptions();
return 0;
}
void FairMQProgOptions::ParseChannelsFromCmdLine()
{
string idForParser;
// check if config-key for config parser is provided
if (fVarMap.count("config-key")) {
idForParser = fVarMap["config-key"].as<string>();
} else if (fVarMap.count("id")) {
idForParser = fVarMap["id"].as<string>();
}
SetProperties(parser::SUBOPT().UserParser(fVarMap.at("channel-config").as<vector<string>>(), idForParser));
}
void FairMQProgOptions::ParseCmdLine(const int argc, char const* const* argv, bool allowUnregistered)
{
// clear the container because it was filled with default values and subsequent calls to store() do not overwrite the existing values
fVarMap.clear();
if (allowUnregistered) {
po::command_line_parser parser{argc, argv};
parser.options(fAllOptions).allow_unregistered();
po::parsed_options parsed = parser.run();
fUnregisteredOptions = po::collect_unrecognized(parsed.options, po::include_positional);
po::store(parsed, fVarMap);
} else {
po::store(po::parse_command_line(argc, argv, fAllOptions), fVarMap);
}
po::notify(fVarMap);
}
void FairMQProgOptions::ParseDefaults()
{
vector<string> emptyArgs = {"dummy", "--id", tools::Uuid()};
vector<const char*> argv(emptyArgs.size());
transform(emptyArgs.begin(), emptyArgs.end(), argv.begin(), [](const string& str) {
return str.c_str();
});
po::store(po::parse_command_line(argv.size(), const_cast<char**>(argv.data()), fAllOptions), fVarMap);
}
vector<string> FairMQProgOptions::GetPropertyKeys() const
{
lock_guard<mutex> lock(fMtx);
vector<string> result;
for (const auto& it : fVarMap) {
result.push_back(it.first.c_str());
}
return result;
}
/// Add option descriptions
int FairMQProgOptions::AddToCmdLineOptions(const po::options_description optDesc, bool /* visible */)
{
fAllOptions.add(optDesc);
return 0;
}
po::options_description& FairMQProgOptions::GetCmdLineOptions()
{
return fAllOptions;
}
int FairMQProgOptions::PrintOptions()
{
map<string, ValInfo> mapinfo;
// get string length for formatting and convert varmap values into string
int maxLenKey = 0;
int maxLenValue = 0;
int maxLenType = 0;
int maxLenDefault = 0;
for (const auto& m : fVarMap) {
maxLenKey = max(maxLenKey, static_cast<int>(m.first.length()));
ValInfo valinfo = ConvertVarValToValInfo(m.second);
mapinfo[m.first] = valinfo;
maxLenValue = max(maxLenValue, static_cast<int>(valinfo.value.length()));
maxLenType = max(maxLenType, static_cast<int>(valinfo.type.length()));
maxLenDefault = max(maxLenDefault, static_cast<int>(valinfo.origin.length()));
}
if (maxLenValue > 100) {
maxLenValue = 100;
}
for (const auto& o : fUnregisteredOptions) {
LOG(debug) << "detected unregistered option: " << o;
}
stringstream ss;
ss << "Configuration: \n";
for (const auto& p : mapinfo) {
string type("<" + p.second.type + ">");
ss << setfill(' ') << left
<< setw(maxLenKey) << p.first << " = "
<< setw(maxLenValue) << p.second.value << " "
<< setw(maxLenType + 2) << type << " "
<< setw(maxLenDefault) << p.second.origin
<< "\n";
}
LOG(debug) << ss.str();
return 0;
}
int FairMQProgOptions::PrintOptionsRaw()
{
const vector<boost::shared_ptr<po::option_description>>& options = fAllOptions.options();
for (const auto& o : options) {
ValInfo value;
if (fVarMap.count(o->canonical_display_name())) {
value = ConvertVarValToValInfo(fVarMap[o->canonical_display_name()]);
}
string description = o->description();
replace(description.begin(), description.end(), '\n', ' ');
cout << o->long_name() << ":" << value.value << ":" << (value.type == "" ? "<unknown>" : value.type) << ":" << description << endl;
}
return 0;
}
string FairMQProgOptions::GetPropertyAsString(const string& key) const
{
lock_guard<mutex> lock(fMtx);
if (fVarMap.count(key)) {
return ConvertVarValToString(fVarMap.at(key));
}
throw PropertyNotFoundException(fair::mq::tools::ToString("Config has no key: ", key));
}
int FairMQProgOptions::Count(const string& key) const
{
lock_guard<mutex> lock(fMtx);
return fVarMap.count(key);
}

View File

@@ -9,203 +9,6 @@
#ifndef FAIRMQPROGOPTIONS_H
#define FAIRMQPROGOPTIONS_H
#include <fairmq/EventManager.h>
#include "FairMQLogger.h"
#include "FairMQChannel.h"
#include "Properties.h"
#include <fairmq/Tools.h>
#include <boost/program_options.hpp>
#include <boost/core/demangle.hpp>
#include <unordered_map>
#include <map>
#include <functional>
#include <string>
#include <vector>
#include <mutex>
#include <sstream>
#include <typeindex>
#include <typeinfo>
#include <utility> // pair
#include <stdexcept>
namespace fair
{
namespace mq
{
struct PropertyChange : Event<std::string> {};
struct PropertyChangeAsString : Event<std::string> {};
} /* namespace mq */
} /* namespace fair */
class FairMQProgOptions
{
public:
FairMQProgOptions();
virtual ~FairMQProgOptions() {}
struct PropertyNotFoundException : std::runtime_error { using std::runtime_error::runtime_error; };
int ParseAll(const std::vector<std::string>& cmdArgs, bool allowUnregistered);
int ParseAll(const int argc, char const* const* argv, bool allowUnregistered = true);
std::unordered_map<std::string, int> GetChannelInfo() const;
std::vector<std::string> GetPropertyKeys() const;
template<typename T>
T GetProperty(const std::string& key) const
{
std::lock_guard<std::mutex> lock(fMtx);
if (fVarMap.count(key)) {
return fVarMap[key].as<T>();
}
throw PropertyNotFoundException(fair::mq::tools::ToString("Config has no key: ", key));
}
template<typename T>
T GetProperty(const std::string& key, const T& ifNotFound) const
{
std::lock_guard<std::mutex> lock(fMtx);
if (fVarMap.count(key)) {
return fVarMap[key].as<T>();
}
return ifNotFound;
}
fair::mq::Properties GetProperties(const std::string& q) const;
std::map<std::string, std::string> GetPropertiesAsString(const std::string& q) const;
fair::mq::Properties GetPropertiesStartingWith(const std::string& q) const;
template<typename T>
T GetValue(const std::string& key) const // TODO: deprecate this
{
return GetProperty<T>(key);
}
std::string GetPropertyAsString(const std::string& key) const;
std::string GetStringValue(const std::string& key) const // TODO: deprecate this
{
return GetPropertyAsString(key);
}
template<typename T>
void SetProperty(const std::string& key, T val)
{
std::unique_lock<std::mutex> lock(fMtx);
SetVarMapValue<typename std::decay<T>::type>(key, val);
if (key == "channel-config") {
ParseChannelsFromCmdLine();
}
lock.unlock();
fEvents.Emit<fair::mq::PropertyChange, typename std::decay<T>::type>(key, val);
fEvents.Emit<fair::mq::PropertyChangeAsString, std::string>(key, GetStringValue(key));
}
template<typename T>
int SetValue(const std::string& key, T val) // TODO: deprecate this
{
SetProperty(key, val);
return 0;
}
void SetProperties(const fair::mq::Properties& input);
void DeleteProperty(const std::string& key);
template<typename T>
void Subscribe(const std::string& subscriber, std::function<void(typename fair::mq::PropertyChange::KeyType, T)> func) const
{
std::lock_guard<std::mutex> lock(fMtx);
static_assert(!std::is_same<T,const char*>::value || !std::is_same<T, char*>::value,
"In template member FairMQProgOptions::Subscribe<T>(key,Lambda) the types const char* or char* for the calback signatures are not supported.");
fEvents.Subscribe<fair::mq::PropertyChange, T>(subscriber, func);
}
template<typename T>
void Unsubscribe(const std::string& subscriber) const
{
std::lock_guard<std::mutex> lock(fMtx);
fEvents.Unsubscribe<fair::mq::PropertyChange, T>(subscriber);
}
void SubscribeAsString(const std::string& subscriber, std::function<void(typename fair::mq::PropertyChange::KeyType, std::string)> func) const
{
std::lock_guard<std::mutex> lock(fMtx);
fEvents.Subscribe<fair::mq::PropertyChangeAsString, std::string>(subscriber, func);
}
void UnsubscribeAsString(const std::string& subscriber) const
{
std::lock_guard<std::mutex> lock(fMtx);
fEvents.Unsubscribe<fair::mq::PropertyChangeAsString, std::string>(subscriber);
}
int Count(const std::string& key) const;
// add options_description
int AddToCmdLineOptions(const boost::program_options::options_description optDesc, bool visible = true);
boost::program_options::options_description& GetCmdLineOptions();
int PrintOptions();
int PrintOptionsRaw();
void AddChannel(const std::string& name, const FairMQChannel& channel);
template<typename T>
static void AddType(std::string label = "")
{
if (label == "") {
label = boost::core::demangle(typeid(T).name());
}
fTypeInfos[std::type_index(typeid(T))] = [label](const fair::mq::Property& p) {
std::stringstream ss;
ss << boost::any_cast<T>(p);
return std::pair<std::string, std::string>{ss.str(), label};
};
}
static std::unordered_map<std::type_index, std::function<std::pair<std::string, std::string>(const fair::mq::Property&)>> fTypeInfos;
static std::unordered_map<std::type_index, void(*)(const fair::mq::EventManager&, const std::string&, const fair::mq::Property&)> fEventEmitters;
private:
boost::program_options::variables_map fVarMap; ///< options container
boost::program_options::options_description fAllOptions; ///< all options descriptions
boost::program_options::options_description fGeneralOptions; ///< general options descriptions
boost::program_options::options_description fMQOptions; ///< MQ options descriptions
boost::program_options::options_description fParserOptions; ///< MQ Parser options descriptions
mutable std::mutex fMtx;
std::vector<std::string> fUnregisteredOptions; ///< container with unregistered options
mutable fair::mq::EventManager fEvents;
void ParseCmdLine(const int argc, char const* const* argv, bool allowUnregistered = true);
void ParseDefaults();
std::unordered_map<std::string, int> GetChannelInfoImpl() const;
// modify the value of variable map after calling boost::program_options::store
template<typename T>
void SetVarMapValue(const std::string& key, const T& val)
{
std::map<std::string, boost::program_options::variable_value>& vm = fVarMap;
vm[key].value() = boost::any(val);
}
void ParseChannelsFromCmdLine();
};
#include <fairmq/ProgOptions.h>
#endif /* FAIRMQPROGOPTIONS_H */

View File

@@ -1,86 +0,0 @@
/********************************************************************************
* Copyright (C) 2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public License (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/// @file FairMQSuboptParser.cxx
/// @author Matthias.Richter@scieq.net
/// @since 2017-03-30
/// @brief Parser implementation for key-value subopt format
#include "FairMQSuboptParser.h"
#include <boost/property_tree/ptree.hpp>
#include <cstring>
#include <utility> // make_pair
using boost::property_tree::ptree;
using namespace std;
namespace fair
{
namespace mq
{
namespace parser
{
constexpr const char* SUBOPT::channelOptionKeys[];
fair::mq::Properties SUBOPT::UserParser(const vector<string>& channelConfig, const string& deviceId)
{
ptree pt;
ptree devicesArray;
ptree deviceProperties;
ptree channelsArray;
for (auto token : channelConfig) {
string channelName;
ptree channelProperties;
ptree socketsArray;
string argString(token);
char* subopts = &argString[0];
char* value = nullptr;
while (subopts && *subopts != 0 && *subopts != ' ') {
int subopt = getsubopt(&subopts, (char**)channelOptionKeys, &value);
if (subopt == NAME) {
channelName = value;
channelProperties.put("name", channelName);
} else if (subopt == ADDRESS) {
ptree socketProperties;
socketProperties.put(channelOptionKeys[subopt], value);
socketsArray.push_back(make_pair("", socketProperties));
} else if (subopt >= 0 && value != nullptr) {
channelProperties.put(channelOptionKeys[subopt], value);
}
}
if (channelName != "") {
channelProperties.add_child("sockets", socketsArray);
} else {
// TODO: what is the error policy here, should we abort?
LOG(error) << "missing channel name in argument of option --channel-config";
}
channelsArray.push_back(make_pair("", channelProperties));
}
deviceProperties.put("id", deviceId);
deviceProperties.add_child("channels", channelsArray);
devicesArray.push_back(make_pair("", deviceProperties));
pt.add_child("fairMQOptions.devices", devicesArray);
return ptreeToProperties(pt, deviceId);
}
}
}
}

View File

@@ -1,94 +0,0 @@
/********************************************************************************
* Copyright (C) 2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public License (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/// @file FairMQSuboptParser.h
/// @author Matthias.Richter@scieq.net
/// @since 2017-03-30
/// @brief Parser implementation for key-value subopt format
#ifndef FAIRMQPARSER_SUBOPT_H
#define FAIRMQPARSER_SUBOPT_H
#include "FairMQParser.h" // for FairMQProperties
#include <boost/program_options.hpp>
#include <cstring>
#include <vector>
#include <string>
namespace fair
{
namespace mq
{
namespace parser
{
/**
* A parser implementation for FairMQ channel properties.
* The parser handles a comma separated key=value list format by using the
* getsubopt function of the standard library.
*
* The option key '--channel-config' can be used with the list of key/value
* pairs like e.g.
* <pre>
* --channel-config name=output,type=push,method=bind
* </pre>
*
* The FairMQ option parser defines a 'UserParser' function for different
* formats. Currently it is strictly parsing channel options, but in general
* the concept is extensible by renaming UserParser to ChannelPropertyParser
* and introducing additional parser functions.
*/
struct SUBOPT
{
enum channelOptionKeyIds
{
NAME = 0, // name of the channel
TYPE, // push, pull, publish, subscribe, etc
METHOD, // bind or connect
ADDRESS, // host, protocol and port address
TRANSPORT, //
SNDBUFSIZE, // size of the send queue
RCVBUFSIZE, // size of the receive queue
SNDKERNELSIZE,
RCVKERNELSIZE,
LINGER,
RATELOGGING, // logging rate
PORTRANGEMIN,
PORTRANGEMAX,
AUTOBIND,
NUMSOCKETS,
lastsocketkey
};
constexpr static const char* channelOptionKeys[] = {
/*[NAME] = */ "name",
/*[TYPE] = */ "type",
/*[METHOD] = */ "method",
/*[ADDRESS] = */ "address",
/*[TRANSPORT] = */ "transport",
/*[SNDBUFSIZE] = */ "sndBufSize",
/*[RCVBUFSIZE] = */ "rcvBufSize",
/*[SNDKERNELSIZE] = */ "sndKernelSize",
/*[RCVKERNELSIZE] = */ "rcvKernelSize",
/*[LINGER] = */ "linger",
/*[RATELOGGING] = */ "rateLogging",
/*[PORTRANGEMIN] = */ "portRangeMin",
/*[PORTRANGEMAX] = */ "portRangeMax",
/*[AUTOBIND] = */ "autoBind",
/*[NUMSOCKETS] = */ "numSockets",
nullptr
};
fair::mq::Properties UserParser(const std::vector<std::string>& channelConfig, const std::string& deviceId);
};
}
}
}
#endif /* FAIRMQPARSER_SUBOPT_H */

View File

@@ -1,28 +0,0 @@
/********************************************************************************
* Copyright (C) 2014-2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#ifndef FAIR_MQ_PROPERTIES_H
#define FAIR_MQ_PROPERTIES_H
#include <boost/any.hpp>
#include <map>
#include <string>
namespace fair
{
namespace mq
{
using Property = boost::any;
using Properties = std::map<std::string, Property>;
}
}
#endif /* FAIR_MQ_PROPERTIES_H */

View File

@@ -32,12 +32,20 @@ The basic structure looks like this:
The top level key is `fairMQOptions`, followed by one or more devices (with their IDs), each containing one or more channels (with their names), each containing one or more sockets.
The socket parameters accept following values:
- `type` (default = ""): "push"/"pull", "pub"/"sub", "req"/"rep", "xsub"/"xpub", "dealer/router", "pair".
- `type` (default = ""): "push"/"pull", "pub"/"sub", "req"/"rep", "pair".
- `method` (default = ""): "bind"/"connect".
- `address` (default = ""): address to bind/connect.
- `sndBufSize` (default = 1000): socket send queue size in number of messages.
- `rcvBufSize` (default = 1000): socket receive queue size in number of messages.
- `sndKernelSize"` (default = ):
- `rcvKernelSize"` (default = ):
- `rateLogging` (default = 1): log socket transfer rates in seconds. 0 for no logging of this socket.
- `transport"` (default = ): override the default device transport for this channel.
- `linger"` (default = ):
- `portRangeMin"` (default = ):
- `portRangeMax"` (default = ):
- `autoBind"` (default = ):
- `numSockets"` (default = ):
If a parameter is not specified, its default value will be set.

View File

@@ -1,150 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/*
* File: runSamplerRoot.cxx
* Author: winckler
*/
// FairRoot - FairMQ
#include "FairMQLogger.h"
#include "FairMQProgOptions.h"
#include "FairMQDevice.h"
#include <exception>
#include <string>
#include <vector>
#include <unordered_map>
using namespace std;
using FairMQMap = unordered_map<string, vector<FairMQChannel>>;
class MyDevice : public FairMQDevice
{
public:
MyDevice()
: fRate(0.5)
{}
virtual ~MyDevice()
{}
void SetRate(double r)
{
fRate = r;
}
double GetRate()
{
return fRate;
}
void Print()
{
LOG(info) << "[MyDevice] rate = " << fRate;
}
private:
double fRate;
};
void MyCallBack(MyDevice& d, double val)
{
d.SetRate(val);
d.Print();
}
int main(int argc, char** argv)
{
try
{
FairMQProgOptions config;
config.GetCmdLineOptions().add_options()
("data-rate", boost::program_options::value<double>()->default_value(0.5), "Data rate");
config.ParseAll(argc, argv, true);
// // get FairMQMap
// auto map1 = config.GetFairMQMap();
// // update value in variable map, and propagate the update to the FairMQMap
// config.SetValue<string>("chans.data.0.address","tcp://localhost:1234");
// // get the updated FairMQMap
// auto map2 = config.GetFairMQMap();
// // modify one channel value
// map2.at("data").at(0).UpdateSndBufSize(500);
// // update the FairMQMap and propagate the change in variable map
// config.UpdateChannelMap(map2);
MyDevice device;
device.SetConfig(config);
LOG(info) << "Subscribing: <string>(chans.data.0.address)";
config.Subscribe<string>("test", [&device](const string& key, string value)
{
if (key == "chans.data.0.address")
{
LOG(info) << "[callback] Updating device parameter " << key << " = " << value;
device.fChannels.at("data").at(0).UpdateAddress(value);
}
});
LOG(info) << "Subscribing: <int>(chans.data.0.rcvBufSize)";
config.Subscribe<int>("test", [&device](const string& key, int value)
{
if(key == "chans.data.0.rcvBufSize")
{
LOG(info) << "[callback] Updating device parameter " << key << " = " << value;
device.fChannels.at("data").at(0).UpdateRcvBufSize(value);
}
});
LOG(info) << "Subscribing: <double>(data-rate)";
config.Subscribe<double>("test", [&device](const string& key, double value)
{
if (key == "data-rate")
{
LOG(info) << "[callback] Updating device parameter " << key << " = " << value;
device.SetRate(value);
}
});
LOG(info) << "Starting value updates...\n";
config.SetValue<string>("chans.data.0.address", "tcp://localhost:4321");
LOG(info) << "config: " << config.GetValue<string>("chans.data.0.address");
LOG(info) << "device: " << device.fChannels.at("data").at(0).GetAddress() << endl;
config.SetValue<int>("chans.data.0.rcvBufSize", 100);
LOG(info) << "config: " << config.GetValue<int>("chans.data.0.rcvBufSize");
LOG(info) << "device: " << device.fChannels.at("data").at(0).GetRcvBufSize() << endl;
config.SetValue<double>("data-rate", 0.9);
LOG(info) << "config: " << config.GetValue<double>("data-rate");
LOG(info) << "device: " << device.GetRate() << endl;
device.Print();
LOG(info) << "nase: " << config.GetValue<double>("nase");
config.Unsubscribe<string>("test");
config.Unsubscribe<int>("test");
config.Unsubscribe<double>("test");
device.ChangeState("END");
}
catch (exception& e)
{
LOG(error) << "Unhandled Exception reached the top of main: " << e.what() << ", application will now exit";
return 1;
}
return 0;
}

View File

@@ -1,5 +0,0 @@
#!/bin/bash
DEVICE="runConfigExample"
DEVICE+=" --id sampler1 --channel-config name=data,type=push,method=bind,address=tcp://*:5555,rateLogging=0"
@CMAKE_CURRENT_BINARY_DIR@/$DEVICE