diff --git a/fairmq/FairMQDevice.cxx b/fairmq/FairMQDevice.cxx index f1b5c2db..384eccfd 100644 --- a/fairmq/FairMQDevice.cxx +++ b/fairmq/FairMQDevice.cxx @@ -223,7 +223,7 @@ void FairMQDevice::InitWrapper() AttachChannels(uninitializedConnectingChannels); } - CallAndHandleError(std::bind(&FairMQDevice::Init, this)); + Init(); ChangeState(internal_DEVICE_READY); } @@ -429,7 +429,7 @@ void FairMQDevice::InitTaskWrapper() { CallStateChangeCallbacks(INITIALIZING_TASK); - CallAndHandleError(std::bind(&FairMQDevice::InitTask, this)); + InitTask(); ChangeState(internal_READY); } @@ -504,46 +504,43 @@ void FairMQDevice::RunWrapper() t.second->Resume(); } - CallAndHandleError([this] + try { - try + PreRun(); + + // process either data callbacks or ConditionalRun/Run + if (fDataCallbacks) { - PreRun(); - - // process either data callbacks or ConditionalRun/Run - if (fDataCallbacks) + // if only one input channel, do lightweight handling without additional polling. + if (fInputChannelKeys.size() == 1 && fChannels.at(fInputChannelKeys.at(0)).size() == 1) { - // if only one input channel, do lightweight handling without additional polling. - if (fInputChannelKeys.size() == 1 && fChannels.at(fInputChannelKeys.at(0)).size() == 1) - { - HandleSingleChannelInput(); - } - else // otherwise do full handling with polling - { - HandleMultipleChannelInput(); - } + HandleSingleChannelInput(); } - else + else // otherwise do full handling with polling { - fair::mq::tools::RateLimiter rateLimiter(fRate); - - while (CheckCurrentState(RUNNING) && ConditionalRun()) - { - if (fRate > 0.001) - { - rateLimiter.maybe_sleep(); - } - } - - Run(); + HandleMultipleChannelInput(); } } - catch (const out_of_range& oor) + else { - LOG(error) << "out of range: " << oor.what(); - LOG(error) << "incorrect/incomplete channel configuration?"; + fair::mq::tools::RateLimiter rateLimiter(fRate); + + while (CheckCurrentState(RUNNING) && ConditionalRun()) + { + if (fRate > 0.001) + { + rateLimiter.maybe_sleep(); + } + } + + Run(); } - }); + } + catch (const out_of_range& oor) + { + LOG(error) << "out of range: " << oor.what(); + LOG(error) << "incorrect/incomplete channel configuration?"; + } // if Run() exited and the state is still RUNNING, transition to READY. if (CheckCurrentState(RUNNING)) @@ -551,7 +548,7 @@ void FairMQDevice::RunWrapper() ChangeState(internal_READY); } - CallAndHandleError(std::bind(&FairMQDevice::PostRun, this)); + PostRun(); rateLogger.get(); } @@ -774,7 +771,7 @@ void FairMQDevice::PauseWrapper() { CallStateChangeCallbacks(PAUSED); - CallAndHandleError(std::bind(&FairMQDevice::Pause, this)); + Pause(); } void FairMQDevice::Pause() @@ -940,7 +937,7 @@ void FairMQDevice::ResetTaskWrapper() { CallStateChangeCallbacks(RESETTING_TASK); - CallAndHandleError(std::bind(&FairMQDevice::ResetTask, this)); + ResetTask(); ChangeState(internal_DEVICE_READY); } @@ -953,7 +950,7 @@ void FairMQDevice::ResetWrapper() { CallStateChangeCallbacks(RESETTING_DEVICE); - CallAndHandleError(std::bind(&FairMQDevice::Reset, this)); + Reset(); ChangeState(internal_IDLE); } @@ -977,17 +974,6 @@ const FairMQChannel& FairMQDevice::GetChannel(const string& channelName, const i return fChannels.at(channelName).at(index); } -void FairMQDevice::CallAndHandleError(std::function callable) -try -{ - callable(); -} -catch(...) -{ - ChangeState(ERROR_FOUND); - throw; -} - void FairMQDevice::Exit() { } diff --git a/fairmq/FairMQStateMachine.cxx b/fairmq/FairMQStateMachine.cxx index 3cefbd48..4c43bf95 100644 --- a/fairmq/FairMQStateMachine.cxx +++ b/fairmq/FairMQStateMachine.cxx @@ -650,10 +650,21 @@ bool FairMQStateMachine::CheckCurrentState(const string& state) const } void FairMQStateMachine::ProcessWork() +try { static_pointer_cast(fFsm)->ProcessWork(); +} catch(...) { + { + lock_guard lock(static_pointer_cast(fFsm)->fWorkMutex); + static_pointer_cast(fFsm)->fWorkActive = false; + static_pointer_cast(fFsm)->fWorkAvailable = false; + static_pointer_cast(fFsm)->fWorkDoneCondition.notify_one(); + } + ChangeState(ERROR_FOUND); + throw; } + int FairMQStateMachine::GetEventNumber(const string& event) { return eventNumbers.at(event); diff --git a/fairmq/devices/FairMQBenchmarkSampler.cxx b/fairmq/devices/FairMQBenchmarkSampler.cxx index badfd076..8e31025f 100644 --- a/fairmq/devices/FairMQBenchmarkSampler.cxx +++ b/fairmq/devices/FairMQBenchmarkSampler.cxx @@ -88,7 +88,6 @@ void FairMQBenchmarkSampler::Run() } } - if (fMsgRate > 0) { rateLimiter.maybe_sleep(); diff --git a/fairmq/plugins/Control.cxx b/fairmq/plugins/Control.cxx index 1cabc920..4990d57d 100644 --- a/fairmq/plugins/Control.cxx +++ b/fairmq/plugins/Control.cxx @@ -339,7 +339,7 @@ auto Control::RunShutdownSequence() -> void { auto nextState = GetCurrentDeviceState(); EmptyEventQueue(); - while (nextState != DeviceState::Exiting) + while (nextState != DeviceState::Exiting && nextState != DeviceState::Error) { switch (nextState) { @@ -359,7 +359,7 @@ auto Control::RunShutdownSequence() -> void ChangeDeviceState(DeviceStateTransition::Resume); break; default: - // ignore other states + LOG(debug) << "Controller ignoring event: " << nextState; break; } diff --git a/test/device/_exceptions.cxx b/test/device/_exceptions.cxx index 93607ca4..6bf8a717 100644 --- a/test/device/_exceptions.cxx +++ b/test/device/_exceptions.cxx @@ -45,7 +45,10 @@ void RunExceptionIn(const std::string& state) exit(result.exit_code); } -TEST(Exceptions, InInit) { EXPECT_EXIT(RunExceptionIn("Init"), ::testing::ExitedWithCode(1), ""); } +TEST(Exceptions, InInit) +{ + EXPECT_EXIT(RunExceptionIn("Init"), ::testing::ExitedWithCode(1), ""); +} TEST(Exceptions, InInitTask) { EXPECT_EXIT(RunExceptionIn("InitTask"), ::testing::ExitedWithCode(1), ""); @@ -54,7 +57,10 @@ TEST(Exceptions, InPreRun) { EXPECT_EXIT(RunExceptionIn("PreRun"), ::testing::ExitedWithCode(1), ""); } -TEST(Exceptions, InRun) { EXPECT_EXIT(RunExceptionIn("Run"), ::testing::ExitedWithCode(1), ""); } +TEST(Exceptions, InRun) +{ + EXPECT_EXIT(RunExceptionIn("Run"), ::testing::ExitedWithCode(1), ""); +} TEST(Exceptions, InPostRun) { EXPECT_EXIT(RunExceptionIn("PostRun"), ::testing::ExitedWithCode(1), "");