Resume/Interrupt transports consistently

- Resume transports before state callbacks & handlers
 - Interrupt transports on new transitions
This commit is contained in:
Alexey Rybalchenko 2023-02-22 14:01:23 +01:00
parent efb659f0ac
commit 9093ed82dc
3 changed files with 28 additions and 11 deletions

View File

@ -92,14 +92,12 @@ Device::Device(ProgOptions* config, tools::Version version)
{ {
SubscribeToNewTransition("device", [&](Transition transition) { SubscribeToNewTransition("device", [&](Transition transition) {
LOG(trace) << "device notified on new transition: " << transition; LOG(trace) << "device notified on new transition: " << transition;
switch (transition) {
case Transition::Stop:
InterruptTransports(); InterruptTransports();
break; });
default:
break; fStateMachine.PrepareState([&](State state) {
} LOG(trace) << "Resuming transports for " << state << " state";
ResumeTransports();
}); });
fStateMachine.HandleStates([&](State state) { fStateMachine.HandleStates([&](State state) {
@ -462,9 +460,6 @@ void Device::RunWrapper()
if (rateLogging && rateLogger->joinable()) { rateLogger->join(); } if (rateLogging && rateLogger->joinable()) { rateLogger->join(); }
}); });
// notify transports to resume transfers
ResumeTransports();
// change to Error state in case of an exception, to release LogSocketRates // change to Error state in case of an exception, to release LogSocketRates
tools::CallOnDestruction cod([&](){ tools::CallOnDestruction cod([&](){
ChangeState(Transition::ErrorFound); ChangeState(Transition::ErrorFound);
@ -494,7 +489,6 @@ void Device::RunWrapper()
// if Run() exited and the state is still RUNNING, transition to READY. // if Run() exited and the state is still RUNNING, transition to READY.
if (!NewStatePending()) { if (!NewStatePending()) {
InterruptTransports();
ChangeState(Transition::Stop); ChangeState(Transition::Stop);
} }

View File

@ -157,6 +157,13 @@ struct Machine_ : public state_machine_def<Machine_>
} }
} }
void CallStatePrep(const State state) const
{
if (!fStatePrepSignal.empty()) {
fStatePrepSignal(state);
}
}
void CallNewTransitionCallbacks(const Transition transition) const void CallNewTransitionCallbacks(const Transition transition) const
{ {
if (!fNewTransitionSignal.empty()) { if (!fNewTransitionSignal.empty()) {
@ -175,6 +182,7 @@ struct Machine_ : public state_machine_def<Machine_>
boost::signals2::signal<void(const State)> fStateChangeSignal; boost::signals2::signal<void(const State)> fStateChangeSignal;
boost::signals2::signal<void(const State)> fStateHandleSignal; boost::signals2::signal<void(const State)> fStateHandleSignal;
boost::signals2::signal<void(const State)> fStatePrepSignal;
boost::signals2::signal<void(const Transition)> fNewTransitionSignal; boost::signals2::signal<void(const Transition)> fNewTransitionSignal;
unordered_map<string, boost::signals2::connection> fStateChangeSignalsMap; unordered_map<string, boost::signals2::connection> fStateChangeSignalsMap;
unordered_map<string, boost::signals2::connection> fNewTransitionSignalsMap; unordered_map<string, boost::signals2::connection> fNewTransitionSignalsMap;
@ -198,6 +206,7 @@ struct Machine_ : public state_machine_def<Machine_>
} }
} }
CallStatePrep(fState);
CallStateChangeCallbacks(fState); CallStateChangeCallbacks(fState);
CallStateHandler(fState); CallStateHandler(fState);
} }
@ -313,6 +322,16 @@ void StateMachine::UnsubscribeFromStateChange(const string& key)
} }
} }
void StateMachine::PrepareState(std::function<void(const State)> callback)
{
auto fsm = static_pointer_cast<FairMQFSM>(fFsm);
if (fsm->fStatePrepSignal.empty()) {
fsm->fStatePrepSignal.connect(callback);
} else {
LOG(error) << "state preparation handler is already set";
}
}
void StateMachine::HandleStates(function<void(const State)> callback) void StateMachine::HandleStates(function<void(const State)> callback)
{ {
auto fsm = static_pointer_cast<FairMQFSM>(fFsm); auto fsm = static_pointer_cast<FairMQFSM>(fFsm);
@ -326,6 +345,9 @@ void StateMachine::HandleStates(function<void(const State)> callback)
void StateMachine::StopHandlingStates() void StateMachine::StopHandlingStates()
{ {
auto fsm = static_pointer_cast<FairMQFSM>(fFsm); auto fsm = static_pointer_cast<FairMQFSM>(fFsm);
if (!fsm->fStatePrepSignal.empty()) {
fsm->fStatePrepSignal.disconnect_all_slots();
}
if (!fsm->fStateHandleSignal.empty()) { if (!fsm->fStateHandleSignal.empty()) {
fsm->fStateHandleSignal.disconnect_all_slots(); fsm->fStateHandleSignal.disconnect_all_slots();
} }

View File

@ -35,6 +35,7 @@ class StateMachine
void SubscribeToStateChange(const std::string& key, std::function<void(const State)> callback); void SubscribeToStateChange(const std::string& key, std::function<void(const State)> callback);
void UnsubscribeFromStateChange(const std::string& key); void UnsubscribeFromStateChange(const std::string& key);
void PrepareState(std::function<void(const State)> callback);
void HandleStates(std::function<void(const State)> callback); void HandleStates(std::function<void(const State)> callback);
void StopHandlingStates(); void StopHandlingStates();