Refactor initialization

- add device constructor that accepts FairMQProgOptions object.
 - Initialize config values in INIT state (to allow their update).
 - Simplify FairMQProgOptions handling in FairMQDevice.
 - Simplify SetTransport/SetConfig - refactor duplicated code.
 - Add FairMQDevice methods to add channels.
This commit is contained in:
Alexey Rybalchenko
2018-08-16 16:40:24 +02:00
committed by Dennis Klein
parent 1c78b8ef0a
commit 1bb558a457
5 changed files with 123 additions and 163 deletions

View File

@@ -8,9 +8,6 @@
#include <FairMQDevice.h>
#include <fairmq/Tools.h>
#include <fairmq/Transports.h>
#include <boost/algorithm/string.hpp> // join/split
#include <boost/uuid/uuid.hpp>
@@ -31,53 +28,39 @@
using namespace std;
FairMQDevice::FairMQDevice()
: fTransportFactory(nullptr)
, fTransports()
, fChannels()
, fConfig(nullptr)
, fId()
, fNumIoThreads(1)
, fInitialValidationFinished(false)
, fInitialValidationCondition()
, fInitialValidationMutex()
, fPortRangeMin(22000)
, fPortRangeMax(32000)
, fNetworkInterface()
, fDefaultTransportType(fair::mq::Transport::DEFAULT)
, fInitializationTimeoutInS(120)
, fDataCallbacks(false)
, fMsgInputs()
, fMultipartInputs()
, fMultitransportInputs()
, fChannelRegistry()
, fInputChannelKeys()
, fMultitransportMutex()
, fMultitransportProceed(false)
, fExternalConfig(false)
, fVersion({0, 0, 0})
, fRate(0.)
, fLastTime(0)
, fRawCmdLineArgs()
: FairMQDevice(nullptr, {0, 0, 0})
{
}
FairMQDevice::FairMQDevice(FairMQProgOptions& config)
: FairMQDevice(&config, {0, 0, 0})
{
}
FairMQDevice::FairMQDevice(const fair::mq::tools::Version version)
: FairMQDevice(nullptr, version)
{
}
FairMQDevice::FairMQDevice(FairMQProgOptions& config, const fair::mq::tools::Version version)
: FairMQDevice(&config, version)
{
}
FairMQDevice::FairMQDevice(FairMQProgOptions* config, const fair::mq::tools::Version version)
: fTransportFactory(nullptr)
, fTransports()
, fChannels()
, fConfig(nullptr)
, fInternalConfig(config ? nullptr : fair::mq::tools::make_unique<FairMQProgOptions>())
, fConfig(config ? config : fInternalConfig.get())
, fId()
, fNumIoThreads(1)
, fInitialValidationFinished(false)
, fInitialValidationCondition()
, fInitialValidationMutex()
, fPortRangeMin(22000)
, fPortRangeMax(32000)
, fNetworkInterface()
, fDefaultTransportType(fair::mq::Transport::DEFAULT)
, fInitializationTimeoutInS(120)
, fDataCallbacks(false)
, fMsgInputs()
, fMultipartInputs()
@@ -86,7 +69,6 @@ FairMQDevice::FairMQDevice(const fair::mq::tools::Version version)
, fInputChannelKeys()
, fMultitransportMutex()
, fMultitransportProceed(false)
, fExternalConfig(false)
, fVersion(version)
, fRate(0.)
, fLastTime(0)
@@ -96,16 +78,43 @@ FairMQDevice::FairMQDevice(const fair::mq::tools::Version version)
void FairMQDevice::InitWrapper()
{
if (!fTransportFactory)
fId = fConfig->GetValue<string>("id");
fRate = fConfig->GetValue<float>("rate");
fPortRangeMin = fConfig->GetValue<int>("port-range-min");
fPortRangeMax = fConfig->GetValue<int>("port-range-max");
try
{
LOG(error) << "Transport not initialized. Did you call SetTransport()?";
exit(EXIT_FAILURE);
fDefaultTransportType = fair::mq::TransportTypes.at(fConfig->GetValue<string>("transport"));
}
catch (const exception& e)
{
LOG(error) << "invalid transport type provided: " << fConfig->GetValue<string>("transport");
}
for (auto& c : fConfig->GetFairMQMap())
{
if (fChannels.find(c.first) == fChannels.end())
{
LOG(debug) << "Inserting new device channel from config: " << c.first;
fChannels.insert(c);
}
else
{
LOG(debug) << "Updating existing device channel from config: " << c.first;
fChannels[c.first] = c.second;
}
}
LOG(debug) << "Requesting '" << fair::mq::TransportNames.at(fDefaultTransportType) << "' as default transport for the device";
fTransportFactory = AddTransport(fDefaultTransportType);
// Containers to store the uninitialized channels.
vector<FairMQChannel*> uninitializedBindingChannels;
vector<FairMQChannel*> uninitializedConnectingChannels;
string networkInterface = fConfig->GetValue<string>("network-interface");
// Fill the uninitialized channel containers
for (auto& mi : fChannels)
{
@@ -126,11 +135,11 @@ void FairMQDevice::InitWrapper()
if (vi->fAddress == "unspecified" || vi->fAddress == "")
{
// if the configured network interface is default, get its name from the default route
if (fNetworkInterface == "default")
if (networkInterface == "default")
{
fNetworkInterface = fair::mq::tools::getDefaultRouteNetworkInterface();
networkInterface = fair::mq::tools::getDefaultRouteNetworkInterface();
}
vi->fAddress = "tcp://" + fair::mq::tools::getInterfaceIP(fNetworkInterface) + ":1";
vi->fAddress = "tcp://" + fair::mq::tools::getInterfaceIP(networkInterface) + ":1";
}
// fill the uninitialized list
uninitializedBindingChannels.push_back(&(*vi));
@@ -175,10 +184,12 @@ void FairMQDevice::InitWrapper()
fInitialValidationCondition.notify_one();
}
int initializationTimeoutInS = fConfig->GetValue<int>("initialization-timeout");
// go over the list of channels until all are initialized (and removed from the uninitialized list)
int numAttempts = 1;
auto sleepTimeInMS = 50;
auto maxAttempts = fInitializationTimeoutInS * 1000 / sleepTimeInMS;
auto maxAttempts = initializationTimeoutInS * 1000 / sleepTimeInMS;
// first attempt
AttachChannels(uninitializedConnectingChannels);
// if not all channels could be connected, update their address values from config and retry
@@ -201,9 +212,9 @@ void FairMQDevice::InitWrapper()
if (numAttempts++ > maxAttempts)
{
LOG(error) << "could not connect all channels after " << fInitializationTimeoutInS << " attempts";
LOG(error) << "could not connect all channels after " << initializationTimeoutInS << " attempts";
ChangeState(ERROR_FOUND);
// throw runtime_error(fair::mq::tools::ToString("could not connect all channels after ", fInitializationTimeoutInS, " attempts"));
// throw runtime_error(fair::mq::tools::ToString("could not connect all channels after ", initializationTimeoutInS, " attempts"));
}
AttachChannels(uninitializedConnectingChannels);
@@ -252,19 +263,15 @@ void FairMQDevice::AttachChannels(vector<FairMQChannel*>& chans)
bool FairMQDevice::AttachChannel(FairMQChannel& ch)
{
if (!ch.fTransportFactory)
if (ch.fTransportType == fair::mq::Transport::DEFAULT || ch.fTransportType == fTransportFactory->GetType())
{
if (ch.fTransportType == fair::mq::Transport::DEFAULT || ch.fTransportType == fTransportFactory->GetType())
{
LOG(debug) << ch.fName << ": using default transport";
ch.InitTransport(fTransportFactory);
}
else
{
LOG(debug) << ch.fName << ": channel transport (" << fair::mq::TransportNames.at(fDefaultTransportType) << ") overriden to " << fair::mq::TransportNames.at(ch.fTransportType);
ch.InitTransport(AddTransport(ch.fTransportType));
}
ch.fTransportType = ch.fTransportFactory->GetType();
LOG(debug) << ch.fName << ": using default transport";
ch.InitTransport(fTransportFactory);
}
else
{
LOG(debug) << ch.fName << ": channel transport (" << fair::mq::TransportNames.at(fDefaultTransportType) << ") overriden to " << fair::mq::TransportNames.at(ch.fTransportType);
ch.InitTransport(AddTransport(ch.fTransportType));
}
vector<string> endpoints;
@@ -798,78 +805,10 @@ shared_ptr<FairMQTransportFactory> FairMQDevice::AddTransport(const fair::mq::Tr
}
}
void FairMQDevice::CreateOwnConfig()
{
// TODO: make fConfig a shared_ptr when no old user code has FairMQProgOptions ptr*
fConfig = new FairMQProgOptions();
string id{boost::uuids::to_string(boost::uuids::random_generator()())};
LOG(warn) << "No FairMQProgOptions provided, creating one internally and setting device ID to " << id;
// dummy argc+argv
char arg0[] = "undefined"; // executable name
char arg1[] = "--id";
char* arg2 = const_cast<char*>(id.c_str()); // device ID
const char* argv[] = { &arg0[0], &arg1[0], arg2, nullptr };
int argc = static_cast<int>((sizeof(argv) / sizeof(argv[0])) - 1);
fConfig->ParseAll(argc, &argv[0]);
fId = fConfig->GetValue<string>("id");
fNetworkInterface = fConfig->GetValue<string>("network-interface");
fNumIoThreads = fConfig->GetValue<int>("io-threads");
fInitializationTimeoutInS = fConfig->GetValue<int>("initialization-timeout");
fRate = fConfig->GetValue<float>("rate");
try {
fDefaultTransportType = fair::mq::TransportTypes.at(fConfig->GetValue<string>("transport"));
} catch(const exception& e) {
LOG(error) << "invalid transport type provided: " << fConfig->GetValue<string>("transport");
}
}
void FairMQDevice::SetTransport(const string& transport)
{
// This method is the first to be called, if FairMQProgOptions are not used (either SetTransport() or SetConfig() make sense, not both).
// Make sure here that at least internal config is available.
if (!fExternalConfig && !fConfig)
{
CreateOwnConfig();
}
if (fTransports.empty())
{
LOG(debug) << "Requesting '" << transport << "' as default transport for the device";
fTransportFactory = AddTransport(fair::mq::TransportTypes.at(transport));
}
else
{
LOG(error) << "Transports container is not empty when setting transport. Setting default twice?";
ChangeState(ERROR_FOUND);
}
}
void FairMQDevice::SetConfig(FairMQProgOptions& config)
{
fExternalConfig = true;
fInternalConfig.reset();
fConfig = &config;
for (auto& c : fConfig->GetFairMQMap())
{
if (!fChannels.insert(c).second)
{
LOG(warn) << "did not insert channel '" << c.first << "', it is already in the device.";
}
}
fId = fConfig->GetValue<string>("id");
fNetworkInterface = fConfig->GetValue<string>("network-interface");
fNumIoThreads = fConfig->GetValue<int>("io-threads");
fInitializationTimeoutInS = fConfig->GetValue<int>("initialization-timeout");
fRate = fConfig->GetValue<float>("rate");
try {
fDefaultTransportType = fair::mq::TransportTypes.at(fConfig->GetValue<string>("transport"));
} catch(const exception& e) {
LOG(error) << "invalid transport type provided: " << fConfig->GetValue<string>("transport");
}
SetTransport(fConfig->GetValue<string>("transport"));
}
void FairMQDevice::LogSocketRates()
@@ -1020,7 +959,7 @@ void FairMQDevice::Reset()
for (auto& vi : mi.second)
{
// vi.fReset = true;
vi.fSocket.reset();
vi.fSocket.reset(); // destroy FairMQSocket
}
}
}
@@ -1032,10 +971,6 @@ const FairMQChannel& FairMQDevice::GetChannel(const string& channelName, const i
void FairMQDevice::Exit()
{
if (!fExternalConfig && fConfig)
{
delete fConfig;
}
}
FairMQDevice::~FairMQDevice()