mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-13 08:41:16 +00:00
SDK: Improve error handling in case state-change fails on a device
Replace the log message with 1. Nothing, if the device is already in the target state 2. Abort and call the completion callback with error otherwise
This commit is contained in:
parent
fd282fa950
commit
7b773cde51
|
@ -98,7 +98,16 @@ Topology::Topology(DDSTopology topo, DDSSession session)
|
||||||
LOG(error) << "state-changes-unsubscription failed with return code: " << parts[2];
|
LOG(error) << "state-changes-unsubscription failed with return code: " << parts[2];
|
||||||
}
|
}
|
||||||
} else if (parts[1] == "could not queue") {
|
} else if (parts[1] == "could not queue") {
|
||||||
LOG(warn) << "Could not queue " << parts[2] << " transition on " << senderId;
|
std::unique_lock<std::mutex> lock(fMtx);
|
||||||
|
|
||||||
|
if (fStateChangeOngoing) {
|
||||||
|
if (fState.at(fDDSSession.GetTaskId(senderId)).state != fTargetState) {
|
||||||
|
fStateChangeError =
|
||||||
|
tools::ToString("Could not queue ", parts[2], " transition on ", senderId);
|
||||||
|
lock.unlock();
|
||||||
|
fCV.notify_one();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
fDDSSession.StartDDSService();
|
fDDSSession.StartDDSService();
|
||||||
|
@ -124,13 +133,13 @@ auto Topology::ChangeState(TopologyTransition transition, ChangeStateCallback cb
|
||||||
std::unique_lock<std::mutex> lock(fMtx);
|
std::unique_lock<std::mutex> lock(fMtx);
|
||||||
if (fStateChangeOngoing) {
|
if (fStateChangeOngoing) {
|
||||||
throw std::runtime_error("A state change request is already in progress, concurrent requests are currently not supported");
|
throw std::runtime_error("A state change request is already in progress, concurrent requests are currently not supported");
|
||||||
lock.unlock();
|
|
||||||
}
|
}
|
||||||
LOG(debug) << "Initiating ChangeState with " << transition << " to " << expectedState.at(transition);
|
LOG(debug) << "Initiating ChangeState with " << transition << " to " << expectedState.at(transition);
|
||||||
fStateChangeOngoing = true;
|
fStateChangeOngoing = true;
|
||||||
fChangeStateCallback = cb;
|
fChangeStateCallback = cb;
|
||||||
fStateChangeTimeout = timeout;
|
fStateChangeTimeout = timeout;
|
||||||
fTargetState = expectedState.at(transition);
|
fTargetState = expectedState.at(transition);
|
||||||
|
fStateChangeError.clear();
|
||||||
|
|
||||||
fDDSSession.SendCommand(GetTransitionName(transition));
|
fDDSSession.SendCommand(GetTransitionName(transition));
|
||||||
}
|
}
|
||||||
|
@ -157,12 +166,15 @@ void Topology::WaitForState()
|
||||||
while (!fShutdown) {
|
while (!fShutdown) {
|
||||||
if (fStateChangeOngoing) {
|
if (fStateChangeOngoing) {
|
||||||
try {
|
try {
|
||||||
|
std::unique_lock<std::mutex> lock(fMtx);
|
||||||
|
|
||||||
auto condition = [&] {
|
auto condition = [&] {
|
||||||
// LOG(info) << "checking condition";
|
// LOG(info) << "checking condition";
|
||||||
// LOG(info) << "fShutdown: " << fShutdown;
|
// LOG(info) << "fShutdown: " << fShutdown;
|
||||||
// LOG(info) << "condition: " << std::all_of(fState.cbegin(), fState.cend(),
|
// LOG(info) << "condition: " << std::all_of(fState.cbegin(), fState.cend(),
|
||||||
// [&](TopologyState::value_type i) { return i.second.state == fTargetState; });
|
// [&](TopologyState::value_type i) { return i.second.state == fTargetState; });
|
||||||
return fShutdown
|
return fShutdown
|
||||||
|
|| !fStateChangeError.empty()
|
||||||
|| std::all_of(
|
|| std::all_of(
|
||||||
fState.cbegin(), fState.cend(), [&](TopologyState::value_type i) {
|
fState.cbegin(), fState.cend(), [&](TopologyState::value_type i) {
|
||||||
// TODO Check, if we can make sure that EXITING state change event are not missed
|
// TODO Check, if we can make sure that EXITING state change event are not missed
|
||||||
|
@ -172,8 +184,6 @@ void Topology::WaitForState()
|
||||||
});
|
});
|
||||||
};
|
};
|
||||||
|
|
||||||
std::unique_lock<std::mutex> lock(fMtx);
|
|
||||||
|
|
||||||
if (fStateChangeTimeout > std::chrono::milliseconds(0)) {
|
if (fStateChangeTimeout > std::chrono::milliseconds(0)) {
|
||||||
if (!fCV.wait_for(lock, fStateChangeTimeout, condition)) {
|
if (!fCV.wait_for(lock, fStateChangeTimeout, condition)) {
|
||||||
// LOG(debug) << "timeout";
|
// LOG(debug) << "timeout";
|
||||||
|
@ -189,6 +199,15 @@ void Topology::WaitForState()
|
||||||
}
|
}
|
||||||
|
|
||||||
fStateChangeOngoing = false;
|
fStateChangeOngoing = false;
|
||||||
|
|
||||||
|
if (!fStateChangeError.empty()) {
|
||||||
|
TopologyState state = fState;
|
||||||
|
lock.unlock();
|
||||||
|
fChangeStateCallback(
|
||||||
|
{{AsyncOpResultCode::Error, fStateChangeError}, std::move(state)});
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
if (fShutdown) {
|
if (fShutdown) {
|
||||||
LOG(debug) << "Aborting because a shutdown was requested";
|
LOG(debug) << "Aborting because a shutdown was requested";
|
||||||
TopologyState state = fState;
|
TopologyState state = fState;
|
||||||
|
|
|
@ -150,6 +150,7 @@ class Topology
|
||||||
ChangeStateCallback fChangeStateCallback;
|
ChangeStateCallback fChangeStateCallback;
|
||||||
std::chrono::milliseconds fStateChangeTimeout;
|
std::chrono::milliseconds fStateChangeTimeout;
|
||||||
bool fShutdown;
|
bool fShutdown;
|
||||||
|
std::string fStateChangeError;
|
||||||
|
|
||||||
void WaitForState();
|
void WaitForState();
|
||||||
void AddNewStateEntry(uint64_t senderId, const std::string& state);
|
void AddNewStateEntry(uint64_t senderId, const std::string& state);
|
||||||
|
|
Loading…
Reference in New Issue
Block a user