diff --git a/fairmq/sdk/Topology.cxx b/fairmq/sdk/Topology.cxx index 2d557eef..8561a79f 100644 --- a/fairmq/sdk/Topology.cxx +++ b/fairmq/sdk/Topology.cxx @@ -98,7 +98,16 @@ Topology::Topology(DDSTopology topo, DDSSession session) LOG(error) << "state-changes-unsubscription failed with return code: " << parts[2]; } } else if (parts[1] == "could not queue") { - LOG(warn) << "Could not queue " << parts[2] << " transition on " << senderId; + std::unique_lock lock(fMtx); + + if (fStateChangeOngoing) { + if (fState.at(fDDSSession.GetTaskId(senderId)).state != fTargetState) { + fStateChangeError = + tools::ToString("Could not queue ", parts[2], " transition on ", senderId); + lock.unlock(); + fCV.notify_one(); + } + } } }); fDDSSession.StartDDSService(); @@ -124,13 +133,13 @@ auto Topology::ChangeState(TopologyTransition transition, ChangeStateCallback cb std::unique_lock lock(fMtx); if (fStateChangeOngoing) { throw std::runtime_error("A state change request is already in progress, concurrent requests are currently not supported"); - lock.unlock(); } LOG(debug) << "Initiating ChangeState with " << transition << " to " << expectedState.at(transition); fStateChangeOngoing = true; fChangeStateCallback = cb; fStateChangeTimeout = timeout; fTargetState = expectedState.at(transition); + fStateChangeError.clear(); fDDSSession.SendCommand(GetTransitionName(transition)); } @@ -157,12 +166,15 @@ void Topology::WaitForState() while (!fShutdown) { if (fStateChangeOngoing) { try { + std::unique_lock lock(fMtx); + auto condition = [&] { // LOG(info) << "checking condition"; // LOG(info) << "fShutdown: " << fShutdown; // LOG(info) << "condition: " << std::all_of(fState.cbegin(), fState.cend(), // [&](TopologyState::value_type i) { return i.second.state == fTargetState; }); return fShutdown + || !fStateChangeError.empty() || std::all_of( fState.cbegin(), fState.cend(), [&](TopologyState::value_type i) { // TODO Check, if we can make sure that EXITING state change event are not missed @@ -172,8 +184,6 @@ void Topology::WaitForState() }); }; - std::unique_lock lock(fMtx); - if (fStateChangeTimeout > std::chrono::milliseconds(0)) { if (!fCV.wait_for(lock, fStateChangeTimeout, condition)) { // LOG(debug) << "timeout"; @@ -189,6 +199,15 @@ void Topology::WaitForState() } fStateChangeOngoing = false; + + if (!fStateChangeError.empty()) { + TopologyState state = fState; + lock.unlock(); + fChangeStateCallback( + {{AsyncOpResultCode::Error, fStateChangeError}, std::move(state)}); + break; + } + if (fShutdown) { LOG(debug) << "Aborting because a shutdown was requested"; TopologyState state = fState; diff --git a/fairmq/sdk/Topology.h b/fairmq/sdk/Topology.h index 953ea695..6b3bf54b 100644 --- a/fairmq/sdk/Topology.h +++ b/fairmq/sdk/Topology.h @@ -150,6 +150,7 @@ class Topology ChangeStateCallback fChangeStateCallback; std::chrono::milliseconds fStateChangeTimeout; bool fShutdown; + std::string fStateChangeError; void WaitForState(); void AddNewStateEntry(uint64_t senderId, const std::string& state);