mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-13 08:41:16 +00:00
DDS plugin: refactor for better readability
This commit is contained in:
parent
c7b1304a2c
commit
926ee743ed
|
@ -8,8 +8,6 @@
|
||||||
|
|
||||||
#include "DDS.h"
|
#include "DDS.h"
|
||||||
|
|
||||||
#include <fairmq/sdk/commands/Commands.h>
|
|
||||||
|
|
||||||
#include <fairmq/tools/Strings.h>
|
#include <fairmq/tools/Strings.h>
|
||||||
|
|
||||||
#include <boost/algorithm/string/join.hpp>
|
#include <boost/algorithm/string/join.hpp>
|
||||||
|
@ -315,118 +313,115 @@ auto DDS::SubscribeForCustomCommands() -> void
|
||||||
|
|
||||||
fDDS.SubscribeCustomCmd([id, this](const string& cmdStr, const string& cond, uint64_t senderId) {
|
fDDS.SubscribeCustomCmd([id, this](const string& cmdStr, const string& cond, uint64_t senderId) {
|
||||||
// LOG(info) << "Received command: '" << cmdStr << "' from " << senderId;
|
// LOG(info) << "Received command: '" << cmdStr << "' from " << senderId;
|
||||||
|
sdk::cmd::Cmds inCmds;
|
||||||
using namespace fair::mq::sdk;
|
|
||||||
cmd::Cmds inCmds;
|
|
||||||
inCmds.Deserialize(cmdStr);
|
inCmds.Deserialize(cmdStr);
|
||||||
|
|
||||||
for (const auto& cmd : inCmds) {
|
for (const auto& cmd : inCmds) {
|
||||||
// LOG(info) << "Received command type: '" << cmd->GetType() << "' from " << senderId;
|
HandleCmd(id, *cmd, cond, senderId);
|
||||||
switch (cmd->GetType()) {
|
|
||||||
case cmd::Type::check_state: {
|
|
||||||
fDDS.Send(cmd::Cmds(cmd::make<cmd::CurrentState>(id, GetCurrentDeviceState()))
|
|
||||||
.Serialize(),
|
|
||||||
to_string(senderId));
|
|
||||||
} break;
|
|
||||||
case cmd::Type::change_state: {
|
|
||||||
Transition transition = static_cast<cmd::ChangeState&>(*cmd).GetTransition();
|
|
||||||
if (ChangeDeviceState(transition)) {
|
|
||||||
cmd::Cmds outCmds(
|
|
||||||
cmd::make<cmd::TransitionStatus>(id, fDDSTaskId, cmd::Result::Ok, transition));
|
|
||||||
fDDS.Send(outCmds.Serialize(), to_string(senderId));
|
|
||||||
} else {
|
|
||||||
sdk::cmd::Cmds outCmds(
|
|
||||||
cmd::make<cmd::TransitionStatus>(id, fDDSTaskId, cmd::Result::Failure, transition));
|
|
||||||
fDDS.Send(outCmds.Serialize(), to_string(senderId));
|
|
||||||
}
|
|
||||||
{
|
|
||||||
lock_guard<mutex> lock{fStateChangeSubscriberMutex};
|
|
||||||
fLastExternalController = senderId;
|
|
||||||
}
|
|
||||||
} break;
|
|
||||||
case cmd::Type::dump_config: {
|
|
||||||
stringstream ss;
|
|
||||||
for (const auto pKey : GetPropertyKeys()) {
|
|
||||||
ss << id << ": " << pKey << " -> " << GetPropertyAsString(pKey) << "\n";
|
|
||||||
}
|
|
||||||
cmd::Cmds outCmds(cmd::make<cmd::Config>(id, ss.str()));
|
|
||||||
fDDS.Send(outCmds.Serialize(), to_string(senderId));
|
|
||||||
} break;
|
|
||||||
case cmd::Type::state_change_exiting_received: {
|
|
||||||
{
|
|
||||||
lock_guard<mutex> lock{fStateChangeSubscriberMutex};
|
|
||||||
if (fLastExternalController == senderId) {
|
|
||||||
fExitingAckedByLastExternalController = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
fExitingAcked.notify_one();
|
|
||||||
} break;
|
|
||||||
case cmd::Type::subscribe_to_state_change: {
|
|
||||||
lock_guard<mutex> lock{fStateChangeSubscriberMutex};
|
|
||||||
fStateChangeSubscribers.insert(senderId);
|
|
||||||
|
|
||||||
LOG(debug) << "Publishing state-change: " << fLastState << "->" << fCurrentState
|
|
||||||
<< " to " << senderId;
|
|
||||||
|
|
||||||
cmd::Cmds outCmds(
|
|
||||||
cmd::make<cmd::StateChangeSubscription>(id, cmd::Result::Ok),
|
|
||||||
cmd::make<cmd::StateChange>(id, fDDSTaskId, fLastState, fCurrentState));
|
|
||||||
|
|
||||||
fDDS.Send(outCmds.Serialize(), to_string(senderId));
|
|
||||||
} break;
|
|
||||||
case cmd::Type::unsubscribe_from_state_change: {
|
|
||||||
{
|
|
||||||
lock_guard<mutex> lock{fStateChangeSubscriberMutex};
|
|
||||||
fStateChangeSubscribers.erase(senderId);
|
|
||||||
}
|
|
||||||
cmd::Cmds outCmds(
|
|
||||||
cmd::make<cmd::StateChangeUnsubscription>(id, cmd::Result::Ok));
|
|
||||||
fDDS.Send(outCmds.Serialize(), to_string(senderId));
|
|
||||||
} break;
|
|
||||||
case cmd::Type::get_properties: {
|
|
||||||
auto _cmd = static_cast<cmd::GetProperties&>(*cmd);
|
|
||||||
auto const request_id(_cmd.GetRequestId());
|
|
||||||
auto result(cmd::Result::Ok);
|
|
||||||
std::vector<std::pair<std::string, std::string>> props;
|
|
||||||
try {
|
|
||||||
for (auto const& prop : GetPropertiesAsString(_cmd.GetQuery())) {
|
|
||||||
props.push_back({prop.first, prop.second});
|
|
||||||
}
|
|
||||||
} catch (std::exception const& e) {
|
|
||||||
LOG(warn) << "Getting properties (request id: " << request_id << ") failed: " << e.what();
|
|
||||||
result = cmd::Result::Failure;
|
|
||||||
}
|
|
||||||
cmd::Cmds const outCmds(cmd::make<cmd::Properties>(id, request_id, result, props));
|
|
||||||
fDDS.Send(outCmds.Serialize(), to_string(senderId));
|
|
||||||
} break;
|
|
||||||
case cmd::Type::set_properties: {
|
|
||||||
auto _cmd(static_cast<cmd::SetProperties&>(*cmd));
|
|
||||||
auto const request_id(_cmd.GetRequestId());
|
|
||||||
auto result(cmd::Result::Ok);
|
|
||||||
try {
|
|
||||||
fair::mq::Properties props;
|
|
||||||
for (auto const& prop : _cmd.GetProps()) {
|
|
||||||
props.insert({prop.first, fair::mq::Property(prop.second)});
|
|
||||||
}
|
|
||||||
// TODO Handle builtin keys with different value type than string
|
|
||||||
SetProperties(props);
|
|
||||||
} catch (std::exception const& e) {
|
|
||||||
LOG(warn) << "Setting properties (request id: " << request_id << ") failed: " << e.what();
|
|
||||||
result = cmd::Result::Failure;
|
|
||||||
}
|
|
||||||
cmd::Cmds const outCmds(cmd::make<cmd::PropertiesSet>(id, request_id, result));
|
|
||||||
fDDS.Send(outCmds.Serialize(), to_string(senderId));
|
|
||||||
} break;
|
|
||||||
default:
|
|
||||||
LOG(warn) << "Unexpected/unknown command received: " << cmdStr;
|
|
||||||
LOG(warn) << "Origin: " << senderId;
|
|
||||||
LOG(warn) << "Destination: " << cond;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
auto DDS::HandleCmd(const string& id, sdk::cmd::Cmd& cmd, const string& cond, uint64_t senderId) -> void
|
||||||
|
{
|
||||||
|
using namespace fair::mq::sdk;
|
||||||
|
using namespace fair::mq::sdk::cmd;
|
||||||
|
// LOG(info) << "Received command type: '" << cmd.GetType() << "' from " << senderId;
|
||||||
|
switch (cmd.GetType()) {
|
||||||
|
case Type::check_state: {
|
||||||
|
fDDS.Send(Cmds(make<CurrentState>(id, GetCurrentDeviceState())).Serialize(), to_string(senderId));
|
||||||
|
} break;
|
||||||
|
case Type::change_state: {
|
||||||
|
Transition transition = static_cast<ChangeState&>(cmd).GetTransition();
|
||||||
|
if (ChangeDeviceState(transition)) {
|
||||||
|
Cmds outCmds(make<TransitionStatus>(id, fDDSTaskId, Result::Ok, transition));
|
||||||
|
fDDS.Send(outCmds.Serialize(), to_string(senderId));
|
||||||
|
} else {
|
||||||
|
Cmds outCmds(make<TransitionStatus>(id, fDDSTaskId, Result::Failure, transition));
|
||||||
|
fDDS.Send(outCmds.Serialize(), to_string(senderId));
|
||||||
|
}
|
||||||
|
{
|
||||||
|
lock_guard<mutex> lock{fStateChangeSubscriberMutex};
|
||||||
|
fLastExternalController = senderId;
|
||||||
|
}
|
||||||
|
} break;
|
||||||
|
case Type::dump_config: {
|
||||||
|
stringstream ss;
|
||||||
|
for (const auto pKey : GetPropertyKeys()) {
|
||||||
|
ss << id << ": " << pKey << " -> " << GetPropertyAsString(pKey) << "\n";
|
||||||
|
}
|
||||||
|
Cmds outCmds(make<Config>(id, ss.str()));
|
||||||
|
fDDS.Send(outCmds.Serialize(), to_string(senderId));
|
||||||
|
} break;
|
||||||
|
case Type::state_change_exiting_received: {
|
||||||
|
{
|
||||||
|
lock_guard<mutex> lock{fStateChangeSubscriberMutex};
|
||||||
|
if (fLastExternalController == senderId) {
|
||||||
|
fExitingAckedByLastExternalController = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
fExitingAcked.notify_one();
|
||||||
|
} break;
|
||||||
|
case Type::subscribe_to_state_change: {
|
||||||
|
lock_guard<mutex> lock{fStateChangeSubscriberMutex};
|
||||||
|
fStateChangeSubscribers.insert(senderId);
|
||||||
|
|
||||||
|
LOG(debug) << "Publishing state-change: " << fLastState << "->" << fCurrentState << " to " << senderId;
|
||||||
|
|
||||||
|
Cmds outCmds(make<StateChangeSubscription>(id, Result::Ok),
|
||||||
|
make<StateChange>(id, fDDSTaskId, fLastState, fCurrentState));
|
||||||
|
|
||||||
|
fDDS.Send(outCmds.Serialize(), to_string(senderId));
|
||||||
|
} break;
|
||||||
|
case Type::unsubscribe_from_state_change: {
|
||||||
|
{
|
||||||
|
lock_guard<mutex> lock{fStateChangeSubscriberMutex};
|
||||||
|
fStateChangeSubscribers.erase(senderId);
|
||||||
|
}
|
||||||
|
Cmds outCmds(make<StateChangeUnsubscription>(id, Result::Ok));
|
||||||
|
fDDS.Send(outCmds.Serialize(), to_string(senderId));
|
||||||
|
} break;
|
||||||
|
case Type::get_properties: {
|
||||||
|
auto _cmd = static_cast<cmd::GetProperties&>(cmd);
|
||||||
|
auto const request_id(_cmd.GetRequestId());
|
||||||
|
auto result(Result::Ok);
|
||||||
|
std::vector<std::pair<std::string, std::string>> props;
|
||||||
|
try {
|
||||||
|
for (auto const& prop : GetPropertiesAsString(_cmd.GetQuery())) {
|
||||||
|
props.push_back({prop.first, prop.second});
|
||||||
|
}
|
||||||
|
} catch (std::exception const& e) {
|
||||||
|
LOG(warn) << "Getting properties (request id: " << request_id << ") failed: " << e.what();
|
||||||
|
result = Result::Failure;
|
||||||
|
}
|
||||||
|
Cmds const outCmds(make<cmd::Properties>(id, request_id, result, props));
|
||||||
|
fDDS.Send(outCmds.Serialize(), to_string(senderId));
|
||||||
|
} break;
|
||||||
|
case Type::set_properties: {
|
||||||
|
auto _cmd(static_cast<cmd::SetProperties&>(cmd));
|
||||||
|
auto const request_id(_cmd.GetRequestId());
|
||||||
|
auto result(Result::Ok);
|
||||||
|
try {
|
||||||
|
fair::mq::Properties props;
|
||||||
|
for (auto const& prop : _cmd.GetProps()) {
|
||||||
|
props.insert({prop.first, fair::mq::Property(prop.second)});
|
||||||
|
}
|
||||||
|
// TODO Handle builtin keys with different value type than string
|
||||||
|
SetProperties(props);
|
||||||
|
} catch (std::exception const& e) {
|
||||||
|
LOG(warn) << "Setting properties (request id: " << request_id << ") failed: " << e.what();
|
||||||
|
result = Result::Failure;
|
||||||
|
}
|
||||||
|
Cmds const outCmds(make<PropertiesSet>(id, request_id, result));
|
||||||
|
fDDS.Send(outCmds.Serialize(), to_string(senderId));
|
||||||
|
} break;
|
||||||
|
default:
|
||||||
|
LOG(warn) << "Unexpected/unknown command received: " << cmd.GetType();
|
||||||
|
LOG(warn) << "Origin: " << senderId;
|
||||||
|
LOG(warn) << "Destination: " << cond;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
DDS::~DDS()
|
DDS::~DDS()
|
||||||
{
|
{
|
||||||
UnsubscribeFromDeviceStateChange();
|
UnsubscribeFromDeviceStateChange();
|
||||||
|
|
|
@ -12,6 +12,7 @@
|
||||||
#include <fairmq/Plugin.h>
|
#include <fairmq/Plugin.h>
|
||||||
#include <fairmq/StateQueue.h>
|
#include <fairmq/StateQueue.h>
|
||||||
#include <fairmq/Version.h>
|
#include <fairmq/Version.h>
|
||||||
|
#include <fairmq/sdk/commands/Commands.h>
|
||||||
|
|
||||||
#include <dds/dds.h>
|
#include <dds/dds.h>
|
||||||
|
|
||||||
|
@ -142,6 +143,7 @@ class DDS : public Plugin
|
||||||
auto SubscribeForConnectingChannels() -> void;
|
auto SubscribeForConnectingChannels() -> void;
|
||||||
auto PublishBoundChannels() -> void;
|
auto PublishBoundChannels() -> void;
|
||||||
auto SubscribeForCustomCommands() -> void;
|
auto SubscribeForCustomCommands() -> void;
|
||||||
|
auto HandleCmd(const std::string& id, sdk::cmd::Cmd& cmd, const std::string& cond, uint64_t senderId) -> void;
|
||||||
|
|
||||||
DDSSubscription fDDS;
|
DDSSubscription fDDS;
|
||||||
size_t fDDSTaskId;
|
size_t fDDSTaskId;
|
||||||
|
|
Loading…
Reference in New Issue
Block a user