Update state machine

- Split INITIALIZING state into Init+Bind+Connect
 - Remove PAUSE state
 - Convert state/transitions to enum classes (CamelCase)
 - Transition to a state only once previous handler is complete
 - Add CompleteInit transition to notify Initializing state
   that config updates are complete
 - Deprecate WaitForEndOfState(transition) in favor of
   WaitForState(state)/WaitForNextState()
 - Update tests/plugins to new APIs
 - Deprecate CheckCurrentState() in favor of NewStatePending()
This commit is contained in:
Alexey Rybalchenko
2019-02-07 13:38:11 +01:00
committed by Dennis Klein
parent 5e71d09e4d
commit fc94342db8
33 changed files with 1322 additions and 1515 deletions

View File

@@ -27,6 +27,19 @@
using namespace std;
static map<fair::mq::Transition, fair::mq::State> backwardsCompatibilityWaitForEndOfStateHelper =
{
{ fair::mq::Transition::InitDevice, fair::mq::State::InitializingDevice },
{ fair::mq::Transition::CompleteInit, fair::mq::State::Initialized },
{ fair::mq::Transition::Bind, fair::mq::State::Bound },
{ fair::mq::Transition::Connect, fair::mq::State::DeviceReady },
{ fair::mq::Transition::InitTask, fair::mq::State::Ready },
{ fair::mq::Transition::Run, fair::mq::State::Ready },
{ fair::mq::Transition::Stop, fair::mq::State::Ready },
{ fair::mq::Transition::ResetTask, fair::mq::State::DeviceReady },
{ fair::mq::Transition::ResetDevice, fair::mq::State::Idle }
};
FairMQDevice::FairMQDevice()
: FairMQDevice(nullptr, {0, 0, 0})
{
@@ -54,7 +67,10 @@ FairMQDevice::FairMQDevice(FairMQProgOptions* config, const fair::mq::tools::Ver
, fInternalConfig(config ? nullptr : fair::mq::tools::make_unique<FairMQProgOptions>())
, fConfig(config ? config : fInternalConfig.get())
, fId()
, fDefaultTransportType(fair::mq::Transport::DEFAULT)
, fDefaultTransportType(fair::mq::Transport::ZMQ)
, fStateMachine()
, fUninitializedBindingChannels()
, fUninitializedConnectingChannels()
, fDataCallbacks(false)
, fMsgInputs()
, fMultipartInputs()
@@ -66,16 +82,98 @@ FairMQDevice::FairMQDevice(FairMQProgOptions* config, const fair::mq::tools::Ver
, fVersion(version)
, fRate(0.)
, fRawCmdLineArgs()
, fInterrupted(false)
, fInterruptedCV()
, fInterruptedMtx()
, fRateLogging(true)
{
SubscribeToNewTransition("device", [&](fair::mq::Transition transition) {
LOG(trace) << "device notified on new transition: " << transition;
switch (transition) {
case fair::mq::Transition::Stop:
UnblockTransports();
break;
default:
break;
}
});
fStateMachine.HandleStates([&](fair::mq::State state) {
LOG(trace) << "device notified on new state: " << state;
{
lock_guard<mutex> lock(fStatesMtx);
fStates.push(state);
}
fStatesCV.notify_one();
switch (state) {
case fair::mq::State::InitializingDevice:
InitWrapper();
break;
case fair::mq::State::Binding:
BindWrapper();
break;
case fair::mq::State::Connecting:
ConnectWrapper();
break;
case fair::mq::State::InitializingTask:
InitTaskWrapper();
break;
case fair::mq::State::Running:
RunWrapper();
break;
case fair::mq::State::ResettingTask:
ResetTaskWrapper();
break;
case fair::mq::State::ResettingDevice:
ResetWrapper();
break;
case fair::mq::State::Exiting:
Exit();
break;
default:
LOG(trace) << "device notified on new state without a matching handler: " << state;
break;
}
});
fStateMachine.Start();
}
fair::mq::State FairMQDevice::WaitForNextState()
{
unique_lock<mutex> lock(fStatesMtx);
while (fStates.empty()) {
fStatesCV.wait_for(lock, chrono::milliseconds(50));
}
auto result = fStates.front();
if (result == fair::mq::State::Error) {
throw DeviceStateError("Device transitioned to error state.");
}
fStates.pop();
return result;
}
void FairMQDevice::WaitForState(fair::mq::State state)
{
while (WaitForNextState() != state) {}
}
void FairMQDevice::WaitForEndOfState(fair::mq::Transition transition)
{
WaitForState(backwardsCompatibilityWaitForEndOfStateHelper.at(transition));
}
void FairMQDevice::InitWrapper()
{
fStateMachine.WaitForPendingState();
fId = fConfig->GetValue<string>("id");
Init();
fRate = fConfig->GetValue<float>("rate");
try {
@@ -96,13 +194,9 @@ void FairMQDevice::InitWrapper()
}
}
LOG(debug) << "Requesting '" << fair::mq::TransportNames.at(fDefaultTransportType) << "' as default transport for the device";
LOG(debug) << "Setting '" << fair::mq::TransportNames.at(fDefaultTransportType) << "' as default transport for the device";
fTransportFactory = AddTransport(fDefaultTransportType);
// Containers to store the uninitialized channels.
vector<FairMQChannel*> uninitializedBindingChannels;
vector<FairMQChannel*> uninitializedConnectingChannels;
string networkInterface = fConfig->GetValue<string>("network-interface");
// Fill the uninitialized channel containers
@@ -113,13 +207,8 @@ void FairMQDevice::InitWrapper()
vi.fName = fair::mq::tools::ToString(mi.first, "[", subChannelIndex, "]");
// set channel transport
if (vi.fTransportType == fair::mq::Transport::DEFAULT || vi.fTransportType == fTransportFactory->GetType()) {
LOG(debug) << vi.fName << ": using default transport";
vi.InitTransport(fTransportFactory);
} else {
LOG(debug) << vi.fName << ": channel transport (" << fair::mq::TransportNames.at(fDefaultTransportType) << ") overriden to " << fair::mq::TransportNames.at(vi.fTransportType);
vi.InitTransport(AddTransport(vi.fTransportType));
}
LOG(debug) << "Initializing transport for channel " << vi.fName << ": " << fair::mq::TransportNames.at(vi.fTransportType);
vi.InitTransport(AddTransport(vi.fTransportType));
if (vi.fMethod == "bind") {
// if binding address is not specified, try getting it from the configured network interface
@@ -131,13 +220,13 @@ void FairMQDevice::InitWrapper()
vi.fAddress = "tcp://" + fair::mq::tools::getInterfaceIP(networkInterface) + ":1";
}
// fill the uninitialized list
uninitializedBindingChannels.push_back(&vi);
fUninitializedBindingChannels.push_back(&vi);
} else if (vi.fMethod == "connect") {
// fill the uninitialized list
uninitializedConnectingChannels.push_back(&vi);
fUninitializedConnectingChannels.push_back(&vi);
} else if (vi.fAddress.find_first_of("@+>") != string::npos) {
// fill the uninitialized list
uninitializedConnectingChannels.push_back(&vi);
fUninitializedConnectingChannels.push_back(&vi);
} else {
LOG(error) << "Cannot update configuration. Socket method (bind/connect) for channel '" << vi.fName << "' not specified.";
throw runtime_error(fair::mq::tools::ToString("Cannot update configuration. Socket method (bind/connect) for channel ", vi.fName, " not specified."));
@@ -147,17 +236,27 @@ void FairMQDevice::InitWrapper()
}
}
// ChangeState(fair::mq::Transition::Auto);
}
void FairMQDevice::BindWrapper()
{
// Bind channels. Here one run is enough, because bind settings should be available locally
// If necessary this could be handled in the same way as the connecting channels
AttachChannels(uninitializedBindingChannels);
AttachChannels(fUninitializedBindingChannels);
if (!uninitializedBindingChannels.empty()) {
LOG(error) << uninitializedBindingChannels.size() << " of the binding channels could not initialize. Initial configuration incomplete.";
throw runtime_error(fair::mq::tools::ToString(uninitializedBindingChannels.size(), " of the binding channels could not initialize. Initial configuration incomplete."));
if (!fUninitializedBindingChannels.empty()) {
LOG(error) << fUninitializedBindingChannels.size() << " of the binding channels could not initialize. Initial configuration incomplete.";
throw runtime_error(fair::mq::tools::ToString(fUninitializedBindingChannels.size(), " of the binding channels could not initialize. Initial configuration incomplete."));
}
CallStateChangeCallbacks(INITIALIZING_DEVICE);
Bind();
ChangeState(fair::mq::Transition::Auto);
}
void FairMQDevice::ConnectWrapper()
{
int initializationTimeoutInS = fConfig->GetValue<int>("initialization-timeout");
// go over the list of channels until all are initialized (and removed from the uninitialized list)
@@ -165,12 +264,12 @@ void FairMQDevice::InitWrapper()
auto sleepTimeInMS = 50;
auto maxAttempts = initializationTimeoutInS * 1000 / sleepTimeInMS;
// first attempt
AttachChannels(uninitializedConnectingChannels);
AttachChannels(fUninitializedConnectingChannels);
// if not all channels could be connected, update their address values from config and retry
while (!uninitializedConnectingChannels.empty()) {
while (!fUninitializedConnectingChannels.empty()) {
this_thread::sleep_for(chrono::milliseconds(sleepTimeInMS));
for (auto& chan : uninitializedConnectingChannels) {
for (auto& chan : fUninitializedConnectingChannels) {
string key{"chans." + chan->GetChannelPrefix() + "." + chan->GetChannelIndex() + ".address"};
string newAddress = fConfig->GetValue<string>(key);
if (newAddress != chan->GetAddress()) {
@@ -183,20 +282,16 @@ void FairMQDevice::InitWrapper()
throw runtime_error(fair::mq::tools::ToString("could not connect all channels after ", initializationTimeoutInS, " attempts"));
}
AttachChannels(uninitializedConnectingChannels);
AttachChannels(fUninitializedConnectingChannels);
}
Init();
if (fChannels.empty()) {
LOG(warn) << "No channels created after finishing initialization";
}
ChangeState(internal_DEVICE_READY);
}
Connect();
void FairMQDevice::Init()
{
ChangeState(fair::mq::Transition::Auto);
}
void FairMQDevice::AttachChannels(vector<FairMQChannel*>& chans)
@@ -295,15 +390,9 @@ bool FairMQDevice::AttachChannel(FairMQChannel& chan)
void FairMQDevice::InitTaskWrapper()
{
CallStateChangeCallbacks(INITIALIZING_TASK);
InitTask();
ChangeState(internal_READY);
}
void FairMQDevice::InitTask()
{
ChangeState(fair::mq::Transition::Auto);
}
bool FairMQDevice::SortSocketsByAddress(const FairMQChannel &lhs, const FairMQChannel &rhs)
@@ -334,21 +423,13 @@ void FairMQDevice::SortChannel(const string& name, const bool reindex)
void FairMQDevice::RunWrapper()
{
CallStateChangeCallbacks(RUNNING);
LOG(info) << "DEVICE: Running...";
// start the rate logger thread
fRateLogging = true;
future<void> rateLogger = async(launch::async, &FairMQDevice::LogSocketRates, this);
// notify transports to resume transfers
{
lock_guard<mutex> guard(fInterruptedMtx);
fInterrupted = false;
}
for (auto& t : fTransports)
{
for (auto& t : fTransports) {
t.second->Resume();
}
@@ -356,50 +437,43 @@ void FairMQDevice::RunWrapper()
PreRun();
// process either data callbacks or ConditionalRun/Run
if (fDataCallbacks)
{
if (fDataCallbacks) {
// if only one input channel, do lightweight handling without additional polling.
if (fInputChannelKeys.size() == 1 && fChannels.at(fInputChannelKeys.at(0)).size() == 1)
{
if (fInputChannelKeys.size() == 1 && fChannels.at(fInputChannelKeys.at(0)).size() == 1) {
HandleSingleChannelInput();
}
else // otherwise do full handling with polling
{
} else {// otherwise do full handling with polling
HandleMultipleChannelInput();
}
}
else
{
} else {
fair::mq::tools::RateLimiter rateLimiter(fRate);
while (CheckCurrentState(RUNNING) && ConditionalRun())
{
if (fRate > 0.001)
{
while (!NewStatePending() && ConditionalRun()) {
if (fRate > 0.001) {
rateLimiter.maybe_sleep();
}
}
Run();
}
// if Run() exited and the state is still RUNNING, transition to READY.
if (!NewStatePending()) {
UnblockTransports();
ChangeState(fair::mq::Transition::Stop);
}
PostRun();
} catch (const out_of_range& oor) {
LOG(error) << "out of range: " << oor.what();
LOG(error) << "incorrect/incomplete channel configuration?";
fRateLogging = false;
ChangeState(fair::mq::Transition::ErrorFound);
throw;
} catch (...) {
fRateLogging = false;
ChangeState(fair::mq::Transition::ErrorFound);
throw;
}
// if Run() exited and the state is still RUNNING, transition to READY.
if (CheckCurrentState(RUNNING))
{
ChangeState(internal_READY);
}
PostRun();
rateLogger.get();
}
@@ -409,14 +483,14 @@ void FairMQDevice::HandleSingleChannelInput()
if (fMsgInputs.size() > 0)
{
while (CheckCurrentState(RUNNING) && proceed)
while (!NewStatePending() && proceed)
{
proceed = HandleMsgInput(fInputChannelKeys.at(0), fMsgInputs.begin()->second, 0);
}
}
else if (fMultipartInputs.size() > 0)
{
while (CheckCurrentState(RUNNING) && proceed)
while (!NewStatePending() && proceed)
{
proceed = HandleMultipartInput(fInputChannelKeys.at(0), fMultipartInputs.begin()->second, 0);
}
@@ -468,7 +542,7 @@ void FairMQDevice::HandleMultipleChannelInput()
FairMQPollerPtr poller(fChannels.at(fInputChannelKeys.at(0)).at(0).fTransportFactory->CreatePoller(fChannels, fInputChannelKeys));
while (CheckCurrentState(RUNNING) && proceed)
while (!NewStatePending() && proceed)
{
poller->Poll(200);
@@ -526,7 +600,7 @@ void FairMQDevice::PollForTransport(const FairMQTransportFactory* factory, const
{
FairMQPollerPtr poller(factory->CreatePoller(fChannels, channelKeys));
while (CheckCurrentState(RUNNING) && fMultitransportProceed)
while (!NewStatePending() && fMultitransportProceed)
{
poller->Poll(500);
@@ -600,58 +674,21 @@ bool FairMQDevice::HandleMultipartInput(const string& chName, const InputMultipa
}
}
void FairMQDevice::Run()
shared_ptr<FairMQTransportFactory> FairMQDevice::AddTransport(fair::mq::Transport transport)
{
}
void FairMQDevice::PreRun()
{
}
bool FairMQDevice::ConditionalRun()
{
return false;
}
void FairMQDevice::PostRun()
{
}
void FairMQDevice::PauseWrapper()
{
CallStateChangeCallbacks(PAUSED);
Pause();
}
void FairMQDevice::Pause()
{
while (CheckCurrentState(PAUSED))
{
this_thread::sleep_for(chrono::milliseconds(500));
LOG(debug) << "paused...";
if (transport == fair::mq::Transport::DEFAULT) {
transport = fDefaultTransportType;
}
LOG(debug) << "Unpausing";
}
shared_ptr<FairMQTransportFactory> FairMQDevice::AddTransport(const fair::mq::Transport transport)
{
auto i = fTransports.find(transport);
if (i == fTransports.end())
{
if (i == fTransports.end()) {
LOG(debug) << "Adding '" << fair::mq::TransportNames.at(transport) << "' transport";
auto tr = FairMQTransportFactory::CreateTransportFactory(fair::mq::TransportNames.at(transport), fId, fConfig);
LOG(debug) << "Adding '" << fair::mq::TransportNames.at(transport) << "' transport to the device.";
pair<fair::mq::Transport, shared_ptr<FairMQTransportFactory>> trPair(transport, tr);
fTransports.insert(trPair);
fTransports.insert({transport, tr});
return tr;
}
else
{
LOG(debug) << "Reusing existing '" << fair::mq::TransportNames.at(transport) << "' transport.";
} else {
LOG(debug) << "Reusing existing '" << fair::mq::TransportNames.at(transport) << "' transport";
return i->second;
}
}
@@ -724,7 +761,7 @@ void FairMQDevice::LogSocketRates()
LOG(debug) << "<channel>: in: <#msgs> (<MB>) out: <#msgs> (<MB>)";
while (fRateLogging)
while (!NewStatePending())
{
t1 = chrono::high_resolution_clock::now();
@@ -769,48 +806,30 @@ void FairMQDevice::LogSocketRates()
}
}
void FairMQDevice::Unblock()
void FairMQDevice::UnblockTransports()
{
for (auto& t : fTransports)
{
for (auto& t : fTransports) {
t.second->Interrupt();
}
{
lock_guard<mutex> guard(fInterruptedMtx);
fInterrupted = true;
fRateLogging = false;
}
fInterruptedCV.notify_all();
}
void FairMQDevice::ResetTaskWrapper()
{
CallStateChangeCallbacks(RESETTING_TASK);
ResetTask();
ChangeState(internal_DEVICE_READY);
}
void FairMQDevice::ResetTask()
{
ChangeState(fair::mq::Transition::Auto);
}
void FairMQDevice::ResetWrapper()
{
CallStateChangeCallbacks(RESETTING_DEVICE);
for (auto& t : fTransports)
{
for (auto& t : fTransports) {
t.second->Reset();
}
// iterate over the channels map
for (auto& mi : fChannels)
{
for (auto& mi : fChannels) {
// iterate over the channels vector
for (auto& vi : mi.second)
{
for (auto& vi : mi.second) {
// vi.fReset = true;
vi.fSocket.reset(); // destroy FairMQSocket
}
@@ -818,18 +837,12 @@ void FairMQDevice::ResetWrapper()
Reset();
ChangeState(internal_IDLE);
}
void FairMQDevice::Reset()
{
}
void FairMQDevice::Exit()
{
ChangeState(fair::mq::Transition::Auto);
}
FairMQDevice::~FairMQDevice()
{
LOG(debug) << "Destructing device " << fId;
UnsubscribeFromNewTransition("device");
fStateMachine.StopHandlingStates();
LOG(debug) << "Shutting down device " << fId;
}