diff --git a/fairmq/plugins/DDS/DDS.cxx b/fairmq/plugins/DDS/DDS.cxx index 5c177d74..4e120e13 100644 --- a/fairmq/plugins/DDS/DDS.cxx +++ b/fairmq/plugins/DDS/DDS.cxx @@ -347,11 +347,11 @@ auto DDS::SubscribeForCustomCommands() -> void Transition transition = static_cast(*cmd).GetTransition(); if (ChangeDeviceState(transition)) { cmd::Cmds outCmds( - cmd::make(id, cmd::Result::Ok, transition)); + cmd::make(id, dds::env_prop(), cmd::Result::Ok, transition)); fDDS.Send(outCmds.Serialize(), to_string(senderId)); } else { sdk::cmd::Cmds outCmds( - cmd::make(id, cmd::Result::Failure, transition)); + cmd::make(id, dds::env_prop(), cmd::Result::Failure, transition)); fDDS.Send(outCmds.Serialize(), to_string(senderId)); } { diff --git a/fairmq/sdk/DDSSession.cxx b/fairmq/sdk/DDSSession.cxx index c615e4db..bf6a88df 100644 --- a/fairmq/sdk/DDSSession.cxx +++ b/fairmq/sdk/DDSSession.cxx @@ -135,8 +135,6 @@ struct DDSSession::Impl dds::intercom_api::CCustomCmd fDDSCustomCmd; Id fId; bool fStopOnDestruction; - mutable std::mutex fMtx; - std::unordered_map fTaskIdByChannelIdMap; }; DDSSession::DDSSession(DDSEnvironment env) @@ -362,18 +360,6 @@ void DDSSession::SendCommand(const std::string& cmd, DDSChannel::Id recipient) fImpl->fDDSCustomCmd.send(cmd, std::to_string(recipient)); } -auto DDSSession::UpdateChannelToTaskAssociation(DDSChannel::Id channelId, DDSTask::Id taskId) -> void -{ - std::lock_guard lk(fImpl->fMtx); - fImpl->fTaskIdByChannelIdMap[channelId] = taskId; -} - -auto DDSSession::GetTaskId(DDSChannel::Id channelId) const -> DDSTask::Id -{ - std::lock_guard lk(fImpl->fMtx); - return fImpl->fTaskIdByChannelIdMap.at(channelId); -} - auto operator<<(std::ostream& os, const DDSSession& session) -> std::ostream& { return os << "$DDS_SESSION_ID: " << session.GetId(); diff --git a/fairmq/sdk/DDSSession.h b/fairmq/sdk/DDSSession.h index a32abfdb..d1ee9486 100644 --- a/fairmq/sdk/DDSSession.h +++ b/fairmq/sdk/DDSSession.h @@ -103,7 +103,6 @@ class DDSSession void UnsubscribeFromCommands(); void SendCommand(const std::string&, const std::string& = ""); void SendCommand(const std::string&, DDSChannel::Id); - auto UpdateChannelToTaskAssociation(DDSChannel::Id, DDSTask::Id) -> void; auto GetTaskId(DDSChannel::Id) const -> DDSTask::Id; friend auto operator<<(std::ostream& os, const DDSSession& session) -> std::ostream&; diff --git a/fairmq/sdk/Topology.h b/fairmq/sdk/Topology.h index 76fc1616..180d2a7c 100644 --- a/fairmq/sdk/Topology.h +++ b/fairmq/sdk/Topology.h @@ -193,7 +193,6 @@ class BasicTopology : public AsioBase case Type::state_change: { auto _cmd = static_cast(*cmd); DDSTask::Id taskId(_cmd.GetTaskId()); - fDDSSession.UpdateChannelToTaskAssociation(senderId, taskId); if (_cmd.GetCurrentState() == DeviceState::Exiting) { Cmds outCmds; outCmds.Add(); @@ -212,10 +211,11 @@ class BasicTopology : public AsioBase } break; case Type::transition_status: { - if (static_cast(*cmd).GetResult() != Result::Ok) { - LOG(error) << "Transition failed for " << static_cast(*cmd).GetDeviceId(); + auto _cmd = static_cast(*cmd); + if (_cmd.GetResult() != Result::Ok) { + LOG(error) << "Transition failed for " << _cmd.GetDeviceId(); std::lock_guard lk(fMtx); - if (!fChangeStateOp.IsCompleted() && fStateData.at(fStateIndex.at(fDDSSession.GetTaskId(senderId))).state != fChangeStateTarget) { + if (!fChangeStateOp.IsCompleted() && fStateData.at(fStateIndex.at(_cmd.GetTaskId())).state != fChangeStateTarget) { fChangeStateOpTimer.cancel(); fChangeStateOp.Complete(MakeErrorCode(ErrorCode::DeviceChangeStateFailed), fStateData); } diff --git a/fairmq/sdk/commands/Commands.cxx b/fairmq/sdk/commands/Commands.cxx index ede10ad4..903375da 100644 --- a/fairmq/sdk/commands/Commands.cxx +++ b/fairmq/sdk/commands/Commands.cxx @@ -296,6 +296,7 @@ string Cmds::Serialize(const Format type) const auto deviceId = fbb.CreateString(_cmd.GetDeviceId()); cmdBuilder = tools::make_unique(fbb); cmdBuilder->add_device_id(deviceId); + cmdBuilder->add_task_id(_cmd.GetTaskId()); cmdBuilder->add_result(GetFBResult(_cmd.GetResult())); cmdBuilder->add_transition(GetFBTransition(_cmd.GetTransition())); } @@ -476,7 +477,7 @@ void Cmds::Deserialize(const string& str, const Format type) fCmds.emplace_back(make(cmdPtr.device_id()->str(), GetMQState(cmdPtr.current_state()))); break; case FBCmd_transition_status: - fCmds.emplace_back(make(cmdPtr.device_id()->str(), GetResult(cmdPtr.result()), GetMQTransition(cmdPtr.transition()))); + fCmds.emplace_back(make(cmdPtr.device_id()->str(), cmdPtr.task_id(), GetResult(cmdPtr.result()), GetMQTransition(cmdPtr.transition()))); break; case FBCmd_config: fCmds.emplace_back(make(cmdPtr.device_id()->str(), cmdPtr.config_string()->str())); diff --git a/fairmq/sdk/commands/Commands.h b/fairmq/sdk/commands/Commands.h index 491072cb..3f077cf4 100644 --- a/fairmq/sdk/commands/Commands.h +++ b/fairmq/sdk/commands/Commands.h @@ -51,7 +51,7 @@ enum class Type : int set_properties, // args: { request_id, properties } current_state, // args: { device_id, current_state } - transition_status, // args: { device_id, Result, transition } + transition_status, // args: { device_id, task_id, Result, transition } config, // args: { device_id, config_string } heartbeat_subscription, // args: { device_id, Result } heartbeat_unsubscription, // args: { device_id, Result } @@ -179,15 +179,18 @@ struct CurrentState : Cmd struct TransitionStatus : Cmd { - explicit TransitionStatus(const std::string& id, const Result result, const Transition transition) + explicit TransitionStatus(const std::string& deviceId, const uint64_t taskId, const Result result, const Transition transition) : Cmd(Type::transition_status) - , fDeviceId(id) + , fDeviceId(deviceId) + , fTaskId(taskId) , fResult(result) , fTransition(transition) {} std::string GetDeviceId() const { return fDeviceId; } void SetDeviceId(const std::string& deviceId) { fDeviceId = deviceId; } + uint64_t GetTaskId() const { return fTaskId; } + void SetTaskId(const uint64_t taskId) { fTaskId = taskId; } Result GetResult() const { return fResult; } void SetResult(const Result result) { fResult = result; } Transition GetTransition() const { return fTransition; } @@ -195,6 +198,7 @@ struct TransitionStatus : Cmd private: std::string fDeviceId; + uint64_t fTaskId; Result fResult; Transition fTransition; }; diff --git a/test/commands/_commands.cxx b/test/commands/_commands.cxx index 6d83cdba..cc1047c5 100644 --- a/test/commands/_commands.cxx +++ b/test/commands/_commands.cxx @@ -32,7 +32,7 @@ TEST(Format, Construction) Cmds getPropertiesCmds(make(66, "k[12]")); Cmds setPropertiesCmds(make(42, props)); Cmds currentStateCmds(make("somedeviceid", State::Running)); - Cmds transitionStatusCmds(make("somedeviceid", Result::Ok, Transition::Stop)); + Cmds transitionStatusCmds(make("somedeviceid", 123456, Result::Ok, Transition::Stop)); Cmds configCmds(make("somedeviceid", "someconfig")); Cmds heartbeatSubscriptionCmds(make("somedeviceid", Result::Ok)); Cmds heartbeatUnsubscriptionCmds(make("somedeviceid", Result::Ok)); @@ -63,6 +63,7 @@ TEST(Format, Construction) ASSERT_EQ(static_cast(currentStateCmds.At(0)).GetCurrentState(), State::Running); ASSERT_EQ(transitionStatusCmds.At(0).GetType(), Type::transition_status); ASSERT_EQ(static_cast(transitionStatusCmds.At(0)).GetDeviceId(), "somedeviceid"); + ASSERT_EQ(static_cast(transitionStatusCmds.At(0)).GetTaskId(), 123456); ASSERT_EQ(static_cast(transitionStatusCmds.At(0)).GetResult(), Result::Ok); ASSERT_EQ(static_cast(transitionStatusCmds.At(0)).GetTransition(), Transition::Stop); ASSERT_EQ(configCmds.At(0).GetType(), Type::config); @@ -113,7 +114,7 @@ void fillCommands(Cmds& cmds) cmds.Add(66, "k[12]"); cmds.Add(42, props); cmds.Add("somedeviceid", State::Running); - cmds.Add("somedeviceid", Result::Ok, Transition::Stop); + cmds.Add("somedeviceid", 123456, Result::Ok, Transition::Stop); cmds.Add("somedeviceid", "someconfig"); cmds.Add("somedeviceid", Result::Ok); cmds.Add("somedeviceid", Result::Ok); @@ -177,6 +178,7 @@ void checkCommands(Cmds& cmds) case Type::transition_status: ++count; ASSERT_EQ(static_cast(*cmd).GetDeviceId(), "somedeviceid"); + ASSERT_EQ(static_cast(*cmd).GetTaskId(), 123456); ASSERT_EQ(static_cast(*cmd).GetResult(), Result::Ok); ASSERT_EQ(static_cast(*cmd).GetTransition(), Transition::Stop); break;