From 264a178424420f4138c1628a6f3c407d08fe2e20 Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Fri, 24 Jan 2020 07:55:00 +0100 Subject: [PATCH] SDK: Add Topology::AsyncGetProperties Co-Author: Dennis Klein --- fairmq/plugins/DDS/DDS.cxx | 21 +++- fairmq/sdk/Error.cxx | 2 + fairmq/sdk/Error.h | 1 + fairmq/sdk/Topology.h | 160 ++++++++++++++++++++++++- fairmq/sdk/commands/Commands.cxx | 50 +++++++- fairmq/sdk/commands/Commands.h | 46 +++++++ fairmq/sdk/commands/CommandsFormat.fbs | 5 +- test/commands/_commands.cxx | 28 ++++- test/sdk/_topology.cxx | 32 ++++- 9 files changed, 331 insertions(+), 14 deletions(-) diff --git a/fairmq/plugins/DDS/DDS.cxx b/fairmq/plugins/DDS/DDS.cxx index fd33bcd8..5c177d74 100644 --- a/fairmq/plugins/DDS/DDS.cxx +++ b/fairmq/plugins/DDS/DDS.cxx @@ -418,6 +418,22 @@ auto DDS::SubscribeForCustomCommands() -> void cmd::make(id, cmd::Result::Ok)); fDDS.Send(outCmds.Serialize(), to_string(senderId)); } break; + case cmd::Type::get_properties: { + auto _cmd = static_cast(*cmd); + auto const request_id(_cmd.GetRequestId()); + auto result(cmd::Result::Ok); + std::vector> props; + try { + for (auto const& prop : GetPropertiesAsString(_cmd.GetQuery())) { + props.push_back({prop.first, prop.second}); + } + } catch (std::exception const& e) { + LOG(warn) << "Getting properties (request id: " << request_id << ") failed: " << e.what(); + result = cmd::Result::Failure; + } + cmd::Cmds const outCmds(cmd::make(id, request_id, result, props)); + fDDS.Send(outCmds.Serialize(), to_string(senderId)); + } break; case cmd::Type::set_properties: { auto _cmd(static_cast(*cmd)); auto const request_id(_cmd.GetRequestId()); @@ -427,9 +443,10 @@ auto DDS::SubscribeForCustomCommands() -> void for (auto const& prop : _cmd.GetProps()) { props.insert({prop.first, fair::mq::Property(prop.second)}); } + // TODO Handle builtin keys with different value type than string SetProperties(props); - } catch (...) { - LOG(warn) << "Setting properties (request id: " << request_id << ") failed"; + } catch (std::exception const& e) { + LOG(warn) << "Setting properties (request id: " << request_id << ") failed: " << e.what(); result = cmd::Result::Failure; } cmd::Cmds const outCmds(cmd::make(id, request_id, result)); diff --git a/fairmq/sdk/Error.cxx b/fairmq/sdk/Error.cxx index 9f0c5293..fd61959c 100644 --- a/fairmq/sdk/Error.cxx +++ b/fairmq/sdk/Error.cxx @@ -27,6 +27,8 @@ std::string ErrorCategory::message(int ev) const return "async operation canceled"; case ErrorCode::DeviceChangeStateFailed: return "failed to change state of a fairmq device"; + case ErrorCode::DeviceGetPropertiesFailed: + return "failed to get fairmq device properties"; case ErrorCode::DeviceSetPropertiesFailed: return "failed to set fairmq device properties"; default: diff --git a/fairmq/sdk/Error.h b/fairmq/sdk/Error.h index c50c91e9..b5a603ae 100644 --- a/fairmq/sdk/Error.h +++ b/fairmq/sdk/Error.h @@ -38,6 +38,7 @@ enum class ErrorCode OperationTimeout, OperationCanceled, DeviceChangeStateFailed, + DeviceGetPropertiesFailed, DeviceSetPropertiesFailed }; diff --git a/fairmq/sdk/Topology.h b/fairmq/sdk/Topology.h index d48f1909..dc952cd0 100644 --- a/fairmq/sdk/Topology.h +++ b/fairmq/sdk/Topology.h @@ -75,6 +75,18 @@ struct DeviceStatus using DeviceProperty = std::pair; /// pair := (key, value) using DeviceProperties = std::vector; +using DevicePropertyQuery = std::string; /// Boost regex supported +using FailedDevices = std::set; + +struct GetPropertiesResult +{ + struct Device + { + DeviceProperties props; + }; + std::unordered_map devices; + FailedDevices failed; +}; using TopologyState = std::vector; using TopologyStateIndex = std::unordered_map; // task id -> index in the data vector @@ -210,6 +222,10 @@ class BasicTopology : public AsioBase } } break; + case Type::properties: { + HandleCmd(static_cast(*cmd)); + } + break; case Type::properties_set: { HandleCmd(static_cast(*cmd)); } @@ -415,7 +431,146 @@ class BasicTopology : public AsioBase auto StateEqualsTo(DeviceState state) const -> bool { return sdk::StateEqualsTo(GetCurrentState(), state); } - using FailedDevices = std::set; + using GetPropertiesCompletionSignature = void(std::error_code, GetPropertiesResult); + + private: + struct GetPropertiesOp + { + using Id = std::size_t; + using GetCount = unsigned int; + + template + GetPropertiesOp(Id id, + GetCount expectedCount, + Duration timeout, + std::mutex& mutex, + Executor const & ex, + Allocator const & alloc, + Handler&& handler) + : fId(id) + , fOp(ex, alloc, std::move(handler)) + , fTimer(ex) + , fCount(0) + , fExpectedCount(expectedCount) + , fMtx(mutex) + { + if (timeout > std::chrono::milliseconds(0)) { + fTimer.expires_after(timeout); + fTimer.async_wait([&](std::error_code ec) { + if (!ec) { + std::lock_guard lk(fMtx); + fOp.Timeout(fResult); + } + }); + } + // LOG(debug) << "GetProperties " << fId << " with expected count of " << fExpectedCount << " started."; + } + GetPropertiesOp() = delete; + GetPropertiesOp(const GetPropertiesOp&) = delete; + GetPropertiesOp& operator=(const GetPropertiesOp&) = delete; + GetPropertiesOp(GetPropertiesOp&&) = default; + GetPropertiesOp& operator=(GetPropertiesOp&&) = default; + ~GetPropertiesOp() = default; + + auto Update(const std::string& deviceId, cmd::Result result, DeviceProperties props) -> void + { + std::lock_guard lk(fMtx); + if (cmd::Result::Ok != result) { + fResult.failed.insert(deviceId); + } else { + fResult.devices.insert({deviceId, {std::move(props)}}); + } + ++fCount; + TryCompletion(); + } + + private: + Id const fId; + AsioAsyncOp fOp; + asio::steady_timer fTimer; + GetCount fCount; + GetCount const fExpectedCount; + GetPropertiesResult fResult; + std::mutex& fMtx; + + /// precondition: fMtx is locked. + auto TryCompletion() -> void + { + if (!fOp.IsCompleted() && fCount == fExpectedCount) { + fTimer.cancel(); + if (fResult.failed.size() > 0) { + fOp.Complete(MakeErrorCode(ErrorCode::DeviceGetPropertiesFailed), std::move(fResult)); + } else { + fOp.Complete(std::move(fResult)); + } + } + } + }; + + auto HandleCmd(cmd::Properties const& cmd) -> void + { + std::unique_lock lk(fMtx); + try { + auto& op(fGetPropertiesOps.at(cmd.GetRequestId())); + lk.unlock(); + op.Update(cmd.GetDeviceId(), cmd.GetResult(), cmd.GetProps()); + } catch (std::out_of_range& e) { + LOG(debug) << "GetProperties operation (request id: " << cmd.GetRequestId() + << ") not found (probably completed or timed out), " + << "discarding reply of device " << cmd.GetDeviceId(); + } + } + + public: + template + auto AsyncGetProperties(DevicePropertyQuery const& query, + Duration timeout, + CompletionToken&& token) + { + return asio::async_initiate( + [&](auto handler) { + typename GetPropertiesOp::Id const id(tools::UuidHash()); + + std::lock_guard lk(fMtx); + fGetPropertiesOps.emplace( + std::piecewise_construct, + std::forward_as_tuple(id), + std::forward_as_tuple(id, + fStateData.size(), + timeout, + fMtx, + AsioBase::GetExecutor(), + AsioBase::GetAllocator(), + std::move(handler))); + + cmd::Cmds const cmds(cmd::make(id, query)); + fDDSSession.SendCommand(cmds.Serialize()); + }, + token); + } + + template + auto AsyncGetProperties(DevicePropertyQuery const& query, CompletionToken&& token) + { + return AsyncGetProperties(query, Duration(0), std::move(token)); + } + + auto GetProperties(DevicePropertyQuery const& query, Duration timeout = Duration(0)) + -> std::pair + { + tools::SharedSemaphore blocker; + std::error_code ec; + GetPropertiesResult result; + AsyncGetProperties( + query, timeout, [&, blocker](std::error_code _ec, GetPropertiesResult _result) mutable { + ec = _ec; + result = _result; + blocker.Signal(); + }); + blocker.Wait(); + return {ec, result}; + } + using SetPropertiesCompletionSignature = void(std::error_code, FailedDevices); private: @@ -437,7 +592,7 @@ class BasicTopology : public AsioBase , fTimer(ex) , fCount(0) , fExpectedCount(expectedCount) - , fFailedDevices(alloc) + , fFailedDevices() , fMtx(mutex) { if (timeout > std::chrono::milliseconds(0)) { @@ -571,6 +726,7 @@ class BasicTopology : public AsioBase TransitionedCount fTransitionedCount; std::unordered_map fSetPropertiesOps; + std::unordered_map fGetPropertiesOps; auto makeTopologyState() -> void { diff --git a/fairmq/sdk/commands/Commands.cxx b/fairmq/sdk/commands/Commands.cxx index 2cd5c645..ede10ad4 100644 --- a/fairmq/sdk/commands/Commands.cxx +++ b/fairmq/sdk/commands/Commands.cxx @@ -46,7 +46,7 @@ array resultNames = } }; -array typeNames = +array typeNames = { { "CheckState", @@ -57,6 +57,7 @@ array typeNames = "SubscribeToStateChange", "UnsubscribeFromStateChange", "StateChangeExitingReceived", + "GetProperties", "SetProperties", "CurrentState", @@ -68,6 +69,7 @@ array typeNames = "StateChangeSubscription", "StateChangeUnsubscription", "StateChange", + "Properties", "PropertiesSet" } }; @@ -150,7 +152,7 @@ array mqTransitionToFBTransition = } }; -array typeToFBCmd = +array typeToFBCmd = { { FBCmd::FBCmd_check_state, @@ -161,6 +163,7 @@ array typeToFBCmd = FBCmd::FBCmd_subscribe_to_state_change, FBCmd::FBCmd_unsubscribe_from_state_change, FBCmd::FBCmd_state_change_exiting_received, + FBCmd::FBCmd_get_properties, FBCmd::FBCmd_set_properties, FBCmd::FBCmd_current_state, FBCmd::FBCmd_transition_status, @@ -171,11 +174,12 @@ array typeToFBCmd = FBCmd::FBCmd_state_change_subscription, FBCmd::FBCmd_state_change_unsubscription, FBCmd::FBCmd_state_change, + FBCmd::FBCmd_properties, FBCmd::FBCmd_properties_set } }; -array fbCmdToType = +array fbCmdToType = { { Type::check_state, @@ -186,6 +190,7 @@ array fbCmdToType = Type::subscribe_to_state_change, Type::unsubscribe_from_state_change, Type::state_change_exiting_received, + Type::get_properties, Type::set_properties, Type::current_state, Type::transition_status, @@ -196,6 +201,7 @@ array fbCmdToType = Type::state_change_subscription, Type::state_change_unsubscription, Type::state_change, + Type::properties, Type::properties_set } }; @@ -255,6 +261,14 @@ string Cmds::Serialize(const Format type) const cmdBuilder = tools::make_unique(fbb); } break; + case Type::get_properties: { + auto _cmd = static_cast(*cmd); + auto query = fbb.CreateString(_cmd.GetQuery()); + cmdBuilder = tools::make_unique(fbb); + cmdBuilder->add_request_id(_cmd.GetRequestId()); + cmdBuilder->add_property_query(query); + } + break; case Type::set_properties: { auto _cmd = static_cast(*cmd); std::vector> propsVector; @@ -343,6 +357,25 @@ string Cmds::Serialize(const Format type) const cmdBuilder->add_current_state(GetFBState(_cmd.GetCurrentState())); } break; + case Type::properties: { + auto _cmd = static_cast(*cmd); + auto deviceId = fbb.CreateString(_cmd.GetDeviceId()); + + std::vector> propsVector; + for (const auto& e : _cmd.GetProps()) { + auto key = fbb.CreateString(e.first); + auto val = fbb.CreateString(e.second); + auto prop = CreateFBProperty(fbb, key, val); + propsVector.push_back(prop); + } + auto props = fbb.CreateVector(propsVector); + cmdBuilder = tools::make_unique(fbb); + cmdBuilder->add_device_id(deviceId); + cmdBuilder->add_request_id(_cmd.GetRequestId()); + cmdBuilder->add_result(GetFBResult(_cmd.GetResult())); + cmdBuilder->add_properties(props); + } + break; case Type::properties_set: { auto _cmd = static_cast(*cmd); auto deviceId = fbb.CreateString(_cmd.GetDeviceId()); @@ -428,6 +461,9 @@ void Cmds::Deserialize(const string& str, const Format type) case FBCmd_state_change_exiting_received: fCmds.emplace_back(make()); break; + case FBCmd_get_properties: + fCmds.emplace_back(make(cmdPtr.request_id(), cmdPtr.property_query()->str())); + break; case FBCmd_set_properties: { std::vector> properties; auto props = cmdPtr.properties(); @@ -463,6 +499,14 @@ void Cmds::Deserialize(const string& str, const Format type) case FBCmd_state_change: fCmds.emplace_back(make(cmdPtr.device_id()->str(), cmdPtr.task_id(), GetMQState(cmdPtr.last_state()), GetMQState(cmdPtr.current_state()))); break; + case FBCmd_properties: { + std::vector> properties; + auto props = cmdPtr.properties(); + for (unsigned int j = 0; j < props->size(); ++j) { + properties.emplace_back(props->Get(j)->key()->str(), props->Get(j)->value()->str()); + } + fCmds.emplace_back(make(cmdPtr.device_id()->str(), cmdPtr.request_id(), GetResult(cmdPtr.result()), properties)); + } break; case FBCmd_properties_set: fCmds.emplace_back(make(cmdPtr.device_id()->str(), cmdPtr.request_id(), GetResult(cmdPtr.result()))); break; diff --git a/fairmq/sdk/commands/Commands.h b/fairmq/sdk/commands/Commands.h index 8f8f3076..491072cb 100644 --- a/fairmq/sdk/commands/Commands.h +++ b/fairmq/sdk/commands/Commands.h @@ -47,6 +47,7 @@ enum class Type : int subscribe_to_state_change, // args: { } unsubscribe_from_state_change, // args: { } state_change_exiting_received, // args: { } + get_properties, // args: { request_id, property_query } set_properties, // args: { request_id, properties } current_state, // args: { device_id, current_state } @@ -58,6 +59,7 @@ enum class Type : int state_change_subscription, // args: { device_id, Result } state_change_unsubscription, // args: { device_id, Result } state_change, // args: { device_id, task_id, last_state, current_state } + properties, // args: { device_id, request_id, Result, properties } properties_set // args: { device_id, request_id, Result } }; @@ -121,6 +123,24 @@ struct StateChangeExitingReceived : Cmd explicit StateChangeExitingReceived() : Cmd(Type::state_change_exiting_received) {} }; +struct GetProperties : Cmd +{ + GetProperties(std::size_t request_id, std::string query) + : Cmd(Type::get_properties) + , fRequestId(request_id) + , fQuery(std::move(query)) + {} + + auto GetRequestId() const -> std::size_t { return fRequestId; } + auto SetRequestId(std::size_t requestId) -> void { fRequestId = requestId; } + auto GetQuery() const -> std::string { return fQuery; } + auto SetQuery(std::string query) -> void { fQuery = std::move(query); } + + private: + std::size_t fRequestId; + std::string fQuery; +}; + struct SetProperties : Cmd { SetProperties(std::size_t request_id, std::vector> properties) @@ -309,6 +329,32 @@ struct StateChange : Cmd fair::mq::State fCurrentState; }; +struct Properties : Cmd +{ + Properties(std::string deviceId, std::size_t requestId, const Result result, std::vector> properties) + : Cmd(Type::properties) + , fDeviceId(std::move(deviceId)) + , fRequestId(requestId) + , fResult(result) + , fProperties(std::move(properties)) + {} + + auto GetDeviceId() const -> std::string { return fDeviceId; } + auto SetDeviceId(std::string deviceId) -> void { fDeviceId = std::move(deviceId); } + auto GetRequestId() const -> std::size_t { return fRequestId; } + auto SetRequestId(std::size_t requestId) -> void { fRequestId = requestId; } + auto GetResult() const -> Result { return fResult; } + auto SetResult(Result result) -> void { fResult = result; } + auto GetProps() const -> std::vector> { return fProperties; } + auto SetProps(std::vector> properties) -> void { fProperties = std::move(properties); } + + private: + std::string fDeviceId; + std::size_t fRequestId; + Result fResult; + std::vector> fProperties; +}; + struct PropertiesSet : Cmd { PropertiesSet(std::string deviceId, std::size_t requestId, Result result) : Cmd(Type::properties_set) diff --git a/fairmq/sdk/commands/CommandsFormat.fbs b/fairmq/sdk/commands/CommandsFormat.fbs index f92c3804..8633d03d 100644 --- a/fairmq/sdk/commands/CommandsFormat.fbs +++ b/fairmq/sdk/commands/CommandsFormat.fbs @@ -52,7 +52,8 @@ enum FBCmd:byte { subscribe_to_state_change, // args: { } unsubscribe_from_state_change, // args: { } state_change_exiting_received, // args: { } - set_properties, // args: { key, value } + get_properties, // args: { request_id, property_query } + set_properties, // args: { request_id, properties } current_state, // args: { device_id, current_state } transition_status, // args: { device_id, Result, transition } @@ -63,6 +64,7 @@ enum FBCmd:byte { state_change_subscription, // args: { device_id, Result } state_change_unsubscription, // args: { device_id, Result } state_change, // args: { device_id, task_id, last_state, current_state } + properties, // args: { device_id, request_id, Result, properties } properties_set // args: { device_id, request_id, Result } } @@ -79,6 +81,7 @@ table FBCommand { current_state:FBState; debug:string; properties:[FBProperty]; + property_query:string; } table FBCommands { diff --git a/test/commands/_commands.cxx b/test/commands/_commands.cxx index 5370a630..6d83cdba 100644 --- a/test/commands/_commands.cxx +++ b/test/commands/_commands.cxx @@ -29,6 +29,7 @@ TEST(Format, Construction) Cmds subscribeToStateChangeCmds(make()); Cmds unsubscribeFromStateChangeCmds(make()); Cmds stateChangeExitingReceivedCmds(make()); + Cmds getPropertiesCmds(make(66, "k[12]")); Cmds setPropertiesCmds(make(42, props)); Cmds currentStateCmds(make("somedeviceid", State::Running)); Cmds transitionStatusCmds(make("somedeviceid", Result::Ok, Transition::Stop)); @@ -39,6 +40,7 @@ TEST(Format, Construction) Cmds stateChangeSubscriptionCmds(make("somedeviceid", Result::Ok)); Cmds stateChangeUnsubscriptionCmds(make("somedeviceid", Result::Ok)); Cmds stateChangeCmds(make("somedeviceid", 123456, State::Running, State::Ready)); + Cmds propertiesCmds(make("somedeviceid", 66, Result::Ok, props)); Cmds propertiesSetCmds(make("somedeviceid", 42, Result::Ok)); ASSERT_EQ(checkStateCmds.At(0).GetType(), Type::check_state); @@ -50,6 +52,9 @@ TEST(Format, Construction) ASSERT_EQ(subscribeToStateChangeCmds.At(0).GetType(), Type::subscribe_to_state_change); ASSERT_EQ(unsubscribeFromStateChangeCmds.At(0).GetType(), Type::unsubscribe_from_state_change); ASSERT_EQ(stateChangeExitingReceivedCmds.At(0).GetType(), Type::state_change_exiting_received); + ASSERT_EQ(getPropertiesCmds.At(0).GetType(), Type::get_properties); + ASSERT_EQ(static_cast(getPropertiesCmds.At(0)).GetRequestId(), 66); + ASSERT_EQ(static_cast(getPropertiesCmds.At(0)).GetQuery(), "k[12]"); ASSERT_EQ(setPropertiesCmds.At(0).GetType(), Type::set_properties); ASSERT_EQ(static_cast(setPropertiesCmds.At(0)).GetRequestId(), 42); ASSERT_EQ(static_cast(setPropertiesCmds.At(0)).GetProps(), props); @@ -82,6 +87,11 @@ TEST(Format, Construction) ASSERT_EQ(static_cast(stateChangeCmds.At(0)).GetTaskId(), 123456); ASSERT_EQ(static_cast(stateChangeCmds.At(0)).GetLastState(), State::Running); ASSERT_EQ(static_cast(stateChangeCmds.At(0)).GetCurrentState(), State::Ready); + ASSERT_EQ(propertiesCmds.At(0).GetType(), Type::properties); + ASSERT_EQ(static_cast(propertiesCmds.At(0)).GetDeviceId(), "somedeviceid"); + ASSERT_EQ(static_cast(propertiesCmds.At(0)).GetRequestId(), 66); + ASSERT_EQ(static_cast(propertiesCmds.At(0)).GetResult(), Result::Ok); + ASSERT_EQ(static_cast(propertiesCmds.At(0)).GetProps(), props); ASSERT_EQ(propertiesSetCmds.At(0).GetType(), Type::properties_set); ASSERT_EQ(static_cast(propertiesSetCmds.At(0)).GetDeviceId(), "somedeviceid"); ASSERT_EQ(static_cast(propertiesSetCmds.At(0)).GetRequestId(), 42); @@ -100,6 +110,7 @@ void fillCommands(Cmds& cmds) cmds.Add(); cmds.Add(); cmds.Add(); + cmds.Add(66, "k[12]"); cmds.Add(42, props); cmds.Add("somedeviceid", State::Running); cmds.Add("somedeviceid", Result::Ok, Transition::Stop); @@ -110,12 +121,13 @@ void fillCommands(Cmds& cmds) cmds.Add("somedeviceid", Result::Ok); cmds.Add("somedeviceid", Result::Ok); cmds.Add("somedeviceid", 123456, State::Running, State::Ready); + cmds.Add("somedeviceid", 66, Result::Ok, props); cmds.Add("somedeviceid", 42, Result::Ok); } void checkCommands(Cmds& cmds) { - ASSERT_EQ(cmds.Size(), 19); + ASSERT_EQ(cmds.Size(), 21); int count = 0; auto const props(std::vector>({{"k1", "v1"}, {"k2", "v2"}})); @@ -147,6 +159,11 @@ void checkCommands(Cmds& cmds) case Type::state_change_exiting_received: ++count; break; + case Type::get_properties: + ++count; + ASSERT_EQ(static_cast(*cmd).GetRequestId(), 66); + ASSERT_EQ(static_cast(*cmd).GetQuery(), "k[12]"); + break; case Type::set_properties: ++count; ASSERT_EQ(static_cast(*cmd).GetRequestId(), 42); @@ -199,6 +216,13 @@ void checkCommands(Cmds& cmds) ASSERT_EQ(static_cast(*cmd).GetLastState(), State::Running); ASSERT_EQ(static_cast(*cmd).GetCurrentState(), State::Ready); break; + case Type::properties: + ++count; + ASSERT_EQ(static_cast(*cmd).GetDeviceId(), "somedeviceid"); + ASSERT_EQ(static_cast(*cmd).GetRequestId(), 66); + ASSERT_EQ(static_cast(*cmd).GetResult(), Result::Ok); + ASSERT_EQ(static_cast(*cmd).GetProps(), props); + break; case Type::properties_set: ++count; ASSERT_EQ(static_cast(*cmd).GetDeviceId(), "somedeviceid"); @@ -211,7 +235,7 @@ void checkCommands(Cmds& cmds) } } - ASSERT_EQ(count, 19); + ASSERT_EQ(count, 21); } TEST(Format, SerializationBinary) diff --git a/test/sdk/_topology.cxx b/test/sdk/_topology.cxx index 745d8db2..4f7c53c5 100644 --- a/test/sdk/_topology.cxx +++ b/test/sdk/_topology.cxx @@ -275,7 +275,7 @@ TEST_F(Topology, AsyncSetPropertiesConcurrent) tools::SharedSemaphore blocker(2); topo.AsyncSetProperties( {{"key1", "val1"}}, - [=](std::error_code ec, sdk::Topology::FailedDevices failed) mutable { + [=](std::error_code ec, sdk::FailedDevices failed) mutable { LOG(info) << ec; ASSERT_EQ(ec, std::error_code()); ASSERT_EQ(failed.size(), 0); @@ -283,7 +283,7 @@ TEST_F(Topology, AsyncSetPropertiesConcurrent) }); topo.AsyncSetProperties( {{"key2", "val2"}, {"key3", "val3"}}, - [=](std::error_code ec, sdk::Topology::FailedDevices failed) mutable { + [=](std::error_code ec, sdk::FailedDevices failed) mutable { LOG(info) << ec; ASSERT_EQ(ec, std::error_code()); ASSERT_EQ(failed.size(), 0); @@ -305,7 +305,7 @@ TEST_F(Topology, AsyncSetPropertiesTimeout) topo.AsyncSetProperties({{"key1", "val1"}}, std::chrono::milliseconds(1), - [=](std::error_code ec, sdk::Topology::FailedDevices) mutable { + [=](std::error_code ec, sdk::FailedDevices) mutable { LOG(info) << ec; EXPECT_EQ(ec, MakeErrorCode(ErrorCode::OperationTimeout)); }); @@ -325,7 +325,7 @@ TEST_F(Topology, SetPropertiesMixed) tools::SharedSemaphore blocker; topo.AsyncSetProperties( {{"key1", "val1"}}, - [=](std::error_code ec, sdk::Topology::FailedDevices failed) mutable { + [=](std::error_code ec, sdk::FailedDevices failed) mutable { LOG(info) << ec; ASSERT_EQ(ec, std::error_code()); ASSERT_EQ(failed.size(), 0); @@ -343,4 +343,28 @@ TEST_F(Topology, SetPropertiesMixed) ASSERT_EQ(topo.ChangeState(TopologyTransition::ResetDevice).first, std::error_code()); } +TEST_F(Topology, GetProperties) +{ + using namespace fair::mq; + using fair::mq::sdk::TopologyTransition; + + sdk::Topology topo(mDDSTopo, mDDSSession); + ASSERT_EQ(topo.ChangeState(TopologyTransition::InitDevice).first, std::error_code()); + + auto const result = topo.GetProperties("^(session|id)$"); + LOG(info) << result.first; + ASSERT_EQ(result.first, std::error_code()); + ASSERT_EQ(result.second.failed.size(), 0); + for (auto const& d : result.second.devices) { + LOG(info) << d.first; + ASSERT_EQ(d.second.props.size(), 2); + for (auto const& p : d.second.props) { + LOG(info) << p.first << " : " << p.second; + } + } + + ASSERT_EQ(topo.ChangeState(TopologyTransition::CompleteInit).first, std::error_code()); + ASSERT_EQ(topo.ChangeState(TopologyTransition::ResetDevice).first, std::error_code()); +} + } // namespace