mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-15 17:41:45 +00:00
Commands: Add task id to subscription status cmds
This commit is contained in:
committed by
Dennis Klein
parent
c5efd3e4a6
commit
274ba5ec00
@@ -71,7 +71,7 @@ const std::map<DeviceTransition, DeviceState> expectedState =
|
||||
|
||||
struct DeviceStatus
|
||||
{
|
||||
bool initialized;
|
||||
bool subscribed_to_state_changes;
|
||||
DeviceState lastState;
|
||||
DeviceState state;
|
||||
DDSTask::Id taskId;
|
||||
@@ -265,15 +265,35 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
||||
|
||||
auto HandleCmd(cmd::StateChangeSubscription const& cmd) -> void
|
||||
{
|
||||
if (cmd.GetResult() != cmd::Result::Ok) {
|
||||
FAIR_LOG(error) << "State change subscription failed for " << cmd.GetDeviceId();
|
||||
if (cmd.GetResult() == cmd::Result::Ok) {
|
||||
DDSTask::Id taskId(cmd.GetTaskId());
|
||||
std::lock_guard<std::mutex> lk(fMtx);
|
||||
|
||||
try {
|
||||
DeviceStatus& task = fStateData.at(fStateIndex.at(taskId));
|
||||
task.subscribed_to_state_changes = true;
|
||||
} catch (const std::exception& e) {
|
||||
FAIR_LOG(error) << "Exception in HandleCmd(cmd::StateChangeSubscription const&): " << e.what();
|
||||
}
|
||||
} else {
|
||||
FAIR_LOG(error) << "State change subscription failed for device: " << cmd.GetDeviceId() << ", task id: " << cmd.GetTaskId();
|
||||
}
|
||||
}
|
||||
|
||||
auto HandleCmd(cmd::StateChangeUnsubscription const& cmd) -> void
|
||||
{
|
||||
if (cmd.GetResult() != cmd::Result::Ok) {
|
||||
FAIR_LOG(error) << "State change unsubscription failed for " << cmd.GetDeviceId();
|
||||
if (cmd.GetResult() == cmd::Result::Ok) {
|
||||
DDSTask::Id taskId(cmd.GetTaskId());
|
||||
std::lock_guard<std::mutex> lk(fMtx);
|
||||
|
||||
try {
|
||||
DeviceStatus& task = fStateData.at(fStateIndex.at(taskId));
|
||||
task.subscribed_to_state_changes = false;
|
||||
} catch (const std::exception& e) {
|
||||
FAIR_LOG(error) << "Exception in HandleCmd(cmd::StateChangeUnsubscription const&): " << e.what();
|
||||
}
|
||||
} else {
|
||||
FAIR_LOG(error) << "State change unsubscription failed for device: " << cmd.GetDeviceId() << ", task id: " << cmd.GetTaskId();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -290,7 +310,6 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
||||
|
||||
try {
|
||||
DeviceStatus& task = fStateData.at(fStateIndex.at(taskId));
|
||||
task.initialized = true;
|
||||
task.lastState = cmd.GetLastState();
|
||||
task.state = cmd.GetCurrentState();
|
||||
// FAIR_LOG(debug) << "Updated state entry: taskId=" << taskId << ", state=" << state;
|
||||
@@ -310,11 +329,11 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
||||
{
|
||||
if (cmd.GetResult() != cmd::Result::Ok) {
|
||||
FAIR_LOG(error) << cmd.GetTransition() << " transition failed for " << cmd.GetDeviceId();
|
||||
DDSTask::Id id(cmd.GetTaskId());
|
||||
DDSTask::Id taskId(cmd.GetTaskId());
|
||||
std::lock_guard<std::mutex> lk(fMtx);
|
||||
for (auto& op : fChangeStateOps) {
|
||||
if (!op.second.IsCompleted() && op.second.ContainsTask(id) &&
|
||||
fStateData.at(fStateIndex.at(id)).state != op.second.GetTargetState()) {
|
||||
if (!op.second.IsCompleted() && op.second.ContainsTask(taskId) &&
|
||||
fStateData.at(fStateIndex.at(taskId)).state != op.second.GetTargetState()) {
|
||||
op.second.Complete(MakeErrorCode(ErrorCode::DeviceChangeStateFailed));
|
||||
}
|
||||
}
|
||||
|
@@ -293,6 +293,7 @@ string Cmds::Serialize(const Format type) const
|
||||
auto deviceId = fbb.CreateString(_cmd.GetDeviceId());
|
||||
cmdBuilder = tools::make_unique<FBCommandBuilder>(fbb);
|
||||
cmdBuilder->add_device_id(deviceId);
|
||||
cmdBuilder->add_task_id(_cmd.GetTaskId());
|
||||
cmdBuilder->add_result(GetFBResult(_cmd.GetResult()));
|
||||
}
|
||||
break;
|
||||
@@ -301,6 +302,7 @@ string Cmds::Serialize(const Format type) const
|
||||
auto deviceId = fbb.CreateString(_cmd.GetDeviceId());
|
||||
cmdBuilder = tools::make_unique<FBCommandBuilder>(fbb);
|
||||
cmdBuilder->add_device_id(deviceId);
|
||||
cmdBuilder->add_task_id(_cmd.GetTaskId());
|
||||
cmdBuilder->add_result(GetFBResult(_cmd.GetResult()));
|
||||
}
|
||||
break;
|
||||
@@ -433,10 +435,10 @@ void Cmds::Deserialize(const string& str, const Format type)
|
||||
fCmds.emplace_back(make<Config>(cmdPtr.device_id()->str(), cmdPtr.config_string()->str()));
|
||||
break;
|
||||
case FBCmd_state_change_subscription:
|
||||
fCmds.emplace_back(make<StateChangeSubscription>(cmdPtr.device_id()->str(), GetResult(cmdPtr.result())));
|
||||
fCmds.emplace_back(make<StateChangeSubscription>(cmdPtr.device_id()->str(), cmdPtr.task_id(), GetResult(cmdPtr.result())));
|
||||
break;
|
||||
case FBCmd_state_change_unsubscription:
|
||||
fCmds.emplace_back(make<StateChangeUnsubscription>(cmdPtr.device_id()->str(), GetResult(cmdPtr.result())));
|
||||
fCmds.emplace_back(make<StateChangeUnsubscription>(cmdPtr.device_id()->str(), cmdPtr.task_id(), GetResult(cmdPtr.result())));
|
||||
break;
|
||||
case FBCmd_state_change:
|
||||
fCmds.emplace_back(make<StateChange>(cmdPtr.device_id()->str(), cmdPtr.task_id(), GetMQState(cmdPtr.last_state()), GetMQState(cmdPtr.current_state())));
|
||||
|
@@ -51,8 +51,8 @@ enum class Type : int
|
||||
current_state, // args: { device_id, current_state }
|
||||
transition_status, // args: { device_id, task_id, Result, transition }
|
||||
config, // args: { device_id, config_string }
|
||||
state_change_subscription, // args: { device_id, Result }
|
||||
state_change_unsubscription, // args: { device_id, Result }
|
||||
state_change_subscription, // args: { device_id, task_id, Result }
|
||||
state_change_unsubscription, // args: { device_id, task_id, Result }
|
||||
state_change, // args: { device_id, task_id, last_state, current_state }
|
||||
properties, // args: { device_id, request_id, Result, properties }
|
||||
properties_set // args: { device_id, request_id, Result }
|
||||
@@ -208,37 +208,45 @@ struct Config : Cmd
|
||||
|
||||
struct StateChangeSubscription : Cmd
|
||||
{
|
||||
explicit StateChangeSubscription(const std::string& id, const Result result)
|
||||
explicit StateChangeSubscription(const std::string& id, const uint64_t taskId, const Result result)
|
||||
: Cmd(Type::state_change_subscription)
|
||||
, fDeviceId(id)
|
||||
, fTaskId(taskId)
|
||||
, fResult(result)
|
||||
{}
|
||||
|
||||
std::string GetDeviceId() const { return fDeviceId; }
|
||||
void SetDeviceId(const std::string& deviceId) { fDeviceId = deviceId; }
|
||||
uint64_t GetTaskId() const { return fTaskId; }
|
||||
void SetTaskId(const uint64_t taskId) { fTaskId = taskId; }
|
||||
Result GetResult() const { return fResult; }
|
||||
void SetResult(const Result result) { fResult = result; }
|
||||
|
||||
private:
|
||||
std::string fDeviceId;
|
||||
uint64_t fTaskId;
|
||||
Result fResult;
|
||||
};
|
||||
|
||||
struct StateChangeUnsubscription : Cmd
|
||||
{
|
||||
explicit StateChangeUnsubscription(const std::string& id, const Result result)
|
||||
explicit StateChangeUnsubscription(const std::string& id, const uint64_t taskId, const Result result)
|
||||
: Cmd(Type::state_change_unsubscription)
|
||||
, fDeviceId(id)
|
||||
, fTaskId(taskId)
|
||||
, fResult(result)
|
||||
{}
|
||||
|
||||
std::string GetDeviceId() const { return fDeviceId; }
|
||||
void SetDeviceId(const std::string& deviceId) { fDeviceId = deviceId; }
|
||||
uint64_t GetTaskId() const { return fTaskId; }
|
||||
void SetTaskId(const uint64_t taskId) { fTaskId = taskId; }
|
||||
Result GetResult() const { return fResult; }
|
||||
void SetResult(const Result result) { fResult = result; }
|
||||
|
||||
private:
|
||||
std::string fDeviceId;
|
||||
uint64_t fTaskId;
|
||||
Result fResult;
|
||||
};
|
||||
|
||||
|
@@ -54,10 +54,10 @@ enum FBCmd:byte {
|
||||
set_properties, // args: { request_id, properties }
|
||||
|
||||
current_state, // args: { device_id, current_state }
|
||||
transition_status, // args: { device_id, Result, transition }
|
||||
transition_status, // args: { device_id, task_id, Result, transition }
|
||||
config, // args: { device_id, config_string }
|
||||
state_change_subscription, // args: { device_id, Result }
|
||||
state_change_unsubscription, // args: { device_id, Result }
|
||||
state_change_subscription, // args: { device_id, task_id, Result }
|
||||
state_change_unsubscription, // args: { device_id, task_id, Result }
|
||||
state_change, // args: { device_id, task_id, last_state, current_state }
|
||||
properties, // args: { device_id, request_id, Result, properties }
|
||||
properties_set // args: { device_id, request_id, Result }
|
||||
|
Reference in New Issue
Block a user