/******************************************************************************** * Copyright (C) 2019 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * * * This software is distributed under the terms of the * * GNU Lesser General Public Licence (LGPL) version 3, * * copied verbatim in the file "LICENSE" * ********************************************************************************/ #ifndef FAIR_MQ_SDK_TOPOLOGY_H #define FAIR_MQ_SDK_TOPOLOGY_H #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace fair { namespace mq { namespace sdk { using DeviceId = std::string; using DeviceState = fair::mq::State; using DeviceTransition = fair::mq::Transition; const std::map expectedState = { { DeviceTransition::InitDevice, DeviceState::InitializingDevice }, { DeviceTransition::CompleteInit, DeviceState::Initialized }, { DeviceTransition::Bind, DeviceState::Bound }, { DeviceTransition::Connect, DeviceState::DeviceReady }, { DeviceTransition::InitTask, DeviceState::Ready }, { DeviceTransition::Run, DeviceState::Running }, { DeviceTransition::Stop, DeviceState::Ready }, { DeviceTransition::ResetTask, DeviceState::DeviceReady }, { DeviceTransition::ResetDevice, DeviceState::Idle }, { DeviceTransition::End, DeviceState::Exiting } }; struct DeviceStatus { bool initialized; DeviceState state; DDSTask::Id taskId; DDSCollection::Id collectionId; }; using DeviceProperty = std::pair; /// pair := (key, value) using DeviceProperties = std::vector; using DevicePropertyQuery = std::string; /// Boost regex supported using FailedDevices = std::set; struct GetPropertiesResult { struct Device { DeviceProperties props; }; std::unordered_map devices; FailedDevices failed; }; using TopologyState = std::vector; using TopologyStateIndex = std::unordered_map; // task id -> index in the data vector using TopologyStateByTask = std::unordered_map; using TopologyStateByCollection = std::unordered_map>; using TopologyTransition = fair::mq::Transition; inline DeviceState AggregateState(const TopologyState& topologyState) { DeviceState first = topologyState.begin()->state; if (std::all_of(topologyState.cbegin(), topologyState.cend(), [&](TopologyState::value_type i) { return i.state == first; })) { return first; } throw MixedStateError("State is not uniform"); } inline bool StateEqualsTo(const TopologyState& topologyState, DeviceState state) { return AggregateState(topologyState) == state; } inline TopologyStateByCollection GroupByCollectionId(const TopologyState& topologyState) { TopologyStateByCollection state; for (const auto& ds : topologyState) { if (ds.collectionId != 0) { state[ds.collectionId].push_back(ds); } } return state; } inline TopologyStateByTask GroupByTaskId(const TopologyState& topologyState) { TopologyStateByTask state; for (const auto& ds : topologyState) { state[ds.taskId] = ds; } return state; } /** * @class BasicTopology Topology.h * @tparam Executor Associated I/O executor * @tparam Allocator Associated default allocator * @brief Represents a FairMQ topology * * @par Thread Safety * @e Distinct @e objects: Safe.@n * @e Shared @e objects: Safe. */ template class BasicTopology : public AsioBase { public: /// @brief (Re)Construct a FairMQ topology from an existing DDS topology /// @param topo DDSTopology /// @param session DDSSession BasicTopology(DDSTopology topo, DDSSession session) : BasicTopology(asio::system_executor(), std::move(topo), std::move(session)) {} /// @brief (Re)Construct a FairMQ topology from an existing DDS topology /// @param ex I/O executor to be associated /// @param topo DDSTopology /// @param session DDSSession /// @throws RuntimeError BasicTopology(const Executor& ex, DDSTopology topo, DDSSession session, Allocator alloc = DefaultAllocator()) : AsioBase(ex, std::move(alloc)) , fDDSSession(std::move(session)) , fDDSTopo(std::move(topo)) , fStateData() , fStateIndex() , fChangeStateOp() , fChangeStateOpTimer(ex) , fChangeStateTarget(DeviceState::Idle) { makeTopologyState(); std::string activeTopo(fDDSSession.RequestCommanderInfo().activeTopologyName); std::string givenTopo(fDDSTopo.GetName()); if (activeTopo != givenTopo) { throw RuntimeError("Given topology ", givenTopo, " is not activated (active: ", activeTopo, ")"); } using namespace fair::mq::sdk::cmd; fDDSSession.SubscribeToCommands([&](const std::string& msg, const std::string& /* condition */, DDSChannel::Id senderId) { Cmds inCmds; inCmds.Deserialize(msg); // LOG(debug) << "Received " << inCmds.Size() << " command(s) with total size of " << msg.length() << " bytes: "; for (const auto& cmd : inCmds) { // LOG(debug) << " > " << cmd->GetType(); switch (cmd->GetType()) { case Type::state_change: { auto _cmd = static_cast(*cmd); DDSTask::Id taskId(_cmd.GetTaskId()); fDDSSession.UpdateChannelToTaskAssociation(senderId, taskId); if (_cmd.GetCurrentState() == DeviceState::Exiting) { Cmds outCmds; outCmds.Add(); fDDSSession.SendCommand(outCmds.Serialize(), senderId); } UpdateStateEntry(taskId, _cmd.GetCurrentState()); } break; case Type::state_change_subscription: if (static_cast(*cmd).GetResult() != Result::Ok) { LOG(error) << "State change subscription failed for " << static_cast(*cmd).GetDeviceId(); } break; case Type::state_change_unsubscription: if (static_cast(*cmd).GetResult() != Result::Ok) { LOG(error) << "State change unsubscription failed for " << static_cast(*cmd).GetDeviceId(); } break; case Type::transition_status: { if (static_cast(*cmd).GetResult() != Result::Ok) { LOG(error) << "Transition failed for " << static_cast(*cmd).GetDeviceId(); std::lock_guard lk(fMtx); if (!fChangeStateOp.IsCompleted() && fStateData.at(fStateIndex.at(fDDSSession.GetTaskId(senderId))).state != fChangeStateTarget) { fChangeStateOpTimer.cancel(); fChangeStateOp.Complete(MakeErrorCode(ErrorCode::DeviceChangeStateFailed), fStateData); } } } break; case Type::properties: { HandleCmd(static_cast(*cmd)); } break; case Type::properties_set: { HandleCmd(static_cast(*cmd)); } break; default: LOG(warn) << "Unexpected/unknown command received: " << cmd->GetType(); LOG(warn) << "Origin: " << senderId; break; } } }); fDDSSession.StartDDSService(); LOG(debug) << "Subscribing to state change"; Cmds cmds(make()); fDDSSession.SendCommand(cmds.Serialize()); } /// not copyable BasicTopology(const BasicTopology&) = delete; BasicTopology& operator=(const BasicTopology&) = delete; /// movable BasicTopology(BasicTopology&&) = default; BasicTopology& operator=(BasicTopology&&) = default; ~BasicTopology() { std::lock_guard lk(fMtx); fDDSSession.UnsubscribeFromCommands(); try { fChangeStateOp.Cancel(fStateData); } catch (...) {} } using Duration = std::chrono::milliseconds; using ChangeStateCompletionSignature = void(std::error_code, TopologyState); /// @brief Initiate state transition on all FairMQ devices in this topology /// @param transition FairMQ device state machine transition /// @param timeout Timeout in milliseconds, 0 means no timeout /// @param token Asio completion token /// @tparam CompletionToken Asio completion token type /// @throws std::system_error /// /// @par Usage examples /// With lambda: /// @code /// topo.AsyncChangeState( /// fair::mq::sdk::TopologyTransition::InitDevice, /// std::chrono::milliseconds(500), /// [](std::error_code ec, TopologyState state) { /// if (!ec) { /// // success /// } else if (ec.category().name() == "fairmq") { /// switch (static_cast(ec.value())) { /// case fair::mq::ErrorCode::OperationTimeout: /// // async operation timed out /// case fair::mq::ErrorCode::OperationCanceled: /// // async operation canceled /// case fair::mq::ErrorCode::DeviceChangeStateFailed: /// // failed to change state of a fairmq device /// case fair::mq::ErrorCode::OperationInProgress: /// // async operation already in progress /// default: /// } /// } /// } /// ); /// @endcode /// With future: /// @code /// auto fut = topo.AsyncChangeState(fair::mq::sdk::TopologyTransition::InitDevice, /// std::chrono::milliseconds(500), /// asio::use_future); /// try { /// fair::mq::sdk::TopologyState state = fut.get(); /// // success /// } catch (const std::system_error& ex) { /// auto ec(ex.code()); /// if (ec.category().name() == "fairmq") { /// switch (static_cast(ec.value())) { /// case fair::mq::ErrorCode::OperationTimeout: /// // async operation timed out /// case fair::mq::ErrorCode::OperationCanceled: /// // async operation canceled /// case fair::mq::ErrorCode::DeviceChangeStateFailed: /// // failed to change state of a fairmq device /// case fair::mq::ErrorCode::OperationInProgress: /// // async operation already in progress /// default: /// } /// } /// } /// @endcode /// With coroutine (C++20, see https://en.cppreference.com/w/cpp/language/coroutines): /// @code /// try { /// fair::mq::sdk::TopologyState state = co_await /// topo.AsyncChangeState(fair::mq::sdk::TopologyTransition::InitDevice, /// std::chrono::milliseconds(500), /// asio::use_awaitable); /// // success /// } catch (const std::system_error& ex) { /// auto ec(ex.code()); /// if (ec.category().name() == "fairmq") { /// switch (static_cast(ec.value())) { /// case fair::mq::ErrorCode::OperationTimeout: /// // async operation timed out /// case fair::mq::ErrorCode::OperationCanceled: /// // async operation canceled /// case fair::mq::ErrorCode::DeviceChangeStateFailed: /// // failed to change state of a fairmq device /// case fair::mq::ErrorCode::OperationInProgress: /// // async operation already in progress /// default: /// } /// } /// } /// @endcode template auto AsyncChangeState(const TopologyTransition transition, Duration timeout, CompletionToken&& token) { return asio::async_initiate([&](auto handler) { std::lock_guard lk(fMtx); if (fChangeStateOp.IsCompleted()) { fChangeStateOp = ChangeStateOp(AsioBase::GetExecutor(), AsioBase::GetAllocator(), std::move(handler)); fChangeStateTarget = expectedState.at(transition); ResetTransitionedCount(fChangeStateTarget); cmd::Cmds cmds(cmd::make(transition)); fDDSSession.SendCommand(cmds.Serialize()); if (timeout > std::chrono::milliseconds(0)) { fChangeStateOpTimer.expires_after(timeout); fChangeStateOpTimer.async_wait([&](std::error_code ec) { if (!ec) { std::lock_guard lk2(fMtx); fChangeStateOp.Timeout(fStateData); } }); } } else { // TODO refactor to hide boiler plate auto ex2(asio::get_associated_executor(handler, AsioBase::GetExecutor())); auto alloc2(asio::get_associated_allocator(handler, AsioBase::GetAllocator())); auto state(GetCurrentStateUnsafe()); ex2.post([h = std::move(handler), s = std::move(state)]() mutable { try { h(MakeErrorCode(ErrorCode::OperationInProgress), s); } catch (const std::exception& e) { LOG(error) << "Uncaught exception in completion handler: " << e.what(); } catch (...) { LOG(error) << "Unknown uncaught exception in completion handler."; } }, alloc2); } }, token); } template auto AsyncChangeState(const TopologyTransition transition, CompletionToken&& token) { return AsyncChangeState(transition, Duration(0), std::move(token)); } /// @brief Perform state transition on all FairMQ devices in this topology /// @param transition FairMQ device state machine transition /// @param timeout Timeout in milliseconds, 0 means no timeout /// @throws std::system_error auto ChangeState(const TopologyTransition transition, Duration timeout = Duration(0)) -> std::pair { tools::SharedSemaphore blocker; std::error_code ec; TopologyState state; AsyncChangeState( transition, timeout, [&, blocker](std::error_code _ec, TopologyState _state) mutable { ec = _ec; state = _state; blocker.Signal(); }); blocker.Wait(); return {ec, state}; } /// @brief Returns the current state of the topology /// @return map of id : DeviceStatus (initialized, state) auto GetCurrentState() const -> TopologyState { std::lock_guard lk(fMtx); return fStateData; } auto AggregateState() const -> DeviceState { return sdk::AggregateState(GetCurrentState()); } auto StateEqualsTo(DeviceState state) const -> bool { return sdk::StateEqualsTo(GetCurrentState(), state); } using GetPropertiesCompletionSignature = void(std::error_code, GetPropertiesResult); private: struct GetPropertiesOp { using Id = std::size_t; using GetCount = unsigned int; template GetPropertiesOp(Id id, GetCount expectedCount, Duration timeout, std::mutex& mutex, Executor const & ex, Allocator const & alloc, Handler&& handler) : fId(id) , fOp(ex, alloc, std::move(handler)) , fTimer(ex) , fCount(0) , fExpectedCount(expectedCount) , fMtx(mutex) { if (timeout > std::chrono::milliseconds(0)) { fTimer.expires_after(timeout); fTimer.async_wait([&](std::error_code ec) { if (!ec) { std::lock_guard lk(fMtx); fOp.Timeout(fResult); } }); } // LOG(debug) << "GetProperties " << fId << " with expected count of " << fExpectedCount << " started."; } GetPropertiesOp() = delete; GetPropertiesOp(const GetPropertiesOp&) = delete; GetPropertiesOp& operator=(const GetPropertiesOp&) = delete; GetPropertiesOp(GetPropertiesOp&&) = default; GetPropertiesOp& operator=(GetPropertiesOp&&) = default; ~GetPropertiesOp() = default; auto Update(const std::string& deviceId, cmd::Result result, DeviceProperties props) -> void { std::lock_guard lk(fMtx); if (cmd::Result::Ok != result) { fResult.failed.insert(deviceId); } else { fResult.devices.insert({deviceId, {std::move(props)}}); } ++fCount; TryCompletion(); } private: Id const fId; AsioAsyncOp fOp; asio::steady_timer fTimer; GetCount fCount; GetCount const fExpectedCount; GetPropertiesResult fResult; std::mutex& fMtx; /// precondition: fMtx is locked. auto TryCompletion() -> void { if (!fOp.IsCompleted() && fCount == fExpectedCount) { fTimer.cancel(); if (fResult.failed.size() > 0) { fOp.Complete(MakeErrorCode(ErrorCode::DeviceGetPropertiesFailed), std::move(fResult)); } else { fOp.Complete(std::move(fResult)); } } } }; auto HandleCmd(cmd::Properties const& cmd) -> void { std::unique_lock lk(fMtx); try { auto& op(fGetPropertiesOps.at(cmd.GetRequestId())); lk.unlock(); op.Update(cmd.GetDeviceId(), cmd.GetResult(), cmd.GetProps()); } catch (std::out_of_range& e) { LOG(debug) << "GetProperties operation (request id: " << cmd.GetRequestId() << ") not found (probably completed or timed out), " << "discarding reply of device " << cmd.GetDeviceId(); } } public: /// @brief Initiate property query on selected FairMQ devices in this topology /// @param query Key(s) to be queried (regex) /// @param path Select a subset of FairMQ devices in this topology, empty selects all /// @param timeout Timeout in milliseconds, 0 means no timeout /// @param token Asio completion token /// @tparam CompletionToken Asio completion token type /// @throws std::system_error template auto AsyncGetProperties(DevicePropertyQuery const& query, const std::string& path, Duration timeout, CompletionToken&& token) { return asio::async_initiate( [&](auto handler) { typename GetPropertiesOp::Id const id(tools::UuidHash()); // TODO Implement garbage collection of completed ops std::lock_guard lk(fMtx); fGetPropertiesOps.emplace( std::piecewise_construct, std::forward_as_tuple(id), std::forward_as_tuple(id, fDDSTopo.GetTasks(path).size(), timeout, fMtx, AsioBase::GetExecutor(), AsioBase::GetAllocator(), std::move(handler))); cmd::Cmds const cmds(cmd::make(id, query)); fDDSSession.SendCommand(cmds.Serialize(), path); }, token); } /// @brief Initiate property query on selected FairMQ devices in this topology /// @param query Key(s) to be queried (regex) /// @param token Asio completion token /// @tparam CompletionToken Asio completion token type /// @throws std::system_error template auto AsyncGetProperties(DevicePropertyQuery const& query, CompletionToken&& token) { return AsyncGetProperties(query, "", Duration(0), std::move(token)); } /// @brief Query properties on selected FairMQ devices in this topology /// @param query Key(s) to be queried (regex) /// @param path Select a subset of FairMQ devices in this topology, empty selects all /// @param timeout Timeout in milliseconds, 0 means no timeout /// @throws std::system_error auto GetProperties(DevicePropertyQuery const& query, const std::string& path = "", Duration timeout = Duration(0)) -> std::pair { tools::SharedSemaphore blocker; std::error_code ec; GetPropertiesResult result; AsyncGetProperties(query, path, timeout, [&, blocker](std::error_code _ec, GetPropertiesResult _result) mutable { ec = _ec; result = _result; blocker.Signal(); }); blocker.Wait(); return {ec, result}; } using SetPropertiesCompletionSignature = void(std::error_code, FailedDevices); private: struct SetPropertiesOp { using Id = std::size_t; using SetCount = unsigned int; template SetPropertiesOp(Id id, SetCount expectedCount, Duration timeout, std::mutex& mutex, Executor const & ex, Allocator const & alloc, Handler&& handler) : fId(id) , fOp(ex, alloc, std::move(handler)) , fTimer(ex) , fCount(0) , fExpectedCount(expectedCount) , fFailedDevices() , fMtx(mutex) { if (timeout > std::chrono::milliseconds(0)) { fTimer.expires_after(timeout); fTimer.async_wait([&](std::error_code ec) { if (!ec) { std::lock_guard lk(fMtx); fOp.Timeout(fFailedDevices); } }); } // LOG(debug) << "SetProperties " << fId << " with expected count of " << fExpectedCount << " started."; } SetPropertiesOp() = delete; SetPropertiesOp(const SetPropertiesOp&) = delete; SetPropertiesOp& operator=(const SetPropertiesOp&) = delete; SetPropertiesOp(SetPropertiesOp&&) = default; SetPropertiesOp& operator=(SetPropertiesOp&&) = default; ~SetPropertiesOp() = default; auto Update(const std::string& deviceId, cmd::Result result) -> void { std::lock_guard lk(fMtx); if (cmd::Result::Ok != result) { fFailedDevices.insert(deviceId); } ++fCount; TryCompletion(); } private: Id const fId; AsioAsyncOp fOp; asio::steady_timer fTimer; SetCount fCount; SetCount const fExpectedCount; FailedDevices fFailedDevices; std::mutex& fMtx; /// precondition: fMtx is locked. auto TryCompletion() -> void { if (!fOp.IsCompleted() && fCount == fExpectedCount) { fTimer.cancel(); if (fFailedDevices.size() > 0) { fOp.Complete(MakeErrorCode(ErrorCode::DeviceSetPropertiesFailed), fFailedDevices); } else { fOp.Complete(fFailedDevices); } } } }; auto HandleCmd(cmd::PropertiesSet const& cmd) -> void { std::unique_lock lk(fMtx); try { auto& op(fSetPropertiesOps.at(cmd.GetRequestId())); lk.unlock(); op.Update(cmd.GetDeviceId(), cmd.GetResult()); } catch (std::out_of_range& e) { LOG(debug) << "SetProperties operation (request id: " << cmd.GetRequestId() << ") not found (probably completed or timed out), " << "discarding reply of device " << cmd.GetDeviceId(); } } public: /// @brief Initiate property update on selected FairMQ devices in this topology /// @param props Properties to set /// @param path Select a subset of FairMQ devices in this topology, empty selects all /// @param timeout Timeout in milliseconds, 0 means no timeout /// @param token Asio completion token /// @tparam CompletionToken Asio completion token type /// @throws std::system_error template auto AsyncSetProperties(const DeviceProperties& props, const std::string& path, Duration timeout, CompletionToken&& token) { return asio::async_initiate( [&](auto handler) { typename SetPropertiesOp::Id const id(tools::UuidHash()); // TODO Implement garbage collection of completed ops std::lock_guard lk(fMtx); fSetPropertiesOps.emplace( std::piecewise_construct, std::forward_as_tuple(id), std::forward_as_tuple(id, fDDSTopo.GetTasks(path).size(), timeout, fMtx, AsioBase::GetExecutor(), AsioBase::GetAllocator(), std::move(handler))); cmd::Cmds const cmds(cmd::make(id, props)); fDDSSession.SendCommand(cmds.Serialize(), path); }, token); } /// @brief Initiate property update on selected FairMQ devices in this topology /// @param props Properties to set /// @param token Asio completion token /// @tparam CompletionToken Asio completion token type /// @throws std::system_error template auto AsyncSetProperties(DeviceProperties const & props, CompletionToken&& token) { return AsyncSetProperties(props, "", Duration(0), std::move(token)); } /// @brief Set properties on selected FairMQ devices in this topology /// @param props Properties to set /// @param path Select a subset of FairMQ devices in this topology, empty selects all /// @param timeout Timeout in milliseconds, 0 means no timeout /// @throws std::system_error auto SetProperties(DeviceProperties const& properties, const std::string& path = "", Duration timeout = Duration(0)) -> std::pair { tools::SharedSemaphore blocker; std::error_code ec; FailedDevices failed; AsyncSetProperties(properties, path, timeout, [&, blocker](std::error_code _ec, FailedDevices _failed) mutable { ec = _ec; failed = _failed; blocker.Signal(); }); blocker.Wait(); return {ec, failed}; } private: using TransitionedCount = unsigned int; DDSSession fDDSSession; DDSTopology fDDSTopo; TopologyState fStateData; TopologyStateIndex fStateIndex; mutable std::mutex fMtx; using ChangeStateOp = AsioAsyncOp; ChangeStateOp fChangeStateOp; asio::steady_timer fChangeStateOpTimer; DeviceState fChangeStateTarget; TransitionedCount fTransitionedCount; std::unordered_map fSetPropertiesOps; std::unordered_map fGetPropertiesOps; auto makeTopologyState() -> void { fStateData.reserve(fDDSTopo.GetTasks().size()); int index = 0; for (const auto& task : fDDSTopo.GetTasks()) { fStateData.push_back(DeviceStatus{false, DeviceState::Ok, task.GetId(), task.GetCollectionId()}); fStateIndex.emplace(task.GetId(), index); index++; } } auto UpdateStateEntry(const DDSTask::Id taskId, const DeviceState state) -> void { try { std::lock_guard lk(fMtx); DeviceStatus& task = fStateData.at(fStateIndex.at(taskId)); task.initialized = true; task.state = state; if (task.state == fChangeStateTarget) { ++fTransitionedCount; } // LOG(debug) << "Updated state entry: taskId=" << taskId << ", state=" << state; TryChangeStateCompletion(); } catch (const std::exception& e) { LOG(error) << "Exception in UpdateStateEntry: " << e.what(); } } /// precodition: fMtx is locked. auto TryChangeStateCompletion() -> void { if (!fChangeStateOp.IsCompleted() && fTransitionedCount == fStateData.size()) { fChangeStateOpTimer.cancel(); fChangeStateOp.Complete(fStateData); } } /// precodition: fMtx is locked. auto ResetTransitionedCount(DeviceState targetState) -> void { fTransitionedCount = std::count_if(fStateIndex.cbegin(), fStateIndex.cend(), [=](const auto& s) { return fStateData.at(s.second).state == targetState; }); } /// precodition: fMtx is locked. auto GetCurrentStateUnsafe() const -> TopologyState { return fStateData; } }; using Topology = BasicTopology; using Topo = Topology; /// @brief Helper to (Re)Construct a FairMQ topology based on already existing native DDS API objects /// @param nativeSession Existing and initialized CSession (either via create() or attach()) /// @param nativeTopo Existing CTopology that is activated on the given nativeSession /// @param env Optional DDSEnv (needed primarily for unit testing) auto MakeTopology(dds::topology_api::CTopology nativeTopo, std::shared_ptr nativeSession, DDSEnv env = {}) -> Topology; } // namespace sdk } // namespace mq } // namespace fair #endif /* FAIR_MQ_SDK_TOPOLOGY_H */