diff --git a/fairmq/sdk/Topology.h b/fairmq/sdk/Topology.h index ec99a3b1..504e50a0 100644 --- a/fairmq/sdk/Topology.h +++ b/fairmq/sdk/Topology.h @@ -34,6 +34,7 @@ #include #include +#include #include #include #include @@ -222,8 +223,15 @@ class BasicTopology : public AsioBase { using namespace fair::mq::sdk::cmd; fDDSSession.SendCommand(Cmds(make()).Serialize()); - // give dds a chance to complete request, TODO: track each individual task and its subscription status - std::this_thread::sleep_for(std::chrono::milliseconds(100)); + + // wait for all tasks to confirm unsubscription + std::unique_lock lk(fMtx); + fStateChangeUnsubscriptionCV.wait(lk, [&](){ + unsigned int count = std::count_if(fStateIndex.cbegin(), fStateIndex.cend(), [=](const auto& s) { + return fStateData.at(s.second).subscribed_to_state_changes == false; + }); + return count == fStateIndex.size(); + }); } void SubscribeToCommands() @@ -267,9 +275,9 @@ class BasicTopology : public AsioBase { if (cmd.GetResult() == cmd::Result::Ok) { DDSTask::Id taskId(cmd.GetTaskId()); - std::lock_guard lk(fMtx); try { + std::lock_guard lk(fMtx); DeviceStatus& task = fStateData.at(fStateIndex.at(taskId)); task.subscribed_to_state_changes = true; } catch (const std::exception& e) { @@ -284,11 +292,13 @@ class BasicTopology : public AsioBase { if (cmd.GetResult() == cmd::Result::Ok) { DDSTask::Id taskId(cmd.GetTaskId()); - std::lock_guard lk(fMtx); try { + std::unique_lock lk(fMtx); DeviceStatus& task = fStateData.at(fStateIndex.at(taskId)); task.subscribed_to_state_changes = false; + lk.unlock(); + fStateChangeUnsubscriptionCV.notify_one(); } catch (const std::exception& e) { FAIR_LOG(error) << "Exception in HandleCmd(cmd::StateChangeUnsubscription const&): " << e.what(); } @@ -306,12 +316,16 @@ class BasicTopology : public AsioBase } DDSTask::Id taskId(cmd.GetTaskId()); - std::lock_guard lk(fMtx); try { + std::lock_guard lk(fMtx); DeviceStatus& task = fStateData.at(fStateIndex.at(taskId)); task.lastState = cmd.GetLastState(); task.state = cmd.GetCurrentState(); + // if the task is exiting, it will not respond to unsubscription request anymore, set it to false now. + if (task.state == DeviceState::Exiting) { + task.subscribed_to_state_changes = false; + } // FAIR_LOG(debug) << "Updated state entry: taskId=" << taskId << ", state=" << state; for (auto& op : fChangeStateOps) { @@ -1187,6 +1201,7 @@ class BasicTopology : public AsioBase TopologyState fStateData; TopologyStateIndex fStateIndex; mutable std::mutex fMtx; + std::condition_variable fStateChangeUnsubscriptionCV; std::unordered_map fChangeStateOps; std::unordered_map fWaitForStateOps;