Extend Readout example

This commit is contained in:
Alexey Rybalchenko
2019-04-02 15:23:26 +02:00
committed by Dennis Klein
parent 35399ee039
commit 7e6eb382d5
17 changed files with 416 additions and 299 deletions

View File

@@ -26,33 +26,34 @@
#include <algorithm> // std::max
using namespace std;
using namespace fair::mq;
static map<fair::mq::Transition, fair::mq::State> backwardsCompatibilityWaitForEndOfStateHelper =
static map<Transition, State> backwardsCompatibilityWaitForEndOfStateHelper =
{
{ fair::mq::Transition::InitDevice, fair::mq::State::InitializingDevice },
{ fair::mq::Transition::CompleteInit, fair::mq::State::Initialized },
{ fair::mq::Transition::Bind, fair::mq::State::Bound },
{ fair::mq::Transition::Connect, fair::mq::State::DeviceReady },
{ fair::mq::Transition::InitTask, fair::mq::State::Ready },
{ fair::mq::Transition::Run, fair::mq::State::Ready },
{ fair::mq::Transition::Stop, fair::mq::State::Ready },
{ fair::mq::Transition::ResetTask, fair::mq::State::DeviceReady },
{ fair::mq::Transition::ResetDevice, fair::mq::State::Idle }
{ Transition::InitDevice, State::InitializingDevice },
{ Transition::CompleteInit, State::Initialized },
{ Transition::Bind, State::Bound },
{ Transition::Connect, State::DeviceReady },
{ Transition::InitTask, State::Ready },
{ Transition::Run, State::Ready },
{ Transition::Stop, State::Ready },
{ Transition::ResetTask, State::DeviceReady },
{ Transition::ResetDevice, State::Idle }
};
static map<int, fair::mq::Transition> backwardsCompatibilityChangeStateHelper =
static map<int, Transition> backwardsCompatibilityChangeStateHelper =
{
{ FairMQDevice::Event::INIT_DEVICE, fair::mq::Transition::InitDevice },
{ FairMQDevice::Event::internal_DEVICE_READY, fair::mq::Transition::Auto },
{ FairMQDevice::Event::INIT_TASK, fair::mq::Transition::InitTask },
{ FairMQDevice::Event::internal_READY, fair::mq::Transition::Auto },
{ FairMQDevice::Event::RUN, fair::mq::Transition::Run },
{ FairMQDevice::Event::STOP, fair::mq::Transition::Stop },
{ FairMQDevice::Event::RESET_TASK, fair::mq::Transition::ResetTask },
{ FairMQDevice::Event::RESET_DEVICE, fair::mq::Transition::ResetDevice },
{ FairMQDevice::Event::internal_IDLE, fair::mq::Transition::Auto },
{ FairMQDevice::Event::END, fair::mq::Transition::End },
{ FairMQDevice::Event::ERROR_FOUND, fair::mq::Transition::ErrorFound }
{ FairMQDevice::Event::INIT_DEVICE, Transition::InitDevice },
{ FairMQDevice::Event::internal_DEVICE_READY, Transition::Auto },
{ FairMQDevice::Event::INIT_TASK, Transition::InitTask },
{ FairMQDevice::Event::internal_READY, Transition::Auto },
{ FairMQDevice::Event::RUN, Transition::Run },
{ FairMQDevice::Event::STOP, Transition::Stop },
{ FairMQDevice::Event::RESET_TASK, Transition::ResetTask },
{ FairMQDevice::Event::RESET_DEVICE, Transition::ResetDevice },
{ FairMQDevice::Event::internal_IDLE, Transition::Auto },
{ FairMQDevice::Event::END, Transition::End },
{ FairMQDevice::Event::ERROR_FOUND, Transition::ErrorFound }
};
FairMQDevice::FairMQDevice()
@@ -65,21 +66,21 @@ FairMQDevice::FairMQDevice(FairMQProgOptions& config)
{
}
FairMQDevice::FairMQDevice(const fair::mq::tools::Version version)
FairMQDevice::FairMQDevice(const tools::Version version)
: FairMQDevice(nullptr, version)
{
}
FairMQDevice::FairMQDevice(FairMQProgOptions& config, const fair::mq::tools::Version version)
FairMQDevice::FairMQDevice(FairMQProgOptions& config, const tools::Version version)
: FairMQDevice(&config, version)
{
}
FairMQDevice::FairMQDevice(FairMQProgOptions* config, const fair::mq::tools::Version version)
FairMQDevice::FairMQDevice(FairMQProgOptions* config, const tools::Version version)
: fTransportFactory(nullptr)
, fTransports()
, fChannels()
, fInternalConfig(config ? nullptr : fair::mq::tools::make_unique<FairMQProgOptions>())
, fInternalConfig(config ? nullptr : tools::make_unique<FairMQProgOptions>())
, fConfig(config ? config : fInternalConfig.get())
, fId()
, fDefaultTransportType(fair::mq::Transport::ZMQ)
@@ -99,11 +100,11 @@ FairMQDevice::FairMQDevice(FairMQProgOptions* config, const fair::mq::tools::Ver
, fMaxRunRuntimeInS(0)
, fRawCmdLineArgs()
{
SubscribeToNewTransition("device", [&](fair::mq::Transition transition) {
SubscribeToNewTransition("device", [&](Transition transition) {
LOG(trace) << "device notified on new transition: " << transition;
switch (transition) {
case fair::mq::Transition::Stop:
case Transition::Stop:
UnblockTransports();
break;
default:
@@ -182,7 +183,7 @@ bool FairMQDevice::ChangeState(const int transition)
return ChangeState(backwardsCompatibilityChangeStateHelper.at(transition));
}
void FairMQDevice::WaitForEndOfState(fair::mq::Transition transition)
void FairMQDevice::WaitForEndOfState(Transition transition)
{
WaitForState(backwardsCompatibilityWaitForEndOfStateHelper.at(transition));
}
@@ -226,7 +227,7 @@ void FairMQDevice::InitWrapper()
int subChannelIndex = 0;
for (auto& vi : mi.second) {
// set channel name: name + vector index
vi.fName = fair::mq::tools::ToString(mi.first, "[", subChannelIndex, "]");
vi.fName = tools::ToString(mi.first, "[", subChannelIndex, "]");
// set channel transport
LOG(debug) << "Initializing transport for channel " << vi.fName << ": " << fair::mq::TransportNames.at(vi.fTransportType);
@@ -237,9 +238,9 @@ void FairMQDevice::InitWrapper()
if (vi.fAddress == "unspecified" || vi.fAddress == "") {
// if the configured network interface is default, get its name from the default route
if (networkInterface == "default") {
networkInterface = fair::mq::tools::getDefaultRouteNetworkInterface();
networkInterface = tools::getDefaultRouteNetworkInterface();
}
vi.fAddress = "tcp://" + fair::mq::tools::getInterfaceIP(networkInterface) + ":1";
vi.fAddress = "tcp://" + tools::getInterfaceIP(networkInterface) + ":1";
}
// fill the uninitialized list
fUninitializedBindingChannels.push_back(&vi);
@@ -251,14 +252,14 @@ void FairMQDevice::InitWrapper()
fUninitializedConnectingChannels.push_back(&vi);
} else {
LOG(error) << "Cannot update configuration. Socket method (bind/connect) for channel '" << vi.fName << "' not specified.";
throw runtime_error(fair::mq::tools::ToString("Cannot update configuration. Socket method (bind/connect) for channel ", vi.fName, " not specified."));
throw runtime_error(tools::ToString("Cannot update configuration. Socket method (bind/connect) for channel ", vi.fName, " not specified."));
}
subChannelIndex++;
}
}
// ChangeState(fair::mq::Transition::Auto);
// ChangeState(Transition::Auto);
}
void FairMQDevice::BindWrapper()
@@ -269,12 +270,12 @@ void FairMQDevice::BindWrapper()
if (!fUninitializedBindingChannels.empty()) {
LOG(error) << fUninitializedBindingChannels.size() << " of the binding channels could not initialize. Initial configuration incomplete.";
throw runtime_error(fair::mq::tools::ToString(fUninitializedBindingChannels.size(), " of the binding channels could not initialize. Initial configuration incomplete."));
throw runtime_error(tools::ToString(fUninitializedBindingChannels.size(), " of the binding channels could not initialize. Initial configuration incomplete."));
}
Bind();
ChangeState(fair::mq::Transition::Auto);
ChangeState(Transition::Auto);
}
void FairMQDevice::ConnectWrapper()
@@ -301,7 +302,7 @@ void FairMQDevice::ConnectWrapper()
if (numAttempts++ > maxAttempts) {
LOG(error) << "could not connect all channels after " << initializationTimeoutInS << " attempts";
throw runtime_error(fair::mq::tools::ToString("could not connect all channels after ", initializationTimeoutInS, " attempts"));
throw runtime_error(tools::ToString("could not connect all channels after ", initializationTimeoutInS, " attempts"));
}
AttachChannels(fUninitializedConnectingChannels);
@@ -313,7 +314,7 @@ void FairMQDevice::ConnectWrapper()
Connect();
ChangeState(fair::mq::Transition::Auto);
ChangeState(Transition::Auto);
}
void FairMQDevice::AttachChannels(vector<FairMQChannel*>& chans)
@@ -366,7 +367,7 @@ bool FairMQDevice::AttachChannel(FairMQChannel& chan)
string hostPart = addressString.substr(0, pos);
if (!(bind && hostPart == "*")) {
string portPart = addressString.substr(pos + 1);
string resolvedHost = fair::mq::tools::getIpFromHostname(hostPart);
string resolvedHost = tools::getIpFromHostname(hostPart);
if (resolvedHost == "") {
return false;
}
@@ -414,7 +415,7 @@ void FairMQDevice::InitTaskWrapper()
{
InitTask();
ChangeState(fair::mq::Transition::Auto);
ChangeState(Transition::Auto);
}
bool FairMQDevice::SortSocketsByAddress(const FairMQChannel &lhs, const FairMQChannel &rhs)
@@ -433,7 +434,7 @@ void FairMQDevice::SortChannel(const string& name, const bool reindex)
for (auto vi = fChannels.at(name).begin(); vi != fChannels.at(name).end(); ++vi)
{
// set channel name: name + vector index
vi->fName = fair::mq::tools::ToString(name, "[", vi - fChannels.at(name).begin(), "]");
vi->fName = tools::ToString(name, "[", vi - fChannels.at(name).begin(), "]");
}
}
}
@@ -467,7 +468,7 @@ void FairMQDevice::RunWrapper()
HandleMultipleChannelInput();
}
} else {
fair::mq::tools::RateLimiter rateLimiter(fRate);
tools::RateLimiter rateLimiter(fRate);
while (!NewStatePending() && ConditionalRun()) {
if (fRate > 0.001) {
@@ -481,7 +482,7 @@ void FairMQDevice::RunWrapper()
// if Run() exited and the state is still RUNNING, transition to READY.
if (!NewStatePending()) {
UnblockTransports();
ChangeState(fair::mq::Transition::Stop);
ChangeState(Transition::Stop);
}
PostRun();
@@ -489,10 +490,10 @@ void FairMQDevice::RunWrapper()
} catch (const out_of_range& oor) {
LOG(error) << "out of range: " << oor.what();
LOG(error) << "incorrect/incomplete channel configuration?";
ChangeState(fair::mq::Transition::ErrorFound);
ChangeState(Transition::ErrorFound);
throw;
} catch (...) {
ChangeState(fair::mq::Transition::ErrorFound);
ChangeState(Transition::ErrorFound);
throw;
}
@@ -664,7 +665,7 @@ void FairMQDevice::PollForTransport(const FairMQTransportFactory* factory, const
catch (exception& e)
{
LOG(error) << "FairMQDevice::PollForTransport() failed: " << e.what() << ", going to ERROR state.";
throw runtime_error(fair::mq::tools::ToString("FairMQDevice::PollForTransport() failed: ", e.what(), ", going to ERROR state."));
throw runtime_error(tools::ToString("FairMQDevice::PollForTransport() failed: ", e.what(), ", going to ERROR state."));
}
}
@@ -738,7 +739,7 @@ void FairMQDevice::LogSocketRates()
filteredSockets.push_back(vi->fSocket.get());
logIntervals.push_back(vi->fRateLogging);
intervalCounters.push_back(0);
filteredChannelNames.push_back(fair::mq::tools::ToString(mi.first, "[", vi - (mi.second).begin(), "]"));
filteredChannelNames.push_back(tools::ToString(mi.first, "[", vi - (mi.second).begin(), "]"));
chanNameLen = max(chanNameLen, filteredChannelNames.back().length());
}
}
@@ -814,7 +815,7 @@ void FairMQDevice::LogSocketRates()
t0 = t1;
if (fMaxRunRuntimeInS > 0 && ++secondsElapsed >= fMaxRunRuntimeInS) {
ChangeState(fair::mq::Transition::Stop);
ChangeState(Transition::Stop);
}
}
}
@@ -830,7 +831,7 @@ void FairMQDevice::ResetTaskWrapper()
{
ResetTask();
ChangeState(fair::mq::Transition::Auto);
ChangeState(Transition::Auto);
}
void FairMQDevice::ResetWrapper()
@@ -850,7 +851,7 @@ void FairMQDevice::ResetWrapper()
Reset();
ChangeState(fair::mq::Transition::Auto);
ChangeState(Transition::Auto);
}
FairMQDevice::~FairMQDevice()