From 926ee743ed661bf9f73a6e5f7a34b13762d2fc45 Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Tue, 24 Mar 2020 15:05:29 +0100 Subject: [PATCH] DDS plugin: refactor for better readability --- fairmq/plugins/DDS/DDS.cxx | 211 ++++++++++++++++++------------------- fairmq/plugins/DDS/DDS.h | 2 + 2 files changed, 105 insertions(+), 108 deletions(-) diff --git a/fairmq/plugins/DDS/DDS.cxx b/fairmq/plugins/DDS/DDS.cxx index 1c1d32b2..ad84bb79 100644 --- a/fairmq/plugins/DDS/DDS.cxx +++ b/fairmq/plugins/DDS/DDS.cxx @@ -8,8 +8,6 @@ #include "DDS.h" -#include - #include #include @@ -315,118 +313,115 @@ auto DDS::SubscribeForCustomCommands() -> void fDDS.SubscribeCustomCmd([id, this](const string& cmdStr, const string& cond, uint64_t senderId) { // LOG(info) << "Received command: '" << cmdStr << "' from " << senderId; - - using namespace fair::mq::sdk; - cmd::Cmds inCmds; + sdk::cmd::Cmds inCmds; inCmds.Deserialize(cmdStr); - for (const auto& cmd : inCmds) { - // LOG(info) << "Received command type: '" << cmd->GetType() << "' from " << senderId; - switch (cmd->GetType()) { - case cmd::Type::check_state: { - fDDS.Send(cmd::Cmds(cmd::make(id, GetCurrentDeviceState())) - .Serialize(), - to_string(senderId)); - } break; - case cmd::Type::change_state: { - Transition transition = static_cast(*cmd).GetTransition(); - if (ChangeDeviceState(transition)) { - cmd::Cmds outCmds( - cmd::make(id, fDDSTaskId, cmd::Result::Ok, transition)); - fDDS.Send(outCmds.Serialize(), to_string(senderId)); - } else { - sdk::cmd::Cmds outCmds( - cmd::make(id, fDDSTaskId, cmd::Result::Failure, transition)); - fDDS.Send(outCmds.Serialize(), to_string(senderId)); - } - { - lock_guard lock{fStateChangeSubscriberMutex}; - fLastExternalController = senderId; - } - } break; - case cmd::Type::dump_config: { - stringstream ss; - for (const auto pKey : GetPropertyKeys()) { - ss << id << ": " << pKey << " -> " << GetPropertyAsString(pKey) << "\n"; - } - cmd::Cmds outCmds(cmd::make(id, ss.str())); - fDDS.Send(outCmds.Serialize(), to_string(senderId)); - } break; - case cmd::Type::state_change_exiting_received: { - { - lock_guard lock{fStateChangeSubscriberMutex}; - if (fLastExternalController == senderId) { - fExitingAckedByLastExternalController = true; - } - } - fExitingAcked.notify_one(); - } break; - case cmd::Type::subscribe_to_state_change: { - lock_guard lock{fStateChangeSubscriberMutex}; - fStateChangeSubscribers.insert(senderId); - - LOG(debug) << "Publishing state-change: " << fLastState << "->" << fCurrentState - << " to " << senderId; - - cmd::Cmds outCmds( - cmd::make(id, cmd::Result::Ok), - cmd::make(id, fDDSTaskId, fLastState, fCurrentState)); - - fDDS.Send(outCmds.Serialize(), to_string(senderId)); - } break; - case cmd::Type::unsubscribe_from_state_change: { - { - lock_guard lock{fStateChangeSubscriberMutex}; - fStateChangeSubscribers.erase(senderId); - } - cmd::Cmds outCmds( - cmd::make(id, cmd::Result::Ok)); - fDDS.Send(outCmds.Serialize(), to_string(senderId)); - } break; - case cmd::Type::get_properties: { - auto _cmd = static_cast(*cmd); - auto const request_id(_cmd.GetRequestId()); - auto result(cmd::Result::Ok); - std::vector> props; - try { - for (auto const& prop : GetPropertiesAsString(_cmd.GetQuery())) { - props.push_back({prop.first, prop.second}); - } - } catch (std::exception const& e) { - LOG(warn) << "Getting properties (request id: " << request_id << ") failed: " << e.what(); - result = cmd::Result::Failure; - } - cmd::Cmds const outCmds(cmd::make(id, request_id, result, props)); - fDDS.Send(outCmds.Serialize(), to_string(senderId)); - } break; - case cmd::Type::set_properties: { - auto _cmd(static_cast(*cmd)); - auto const request_id(_cmd.GetRequestId()); - auto result(cmd::Result::Ok); - try { - fair::mq::Properties props; - for (auto const& prop : _cmd.GetProps()) { - props.insert({prop.first, fair::mq::Property(prop.second)}); - } - // TODO Handle builtin keys with different value type than string - SetProperties(props); - } catch (std::exception const& e) { - LOG(warn) << "Setting properties (request id: " << request_id << ") failed: " << e.what(); - result = cmd::Result::Failure; - } - cmd::Cmds const outCmds(cmd::make(id, request_id, result)); - fDDS.Send(outCmds.Serialize(), to_string(senderId)); - } break; - default: - LOG(warn) << "Unexpected/unknown command received: " << cmdStr; - LOG(warn) << "Origin: " << senderId; - LOG(warn) << "Destination: " << cond; - break; - } + HandleCmd(id, *cmd, cond, senderId); } }); } +auto DDS::HandleCmd(const string& id, sdk::cmd::Cmd& cmd, const string& cond, uint64_t senderId) -> void +{ + using namespace fair::mq::sdk; + using namespace fair::mq::sdk::cmd; + // LOG(info) << "Received command type: '" << cmd.GetType() << "' from " << senderId; + switch (cmd.GetType()) { + case Type::check_state: { + fDDS.Send(Cmds(make(id, GetCurrentDeviceState())).Serialize(), to_string(senderId)); + } break; + case Type::change_state: { + Transition transition = static_cast(cmd).GetTransition(); + if (ChangeDeviceState(transition)) { + Cmds outCmds(make(id, fDDSTaskId, Result::Ok, transition)); + fDDS.Send(outCmds.Serialize(), to_string(senderId)); + } else { + Cmds outCmds(make(id, fDDSTaskId, Result::Failure, transition)); + fDDS.Send(outCmds.Serialize(), to_string(senderId)); + } + { + lock_guard lock{fStateChangeSubscriberMutex}; + fLastExternalController = senderId; + } + } break; + case Type::dump_config: { + stringstream ss; + for (const auto pKey : GetPropertyKeys()) { + ss << id << ": " << pKey << " -> " << GetPropertyAsString(pKey) << "\n"; + } + Cmds outCmds(make(id, ss.str())); + fDDS.Send(outCmds.Serialize(), to_string(senderId)); + } break; + case Type::state_change_exiting_received: { + { + lock_guard lock{fStateChangeSubscriberMutex}; + if (fLastExternalController == senderId) { + fExitingAckedByLastExternalController = true; + } + } + fExitingAcked.notify_one(); + } break; + case Type::subscribe_to_state_change: { + lock_guard lock{fStateChangeSubscriberMutex}; + fStateChangeSubscribers.insert(senderId); + + LOG(debug) << "Publishing state-change: " << fLastState << "->" << fCurrentState << " to " << senderId; + + Cmds outCmds(make(id, Result::Ok), + make(id, fDDSTaskId, fLastState, fCurrentState)); + + fDDS.Send(outCmds.Serialize(), to_string(senderId)); + } break; + case Type::unsubscribe_from_state_change: { + { + lock_guard lock{fStateChangeSubscriberMutex}; + fStateChangeSubscribers.erase(senderId); + } + Cmds outCmds(make(id, Result::Ok)); + fDDS.Send(outCmds.Serialize(), to_string(senderId)); + } break; + case Type::get_properties: { + auto _cmd = static_cast(cmd); + auto const request_id(_cmd.GetRequestId()); + auto result(Result::Ok); + std::vector> props; + try { + for (auto const& prop : GetPropertiesAsString(_cmd.GetQuery())) { + props.push_back({prop.first, prop.second}); + } + } catch (std::exception const& e) { + LOG(warn) << "Getting properties (request id: " << request_id << ") failed: " << e.what(); + result = Result::Failure; + } + Cmds const outCmds(make(id, request_id, result, props)); + fDDS.Send(outCmds.Serialize(), to_string(senderId)); + } break; + case Type::set_properties: { + auto _cmd(static_cast(cmd)); + auto const request_id(_cmd.GetRequestId()); + auto result(Result::Ok); + try { + fair::mq::Properties props; + for (auto const& prop : _cmd.GetProps()) { + props.insert({prop.first, fair::mq::Property(prop.second)}); + } + // TODO Handle builtin keys with different value type than string + SetProperties(props); + } catch (std::exception const& e) { + LOG(warn) << "Setting properties (request id: " << request_id << ") failed: " << e.what(); + result = Result::Failure; + } + Cmds const outCmds(make(id, request_id, result)); + fDDS.Send(outCmds.Serialize(), to_string(senderId)); + } break; + default: + LOG(warn) << "Unexpected/unknown command received: " << cmd.GetType(); + LOG(warn) << "Origin: " << senderId; + LOG(warn) << "Destination: " << cond; + break; + } +} + DDS::~DDS() { UnsubscribeFromDeviceStateChange(); diff --git a/fairmq/plugins/DDS/DDS.h b/fairmq/plugins/DDS/DDS.h index 4b357dfd..5086b421 100644 --- a/fairmq/plugins/DDS/DDS.h +++ b/fairmq/plugins/DDS/DDS.h @@ -12,6 +12,7 @@ #include #include #include +#include #include @@ -142,6 +143,7 @@ class DDS : public Plugin auto SubscribeForConnectingChannels() -> void; auto PublishBoundChannels() -> void; auto SubscribeForCustomCommands() -> void; + auto HandleCmd(const std::string& id, sdk::cmd::Cmd& cmd, const std::string& cond, uint64_t senderId) -> void; DDSSubscription fDDS; size_t fDDSTaskId;