FairMQ  1.4.14
C++ Message Queuing Library and Framework
AsioAsyncOp.h
1 /********************************************************************************
2  * Copyright (C) 2019 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
3  * *
4  * This software is distributed under the terms of the *
5  * GNU Lesser General Public Licence (LGPL) version 3, *
6  * copied verbatim in the file "LICENSE" *
7  ********************************************************************************/
8 
9 #ifndef FAIR_MQ_SDK_ASIOASYNCOP_H
10 #define FAIR_MQ_SDK_ASIOASYNCOP_H
11 
12 #include <asio/associated_allocator.hpp>
13 #include <asio/associated_executor.hpp>
14 #include <asio/executor_work_guard.hpp>
15 #include <asio/system_executor.hpp>
16 #include <chrono>
17 #include <exception>
18 #include <fairmq/sdk/Error.h>
19 #include <fairmq/sdk/Traits.h>
20 #include <functional>
21 #include <memory>
22 #include <system_error>
23 #include <type_traits>
24 #include <utility>
25 
26 #include <fairlogger/Logger.h>
27 #ifndef FAIR_LOG
28 #define FAIR_LOG LOG
29 #endif /* ifndef FAIR_LOG */
30 
31 namespace fair {
32 namespace mq {
33 namespace sdk {
34 
35 template<typename... SignatureArgTypes>
37 {
38  virtual auto Complete(std::error_code, SignatureArgTypes...) -> void = 0;
39  virtual auto IsCompleted() const -> bool = 0;
40 };
41 
46 template<typename Executor1, typename Allocator1, typename Handler, typename... SignatureArgTypes>
47 struct AsioAsyncOpImpl : AsioAsyncOpImplBase<SignatureArgTypes...>
48 {
50  using Allocator2 = typename asio::associated_allocator<Handler, Allocator1>::type;
51 
53  using Executor2 = typename asio::associated_executor<Handler, Executor1>::type;
54 
56  AsioAsyncOpImpl(const Executor1& ex1, Allocator1 alloc1, Handler&& handler)
57  : fWork1(ex1)
58  , fWork2(asio::get_associated_executor(handler, ex1))
59  , fHandler(std::move(handler))
60  , fAlloc1(std::move(alloc1))
61  {}
62 
63  auto GetAlloc2() const -> Allocator2 { return asio::get_associated_allocator(fHandler, fAlloc1); }
64  auto GetEx2() const -> Executor2 { return asio::get_associated_executor(fWork2); }
65 
66  auto Complete(std::error_code ec, SignatureArgTypes... args) -> void override
67  {
68  if (IsCompleted()) {
69  throw RuntimeError("Async operation already completed");
70  }
71 
72  GetEx2().dispatch(
73  [=, handler = std::move(fHandler)]() mutable {
74  try {
75  handler(ec, args...);
76  } catch (const std::exception& e) {
77  FAIR_LOG(error) << "Uncaught exception in AsioAsyncOp completion handler: " << e.what();
78  } catch (...) {
79  FAIR_LOG(error) << "Unknown uncaught exception in AsioAsyncOp completion handler.";
80  }
81  },
82  GetAlloc2());
83 
84  fWork1.reset();
85  fWork2.reset();
86  }
87 
88  auto IsCompleted() const -> bool override
89  {
90  return !fWork1.owns_work() && !fWork2.owns_work();
91  }
92 
93  private:
95  asio::executor_work_guard<Executor1> fWork1;
96  asio::executor_work_guard<Executor2> fWork2;
97  Handler fHandler;
98  Allocator1 fAlloc1;
99 };
100 
114 template<typename Executor, typename Allocator, typename CompletionSignature>
116 {
117 };
118 
128 template<typename Executor,
129  typename Allocator,
130  typename SignatureReturnType,
131  typename SignatureFirstArgType,
132  typename... SignatureArgTypes>
133 struct AsioAsyncOp<Executor,
134  Allocator,
135  SignatureReturnType(SignatureFirstArgType, SignatureArgTypes...)>
136 {
137  static_assert(std::is_void<SignatureReturnType>::value,
138  "return value of CompletionSignature must be void");
139  static_assert(std::is_same<SignatureFirstArgType, std::error_code>::value,
140  "first argument of CompletionSignature must be std::error_code");
141  using Duration = std::chrono::milliseconds;
142 
143  private:
144  using Impl = AsioAsyncOpImplBase<SignatureArgTypes...>;
145  using ImplPtr = std::unique_ptr<Impl, std::function<void(Impl*)>>;
146  ImplPtr fImpl;
147 
148  public:
151  : fImpl(nullptr)
152  {}
153 
155  template<typename Handler>
156  AsioAsyncOp(Executor ex1, Allocator alloc1, Handler&& handler)
157  : AsioAsyncOp()
158  {
159  // Async operation type to be allocated and constructed
160  using Op = AsioAsyncOpImpl<Executor, Allocator, Handler, SignatureArgTypes...>;
161 
162  // Create allocator for concrete op type
163  // Allocator2, see https://www.boost.org/doc/libs/1_70_0/doc/html/boost_asio/reference/asynchronous_operations.html#boost_asio.reference.asynchronous_operations.allocation_of_intermediate_storage
164  using OpAllocator =
165  typename std::allocator_traits<typename Op::Allocator2>::template rebind_alloc<Op>;
166  OpAllocator opAlloc;
167 
168  // Allocate memory
169  auto mem(std::allocator_traits<OpAllocator>::allocate(opAlloc, 1));
170 
171  // Construct object
172  auto ptr(new (mem) Op(std::move(ex1),
173  std::move(alloc1),
174  std::forward<Handler>(handler)));
175 
176  // Assign ownership to this object
177  fImpl = ImplPtr(ptr, [opAlloc](Impl* p) mutable {
178  std::allocator_traits<OpAllocator>::deallocate(opAlloc, static_cast<Op*>(p), 1);
179  });
180  }
181 
183  template<typename Handler>
184  AsioAsyncOp(Executor ex1, Handler&& handler)
185  : AsioAsyncOp(std::move(ex1), Allocator(), std::forward<Handler>(handler))
186  {}
187 
189  template<typename Handler>
190  explicit AsioAsyncOp(Handler&& handler)
191  : AsioAsyncOp(asio::system_executor(), std::forward<Handler>(handler))
192  {}
193 
194  auto IsCompleted() -> bool { return (fImpl == nullptr) || fImpl->IsCompleted(); }
195 
196  auto Complete(std::error_code ec, SignatureArgTypes... args) -> void
197  {
198  if(IsCompleted()) {
199  throw RuntimeError("Async operation already completed");
200  }
201 
202  fImpl->Complete(ec, args...);
203  fImpl.reset(nullptr);
204  }
205 
206  auto Complete(SignatureArgTypes... args) -> void
207  {
208  Complete(std::error_code(), args...);
209  }
210 
211  auto Cancel(SignatureArgTypes... args) -> void
212  {
213  Complete(MakeErrorCode(ErrorCode::OperationCanceled), args...);
214  }
215 
216  auto Timeout(SignatureArgTypes... args) -> void
217  {
218  Complete(MakeErrorCode(ErrorCode::OperationTimeout), args...);
219  }
220 };
221 
222 } /* namespace sdk */
223 } /* namespace mq */
224 } /* namespace fair */
225 
226 #endif /* FAIR_MQ_SDK_ASIOASYNCOP_H */
227 
Definition: AsioAsyncOp.h:47
Interface for Asio-compliant asynchronous operation, see https://www.boost.org/doc/libs/1_70_0/doc/ht...
Definition: AsioAsyncOp.h:115
Definition: AsioAsyncOp.h:36
Definition: Error.h:56
AsioAsyncOpImpl(const Executor1 &ex1, Allocator1 alloc1, Handler &&handler)
Ctor.
Definition: AsioAsyncOp.h:56
typename asio::associated_executor< Handler, Executor1 >::type Executor2
See https://www.boost.org/doc/libs/1_70_0/doc/html/boost_asio/reference/asynchronous_operations.html#boost_asio.reference.asynchronous_operations.associated_completion_handler_executor.
Definition: AsioAsyncOp.h:53
AsioAsyncOp(Executor ex1, Handler &&handler)
Ctor with handler #2.
Definition: AsioAsyncOp.h:184
typename asio::associated_allocator< Handler, Allocator1 >::type Allocator2
See https://www.boost.org/doc/libs/1_70_0/doc/html/boost_asio/reference/asynchronous_operations.html#boost_asio.reference.asynchronous_operations.allocation_of_intermediate_storage.
Definition: AsioAsyncOp.h:50
Definition: Error.h:20
Tools for interfacing containers to the transport via polymorphic allocators.
Definition: DeviceRunner.h:23
AsioAsyncOp(Executor ex1, Allocator alloc1, Handler &&handler)
Ctor with handler.
Definition: AsioAsyncOp.h:156
Definition: Traits.h:16

privacy