mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-14 17:16:47 +00:00
SDK: track state change (un-)subscriptions
This commit is contained in:
parent
274ba5ec00
commit
036561ab38
|
@ -34,6 +34,7 @@
|
||||||
|
|
||||||
#include <algorithm>
|
#include <algorithm>
|
||||||
#include <chrono>
|
#include <chrono>
|
||||||
|
#include <condition_variable>
|
||||||
#include <functional>
|
#include <functional>
|
||||||
#include <map>
|
#include <map>
|
||||||
#include <memory>
|
#include <memory>
|
||||||
|
@ -222,8 +223,15 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
||||||
{
|
{
|
||||||
using namespace fair::mq::sdk::cmd;
|
using namespace fair::mq::sdk::cmd;
|
||||||
fDDSSession.SendCommand(Cmds(make<UnsubscribeFromStateChange>()).Serialize());
|
fDDSSession.SendCommand(Cmds(make<UnsubscribeFromStateChange>()).Serialize());
|
||||||
// give dds a chance to complete request, TODO: track each individual task and its subscription status
|
|
||||||
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
// wait for all tasks to confirm unsubscription
|
||||||
|
std::unique_lock<std::mutex> lk(fMtx);
|
||||||
|
fStateChangeUnsubscriptionCV.wait(lk, [&](){
|
||||||
|
unsigned int count = std::count_if(fStateIndex.cbegin(), fStateIndex.cend(), [=](const auto& s) {
|
||||||
|
return fStateData.at(s.second).subscribed_to_state_changes == false;
|
||||||
|
});
|
||||||
|
return count == fStateIndex.size();
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
void SubscribeToCommands()
|
void SubscribeToCommands()
|
||||||
|
@ -267,9 +275,9 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
||||||
{
|
{
|
||||||
if (cmd.GetResult() == cmd::Result::Ok) {
|
if (cmd.GetResult() == cmd::Result::Ok) {
|
||||||
DDSTask::Id taskId(cmd.GetTaskId());
|
DDSTask::Id taskId(cmd.GetTaskId());
|
||||||
std::lock_guard<std::mutex> lk(fMtx);
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
std::lock_guard<std::mutex> lk(fMtx);
|
||||||
DeviceStatus& task = fStateData.at(fStateIndex.at(taskId));
|
DeviceStatus& task = fStateData.at(fStateIndex.at(taskId));
|
||||||
task.subscribed_to_state_changes = true;
|
task.subscribed_to_state_changes = true;
|
||||||
} catch (const std::exception& e) {
|
} catch (const std::exception& e) {
|
||||||
|
@ -284,11 +292,13 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
||||||
{
|
{
|
||||||
if (cmd.GetResult() == cmd::Result::Ok) {
|
if (cmd.GetResult() == cmd::Result::Ok) {
|
||||||
DDSTask::Id taskId(cmd.GetTaskId());
|
DDSTask::Id taskId(cmd.GetTaskId());
|
||||||
std::lock_guard<std::mutex> lk(fMtx);
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
std::unique_lock<std::mutex> lk(fMtx);
|
||||||
DeviceStatus& task = fStateData.at(fStateIndex.at(taskId));
|
DeviceStatus& task = fStateData.at(fStateIndex.at(taskId));
|
||||||
task.subscribed_to_state_changes = false;
|
task.subscribed_to_state_changes = false;
|
||||||
|
lk.unlock();
|
||||||
|
fStateChangeUnsubscriptionCV.notify_one();
|
||||||
} catch (const std::exception& e) {
|
} catch (const std::exception& e) {
|
||||||
FAIR_LOG(error) << "Exception in HandleCmd(cmd::StateChangeUnsubscription const&): " << e.what();
|
FAIR_LOG(error) << "Exception in HandleCmd(cmd::StateChangeUnsubscription const&): " << e.what();
|
||||||
}
|
}
|
||||||
|
@ -306,12 +316,16 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
||||||
}
|
}
|
||||||
|
|
||||||
DDSTask::Id taskId(cmd.GetTaskId());
|
DDSTask::Id taskId(cmd.GetTaskId());
|
||||||
std::lock_guard<std::mutex> lk(fMtx);
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
std::lock_guard<std::mutex> lk(fMtx);
|
||||||
DeviceStatus& task = fStateData.at(fStateIndex.at(taskId));
|
DeviceStatus& task = fStateData.at(fStateIndex.at(taskId));
|
||||||
task.lastState = cmd.GetLastState();
|
task.lastState = cmd.GetLastState();
|
||||||
task.state = cmd.GetCurrentState();
|
task.state = cmd.GetCurrentState();
|
||||||
|
// if the task is exiting, it will not respond to unsubscription request anymore, set it to false now.
|
||||||
|
if (task.state == DeviceState::Exiting) {
|
||||||
|
task.subscribed_to_state_changes = false;
|
||||||
|
}
|
||||||
// FAIR_LOG(debug) << "Updated state entry: taskId=" << taskId << ", state=" << state;
|
// FAIR_LOG(debug) << "Updated state entry: taskId=" << taskId << ", state=" << state;
|
||||||
|
|
||||||
for (auto& op : fChangeStateOps) {
|
for (auto& op : fChangeStateOps) {
|
||||||
|
@ -1187,6 +1201,7 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
||||||
TopologyState fStateData;
|
TopologyState fStateData;
|
||||||
TopologyStateIndex fStateIndex;
|
TopologyStateIndex fStateIndex;
|
||||||
mutable std::mutex fMtx;
|
mutable std::mutex fMtx;
|
||||||
|
std::condition_variable fStateChangeUnsubscriptionCV;
|
||||||
|
|
||||||
std::unordered_map<typename ChangeStateOp::Id, ChangeStateOp> fChangeStateOps;
|
std::unordered_map<typename ChangeStateOp::Id, ChangeStateOp> fChangeStateOps;
|
||||||
std::unordered_map<typename WaitForStateOp::Id, WaitForStateOp> fWaitForStateOps;
|
std::unordered_map<typename WaitForStateOp::Id, WaitForStateOp> fWaitForStateOps;
|
||||||
|
|
Loading…
Reference in New Issue
Block a user