9 #ifndef FAIR_MQ_SDK_ASIOASYNCOP_H
10 #define FAIR_MQ_SDK_ASIOASYNCOP_H
12 #include <asio/associated_allocator.hpp>
13 #include <asio/associated_executor.hpp>
14 #include <asio/executor_work_guard.hpp>
15 #include <asio/dispatch.hpp>
16 #include <asio/system_executor.hpp>
19 #include <fairmq/sdk/Error.h>
20 #include <fairmq/sdk/Traits.h>
23 #include <system_error>
24 #include <type_traits>
27 #include <fairlogger/Logger.h>
32 namespace fair::mq::sdk
35 template<
typename... SignatureArgTypes>
38 virtual auto Complete(std::error_code, SignatureArgTypes...) ->
void = 0;
39 virtual auto IsCompleted()
const ->
bool = 0;
46 template<
typename Executor1,
typename Allocator1,
typename Handler,
typename... SignatureArgTypes>
50 using Allocator2 =
typename asio::associated_allocator<Handler, Allocator1>::type;
53 using Executor2 =
typename asio::associated_executor<Handler, Executor1>::type;
58 , fWork2(asio::get_associated_executor(handler, ex1))
59 , fHandler(std::move(handler))
60 , fAlloc1(std::move(alloc1))
63 auto GetAlloc2() const ->
Allocator2 {
return asio::get_associated_allocator(fHandler, fAlloc1); }
64 auto GetEx2() const ->
Executor2 {
return asio::get_associated_executor(fWork2); }
66 auto Complete(std::error_code ec, SignatureArgTypes... args) ->
void override
69 throw RuntimeError(
"Async operation already completed");
72 asio::dispatch(GetEx2(),
73 [=, handler = std::move(fHandler)]()
mutable {
76 }
catch (
const std::exception& e) {
77 FAIR_LOG(error) <<
"Uncaught exception in AsioAsyncOp completion handler: " << e.what();
79 FAIR_LOG(error) <<
"Unknown uncaught exception in AsioAsyncOp completion handler.";
87 auto IsCompleted() const ->
bool override
89 return !fWork1.owns_work() && !fWork2.owns_work();
94 asio::executor_work_guard<Executor1> fWork1;
95 asio::executor_work_guard<Executor2> fWork2;
113 template<
typename Executor,
typename Allocator,
typename CompletionSignature>
127 template<
typename Executor,
129 typename SignatureReturnType,
130 typename SignatureFirstArgType,
131 typename... SignatureArgTypes>
134 SignatureReturnType(SignatureFirstArgType, SignatureArgTypes...)>
136 static_assert(std::is_void<SignatureReturnType>::value,
137 "return value of CompletionSignature must be void");
138 static_assert(std::is_same<SignatureFirstArgType, std::error_code>::value,
139 "first argument of CompletionSignature must be std::error_code");
140 using Duration = std::chrono::milliseconds;
144 using ImplPtr = std::unique_ptr<
Impl, std::function<void(
Impl*)>>;
154 template<
typename Handler>
159 using Op =
AsioAsyncOpImpl<Executor, Allocator, Handler, SignatureArgTypes...>;
164 typename std::allocator_traits<typename Op::Allocator2>::template rebind_alloc<Op>;
168 auto mem(std::allocator_traits<OpAllocator>::allocate(opAlloc, 1));
171 auto ptr(
new (mem) Op(std::move(ex1),
173 std::forward<Handler>(handler)));
176 fImpl = ImplPtr(ptr, [opAlloc](
Impl* p)
mutable {
177 std::allocator_traits<OpAllocator>::deallocate(opAlloc,
static_cast<Op*
>(p), 1);
182 template<
typename Handler>
184 :
AsioAsyncOp(std::move(ex1), Allocator(), std::forward<Handler>(handler))
188 template<
typename Handler>
190 :
AsioAsyncOp(asio::system_executor(), std::forward<Handler>(handler))
193 auto IsCompleted() ->
bool {
return (fImpl ==
nullptr) || fImpl->IsCompleted(); }
195 auto Complete(std::error_code ec, SignatureArgTypes... args) ->
void
198 throw RuntimeError(
"Async operation already completed");
201 fImpl->Complete(ec, args...);
202 fImpl.reset(
nullptr);
205 auto Complete(SignatureArgTypes... args) ->
void
207 Complete(std::error_code(), args...);
210 auto Cancel(SignatureArgTypes... args) ->
void
212 Complete(MakeErrorCode(ErrorCode::OperationCanceled), args...);
215 auto Timeout(SignatureArgTypes... args) ->
void
217 Complete(MakeErrorCode(ErrorCode::OperationTimeout), args...);