diff --git a/.clang-tidy b/.clang-tidy index be9bc944..6faf7d76 100644 --- a/.clang-tidy +++ b/.clang-tidy @@ -1,3 +1,3 @@ --- -Checks: '*,-google-*,-fuchsia-*,-cert-*,-llvm-header-guard,-readability-named-parameter,-misc-non-private-member-variables-in-classes,-*-magic-numbers,-llvm-include-order,-hicpp-no-array-decay,-performance-unnecessary-value-param,-cppcoreguidelines-pro-bounds-array-to-pointer-decay' +Checks: '*,-google-*,-fuchsia-*,-cert-*,-llvm-header-guard,-readability-named-parameter,-misc-non-private-member-variables-in-classes,-*-magic-numbers,-llvm-include-order,-hicpp-no-array-decay,-performance-unnecessary-value-param,-cppcoreguidelines-pro-bounds-array-to-pointer-decay,-modernize-use-trailing-return-type,-readability-redundant-member-init' HeaderFilterRegex: '/(fairmq/)' diff --git a/fairmq/plugins/DDS/DDS.cxx b/fairmq/plugins/DDS/DDS.cxx index 385e5bf9..fd33bcd8 100644 --- a/fairmq/plugins/DDS/DDS.cxx +++ b/fairmq/plugins/DDS/DDS.cxx @@ -324,7 +324,6 @@ auto DDS::HeartbeatSender() -> void auto DDS::SubscribeForCustomCommands() -> void { - using namespace sdk::cmd; LOG(debug) << "Subscribing for DDS custom commands."; string id = GetProperty("id"); @@ -332,59 +331,59 @@ auto DDS::SubscribeForCustomCommands() -> void fDDS.SubscribeCustomCmd([id, this](const string& cmdStr, const string& cond, uint64_t senderId) { // LOG(info) << "Received command: '" << cmdStr << "' from " << senderId; - Cmds inCmds; + using namespace fair::mq::sdk; + cmd::Cmds inCmds; inCmds.Deserialize(cmdStr); for (const auto& cmd : inCmds) { // LOG(info) << "Received command type: '" << cmd->GetType() << "' from " << senderId; switch (cmd->GetType()) { - case Type::check_state: { - fDDS.Send(Cmds(make(id, GetCurrentDeviceState())).Serialize(), to_string(senderId)); - } - break; - case Type::change_state: { - Transition transition = static_cast(*cmd).GetTransition(); + case cmd::Type::check_state: { + fDDS.Send(cmd::Cmds(cmd::make(id, GetCurrentDeviceState())) + .Serialize(), + to_string(senderId)); + } break; + case cmd::Type::change_state: { + Transition transition = static_cast(*cmd).GetTransition(); if (ChangeDeviceState(transition)) { - Cmds outCmds(make(id, Result::Ok, transition)); + cmd::Cmds outCmds( + cmd::make(id, cmd::Result::Ok, transition)); fDDS.Send(outCmds.Serialize(), to_string(senderId)); } else { - Cmds outCmds(make(id, Result::Failure, transition)); + sdk::cmd::Cmds outCmds( + cmd::make(id, cmd::Result::Failure, transition)); fDDS.Send(outCmds.Serialize(), to_string(senderId)); } { lock_guard lock{fStateChangeSubscriberMutex}; fLastExternalController = senderId; } - } - break; - case Type::dump_config: { + } break; + case cmd::Type::dump_config: { stringstream ss; - for (const auto pKey: GetPropertyKeys()) { + for (const auto pKey : GetPropertyKeys()) { ss << id << ": " << pKey << " -> " << GetPropertyAsString(pKey) << "\n"; } - Cmds outCmds(make(id, ss.str())); + cmd::Cmds outCmds(cmd::make(id, ss.str())); fDDS.Send(outCmds.Serialize(), to_string(senderId)); - } - break; - case Type::subscribe_to_heartbeats: { + } break; + case cmd::Type::subscribe_to_heartbeats: { { lock_guard lock{fHeartbeatSubscriberMutex}; fHeartbeatSubscribers.insert(senderId); } - Cmds outCmds(make(id, Result::Ok)); + cmd::Cmds outCmds(cmd::make(id, cmd::Result::Ok)); fDDS.Send(outCmds.Serialize(), to_string(senderId)); - } - break; - case Type::unsubscribe_from_heartbeats: { + } break; + case cmd::Type::unsubscribe_from_heartbeats: { { lock_guard lock{fHeartbeatSubscriberMutex}; fHeartbeatSubscribers.erase(senderId); } - Cmds outCmds(make(id, Result::Ok)); + cmd::Cmds outCmds(cmd::make(id, cmd::Result::Ok)); fDDS.Send(outCmds.Serialize(), to_string(senderId)); - } - break; - case Type::state_change_exiting_received: { + } break; + case cmd::Type::state_change_exiting_received: { { lock_guard lock{fStateChangeSubscriberMutex}; if (fLastExternalController == senderId) { @@ -392,31 +391,50 @@ auto DDS::SubscribeForCustomCommands() -> void } } fExitingAcked.notify_one(); - } - break; - case Type::subscribe_to_state_change: { + } break; + case cmd::Type::subscribe_to_state_change: { lock_guard lock{fStateChangeSubscriberMutex}; fStateChangeSubscribers.insert(senderId); if (!fControllerThread.joinable()) { fControllerThread = thread(&DDS::WaitForExitingAck, this); } - LOG(debug) << "Publishing state-change: " << fLastState << "->" << fCurrentState << " to " << senderId; + LOG(debug) << "Publishing state-change: " << fLastState << "->" << fCurrentState + << " to " << senderId; - Cmds outCmds(make(id, Result::Ok), make(id, dds::env_prop(), fLastState, fCurrentState)); + cmd::Cmds outCmds( + cmd::make(id, cmd::Result::Ok), + cmd::make( + id, dds::env_prop(), fLastState, fCurrentState)); fDDS.Send(outCmds.Serialize(), to_string(senderId)); - } - break; - case Type::unsubscribe_from_state_change: { + } break; + case cmd::Type::unsubscribe_from_state_change: { { lock_guard lock{fStateChangeSubscriberMutex}; fStateChangeSubscribers.erase(senderId); } - Cmds outCmds(make(id, Result::Ok)); + cmd::Cmds outCmds( + cmd::make(id, cmd::Result::Ok)); fDDS.Send(outCmds.Serialize(), to_string(senderId)); - } - break; + } break; + case cmd::Type::set_properties: { + auto _cmd(static_cast(*cmd)); + auto const request_id(_cmd.GetRequestId()); + auto result(cmd::Result::Ok); + try { + fair::mq::Properties props; + for (auto const& prop : _cmd.GetProps()) { + props.insert({prop.first, fair::mq::Property(prop.second)}); + } + SetProperties(props); + } catch (...) { + LOG(warn) << "Setting properties (request id: " << request_id << ") failed"; + result = cmd::Result::Failure; + } + cmd::Cmds const outCmds(cmd::make(id, request_id, result)); + fDDS.Send(outCmds.Serialize(), to_string(senderId)); + } break; default: LOG(warn) << "Unexpected/unknown command received: " << cmdStr; LOG(warn) << "Origin: " << senderId; diff --git a/fairmq/sdk/Error.cxx b/fairmq/sdk/Error.cxx index df726041..9f0c5293 100644 --- a/fairmq/sdk/Error.cxx +++ b/fairmq/sdk/Error.cxx @@ -27,6 +27,8 @@ std::string ErrorCategory::message(int ev) const return "async operation canceled"; case ErrorCode::DeviceChangeStateFailed: return "failed to change state of a fairmq device"; + case ErrorCode::DeviceSetPropertiesFailed: + return "failed to set fairmq device properties"; default: return "(unrecognized error)"; } diff --git a/fairmq/sdk/Error.h b/fairmq/sdk/Error.h index 3a24188a..c50c91e9 100644 --- a/fairmq/sdk/Error.h +++ b/fairmq/sdk/Error.h @@ -37,7 +37,8 @@ enum class ErrorCode OperationInProgress = 10, OperationTimeout, OperationCanceled, - DeviceChangeStateFailed + DeviceChangeStateFailed, + DeviceSetPropertiesFailed }; std::error_code MakeErrorCode(ErrorCode); diff --git a/fairmq/sdk/Topology.h b/fairmq/sdk/Topology.h index ab0839c3..d48f1909 100644 --- a/fairmq/sdk/Topology.h +++ b/fairmq/sdk/Topology.h @@ -20,6 +20,7 @@ #include #include #include +#include #include @@ -35,6 +36,7 @@ #include #include #include +#include #include #include #include @@ -45,6 +47,7 @@ namespace fair { namespace mq { namespace sdk { +using DeviceId = std::string; using DeviceState = fair::mq::State; using DeviceTransition = fair::mq::Transition; @@ -70,6 +73,9 @@ struct DeviceStatus DDSCollection::Id collectionId; }; +using DeviceProperty = std::pair; /// pair := (key, value) +using DeviceProperties = std::vector; + using TopologyState = std::vector; using TopologyStateIndex = std::unordered_map; // task id -> index in the data vector using TopologyStateByTask = std::unordered_map; @@ -168,20 +174,21 @@ class BasicTopology : public AsioBase Cmds inCmds; inCmds.Deserialize(msg); // LOG(debug) << "Received " << inCmds.Size() << " command(s) with total size of " << msg.length() << " bytes: "; + for (const auto& cmd : inCmds) { // LOG(debug) << " > " << cmd->GetType(); switch (cmd->GetType()) { case Type::state_change: { - DDSTask::Id taskId(static_cast(*cmd).GetTaskId()); + auto _cmd = static_cast(*cmd); + DDSTask::Id taskId(_cmd.GetTaskId()); fDDSSession.UpdateChannelToTaskAssociation(senderId, taskId); - if (static_cast(*cmd).GetCurrentState() == DeviceState::Exiting) { + if (_cmd.GetCurrentState() == DeviceState::Exiting) { Cmds outCmds; outCmds.Add(); fDDSSession.SendCommand(outCmds.Serialize(), senderId); } - UpdateStateEntry(taskId, static_cast(*cmd).GetCurrentState()); - } - break; + UpdateStateEntry(taskId, _cmd.GetCurrentState()); + } break; case Type::state_change_subscription: if (static_cast(*cmd).GetResult() != Result::Ok) { LOG(error) << "State change subscription failed for " << static_cast(*cmd).GetDeviceId(); @@ -203,6 +210,10 @@ class BasicTopology : public AsioBase } } break; + case Type::properties_set: { + HandleCmd(static_cast(*cmd)); + } + break; default: LOG(warn) << "Unexpected/unknown command received: " << cmd->GetType(); LOG(warn) << "Origin: " << senderId; @@ -320,55 +331,53 @@ class BasicTopology : public AsioBase /// } /// @endcode template - auto AsyncChangeState(TopologyTransition transition, + auto AsyncChangeState(const TopologyTransition transition, Duration timeout, CompletionToken&& token) { - return asio::async_initiate( - [&](auto handler) { - std::lock_guard lk(fMtx); + return asio::async_initiate([&](auto handler) { + std::lock_guard lk(fMtx); - if (fChangeStateOp.IsCompleted()) { - fChangeStateOp = ChangeStateOp(AsioBase::GetExecutor(), - AsioBase::GetAllocator(), - std::move(handler)); - fChangeStateTarget = expectedState.at(transition); - ResetTransitionedCount(fChangeStateTarget); - cmd::Cmds cmds(cmd::make(transition)); - fDDSSession.SendCommand(cmds.Serialize()); - if (timeout > std::chrono::milliseconds(0)) { - fChangeStateOpTimer.expires_after(timeout); - fChangeStateOpTimer.async_wait([&](std::error_code ec) { - if (!ec) { - std::lock_guard lk2(fMtx); - fChangeStateOp.Timeout(fStateData); - } - }); - } - } else { - // TODO refactor to hide boiler plate - auto ex2(asio::get_associated_executor(handler, AsioBase::GetExecutor())); - auto alloc2(asio::get_associated_allocator(handler, AsioBase::GetAllocator())); - auto state(GetCurrentStateUnsafe()); - - ex2.post( - [h = std::move(handler), s = std::move(state)]() mutable { - try { - h(MakeErrorCode(ErrorCode::OperationInProgress), s); - } catch (const std::exception& e) { - LOG(error) << "Uncaught exception in completion handler: " << e.what(); - } catch (...) { - LOG(error) << "Unknown uncaught exception in completion handler."; - } - }, - alloc2); + if (fChangeStateOp.IsCompleted()) { + fChangeStateOp = ChangeStateOp(AsioBase::GetExecutor(), + AsioBase::GetAllocator(), + std::move(handler)); + fChangeStateTarget = expectedState.at(transition); + ResetTransitionedCount(fChangeStateTarget); + cmd::Cmds cmds(cmd::make(transition)); + fDDSSession.SendCommand(cmds.Serialize()); + if (timeout > std::chrono::milliseconds(0)) { + fChangeStateOpTimer.expires_after(timeout); + fChangeStateOpTimer.async_wait([&](std::error_code ec) { + if (!ec) { + std::lock_guard lk2(fMtx); + fChangeStateOp.Timeout(fStateData); + } + }); } - }, - token); + } else { + // TODO refactor to hide boiler plate + auto ex2(asio::get_associated_executor(handler, AsioBase::GetExecutor())); + auto alloc2(asio::get_associated_allocator(handler, AsioBase::GetAllocator())); + auto state(GetCurrentStateUnsafe()); + + ex2.post([h = std::move(handler), s = std::move(state)]() mutable { + try { + h(MakeErrorCode(ErrorCode::OperationInProgress), s); + } catch (const std::exception& e) { + LOG(error) << "Uncaught exception in completion handler: " << e.what(); + } catch (...) { + LOG(error) << "Unknown uncaught exception in completion handler."; + } + }, + alloc2); + } + }, + token); } template - auto AsyncChangeState(TopologyTransition transition, CompletionToken&& token) + auto AsyncChangeState(const TopologyTransition transition, CompletionToken&& token) { return AsyncChangeState(transition, Duration(0), std::move(token)); } @@ -378,7 +387,7 @@ class BasicTopology : public AsioBase /// @param timeout Timeout in milliseconds, 0 means no timeout /// @tparam CompletionToken Asio completion token type /// @throws std::system_error - auto ChangeState(TopologyTransition transition, Duration timeout = Duration(0)) + auto ChangeState(const TopologyTransition transition, Duration timeout = Duration(0)) -> std::pair { tools::SharedSemaphore blocker; @@ -406,6 +415,146 @@ class BasicTopology : public AsioBase auto StateEqualsTo(DeviceState state) const -> bool { return sdk::StateEqualsTo(GetCurrentState(), state); } + using FailedDevices = std::set; + using SetPropertiesCompletionSignature = void(std::error_code, FailedDevices); + + private: + struct SetPropertiesOp + { + using Id = std::size_t; + using SetCount = unsigned int; + + template + SetPropertiesOp(Id id, + SetCount expectedCount, + Duration timeout, + std::mutex& mutex, + Executor const & ex, + Allocator const & alloc, + Handler&& handler) + : fId(id) + , fOp(ex, alloc, std::move(handler)) + , fTimer(ex) + , fCount(0) + , fExpectedCount(expectedCount) + , fFailedDevices(alloc) + , fMtx(mutex) + { + if (timeout > std::chrono::milliseconds(0)) { + fTimer.expires_after(timeout); + fTimer.async_wait([&](std::error_code ec) { + if (!ec) { + std::lock_guard lk(fMtx); + fOp.Timeout(fFailedDevices); + } + }); + } + // LOG(debug) << "SetProperties " << fId << " with expected count of " << fExpectedCount << " started."; + } + SetPropertiesOp() = delete; + SetPropertiesOp(const SetPropertiesOp&) = delete; + SetPropertiesOp& operator=(const SetPropertiesOp&) = delete; + SetPropertiesOp(SetPropertiesOp&&) = default; + SetPropertiesOp& operator=(SetPropertiesOp&&) = default; + ~SetPropertiesOp() = default; + + auto Update(const std::string& deviceId, cmd::Result result) -> void + { + std::lock_guard lk(fMtx); + if (cmd::Result::Ok != result) { + fFailedDevices.insert(deviceId); + } + ++fCount; + TryCompletion(); + } + + private: + Id const fId; + AsioAsyncOp fOp; + asio::steady_timer fTimer; + SetCount fCount; + SetCount const fExpectedCount; + FailedDevices fFailedDevices; + std::mutex& fMtx; + + /// precondition: fMtx is locked. + auto TryCompletion() -> void + { + if (!fOp.IsCompleted() && fCount == fExpectedCount) { + fTimer.cancel(); + if (fFailedDevices.size() > 0) { + fOp.Complete(MakeErrorCode(ErrorCode::DeviceSetPropertiesFailed), fFailedDevices); + } else { + fOp.Complete(fFailedDevices); + } + } + } + }; + + auto HandleCmd(cmd::PropertiesSet const& cmd) -> void + { + std::unique_lock lk(fMtx); + try { + auto& op(fSetPropertiesOps.at(cmd.GetRequestId())); + lk.unlock(); + op.Update(cmd.GetDeviceId(), cmd.GetResult()); + } catch (std::out_of_range& e) { + LOG(debug) << "SetProperties operation (request id: " << cmd.GetRequestId() + << ") not found (probably completed or timed out), " + << "discarding reply of device " << cmd.GetDeviceId(); + } + } + + public: + template + auto AsyncSetProperties(const DeviceProperties& props, + Duration timeout, + CompletionToken&& token) + { + return asio::async_initiate( + [&](auto handler) { + typename SetPropertiesOp::Id const id(tools::UuidHash()); + + std::lock_guard lk(fMtx); + fSetPropertiesOps.emplace( + std::piecewise_construct, + std::forward_as_tuple(id), + std::forward_as_tuple(id, + fStateData.size(), + timeout, + fMtx, + AsioBase::GetExecutor(), + AsioBase::GetAllocator(), + std::move(handler))); + + cmd::Cmds const cmds(cmd::make(id, props)); + fDDSSession.SendCommand(cmds.Serialize()); + }, + token); + } + + template + auto AsyncSetProperties(DeviceProperties const & properties, CompletionToken&& token) + { + return AsyncSetProperties(properties, Duration(0), std::move(token)); + } + + auto SetProperties(DeviceProperties const& properties, Duration timeout = Duration(0)) + -> std::pair + { + tools::SharedSemaphore blocker; + std::error_code ec; + FailedDevices failed; + AsyncSetProperties( + properties, timeout, [&, blocker](std::error_code _ec, FailedDevices _failed) mutable { + ec = _ec; + failed = _failed; + blocker.Signal(); + }); + blocker.Wait(); + return {ec, failed}; + } + private: using TransitionedCount = unsigned int; @@ -421,6 +570,8 @@ class BasicTopology : public AsioBase DeviceState fChangeStateTarget; TransitionedCount fTransitionedCount; + std::unordered_map fSetPropertiesOps; + auto makeTopologyState() -> void { fStateData.reserve(fDDSTopo.GetTasks().size()); diff --git a/fairmq/sdk/commands/Commands.cxx b/fairmq/sdk/commands/Commands.cxx index d72a0aec..2cd5c645 100644 --- a/fairmq/sdk/commands/Commands.cxx +++ b/fairmq/sdk/commands/Commands.cxx @@ -46,7 +46,7 @@ array resultNames = } }; -array typeNames = +array typeNames = { { "CheckState", @@ -57,6 +57,7 @@ array typeNames = "SubscribeToStateChange", "UnsubscribeFromStateChange", "StateChangeExitingReceived", + "SetProperties", "CurrentState", "TransitionStatus", @@ -66,7 +67,8 @@ array typeNames = "Heartbeat", "StateChangeSubscription", "StateChangeUnsubscription", - "StateChange" + "StateChange", + "PropertiesSet" } }; @@ -148,7 +150,7 @@ array mqTransitionToFBTransition = } }; -array typeToFBCmd = +array typeToFBCmd = { { FBCmd::FBCmd_check_state, @@ -159,6 +161,7 @@ array typeToFBCmd = FBCmd::FBCmd_subscribe_to_state_change, FBCmd::FBCmd_unsubscribe_from_state_change, FBCmd::FBCmd_state_change_exiting_received, + FBCmd::FBCmd_set_properties, FBCmd::FBCmd_current_state, FBCmd::FBCmd_transition_status, FBCmd::FBCmd_config, @@ -167,11 +170,12 @@ array typeToFBCmd = FBCmd::FBCmd_heartbeat, FBCmd::FBCmd_state_change_subscription, FBCmd::FBCmd_state_change_unsubscription, - FBCmd::FBCmd_state_change + FBCmd::FBCmd_state_change, + FBCmd::FBCmd_properties_set } }; -array fbCmdToType = +array fbCmdToType = { { Type::check_state, @@ -182,6 +186,7 @@ array fbCmdToType = Type::subscribe_to_state_change, Type::unsubscribe_from_state_change, Type::state_change_exiting_received, + Type::set_properties, Type::current_state, Type::transition_status, Type::config, @@ -190,7 +195,8 @@ array fbCmdToType = Type::heartbeat, Type::state_change_subscription, Type::state_change_unsubscription, - Type::state_change + Type::state_change, + Type::properties_set } }; @@ -249,41 +255,60 @@ string Cmds::Serialize(const Format type) const cmdBuilder = tools::make_unique(fbb); } break; + case Type::set_properties: { + auto _cmd = static_cast(*cmd); + std::vector> propsVector; + for (auto const& e : _cmd.GetProps()) { + auto const key(fbb.CreateString(e.first)); + auto const val(fbb.CreateString(e.second)); + propsVector.push_back(CreateFBProperty(fbb, key, val)); + } + auto props = fbb.CreateVector(propsVector); + cmdBuilder = tools::make_unique(fbb); + cmdBuilder->add_request_id(_cmd.GetRequestId()); + cmdBuilder->add_properties(props); + } + break; case Type::current_state: { - auto deviceId = fbb.CreateString(static_cast(*cmd).GetDeviceId()); + auto _cmd = static_cast(*cmd); + auto deviceId = fbb.CreateString(_cmd.GetDeviceId()); cmdBuilder = tools::make_unique(fbb); cmdBuilder->add_device_id(deviceId); - cmdBuilder->add_current_state(GetFBState(static_cast(*cmd).GetCurrentState())); + cmdBuilder->add_current_state(GetFBState(_cmd.GetCurrentState())); } break; case Type::transition_status: { - auto deviceId = fbb.CreateString(static_cast(*cmd).GetDeviceId()); + auto _cmd = static_cast(*cmd); + auto deviceId = fbb.CreateString(_cmd.GetDeviceId()); cmdBuilder = tools::make_unique(fbb); cmdBuilder->add_device_id(deviceId); - cmdBuilder->add_result(GetFBResult(static_cast(*cmd).GetResult())); - cmdBuilder->add_transition(GetFBTransition(static_cast(*cmd).GetTransition())); + cmdBuilder->add_result(GetFBResult(_cmd.GetResult())); + cmdBuilder->add_transition(GetFBTransition(_cmd.GetTransition())); } break; case Type::config: { - auto deviceId = fbb.CreateString(static_cast(*cmd).GetDeviceId()); - auto config = fbb.CreateString(static_cast(*cmd).GetConfig()); + auto _cmd = static_cast(*cmd); + auto deviceId = fbb.CreateString(_cmd.GetDeviceId()); + auto config = fbb.CreateString(_cmd.GetConfig()); cmdBuilder = tools::make_unique(fbb); cmdBuilder->add_device_id(deviceId); cmdBuilder->add_config_string(config); } break; case Type::heartbeat_subscription: { - auto deviceId = fbb.CreateString(static_cast(*cmd).GetDeviceId()); + auto _cmd = static_cast(*cmd); + auto deviceId = fbb.CreateString(_cmd.GetDeviceId()); cmdBuilder = tools::make_unique(fbb); cmdBuilder->add_device_id(deviceId); - cmdBuilder->add_result(GetFBResult(static_cast(*cmd).GetResult())); + cmdBuilder->add_result(GetFBResult(_cmd.GetResult())); } break; case Type::heartbeat_unsubscription: { - auto deviceId = fbb.CreateString(static_cast(*cmd).GetDeviceId()); + auto _cmd = static_cast(*cmd); + auto deviceId = fbb.CreateString(_cmd.GetDeviceId()); cmdBuilder = tools::make_unique(fbb); cmdBuilder->add_device_id(deviceId); - cmdBuilder->add_result(GetFBResult(static_cast(*cmd).GetResult())); + cmdBuilder->add_result(GetFBResult(_cmd.GetResult())); } break; case Type::heartbeat: { @@ -293,26 +318,38 @@ string Cmds::Serialize(const Format type) const } break; case Type::state_change_subscription: { - auto deviceId = fbb.CreateString(static_cast(*cmd).GetDeviceId()); + auto _cmd = static_cast(*cmd); + auto deviceId = fbb.CreateString(_cmd.GetDeviceId()); cmdBuilder = tools::make_unique(fbb); cmdBuilder->add_device_id(deviceId); - cmdBuilder->add_result(GetFBResult(static_cast(*cmd).GetResult())); + cmdBuilder->add_result(GetFBResult(_cmd.GetResult())); } break; case Type::state_change_unsubscription: { - auto deviceId = fbb.CreateString(static_cast(*cmd).GetDeviceId()); + auto _cmd = static_cast(*cmd); + auto deviceId = fbb.CreateString(_cmd.GetDeviceId()); cmdBuilder = tools::make_unique(fbb); cmdBuilder->add_device_id(deviceId); - cmdBuilder->add_result(GetFBResult(static_cast(*cmd).GetResult())); + cmdBuilder->add_result(GetFBResult(_cmd.GetResult())); } break; case Type::state_change: { - auto deviceId = fbb.CreateString(static_cast(*cmd).GetDeviceId()); + auto _cmd = static_cast(*cmd); + auto deviceId = fbb.CreateString(_cmd.GetDeviceId()); cmdBuilder = tools::make_unique(fbb); cmdBuilder->add_device_id(deviceId); - cmdBuilder->add_task_id(static_cast(*cmd).GetTaskId()); - cmdBuilder->add_last_state(GetFBState(static_cast(*cmd).GetLastState())); - cmdBuilder->add_current_state(GetFBState(static_cast(*cmd).GetCurrentState())); + cmdBuilder->add_task_id(_cmd.GetTaskId()); + cmdBuilder->add_last_state(GetFBState(_cmd.GetLastState())); + cmdBuilder->add_current_state(GetFBState(_cmd.GetCurrentState())); + } + break; + case Type::properties_set: { + auto _cmd = static_cast(*cmd); + auto deviceId = fbb.CreateString(_cmd.GetDeviceId()); + cmdBuilder = tools::make_unique(fbb); + cmdBuilder->add_device_id(deviceId); + cmdBuilder->add_request_id(_cmd.GetRequestId()); + cmdBuilder->add_result(GetFBResult(_cmd.GetResult())); } break; default: @@ -391,6 +428,14 @@ void Cmds::Deserialize(const string& str, const Format type) case FBCmd_state_change_exiting_received: fCmds.emplace_back(make()); break; + case FBCmd_set_properties: { + std::vector> properties; + auto props = cmdPtr.properties(); + for (unsigned int j = 0; j < props->size(); ++j) { + properties.emplace_back(props->Get(j)->key()->str(), props->Get(j)->value()->str()); + } + fCmds.emplace_back(make(cmdPtr.request_id(), properties)); + } break; case FBCmd_current_state: fCmds.emplace_back(make(cmdPtr.device_id()->str(), GetMQState(cmdPtr.current_state()))); break; @@ -418,6 +463,9 @@ void Cmds::Deserialize(const string& str, const Format type) case FBCmd_state_change: fCmds.emplace_back(make(cmdPtr.device_id()->str(), cmdPtr.task_id(), GetMQState(cmdPtr.last_state()), GetMQState(cmdPtr.current_state()))); break; + case FBCmd_properties_set: + fCmds.emplace_back(make(cmdPtr.device_id()->str(), cmdPtr.request_id(), GetResult(cmdPtr.result()))); + break; default: throw CommandFormatError("unrecognized command type given to fair::mq::cmd::Cmds::Deserialize()"); break; diff --git a/fairmq/sdk/commands/Commands.h b/fairmq/sdk/commands/Commands.h index 8c98bb77..8f8f3076 100644 --- a/fairmq/sdk/commands/Commands.h +++ b/fairmq/sdk/commands/Commands.h @@ -47,6 +47,7 @@ enum class Type : int subscribe_to_state_change, // args: { } unsubscribe_from_state_change, // args: { } state_change_exiting_received, // args: { } + set_properties, // args: { request_id, properties } current_state, // args: { device_id, current_state } transition_status, // args: { device_id, Result, transition } @@ -56,12 +57,14 @@ enum class Type : int heartbeat, // args: { device_id } state_change_subscription, // args: { device_id, Result } state_change_unsubscription, // args: { device_id, Result } - state_change // args: { device_id, task_id, last_state, current_state } + state_change, // args: { device_id, task_id, last_state, current_state } + properties_set // args: { device_id, request_id, Result } }; struct Cmd { explicit Cmd(const Type type) : fType(type) {} + virtual ~Cmd() = default; Type GetType() const { return fType; } @@ -118,6 +121,24 @@ struct StateChangeExitingReceived : Cmd explicit StateChangeExitingReceived() : Cmd(Type::state_change_exiting_received) {} }; +struct SetProperties : Cmd +{ + SetProperties(std::size_t request_id, std::vector> properties) + : Cmd(Type::set_properties) + , fRequestId(request_id) + , fProperties(std::move(properties)) + {} + + auto GetRequestId() const -> std::size_t { return fRequestId; } + auto SetRequestId(std::size_t requestId) -> void { fRequestId = requestId; } + auto GetProps() const -> std::vector> { return fProperties; } + auto SetProps(std::vector> properties) -> void { fProperties = std::move(properties); } + + private: + std::size_t fRequestId; + std::vector> fProperties; +}; + struct CurrentState : Cmd { explicit CurrentState(const std::string& id, State currentState) @@ -288,6 +309,27 @@ struct StateChange : Cmd fair::mq::State fCurrentState; }; +struct PropertiesSet : Cmd { + PropertiesSet(std::string deviceId, std::size_t requestId, Result result) + : Cmd(Type::properties_set) + , fDeviceId(std::move(deviceId)) + , fRequestId(requestId) + , fResult(result) + {} + + auto GetDeviceId() const -> std::string { return fDeviceId; } + auto SetDeviceId(std::string deviceId) -> void { fDeviceId = std::move(deviceId); } + auto GetRequestId() const -> std::size_t { return fRequestId; } + auto SetRequestId(std::size_t requestId) -> void { fRequestId = requestId; } + auto GetResult() const -> Result { return fResult; } + auto SetResult(Result result) -> void { fResult = result; } + + private: + std::string fDeviceId; + std::size_t fRequestId; + Result fResult; +}; + template std::unique_ptr make(Args&&... args) { @@ -307,7 +349,6 @@ struct Cmds Unpack(std::forward&&>(first), std::forward(rest)...); } - void Add(std::unique_ptr&& cmd) { fCmds.emplace_back(std::move(cmd)); } template diff --git a/fairmq/sdk/commands/CommandsFormat.fbs b/fairmq/sdk/commands/CommandsFormat.fbs index 5aadcfa5..f92c3804 100644 --- a/fairmq/sdk/commands/CommandsFormat.fbs +++ b/fairmq/sdk/commands/CommandsFormat.fbs @@ -38,6 +38,11 @@ enum FBTransition:byte { ErrorFound } +table FBProperty { + key:string; + value:string; +} + enum FBCmd:byte { check_state, // args: { } change_state, // args: { transition } @@ -47,6 +52,7 @@ enum FBCmd:byte { subscribe_to_state_change, // args: { } unsubscribe_from_state_change, // args: { } state_change_exiting_received, // args: { } + set_properties, // args: { key, value } current_state, // args: { device_id, current_state } transition_status, // args: { device_id, Result, transition } @@ -56,13 +62,15 @@ enum FBCmd:byte { heartbeat, // args: { device_id } state_change_subscription, // args: { device_id, Result } state_change_unsubscription, // args: { device_id, Result } - state_change // args: { device_id, task_id, last_state, current_state } + state_change, // args: { device_id, task_id, last_state, current_state } + properties_set // args: { device_id, request_id, Result } } table FBCommand { command_id:FBCmd; device_id:string; task_id:uint64; + request_id:uint64; state:FBState; transition:FBTransition; result:FBResult; @@ -70,6 +78,7 @@ table FBCommand { last_state:FBState; current_state:FBState; debug:string; + properties:[FBProperty]; } table FBCommands { diff --git a/test/commands/_commands.cxx b/test/commands/_commands.cxx index 31f5f5ac..5370a630 100644 --- a/test/commands/_commands.cxx +++ b/test/commands/_commands.cxx @@ -19,6 +19,8 @@ using namespace fair::mq::sdk::cmd; TEST(Format, Construction) { + auto const props(std::vector>({{"k1", "v1"}, {"k2", "v2"}})); + Cmds checkStateCmds(make()); Cmds changeStateCmds(make(Transition::Stop)); Cmds dumpConfigCmds(make()); @@ -27,6 +29,7 @@ TEST(Format, Construction) Cmds subscribeToStateChangeCmds(make()); Cmds unsubscribeFromStateChangeCmds(make()); Cmds stateChangeExitingReceivedCmds(make()); + Cmds setPropertiesCmds(make(42, props)); Cmds currentStateCmds(make("somedeviceid", State::Running)); Cmds transitionStatusCmds(make("somedeviceid", Result::Ok, Transition::Stop)); Cmds configCmds(make("somedeviceid", "someconfig")); @@ -36,6 +39,7 @@ TEST(Format, Construction) Cmds stateChangeSubscriptionCmds(make("somedeviceid", Result::Ok)); Cmds stateChangeUnsubscriptionCmds(make("somedeviceid", Result::Ok)); Cmds stateChangeCmds(make("somedeviceid", 123456, State::Running, State::Ready)); + Cmds propertiesSetCmds(make("somedeviceid", 42, Result::Ok)); ASSERT_EQ(checkStateCmds.At(0).GetType(), Type::check_state); ASSERT_EQ(changeStateCmds.At(0).GetType(), Type::change_state); @@ -46,6 +50,9 @@ TEST(Format, Construction) ASSERT_EQ(subscribeToStateChangeCmds.At(0).GetType(), Type::subscribe_to_state_change); ASSERT_EQ(unsubscribeFromStateChangeCmds.At(0).GetType(), Type::unsubscribe_from_state_change); ASSERT_EQ(stateChangeExitingReceivedCmds.At(0).GetType(), Type::state_change_exiting_received); + ASSERT_EQ(setPropertiesCmds.At(0).GetType(), Type::set_properties); + ASSERT_EQ(static_cast(setPropertiesCmds.At(0)).GetRequestId(), 42); + ASSERT_EQ(static_cast(setPropertiesCmds.At(0)).GetProps(), props); ASSERT_EQ(currentStateCmds.At(0).GetType(), Type::current_state); ASSERT_EQ(static_cast(currentStateCmds.At(0)).GetDeviceId(), "somedeviceid"); ASSERT_EQ(static_cast(currentStateCmds.At(0)).GetCurrentState(), State::Running); @@ -75,10 +82,16 @@ TEST(Format, Construction) ASSERT_EQ(static_cast(stateChangeCmds.At(0)).GetTaskId(), 123456); ASSERT_EQ(static_cast(stateChangeCmds.At(0)).GetLastState(), State::Running); ASSERT_EQ(static_cast(stateChangeCmds.At(0)).GetCurrentState(), State::Ready); + ASSERT_EQ(propertiesSetCmds.At(0).GetType(), Type::properties_set); + ASSERT_EQ(static_cast(propertiesSetCmds.At(0)).GetDeviceId(), "somedeviceid"); + ASSERT_EQ(static_cast(propertiesSetCmds.At(0)).GetRequestId(), 42); + ASSERT_EQ(static_cast(propertiesSetCmds.At(0)).GetResult(), Result::Ok); } void fillCommands(Cmds& cmds) { + auto const props(std::vector>({{"k1", "v1"}, {"k2", "v2"}})); + cmds.Add(); cmds.Add(Transition::Stop); cmds.Add(); @@ -87,6 +100,7 @@ void fillCommands(Cmds& cmds) cmds.Add(); cmds.Add(); cmds.Add(); + cmds.Add(42, props); cmds.Add("somedeviceid", State::Running); cmds.Add("somedeviceid", Result::Ok, Transition::Stop); cmds.Add("somedeviceid", "someconfig"); @@ -96,13 +110,15 @@ void fillCommands(Cmds& cmds) cmds.Add("somedeviceid", Result::Ok); cmds.Add("somedeviceid", Result::Ok); cmds.Add("somedeviceid", 123456, State::Running, State::Ready); + cmds.Add("somedeviceid", 42, Result::Ok); } void checkCommands(Cmds& cmds) { - ASSERT_EQ(cmds.Size(), 17); + ASSERT_EQ(cmds.Size(), 19); int count = 0; + auto const props(std::vector>({{"k1", "v1"}, {"k2", "v2"}})); for (const auto& cmd : cmds) { switch (cmd->GetType()) { @@ -131,6 +147,11 @@ void checkCommands(Cmds& cmds) case Type::state_change_exiting_received: ++count; break; + case Type::set_properties: + ++count; + ASSERT_EQ(static_cast(*cmd).GetRequestId(), 42); + ASSERT_EQ(static_cast(*cmd).GetProps(), props); + break; case Type::current_state: ++count; ASSERT_EQ(static_cast(*cmd).GetDeviceId(), "somedeviceid"); @@ -178,13 +199,19 @@ void checkCommands(Cmds& cmds) ASSERT_EQ(static_cast(*cmd).GetLastState(), State::Running); ASSERT_EQ(static_cast(*cmd).GetCurrentState(), State::Ready); break; + case Type::properties_set: + ++count; + ASSERT_EQ(static_cast(*cmd).GetDeviceId(), "somedeviceid"); + ASSERT_EQ(static_cast(*cmd).GetRequestId(), 42); + ASSERT_EQ(static_cast(*cmd).GetResult(), Result::Ok); + break; default: ASSERT_TRUE(false); break; } } - ASSERT_EQ(count, 17); + ASSERT_EQ(count, 19); } TEST(Format, SerializationBinary) diff --git a/test/sdk/_topology.cxx b/test/sdk/_topology.cxx index 420790f6..745d8db2 100644 --- a/test/sdk/_topology.cxx +++ b/test/sdk/_topology.cxx @@ -243,4 +243,104 @@ TEST_F(Topology, ChangeStateFullDeviceLifecycle2) } } +TEST_F(Topology, SetProperties) +{ + using namespace fair::mq; + using fair::mq::sdk::TopologyTransition; + + sdk::Topology topo(mDDSTopo, mDDSSession); + ASSERT_EQ(topo.ChangeState(TopologyTransition::InitDevice).first, std::error_code()); + + auto const result1 = topo.SetProperties({{"key1", "val1"}}); + LOG(info) << result1.first; + ASSERT_EQ(result1.first, std::error_code()); + ASSERT_EQ(result1.second.size(), 0); + auto const result2 = topo.SetProperties({{"key2", "val2"}, {"key3", "val3"}}); + LOG(info) << result2.first; + ASSERT_EQ(result2.first, std::error_code()); + ASSERT_EQ(result2.second.size(), 0); + + ASSERT_EQ(topo.ChangeState(TopologyTransition::CompleteInit).first, std::error_code()); + ASSERT_EQ(topo.ChangeState(TopologyTransition::ResetDevice).first, std::error_code()); +} + +TEST_F(Topology, AsyncSetPropertiesConcurrent) +{ + using namespace fair::mq; + using fair::mq::sdk::TopologyTransition; + + sdk::Topology topo(mDDSTopo, mDDSSession); + ASSERT_EQ(topo.ChangeState(TopologyTransition::InitDevice).first, std::error_code()); + + tools::SharedSemaphore blocker(2); + topo.AsyncSetProperties( + {{"key1", "val1"}}, + [=](std::error_code ec, sdk::Topology::FailedDevices failed) mutable { + LOG(info) << ec; + ASSERT_EQ(ec, std::error_code()); + ASSERT_EQ(failed.size(), 0); + blocker.Signal(); + }); + topo.AsyncSetProperties( + {{"key2", "val2"}, {"key3", "val3"}}, + [=](std::error_code ec, sdk::Topology::FailedDevices failed) mutable { + LOG(info) << ec; + ASSERT_EQ(ec, std::error_code()); + ASSERT_EQ(failed.size(), 0); + blocker.Signal(); + }); + blocker.Wait(); + + ASSERT_EQ(topo.ChangeState(TopologyTransition::CompleteInit).first, std::error_code()); + ASSERT_EQ(topo.ChangeState(TopologyTransition::ResetDevice).first, std::error_code()); +} + +TEST_F(Topology, AsyncSetPropertiesTimeout) +{ + using namespace fair::mq; + using fair::mq::sdk::TopologyTransition; + + sdk::Topology topo(mDDSTopo, mDDSSession); + ASSERT_EQ(topo.ChangeState(TopologyTransition::InitDevice).first, std::error_code()); + + topo.AsyncSetProperties({{"key1", "val1"}}, + std::chrono::milliseconds(1), + [=](std::error_code ec, sdk::Topology::FailedDevices) mutable { + LOG(info) << ec; + EXPECT_EQ(ec, MakeErrorCode(ErrorCode::OperationTimeout)); + }); + + ASSERT_EQ(topo.ChangeState(TopologyTransition::CompleteInit).first, std::error_code()); + ASSERT_EQ(topo.ChangeState(TopologyTransition::ResetDevice).first, std::error_code()); +} + +TEST_F(Topology, SetPropertiesMixed) +{ + using namespace fair::mq; + using fair::mq::sdk::TopologyTransition; + + sdk::Topology topo(mDDSTopo, mDDSSession); + ASSERT_EQ(topo.ChangeState(TopologyTransition::InitDevice).first, std::error_code()); + + tools::SharedSemaphore blocker; + topo.AsyncSetProperties( + {{"key1", "val1"}}, + [=](std::error_code ec, sdk::Topology::FailedDevices failed) mutable { + LOG(info) << ec; + ASSERT_EQ(ec, std::error_code()); + ASSERT_EQ(failed.size(), 0); + blocker.Signal(); + }); + + auto result = topo.SetProperties({{"key2", "val2"}}); + LOG(info) << result.first; + ASSERT_EQ(result.first, std::error_code()); + ASSERT_EQ(result.second.size(), 0); + + blocker.Wait(); + + ASSERT_EQ(topo.ChangeState(TopologyTransition::CompleteInit).first, std::error_code()); + ASSERT_EQ(topo.ChangeState(TopologyTransition::ResetDevice).first, std::error_code()); +} + } // namespace