FairMQ  1.4.33
C++ Message Queuing Library and Framework
Topology.h
1 /********************************************************************************
2  * Copyright (C) 2019 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
3  * *
4  * This software is distributed under the terms of the *
5  * GNU Lesser General Public Licence (LGPL) version 3, *
6  * copied verbatim in the file "LICENSE" *
7  ********************************************************************************/
8 
9 #ifndef FAIR_MQ_SDK_TOPOLOGY_H
10 #define FAIR_MQ_SDK_TOPOLOGY_H
11 
12 #include <fairmq/sdk/AsioAsyncOp.h>
13 #include <fairmq/sdk/AsioBase.h>
14 #include <fairmq/sdk/commands/Commands.h>
15 #include <fairmq/sdk/DDSCollection.h>
16 #include <fairmq/sdk/DDSInfo.h>
17 #include <fairmq/sdk/DDSSession.h>
18 #include <fairmq/sdk/DDSTask.h>
19 #include <fairmq/sdk/DDSTopology.h>
20 #include <fairmq/sdk/Error.h>
21 #include <fairmq/States.h>
22 #include <fairmq/tools/Semaphore.h>
23 #include <fairmq/tools/Unique.h>
24 
25 #include <fairlogger/Logger.h>
26 #ifndef FAIR_LOG
27 #define FAIR_LOG LOG
28 #endif /* ifndef FAIR_LOG */
29 
30 #include <asio/associated_executor.hpp>
31 #include <asio/async_result.hpp>
32 #include <asio/steady_timer.hpp>
33 #include <asio/system_executor.hpp>
34 
35 #include <algorithm>
36 #include <chrono>
37 #include <condition_variable>
38 #include <functional>
39 #include <map>
40 #include <memory>
41 #include <mutex>
42 #include <ostream>
43 #include <set>
44 #include <stdexcept>
45 #include <string>
46 #include <thread>
47 #include <unordered_map>
48 #include <utility>
49 #include <vector>
50 
51 namespace fair::mq::sdk
52 {
53 
54 using DeviceId = std::string;
55 using DeviceState = fair::mq::State;
56 using DeviceTransition = fair::mq::Transition;
57 
58 const std::map<DeviceTransition, DeviceState> expectedState =
59 {
60  { DeviceTransition::InitDevice, DeviceState::InitializingDevice },
61  { DeviceTransition::CompleteInit, DeviceState::Initialized },
62  { DeviceTransition::Bind, DeviceState::Bound },
63  { DeviceTransition::Connect, DeviceState::DeviceReady },
64  { DeviceTransition::InitTask, DeviceState::Ready },
65  { DeviceTransition::Run, DeviceState::Running },
66  { DeviceTransition::Stop, DeviceState::Ready },
67  { DeviceTransition::ResetTask, DeviceState::DeviceReady },
68  { DeviceTransition::ResetDevice, DeviceState::Idle },
69  { DeviceTransition::End, DeviceState::Exiting }
70 };
71 
72 // mirrors DeviceState, but adds a "Mixed" state that represents a topology where devices are currently not in the same state.
73 enum class AggregatedTopologyState : int
74 {
75  Undefined = static_cast<int>(fair::mq::State::Undefined),
76  Ok = static_cast<int>(fair::mq::State::Ok),
77  Error = static_cast<int>(fair::mq::State::Error),
78  Idle = static_cast<int>(fair::mq::State::Idle),
79  InitializingDevice = static_cast<int>(fair::mq::State::InitializingDevice),
80  Initialized = static_cast<int>(fair::mq::State::Initialized),
81  Binding = static_cast<int>(fair::mq::State::Binding),
82  Bound = static_cast<int>(fair::mq::State::Bound),
83  Connecting = static_cast<int>(fair::mq::State::Connecting),
84  DeviceReady = static_cast<int>(fair::mq::State::DeviceReady),
85  InitializingTask = static_cast<int>(fair::mq::State::InitializingTask),
86  Ready = static_cast<int>(fair::mq::State::Ready),
87  Running = static_cast<int>(fair::mq::State::Running),
88  ResettingTask = static_cast<int>(fair::mq::State::ResettingTask),
89  ResettingDevice = static_cast<int>(fair::mq::State::ResettingDevice),
90  Exiting = static_cast<int>(fair::mq::State::Exiting),
91  Mixed
92 };
93 
94 inline auto operator==(DeviceState lhs, AggregatedTopologyState rhs) -> bool
95 {
96  return static_cast<int>(lhs) == static_cast<int>(rhs);
97 }
98 
99 inline auto operator==(AggregatedTopologyState lhs, DeviceState rhs) -> bool
100 {
101  return static_cast<int>(lhs) == static_cast<int>(rhs);
102 }
103 
104 inline std::ostream& operator<<(std::ostream& os, const AggregatedTopologyState& state)
105 {
106  if (state == AggregatedTopologyState::Mixed) {
107  return os << "MIXED";
108  } else {
109  return os << static_cast<DeviceState>(state);
110  }
111 }
112 
113 inline std::string GetAggregatedTopologyStateName(AggregatedTopologyState s)
114 {
115  if (s == AggregatedTopologyState::Mixed) {
116  return "MIXED";
117  } else {
118  return GetStateName(static_cast<State>(s));
119  }
120 }
121 
122 inline AggregatedTopologyState GetAggregatedTopologyState(const std::string& state)
123 {
124  if (state == "MIXED") {
125  return AggregatedTopologyState::Mixed;
126  } else {
127  return static_cast<AggregatedTopologyState>(GetState(state));
128  }
129 }
130 
132 {
133  bool subscribed_to_state_changes;
134  DeviceState lastState;
135  DeviceState state;
136  DDSTask::Id taskId;
137  DDSCollection::Id collectionId;
138 };
139 
140 using DeviceProperty = std::pair<std::string, std::string>;
141 using DeviceProperties = std::vector<DeviceProperty>;
142 using DevicePropertyQuery = std::string;
143 using FailedDevices = std::set<DeviceId>;
144 
146 {
147  struct Device
148  {
149  DeviceProperties props;
150  };
151  std::unordered_map<DeviceId, Device> devices;
152  FailedDevices failed;
153 };
154 
155 using TopologyState = std::vector<DeviceStatus>;
156 using TopologyStateIndex = std::unordered_map<DDSTask::Id, int>; // task id -> index in the data vector
157 using TopologyStateByTask = std::unordered_map<DDSTask::Id, DeviceStatus>;
158 using TopologyStateByCollection = std::unordered_map<DDSCollection::Id, std::vector<DeviceStatus>>;
159 using TopologyTransition = fair::mq::Transition;
160 
161 inline AggregatedTopologyState AggregateState(const TopologyState& topologyState)
162 {
163  DeviceState first = topologyState.begin()->state;
164 
165  if (std::all_of(topologyState.cbegin(), topologyState.cend(), [&](TopologyState::value_type i) {
166  return i.state == first;
167  })) {
168  return static_cast<AggregatedTopologyState>(first);
169  }
170 
171  return AggregatedTopologyState::Mixed;
172 }
173 
174 inline bool StateEqualsTo(const TopologyState& topologyState, DeviceState state)
175 {
176  return AggregateState(topologyState) == static_cast<AggregatedTopologyState>(state);
177 }
178 
179 inline TopologyStateByCollection GroupByCollectionId(const TopologyState& topologyState)
180 {
181  TopologyStateByCollection state;
182  for (const auto& ds : topologyState) {
183  if (ds.collectionId != 0) {
184  state[ds.collectionId].push_back(ds);
185  }
186  }
187 
188  return state;
189 }
190 
191 inline TopologyStateByTask GroupByTaskId(const TopologyState& topologyState)
192 {
193  TopologyStateByTask state;
194  for (const auto& ds : topologyState) {
195  state[ds.taskId] = ds;
196  }
197 
198  return state;
199 }
200 
211 template <typename Executor, typename Allocator>
212 class BasicTopology : public AsioBase<Executor, Allocator>
213 {
214  public:
219  BasicTopology(DDSTopology topo, DDSSession session, bool blockUntilConnected = false)
220  : BasicTopology<Executor, Allocator>(asio::system_executor(), std::move(topo), std::move(session), blockUntilConnected)
221  {}
222 
229  BasicTopology(const Executor& ex,
230  DDSTopology topo,
231  DDSSession session,
232  bool blockUntilConnected = false,
233  Allocator alloc = DefaultAllocator())
234  : AsioBase<Executor, Allocator>(ex, std::move(alloc))
235  , fDDSSession(std::move(session))
236  , fDDSTopo(std::move(topo))
237  , fStateData()
238  , fStateIndex()
239  , fMtx(std::make_unique<std::mutex>())
240  , fStateChangeSubscriptionsCV(std::make_unique<std::condition_variable>())
241  , fNumStateChangePublishers(0)
242  , fHeartbeatsTimer(asio::system_executor())
243  , fHeartbeatInterval(600000)
244  {
245  makeTopologyState();
246 
247  std::string activeTopo(fDDSSession.RequestCommanderInfo().activeTopologyName);
248  std::string givenTopo(fDDSTopo.GetName());
249  if (activeTopo != givenTopo) {
250  throw RuntimeError("Given topology ", givenTopo, " is not activated (active: ", activeTopo, ")");
251  }
252 
253  SubscribeToCommands();
254 
255  fDDSSession.StartDDSService();
256  SubscribeToStateChanges();
257  if (blockUntilConnected) {
258  WaitForPublisherCount(fStateIndex.size());
259  }
260  }
261 
263  BasicTopology(const BasicTopology&) = delete;
264  BasicTopology& operator=(const BasicTopology&) = delete;
265 
268  BasicTopology& operator=(BasicTopology&&) = default;
269 
270  ~BasicTopology()
271  {
272  UnsubscribeFromStateChanges();
273 
274  std::lock_guard<std::mutex> lk(*fMtx);
275  fDDSSession.UnsubscribeFromCommands();
276  try {
277  for (auto& op : fChangeStateOps) {
278  op.second.Complete(MakeErrorCode(ErrorCode::OperationCanceled));
279  }
280  } catch (...) {}
281  }
282 
283  void SubscribeToStateChanges()
284  {
285  // FAIR_LOG(debug) << "Subscribing to state change";
286  cmd::Cmds cmds(cmd::make<cmd::SubscribeToStateChange>(fHeartbeatInterval.count()));
287  fDDSSession.SendCommand(cmds.Serialize());
288 
289  fHeartbeatsTimer.expires_after(fHeartbeatInterval);
290  fHeartbeatsTimer.async_wait(std::bind(&BasicTopology::SendSubscriptionHeartbeats, this, std::placeholders::_1));
291  }
292 
293  void WaitForPublisherCount(unsigned int number)
294  {
295  std::unique_lock<std::mutex> lk(*fMtx);
296  fStateChangeSubscriptionsCV->wait(lk, [&](){
297  return fNumStateChangePublishers == number;
298  });
299  }
300 
301  void SendSubscriptionHeartbeats(const std::error_code& ec)
302  {
303  if (!ec) {
304  // Timer expired.
305  fDDSSession.SendCommand(cmd::Cmds(cmd::make<cmd::SubscriptionHeartbeat>(fHeartbeatInterval.count())).Serialize());
306  // schedule again
307  fHeartbeatsTimer.expires_after(fHeartbeatInterval);
308  fHeartbeatsTimer.async_wait(std::bind(&BasicTopology::SendSubscriptionHeartbeats, this, std::placeholders::_1));
309  } else if (ec == asio::error::operation_aborted) {
310  // FAIR_LOG(debug) << "Heartbeats timer canceled";
311  } else {
312  FAIR_LOG(error) << "Timer error: " << ec;
313  }
314  }
315 
316  void UnsubscribeFromStateChanges()
317  {
318  // stop sending heartbeats
319  fHeartbeatsTimer.cancel();
320 
321  // unsubscribe from state changes
322  fDDSSession.SendCommand(cmd::Cmds(cmd::make<cmd::UnsubscribeFromStateChange>()).Serialize());
323 
324  // wait for all tasks to confirm unsubscription
325  WaitForPublisherCount(0);
326  }
327 
328  void SubscribeToCommands()
329  {
330  fDDSSession.SubscribeToCommands([&](const std::string& msg, const std::string& /* condition */, DDSChannel::Id senderId) {
331  cmd::Cmds inCmds;
332  inCmds.Deserialize(msg);
333  // FAIR_LOG(debug) << "Received " << inCmds.Size() << " command(s) with total size of " << msg.length() << " bytes: ";
334 
335  for (const auto& cmd : inCmds) {
336  // FAIR_LOG(debug) << " > " << cmd->GetType();
337  switch (cmd->GetType()) {
338  case cmd::Type::state_change_subscription:
339  HandleCmd(static_cast<cmd::StateChangeSubscription&>(*cmd));
340  break;
341  case cmd::Type::state_change_unsubscription:
342  HandleCmd(static_cast<cmd::StateChangeUnsubscription&>(*cmd));
343  break;
344  case cmd::Type::state_change:
345  HandleCmd(static_cast<cmd::StateChange&>(*cmd), senderId);
346  break;
347  case cmd::Type::transition_status:
348  HandleCmd(static_cast<cmd::TransitionStatus&>(*cmd));
349  break;
350  case cmd::Type::properties:
351  HandleCmd(static_cast<cmd::Properties&>(*cmd));
352  break;
353  case cmd::Type::properties_set:
354  HandleCmd(static_cast<cmd::PropertiesSet&>(*cmd));
355  break;
356  default:
357  FAIR_LOG(warn) << "Unexpected/unknown command received: " << cmd->GetType();
358  FAIR_LOG(warn) << "Origin: " << senderId;
359  break;
360  }
361  }
362  });
363  }
364 
365  auto HandleCmd(cmd::StateChangeSubscription const& cmd) -> void
366  {
367  if (cmd.GetResult() == cmd::Result::Ok) {
368  DDSTask::Id taskId(cmd.GetTaskId());
369 
370  try {
371  std::unique_lock<std::mutex> lk(*fMtx);
372  DeviceStatus& task = fStateData.at(fStateIndex.at(taskId));
373  if (!task.subscribed_to_state_changes) {
374  task.subscribed_to_state_changes = true;
375  ++fNumStateChangePublishers;
376  } else {
377  FAIR_LOG(warn) << "Task '" << task.taskId << "' sent subscription confirmation more than once";
378  }
379  lk.unlock();
380  fStateChangeSubscriptionsCV->notify_one();
381  } catch (const std::exception& e) {
382  FAIR_LOG(error) << "Exception in HandleCmd(cmd::StateChangeSubscription const&): " << e.what();
383  FAIR_LOG(error) << "Possibly no task with id '" << taskId << "'?";
384  }
385  } else {
386  FAIR_LOG(error) << "State change subscription failed for device: " << cmd.GetDeviceId() << ", task id: " << cmd.GetTaskId();
387  }
388  }
389 
390  auto HandleCmd(cmd::StateChangeUnsubscription const& cmd) -> void
391  {
392  if (cmd.GetResult() == cmd::Result::Ok) {
393  DDSTask::Id taskId(cmd.GetTaskId());
394 
395  try {
396  std::unique_lock<std::mutex> lk(*fMtx);
397  DeviceStatus& task = fStateData.at(fStateIndex.at(taskId));
398  if (task.subscribed_to_state_changes) {
399  task.subscribed_to_state_changes = false;
400  --fNumStateChangePublishers;
401  } else {
402  FAIR_LOG(warn) << "Task '" << task.taskId << "' sent unsubscription confirmation more than once";
403  }
404  lk.unlock();
405  fStateChangeSubscriptionsCV->notify_one();
406  } catch (const std::exception& e) {
407  FAIR_LOG(error) << "Exception in HandleCmd(cmd::StateChangeUnsubscription const&): " << e.what();
408  }
409  } else {
410  FAIR_LOG(error) << "State change unsubscription failed for device: " << cmd.GetDeviceId() << ", task id: " << cmd.GetTaskId();
411  }
412  }
413 
414  auto HandleCmd(cmd::StateChange const& cmd, DDSChannel::Id const& senderId) -> void
415  {
416  if (cmd.GetCurrentState() == DeviceState::Exiting) {
417  fDDSSession.SendCommand(cmd::Cmds(cmd::make<cmd::StateChangeExitingReceived>()).Serialize(), senderId);
418  }
419 
420  DDSTask::Id taskId(cmd.GetTaskId());
421 
422  try {
423  std::lock_guard<std::mutex> lk(*fMtx);
424  DeviceStatus& task = fStateData.at(fStateIndex.at(taskId));
425  task.lastState = cmd.GetLastState();
426  task.state = cmd.GetCurrentState();
427  // if the task is exiting, it will not respond to unsubscription request anymore, set it to false now.
428  if (task.state == DeviceState::Exiting) {
429  task.subscribed_to_state_changes = false;
430  --fNumStateChangePublishers;
431  }
432  // FAIR_LOG(debug) << "Updated state entry: taskId=" << taskId << ", state=" << state;
433 
434  for (auto& op : fChangeStateOps) {
435  op.second.Update(taskId, cmd.GetCurrentState());
436  }
437  for (auto& op : fWaitForStateOps) {
438  op.second.Update(taskId, cmd.GetLastState(), cmd.GetCurrentState());
439  }
440  } catch (const std::exception& e) {
441  FAIR_LOG(error) << "Exception in HandleCmd(cmd::StateChange const&): " << e.what();
442  }
443  }
444 
445  auto HandleCmd(cmd::TransitionStatus const& cmd) -> void
446  {
447  if (cmd.GetResult() != cmd::Result::Ok) {
448  DDSTask::Id taskId(cmd.GetTaskId());
449  std::lock_guard<std::mutex> lk(*fMtx);
450  for (auto& op : fChangeStateOps) {
451  if (!op.second.IsCompleted() && op.second.ContainsTask(taskId)) {
452  if (fStateData.at(fStateIndex.at(taskId)).state != op.second.GetTargetState()) {
453  FAIR_LOG(error) << cmd.GetTransition() << " transition failed for " << cmd.GetDeviceId() << ", device is in " << cmd.GetCurrentState() << " state.";
454  op.second.Complete(MakeErrorCode(ErrorCode::DeviceChangeStateFailed));
455  } else {
456  FAIR_LOG(debug) << cmd.GetTransition() << " transition failed for " << cmd.GetDeviceId() << ", device is already in " << cmd.GetCurrentState() << " state.";
457  }
458  }
459  }
460  }
461  }
462 
463  auto HandleCmd(cmd::Properties const& cmd) -> void
464  {
465  std::unique_lock<std::mutex> lk(*fMtx);
466  try {
467  auto& op(fGetPropertiesOps.at(cmd.GetRequestId()));
468  lk.unlock();
469  op.Update(cmd.GetDeviceId(), cmd.GetResult(), cmd.GetProps());
470  } catch (std::out_of_range& e) {
471  FAIR_LOG(debug) << "GetProperties operation (request id: " << cmd.GetRequestId()
472  << ") not found (probably completed or timed out), "
473  << "discarding reply of device " << cmd.GetDeviceId();
474  }
475  }
476 
477  auto HandleCmd(cmd::PropertiesSet const& cmd) -> void
478  {
479  std::unique_lock<std::mutex> lk(*fMtx);
480  try {
481  auto& op(fSetPropertiesOps.at(cmd.GetRequestId()));
482  lk.unlock();
483  op.Update(cmd.GetDeviceId(), cmd.GetResult());
484  } catch (std::out_of_range& e) {
485  FAIR_LOG(debug) << "SetProperties operation (request id: " << cmd.GetRequestId()
486  << ") not found (probably completed or timed out), "
487  << "discarding reply of device " << cmd.GetDeviceId();
488  }
489  }
490 
491  using Duration = std::chrono::microseconds;
492  using ChangeStateCompletionSignature = void(std::error_code, TopologyState);
493 
494  private:
495  struct ChangeStateOp
496  {
497  using Id = std::size_t;
498  using Count = unsigned int;
499 
500  template<typename Handler>
501  ChangeStateOp(Id id,
502  const TopologyTransition transition,
503  std::vector<DDSTask> tasks,
504  TopologyState& stateData,
505  Duration timeout,
506  std::mutex& mutex,
507  Executor const & ex,
508  Allocator const & alloc,
509  Handler&& handler)
510  : fId(id)
511  , fOp(ex, alloc, std::move(handler))
512  , fStateData(stateData)
513  , fTimer(ex)
514  , fCount(0)
515  , fTasks(std::move(tasks))
516  , fTargetState(expectedState.at(transition))
517  , fMtx(mutex)
518  {
519  if (timeout > std::chrono::milliseconds(0)) {
520  fTimer.expires_after(timeout);
521  fTimer.async_wait([&](std::error_code ec) {
522  if (!ec) {
523  std::lock_guard<std::mutex> lk(fMtx);
524  fOp.Timeout(fStateData);
525  }
526  });
527  }
528  if (fTasks.empty()) {
529  FAIR_LOG(warn) << "ChangeState initiated on an empty set of tasks, check the path argument.";
530  }
531  }
532  ChangeStateOp() = delete;
533  ChangeStateOp(const ChangeStateOp&) = delete;
534  ChangeStateOp& operator=(const ChangeStateOp&) = delete;
535  ChangeStateOp(ChangeStateOp&&) = default;
536  ChangeStateOp& operator=(ChangeStateOp&&) = default;
537  ~ChangeStateOp() = default;
538 
540  auto ResetCount(const TopologyStateIndex& stateIndex, const TopologyState& stateData) -> void
541  {
542  fCount = std::count_if(stateIndex.cbegin(), stateIndex.cend(), [=](const auto& s) {
543  if (ContainsTask(stateData.at(s.second).taskId)) {
544  return stateData.at(s.second).state == fTargetState;
545  } else {
546  return false;
547  }
548  });
549  }
550 
552  auto Update(const DDSTask::Id taskId, const DeviceState currentState) -> void
553  {
554  if (!fOp.IsCompleted() && ContainsTask(taskId)) {
555  if (currentState == fTargetState) {
556  ++fCount;
557  }
558  TryCompletion();
559  }
560  }
561 
563  auto TryCompletion() -> void
564  {
565  if (!fOp.IsCompleted() && fCount == fTasks.size()) {
566  Complete(std::error_code());
567  }
568  }
569 
571  auto Complete(std::error_code ec) -> void
572  {
573  fTimer.cancel();
574  fOp.Complete(ec, fStateData);
575  }
576 
578  auto ContainsTask(DDSTask::Id id) -> bool
579  {
580  auto it = std::find_if(fTasks.begin(), fTasks.end(), [id](const DDSTask& t) { return t.GetId() == id; });
581  return it != fTasks.end();
582  }
583 
584  bool IsCompleted() { return fOp.IsCompleted(); }
585 
586  auto GetTargetState() const -> DeviceState { return fTargetState; }
587 
588  private:
589  Id const fId;
590  AsioAsyncOp<Executor, Allocator, ChangeStateCompletionSignature> fOp;
591  TopologyState& fStateData;
592  asio::steady_timer fTimer;
593  Count fCount;
594  std::vector<DDSTask> fTasks;
595  DeviceState fTargetState;
596  std::mutex& fMtx;
597  };
598 
599  public:
677  template<typename CompletionToken>
678  auto AsyncChangeState(const TopologyTransition transition,
679  const std::string& path,
680  Duration timeout,
681  CompletionToken&& token)
682  {
683  return asio::async_initiate<CompletionToken, ChangeStateCompletionSignature>([&](auto handler) {
684  typename ChangeStateOp::Id const id(tools::UuidHash());
685 
686  std::lock_guard<std::mutex> lk(*fMtx);
687 
688  for (auto it = begin(fChangeStateOps); it != end(fChangeStateOps);) {
689  if (it->second.IsCompleted()) {
690  it = fChangeStateOps.erase(it);
691  } else {
692  ++it;
693  }
694  }
695 
696  auto p = fChangeStateOps.emplace(
697  std::piecewise_construct,
698  std::forward_as_tuple(id),
699  std::forward_as_tuple(id,
700  transition,
701  fDDSTopo.GetTasks(path),
702  fStateData,
703  timeout,
704  *fMtx,
707  std::move(handler)));
708 
709  cmd::Cmds cmds(cmd::make<cmd::ChangeState>(transition));
710  fDDSSession.SendCommand(cmds.Serialize(), path);
711 
712  p.first->second.ResetCount(fStateIndex, fStateData);
713  // TODO: make sure following operation properly queues the completion and not doing it directly out of initiation call.
714  p.first->second.TryCompletion();
715 
716  },
717  token);
718  }
719 
725  template<typename CompletionToken>
726  auto AsyncChangeState(const TopologyTransition transition, CompletionToken&& token)
727  {
728  return AsyncChangeState(transition, "", Duration(0), std::move(token));
729  }
730 
737  template<typename CompletionToken>
738  auto AsyncChangeState(const TopologyTransition transition, Duration timeout, CompletionToken&& token)
739  {
740  return AsyncChangeState(transition, "", timeout, std::move(token));
741  }
742 
749  template<typename CompletionToken>
750  auto AsyncChangeState(const TopologyTransition transition, const std::string& path, CompletionToken&& token)
751  {
752  return AsyncChangeState(transition, path, Duration(0), std::move(token));
753  }
754 
760  auto ChangeState(const TopologyTransition transition, const std::string& path = "", Duration timeout = Duration(0))
761  -> std::pair<std::error_code, TopologyState>
762  {
763  tools::SharedSemaphore blocker;
764  std::error_code ec;
765  TopologyState state;
766  AsyncChangeState(transition, path, timeout, [&, blocker](std::error_code _ec, TopologyState _state) mutable {
767  ec = _ec;
768  state = _state;
769  blocker.Signal();
770  });
771  blocker.Wait();
772  return {ec, state};
773  }
774 
779  auto ChangeState(const TopologyTransition transition, Duration timeout)
780  -> std::pair<std::error_code, TopologyState>
781  {
782  return ChangeState(transition, "", timeout);
783  }
784 
787  auto GetCurrentState() const -> TopologyState
788  {
789  std::lock_guard<std::mutex> lk(*fMtx);
790  return fStateData;
791  }
792 
793  auto AggregateState() const -> DeviceState { return sdk::AggregateState(GetCurrentState()); }
794 
795  auto StateEqualsTo(DeviceState state) const -> bool { return sdk::StateEqualsTo(GetCurrentState(), state); }
796 
797  using WaitForStateCompletionSignature = void(std::error_code);
798 
799  private:
800  struct WaitForStateOp
801  {
802  using Id = std::size_t;
803  using Count = unsigned int;
804 
805  template<typename Handler>
806  WaitForStateOp(Id id,
807  DeviceState targetLastState,
808  DeviceState targetCurrentState,
809  std::vector<DDSTask> tasks,
810  Duration timeout,
811  std::mutex& mutex,
812  Executor const & ex,
813  Allocator const & alloc,
814  Handler&& handler)
815  : fId(id)
816  , fOp(ex, alloc, std::move(handler))
817  , fTimer(ex)
818  , fCount(0)
819  , fTasks(std::move(tasks))
820  , fTargetLastState(targetLastState)
821  , fTargetCurrentState(targetCurrentState)
822  , fMtx(mutex)
823  {
824  if (timeout > std::chrono::milliseconds(0)) {
825  fTimer.expires_after(timeout);
826  fTimer.async_wait([&](std::error_code ec) {
827  if (!ec) {
828  std::lock_guard<std::mutex> lk(fMtx);
829  fOp.Timeout();
830  }
831  });
832  }
833  if (fTasks.empty()) {
834  FAIR_LOG(warn) << "WaitForState initiated on an empty set of tasks, check the path argument.";
835  }
836  }
837  WaitForStateOp() = delete;
838  WaitForStateOp(const WaitForStateOp&) = delete;
839  WaitForStateOp& operator=(const WaitForStateOp&) = delete;
840  WaitForStateOp(WaitForStateOp&&) = default;
841  WaitForStateOp& operator=(WaitForStateOp&&) = default;
842  ~WaitForStateOp() = default;
843 
845  auto ResetCount(const TopologyStateIndex& stateIndex, const TopologyState& stateData) -> void
846  {
847  fCount = std::count_if(stateIndex.cbegin(), stateIndex.cend(), [=](const auto& s) {
848  if (ContainsTask(stateData.at(s.second).taskId)) {
849  return stateData.at(s.second).state == fTargetCurrentState &&
850  (stateData.at(s.second).lastState == fTargetLastState || fTargetLastState == DeviceState::Undefined);
851  } else {
852  return false;
853  }
854  });
855  }
856 
858  auto Update(const DDSTask::Id taskId, const DeviceState lastState, const DeviceState currentState) -> void
859  {
860  if (!fOp.IsCompleted() && ContainsTask(taskId)) {
861  if (currentState == fTargetCurrentState &&
862  (lastState == fTargetLastState || fTargetLastState == DeviceState::Undefined)) {
863  ++fCount;
864  }
865  TryCompletion();
866  }
867  }
868 
870  auto TryCompletion() -> void
871  {
872  if (!fOp.IsCompleted() && fCount == fTasks.size()) {
873  fTimer.cancel();
874  fOp.Complete();
875  }
876  }
877 
878  bool IsCompleted() { return fOp.IsCompleted(); }
879 
880  private:
881  Id const fId;
882  AsioAsyncOp<Executor, Allocator, WaitForStateCompletionSignature> fOp;
883  asio::steady_timer fTimer;
884  Count fCount;
885  std::vector<DDSTask> fTasks;
886  DeviceState fTargetLastState;
887  DeviceState fTargetCurrentState;
888  std::mutex& fMtx;
889 
891  auto ContainsTask(DDSTask::Id id) -> bool
892  {
893  auto it = std::find_if(fTasks.begin(), fTasks.end(), [id](const DDSTask& t) { return t.GetId() == id; });
894  return it != fTasks.end();
895  }
896  };
897 
898  public:
907  template<typename CompletionToken>
908  auto AsyncWaitForState(const DeviceState targetLastState,
909  const DeviceState targetCurrentState,
910  const std::string& path,
911  Duration timeout,
912  CompletionToken&& token)
913  {
914  return asio::async_initiate<CompletionToken, WaitForStateCompletionSignature>([&](auto handler) {
915  typename GetPropertiesOp::Id const id(tools::UuidHash());
916 
917  std::lock_guard<std::mutex> lk(*fMtx);
918 
919  for (auto it = begin(fWaitForStateOps); it != end(fWaitForStateOps);) {
920  if (it->second.IsCompleted()) {
921  it = fWaitForStateOps.erase(it);
922  } else {
923  ++it;
924  }
925  }
926 
927  auto p = fWaitForStateOps.emplace(
928  std::piecewise_construct,
929  std::forward_as_tuple(id),
930  std::forward_as_tuple(id,
931  targetLastState,
932  targetCurrentState,
933  fDDSTopo.GetTasks(path),
934  timeout,
935  *fMtx,
938  std::move(handler)));
939  p.first->second.ResetCount(fStateIndex, fStateData);
940  // TODO: make sure following operation properly queues the completion and not doing it directly out of initiation call.
941  p.first->second.TryCompletion();
942  },
943  token);
944  }
945 
952  template<typename CompletionToken>
953  auto AsyncWaitForState(const DeviceState targetLastState, const DeviceState targetCurrentState, CompletionToken&& token)
954  {
955  return AsyncWaitForState(targetLastState, targetCurrentState, "", Duration(0), std::move(token));
956  }
957 
963  template<typename CompletionToken>
964  auto AsyncWaitForState(const DeviceState targetCurrentState, CompletionToken&& token)
965  {
966  return AsyncWaitForState(DeviceState::Undefined, targetCurrentState, "", Duration(0), std::move(token));
967  }
968 
975  auto WaitForState(const DeviceState targetLastState, const DeviceState targetCurrentState, const std::string& path = "", Duration timeout = Duration(0))
976  -> std::error_code
977  {
978  tools::SharedSemaphore blocker;
979  std::error_code ec;
980  AsyncWaitForState(targetLastState, targetCurrentState, path, timeout, [&, blocker](std::error_code _ec) mutable {
981  ec = _ec;
982  blocker.Signal();
983  });
984  blocker.Wait();
985  return ec;
986  }
987 
993  auto WaitForState(const DeviceState targetCurrentState, const std::string& path = "", Duration timeout = Duration(0))
994  -> std::error_code
995  {
996  return WaitForState(DeviceState::Undefined, targetCurrentState, path, timeout);
997  }
998 
999  using GetPropertiesCompletionSignature = void(std::error_code, GetPropertiesResult);
1000 
1001  private:
1002  struct GetPropertiesOp
1003  {
1004  using Id = std::size_t;
1005  using GetCount = unsigned int;
1006 
1007  template<typename Handler>
1008  GetPropertiesOp(Id id,
1009  GetCount expectedCount,
1010  Duration timeout,
1011  std::mutex& mutex,
1012  Executor const & ex,
1013  Allocator const & alloc,
1014  Handler&& handler)
1015  : fId(id)
1016  , fOp(ex, alloc, std::move(handler))
1017  , fTimer(ex)
1018  , fCount(0)
1019  , fExpectedCount(expectedCount)
1020  , fMtx(mutex)
1021  {
1022  if (timeout > std::chrono::milliseconds(0)) {
1023  fTimer.expires_after(timeout);
1024  fTimer.async_wait([&](std::error_code ec) {
1025  if (!ec) {
1026  std::lock_guard<std::mutex> lk(fMtx);
1027  fOp.Timeout(fResult);
1028  }
1029  });
1030  }
1031  if (expectedCount == 0) {
1032  FAIR_LOG(warn) << "GetProperties initiated on an empty set of tasks, check the path argument.";
1033  }
1034  // FAIR_LOG(debug) << "GetProperties " << fId << " with expected count of " << fExpectedCount << " started.";
1035  }
1036  GetPropertiesOp() = delete;
1037  GetPropertiesOp(const GetPropertiesOp&) = delete;
1038  GetPropertiesOp& operator=(const GetPropertiesOp&) = delete;
1039  GetPropertiesOp(GetPropertiesOp&&) = default;
1040  GetPropertiesOp& operator=(GetPropertiesOp&&) = default;
1041  ~GetPropertiesOp() = default;
1042 
1043  auto Update(const std::string& deviceId, cmd::Result result, DeviceProperties props) -> void
1044  {
1045  std::lock_guard<std::mutex> lk(fMtx);
1046  if (cmd::Result::Ok != result) {
1047  fResult.failed.insert(deviceId);
1048  } else {
1049  fResult.devices.insert({deviceId, {std::move(props)}});
1050  }
1051  ++fCount;
1052  TryCompletion();
1053  }
1054 
1055  bool IsCompleted() { return fOp.IsCompleted(); }
1056 
1057  private:
1058  Id const fId;
1059  AsioAsyncOp<Executor, Allocator, GetPropertiesCompletionSignature> fOp;
1060  asio::steady_timer fTimer;
1061  GetCount fCount;
1062  GetCount const fExpectedCount;
1063  GetPropertiesResult fResult;
1064  std::mutex& fMtx;
1065 
1067  auto TryCompletion() -> void
1068  {
1069  if (!fOp.IsCompleted() && fCount == fExpectedCount) {
1070  fTimer.cancel();
1071  if (fResult.failed.size() > 0) {
1072  fOp.Complete(MakeErrorCode(ErrorCode::DeviceGetPropertiesFailed), std::move(fResult));
1073  } else {
1074  fOp.Complete(std::move(fResult));
1075  }
1076  }
1077  }
1078  };
1079 
1080  public:
1088  template<typename CompletionToken>
1089  auto AsyncGetProperties(DevicePropertyQuery const& query,
1090  const std::string& path,
1091  Duration timeout,
1092  CompletionToken&& token)
1093  {
1094  return asio::async_initiate<CompletionToken, GetPropertiesCompletionSignature>(
1095  [&](auto handler) {
1096  typename GetPropertiesOp::Id const id(tools::UuidHash());
1097 
1098  std::lock_guard<std::mutex> lk(*fMtx);
1099 
1100  for (auto it = begin(fGetPropertiesOps); it != end(fGetPropertiesOps);) {
1101  if (it->second.IsCompleted()) {
1102  it = fGetPropertiesOps.erase(it);
1103  } else {
1104  ++it;
1105  }
1106  }
1107 
1108  fGetPropertiesOps.emplace(
1109  std::piecewise_construct,
1110  std::forward_as_tuple(id),
1111  std::forward_as_tuple(id,
1112  fDDSTopo.GetTasks(path).size(),
1113  timeout,
1114  *fMtx,
1117  std::move(handler)));
1118 
1119  cmd::Cmds const cmds(cmd::make<cmd::GetProperties>(id, query));
1120  fDDSSession.SendCommand(cmds.Serialize(), path);
1121  },
1122  token);
1123  }
1124 
1130  template<typename CompletionToken>
1131  auto AsyncGetProperties(DevicePropertyQuery const& query, CompletionToken&& token)
1132  {
1133  return AsyncGetProperties(query, "", Duration(0), std::move(token));
1134  }
1135 
1141  auto GetProperties(DevicePropertyQuery const& query, const std::string& path = "", Duration timeout = Duration(0))
1142  -> std::pair<std::error_code, GetPropertiesResult>
1143  {
1144  tools::SharedSemaphore blocker;
1145  std::error_code ec;
1146  GetPropertiesResult result;
1147  AsyncGetProperties(query, path, timeout, [&, blocker](std::error_code _ec, GetPropertiesResult _result) mutable {
1148  ec = _ec;
1149  result = _result;
1150  blocker.Signal();
1151  });
1152  blocker.Wait();
1153  return {ec, result};
1154  }
1155 
1156  using SetPropertiesCompletionSignature = void(std::error_code, FailedDevices);
1157 
1158  private:
1159  struct SetPropertiesOp
1160  {
1161  using Id = std::size_t;
1162  using SetCount = unsigned int;
1163 
1164  template<typename Handler>
1165  SetPropertiesOp(Id id,
1166  SetCount expectedCount,
1167  Duration timeout,
1168  std::mutex& mutex,
1169  Executor const & ex,
1170  Allocator const & alloc,
1171  Handler&& handler)
1172  : fId(id)
1173  , fOp(ex, alloc, std::move(handler))
1174  , fTimer(ex)
1175  , fCount(0)
1176  , fExpectedCount(expectedCount)
1177  , fFailedDevices()
1178  , fMtx(mutex)
1179  {
1180  if (timeout > std::chrono::milliseconds(0)) {
1181  fTimer.expires_after(timeout);
1182  fTimer.async_wait([&](std::error_code ec) {
1183  if (!ec) {
1184  std::lock_guard<std::mutex> lk(fMtx);
1185  fOp.Timeout(fFailedDevices);
1186  }
1187  });
1188  }
1189  if (expectedCount == 0) {
1190  FAIR_LOG(warn) << "SetProperties initiated on an empty set of tasks, check the path argument.";
1191  }
1192  // FAIR_LOG(debug) << "SetProperties " << fId << " with expected count of " << fExpectedCount << " started.";
1193  }
1194  SetPropertiesOp() = delete;
1195  SetPropertiesOp(const SetPropertiesOp&) = delete;
1196  SetPropertiesOp& operator=(const SetPropertiesOp&) = delete;
1197  SetPropertiesOp(SetPropertiesOp&&) = default;
1198  SetPropertiesOp& operator=(SetPropertiesOp&&) = default;
1199  ~SetPropertiesOp() = default;
1200 
1201  auto Update(const std::string& deviceId, cmd::Result result) -> void
1202  {
1203  std::lock_guard<std::mutex> lk(fMtx);
1204  if (cmd::Result::Ok != result) {
1205  fFailedDevices.insert(deviceId);
1206  }
1207  ++fCount;
1208  TryCompletion();
1209  }
1210 
1211  bool IsCompleted() { return fOp.IsCompleted(); }
1212 
1213  private:
1214  Id const fId;
1215  AsioAsyncOp<Executor, Allocator, SetPropertiesCompletionSignature> fOp;
1216  asio::steady_timer fTimer;
1217  SetCount fCount;
1218  SetCount const fExpectedCount;
1219  FailedDevices fFailedDevices;
1220  std::mutex& fMtx;
1221 
1223  auto TryCompletion() -> void
1224  {
1225  if (!fOp.IsCompleted() && fCount == fExpectedCount) {
1226  fTimer.cancel();
1227  if (fFailedDevices.size() > 0) {
1228  fOp.Complete(MakeErrorCode(ErrorCode::DeviceSetPropertiesFailed), fFailedDevices);
1229  } else {
1230  fOp.Complete(fFailedDevices);
1231  }
1232  }
1233  }
1234  };
1235 
1236  public:
1244  template<typename CompletionToken>
1245  auto AsyncSetProperties(const DeviceProperties& props,
1246  const std::string& path,
1247  Duration timeout,
1248  CompletionToken&& token)
1249  {
1250  return asio::async_initiate<CompletionToken, SetPropertiesCompletionSignature>(
1251  [&](auto handler) {
1252  typename SetPropertiesOp::Id const id(tools::UuidHash());
1253 
1254  std::lock_guard<std::mutex> lk(*fMtx);
1255 
1256  for (auto it = begin(fGetPropertiesOps); it != end(fGetPropertiesOps);) {
1257  if (it->second.IsCompleted()) {
1258  it = fGetPropertiesOps.erase(it);
1259  } else {
1260  ++it;
1261  }
1262  }
1263 
1264  fSetPropertiesOps.emplace(
1265  std::piecewise_construct,
1266  std::forward_as_tuple(id),
1267  std::forward_as_tuple(id,
1268  fDDSTopo.GetTasks(path).size(),
1269  timeout,
1270  *fMtx,
1273  std::move(handler)));
1274 
1275  cmd::Cmds const cmds(cmd::make<cmd::SetProperties>(id, props));
1276  fDDSSession.SendCommand(cmds.Serialize(), path);
1277  },
1278  token);
1279  }
1280 
1286  template<typename CompletionToken>
1287  auto AsyncSetProperties(DeviceProperties const & props, CompletionToken&& token)
1288  {
1289  return AsyncSetProperties(props, "", Duration(0), std::move(token));
1290  }
1291 
1297  auto SetProperties(DeviceProperties const& properties, const std::string& path = "", Duration timeout = Duration(0))
1298  -> std::pair<std::error_code, FailedDevices>
1299  {
1300  tools::SharedSemaphore blocker;
1301  std::error_code ec;
1302  FailedDevices failed;
1303  AsyncSetProperties(properties, path, timeout, [&, blocker](std::error_code _ec, FailedDevices _failed) mutable {
1304  ec = _ec;
1305  failed = _failed;
1306  blocker.Signal();
1307  });
1308  blocker.Wait();
1309  return {ec, failed};
1310  }
1311 
1312  Duration GetHeartbeatInterval() const { return fHeartbeatInterval; }
1313  void SetHeartbeatInterval(Duration duration) { fHeartbeatInterval = duration; }
1314 
1315  private:
1316  using TransitionedCount = unsigned int;
1317 
1318  DDSSession fDDSSession;
1319  DDSTopology fDDSTopo;
1320  TopologyState fStateData;
1321  TopologyStateIndex fStateIndex;
1322 
1323  mutable std::unique_ptr<std::mutex> fMtx;
1324 
1325  std::unique_ptr<std::condition_variable> fStateChangeSubscriptionsCV;
1326  unsigned int fNumStateChangePublishers;
1327  asio::steady_timer fHeartbeatsTimer;
1328  Duration fHeartbeatInterval;
1329 
1330  std::unordered_map<typename ChangeStateOp::Id, ChangeStateOp> fChangeStateOps;
1331  std::unordered_map<typename WaitForStateOp::Id, WaitForStateOp> fWaitForStateOps;
1332  std::unordered_map<typename SetPropertiesOp::Id, SetPropertiesOp> fSetPropertiesOps;
1333  std::unordered_map<typename GetPropertiesOp::Id, GetPropertiesOp> fGetPropertiesOps;
1334 
1335  auto makeTopologyState() -> void
1336  {
1337  fStateData.reserve(fDDSTopo.GetTasks().size());
1338 
1339  int index = 0;
1340 
1341  for (const auto& task : fDDSTopo.GetTasks()) {
1342  fStateData.push_back(DeviceStatus{false, DeviceState::Undefined, DeviceState::Undefined, task.GetId(), task.GetCollectionId()});
1343  fStateIndex.emplace(task.GetId(), index);
1344  index++;
1345  }
1346  }
1347 
1349  auto GetCurrentStateUnsafe() const -> TopologyState
1350  {
1351  return fStateData;
1352  }
1353 };
1354 
1355 using Topology = BasicTopology<DefaultExecutor, DefaultAllocator>;
1356 using Topo = Topology;
1357 
1363 auto MakeTopology(dds::topology_api::CTopology nativeTopo,
1364  std::shared_ptr<dds::tools_api::CSession> nativeSession,
1365  DDSEnv env = {},
1366  bool blockUntilConnected = false) -> Topology;
1367 
1368 } // namespace fair::mq::sdk
1369 
1370 #endif /* FAIR_MQ_SDK_TOPOLOGY_H */
fair::mq::sdk::BasicTopology::WaitForState
auto WaitForState(const DeviceState targetLastState, const DeviceState targetCurrentState, const std::string &path="", Duration timeout=Duration(0)) -> std::error_code
Wait for selected FairMQ devices to reach given last & current state in this topology.
Definition: Topology.h:975
fair::mq::sdk::DDSTopology::GetName
auto GetName() const -> std::string
Get the name of the topology.
Definition: DDSTopology.cxx:111
fair::mq::sdk::BasicTopology::BasicTopology
BasicTopology(DDSTopology topo, DDSSession session, bool blockUntilConnected=false)
(Re)Construct a FairMQ topology from an existing DDS topology
Definition: Topology.h:219
fair::mq::sdk::GetPropertiesResult::Device
Definition: Topology.h:148
fair::mq::sdk::BasicTopology::BasicTopology
BasicTopology(BasicTopology &&)=default
movable
fair::mq::sdk::BasicTopology::ChangeState
auto ChangeState(const TopologyTransition transition, Duration timeout) -> std::pair< std::error_code, TopologyState >
Perform state transition on all FairMQ devices in this topology with a timeout.
Definition: Topology.h:779
fair::mq::sdk::BasicTopology::SetProperties
auto SetProperties(DeviceProperties const &properties, const std::string &path="", Duration timeout=Duration(0)) -> std::pair< std::error_code, FailedDevices >
Set properties on selected FairMQ devices in this topology.
Definition: Topology.h:1297
fair::mq::tools::SharedSemaphore
A simple copyable blocking semaphore.
Definition: Semaphore.h:51
fair::mq::sdk::RuntimeError
Definition: Error.h:35
fair::mq::sdk::BasicTopology::BasicTopology
BasicTopology(const Executor &ex, DDSTopology topo, DDSSession session, bool blockUntilConnected=false, Allocator alloc=DefaultAllocator())
(Re)Construct a FairMQ topology from an existing DDS topology
Definition: Topology.h:229
fair::mq::sdk::BasicTopology::GetCurrentState
auto GetCurrentState() const -> TopologyState
Returns the current state of the topology.
Definition: Topology.h:787
fair::mq::sdk::BasicTopology::GetProperties
auto GetProperties(DevicePropertyQuery const &query, const std::string &path="", Duration timeout=Duration(0)) -> std::pair< std::error_code, GetPropertiesResult >
Query properties on selected FairMQ devices in this topology.
Definition: Topology.h:1141
fair::mq::sdk::DeviceStatus
Definition: Topology.h:132
fair::mq::sdk::BasicTopology::WaitForState
auto WaitForState(const DeviceState targetCurrentState, const std::string &path="", Duration timeout=Duration(0)) -> std::error_code
Wait for selected FairMQ devices to reach given current state in this topology.
Definition: Topology.h:993
fair::mq::sdk::AsioBase
Base for creating Asio-enabled I/O objects.
Definition: AsioBase.h:41
fair::mq::sdk::BasicTopology::BasicTopology
BasicTopology(const BasicTopology &)=delete
not copyable
fair::mq::sdk::DDSSession
Represents a DDS session.
Definition: DDSSession.h:62
fair::mq::sdk::BasicTopology::AsyncSetProperties
auto AsyncSetProperties(DeviceProperties const &props, CompletionToken &&token)
Initiate property update on selected FairMQ devices in this topology.
Definition: Topology.h:1287
fair::mq::sdk::BasicTopology::AsyncWaitForState
auto AsyncWaitForState(const DeviceState targetLastState, const DeviceState targetCurrentState, const std::string &path, Duration timeout, CompletionToken &&token)
Initiate waiting for selected FairMQ devices to reach given last & current state in this topology.
Definition: Topology.h:908
fair::mq::sdk::BasicTopology::AsyncChangeState
auto AsyncChangeState(const TopologyTransition transition, const std::string &path, CompletionToken &&token)
Initiate state transition on all FairMQ devices in this topology with a timeout.
Definition: Topology.h:750
fair::mq::sdk::BasicTopology::AsyncChangeState
auto AsyncChangeState(const TopologyTransition transition, CompletionToken &&token)
Initiate state transition on all FairMQ devices in this topology.
Definition: Topology.h:726
fair::mq::sdk::BasicTopology::AsyncGetProperties
auto AsyncGetProperties(DevicePropertyQuery const &query, CompletionToken &&token)
Initiate property query on selected FairMQ devices in this topology.
Definition: Topology.h:1131
fair::mq::sdk::BasicTopology::AsyncWaitForState
auto AsyncWaitForState(const DeviceState targetLastState, const DeviceState targetCurrentState, CompletionToken &&token)
Initiate waiting for selected FairMQ devices to reach given last & current state in this topology.
Definition: Topology.h:953
fair::mq::sdk::GetPropertiesResult
Definition: Topology.h:146
fair::mq::sdk::BasicTopology::AsyncWaitForState
auto AsyncWaitForState(const DeviceState targetCurrentState, CompletionToken &&token)
Initiate waiting for selected FairMQ devices to reach given current state in this topology.
Definition: Topology.h:964
fair::mq::sdk::cmd::Cmds
Definition: Commands.h:360
fair::mq::sdk::BasicTopology::AsyncChangeState
auto AsyncChangeState(const TopologyTransition transition, const std::string &path, Duration timeout, CompletionToken &&token)
Initiate state transition on all FairMQ devices in this topology.
Definition: Topology.h:678
fair::mq::sdk::BasicTopology
Represents a FairMQ topology.
Definition: Topology.h:213
fair::mq::sdk::BasicTopology::ChangeState
auto ChangeState(const TopologyTransition transition, const std::string &path="", Duration timeout=Duration(0)) -> std::pair< std::error_code, TopologyState >
Perform state transition on FairMQ devices in this topology for a specified topology path.
Definition: Topology.h:760
fair::mq::sdk::BasicTopology::AsyncGetProperties
auto AsyncGetProperties(DevicePropertyQuery const &query, const std::string &path, Duration timeout, CompletionToken &&token)
Initiate property query on selected FairMQ devices in this topology.
Definition: Topology.h:1089
fair::mq::sdk::BasicTopology::AsyncSetProperties
auto AsyncSetProperties(const DeviceProperties &props, const std::string &path, Duration timeout, CompletionToken &&token)
Initiate property update on selected FairMQ devices in this topology.
Definition: Topology.h:1245
fair::mq::sdk::DDSTopology
Represents a DDS topology.
Definition: DDSTopology.h:35
fair::mq::sdk::BasicTopology::AsyncChangeState
auto AsyncChangeState(const TopologyTransition transition, Duration timeout, CompletionToken &&token)
Initiate state transition on all FairMQ devices in this topology with a timeout.
Definition: Topology.h:738

privacy