diff --git a/fairmq/sdk/AsioAsyncOp.h b/fairmq/sdk/AsioAsyncOp.h new file mode 100644 index 00000000..0757a8a3 --- /dev/null +++ b/fairmq/sdk/AsioAsyncOp.h @@ -0,0 +1,211 @@ +/******************************************************************************** + * Copyright (C) 2019 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ + +#ifndef FAIR_MQ_SDK_ASIOASYNCOP_H +#define FAIR_MQ_SDK_ASIOASYNCOP_H + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace fair { +namespace mq { +namespace sdk { + +template +struct AsioAsyncOpImplBase +{ + virtual auto Complete(std::error_code, SignatureArgTypes&&...) -> void = 0; + virtual auto IsCompleted() const -> bool = 0; +}; + +/** + * @tparam Executor1 Associated I/O executor, see https://www.boost.org/doc/libs/1_70_0/doc/html/boost_asio/reference/asynchronous_operations.html#boost_asio.reference.asynchronous_operations.associated_i_o_executor + * @tparam Allocator1 Default allocation strategy, see https://www.boost.org/doc/libs/1_70_0/doc/html/boost_asio/reference/asynchronous_operations.html#boost_asio.reference.asynchronous_operations.allocation_of_intermediate_storage + */ +template +struct AsioAsyncOpImpl : AsioAsyncOpImplBase +{ + /// See https://www.boost.org/doc/libs/1_70_0/doc/html/boost_asio/reference/asynchronous_operations.html#boost_asio.reference.asynchronous_operations.allocation_of_intermediate_storage + using Allocator2 = typename asio::associated_allocator::type; + + /// See https://www.boost.org/doc/libs/1_70_0/doc/html/boost_asio/reference/asynchronous_operations.html#boost_asio.reference.asynchronous_operations.associated_completion_handler_executor + using Executor2 = typename asio::associated_executor::type; + + /// Ctor + AsioAsyncOpImpl(const Executor1& ex1, Allocator1&& alloc1, Handler&& handler) + : fWork1(ex1) + , fWork2(asio::get_associated_executor(handler, ex1)) + , fHandler(std::move(handler)) + , fAlloc1(std::move(alloc1)) + {} + + auto GetAlloc2() const -> Allocator2 { return asio::get_associated_allocator(fHandler, fAlloc1); } + auto GetEx2() const -> Executor2 { return asio::get_associated_executor(fWork2); } + + auto Complete(std::error_code ec, SignatureArgTypes&&... args) -> void override + { + if (IsCompleted()) { + throw RuntimeError("Async operation already completed"); + } + + GetEx2().dispatch( + [=, handler = std::move(fHandler)]() mutable { + handler(ec, std::forward(args)...); + }, + GetAlloc2()); + + fWork1.reset(); + fWork2.reset(); + } + + auto IsCompleted() const -> bool override + { + return !fWork1.owns_work() && !fWork2.owns_work(); + } + + private: + /// See https://www.boost.org/doc/libs/1_70_0/doc/html/boost_asio/reference/asynchronous_operations.html#boost_asio.reference.asynchronous_operations.outstanding_work + asio::executor_work_guard fWork1; + asio::executor_work_guard fWork2; + Handler fHandler; + Allocator1 fAlloc1; +}; + +/** + * @class AsioAsyncOp AsioAsyncOp.h + * @tparam Executor Associated I/O executor, see https://www.boost.org/doc/libs/1_70_0/doc/html/boost_asio/reference/asynchronous_operations.html#boost_asio.reference.asynchronous_operations.associated_i_o_executor + * @tparam Allocator Default allocation strategy, see https://www.boost.org/doc/libs/1_70_0/doc/html/boost_asio/reference/asynchronous_operations.html#boost_asio.reference.asynchronous_operations.allocation_of_intermediate_storage + * @tparam CompletionSignature + * @brief Interface for Asio-compliant asynchronous operation, see https://www.boost.org/doc/libs/1_70_0/doc/html/boost_asio/reference/asynchronous_operations.html + * + * @par Thread Safety + * @e Distinct @e objects: Safe.@n + * @e Shared @e objects: Unsafe. + * + * primary template + */ +template +struct AsioAsyncOp +{ +}; + +/** + * @tparam Executor See primary template + * @tparam Allocator See primary template + * @tparam SignatureReturnType Return type of CompletionSignature, see primary template + * @tparam SignatureFirstArgType Type of first argument of CompletionSignature, see primary template + * @tparam SignatureArgTypes Types of the rest of arguments of CompletionSignature + * + * partial specialization to deconstruct CompletionSignature + */ +template +struct AsioAsyncOp +{ + static_assert(std::is_void::value, + "return value of CompletionSignature must be void"); + static_assert(std::is_same::value, + "first argument of CompletionSignature must be std::error_code"); + using Duration = std::chrono::milliseconds; + + private: + using Impl = AsioAsyncOpImplBase; + using ImplPtr = std::unique_ptr>; + ImplPtr fImpl; + + public: + /// Default Ctor + AsioAsyncOp() + : fImpl(nullptr) + {} + + /// Ctor with handler + template + AsioAsyncOp(Executor&& ex1, Allocator&& alloc1, Handler&& handler) + : AsioAsyncOp() + { + // Async operation type to be allocated and constructed + using Op = AsioAsyncOpImpl; + + // Create allocator for concrete op type + // Allocator2, see https://www.boost.org/doc/libs/1_70_0/doc/html/boost_asio/reference/asynchronous_operations.html#boost_asio.reference.asynchronous_operations.allocation_of_intermediate_storage + using OpAllocator = + typename std::allocator_traits::template rebind_alloc; + OpAllocator opAlloc; + + // Allocate memory + auto mem(std::allocator_traits::allocate(opAlloc, 1)); + + // Construct object + auto ptr(new (mem) Op(std::forward(ex1), + std::forward(alloc1), + std::forward(handler))); + + // Assign ownership to this object + fImpl = ImplPtr(ptr, [opAlloc](Impl* p) mutable { + std::allocator_traits::deallocate(opAlloc, static_cast(p), 1); + }); + } + + /// Ctor with handler #2 + template + AsioAsyncOp(Executor&& ex1, Handler&& handler) + : AsioAsyncOp(std::forward(ex1), Allocator(), std::forward(handler)) + {} + + /// Ctor with handler #3 + template + explicit AsioAsyncOp(Handler&& handler) + : AsioAsyncOp(asio::system_executor(), std::forward(handler)) + {} + + auto IsCompleted() -> bool { return (fImpl == nullptr) || fImpl->IsCompleted(); } + + auto Complete(std::error_code ec, SignatureArgTypes&&... args) -> void + { + if(IsCompleted()) { + throw RuntimeError("Async operation already completed"); + } + + fImpl->Complete(ec, std::forward(args)...); + fImpl.reset(nullptr); + } + + auto Complete(SignatureArgTypes&&... args) -> void + { + Complete(std::error_code(), std::forward(args)...); + } + + auto Cancel(SignatureArgTypes&&... args) -> void + { + Complete(std::make_error_code(std::errc::operation_canceled), + std::forward(args)...); + } +}; + +} /* namespace sdk */ +} /* namespace mq */ +} /* namespace fair */ + +#endif /* FAIR_MQ_SDK_ASIOASYNCOP_H */ + diff --git a/fairmq/sdk/AsioBase.h b/fairmq/sdk/AsioBase.h new file mode 100644 index 00000000..73203bf6 --- /dev/null +++ b/fairmq/sdk/AsioBase.h @@ -0,0 +1,76 @@ +/******************************************************************************** + * Copyright (C) 2019 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ + +#ifndef FAIR_MQ_SDK_ASIOBASE_H +#define FAIR_MQ_SDK_ASIOBASE_H + +#include +#include +#include +#include + +namespace fair { +namespace mq { +namespace sdk { + +using DefaultExecutor = asio::executor; +using DefaultAllocator = std::allocator; + +/** + * @class AsioBase AsioBase.h + * @tparam Executor Associated I/O executor + * @tparam Allocator Associated default allocator + * @brief Base for creating Asio-enabled I/O objects + * + * @par Thread Safety + * @e Distinct @e objects: Safe.@n + * @e Shared @e objects: Unsafe. + */ +template +class AsioBase +{ + public: + /// Member type of associated I/O executor + using ExecutorType = Executor; + /// Get associated I/O executor + auto GetExecutor() const noexcept -> ExecutorType { return fExecutor; } + + /// Member type of associated default allocator + using AllocatorType = Allocator; + /// Get associated default allocator + auto GetAllocator() const noexcept -> AllocatorType { return fAllocator; } + + /// NO default ctor + AsioBase() = delete; + + /// Construct with associated I/O executor + explicit AsioBase(Executor ex, Allocator alloc) + : fExecutor(std::move(ex)) + , fAllocator(std::move(alloc)) + {} + + /// NOT copyable + AsioBase(const AsioBase&) = delete; + AsioBase& operator=(const AsioBase&) = delete; + + /// movable + AsioBase(AsioBase&&) noexcept = default; + AsioBase& operator=(AsioBase&&) noexcept = default; + + ~AsioBase() = default; + + private: + ExecutorType fExecutor; + AllocatorType fAllocator; +}; + +} /* namespace sdk */ +} /* namespace mq */ +} /* namespace fair */ + +#endif /* FAIR_MQ_SDK_ASIOBASE_H */ diff --git a/fairmq/sdk/Exceptions.h b/fairmq/sdk/Exceptions.h new file mode 100644 index 00000000..77809244 --- /dev/null +++ b/fairmq/sdk/Exceptions.h @@ -0,0 +1,37 @@ +/******************************************************************************** + * Copyright (C) 2019 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ + +#ifndef FAIR_MQ_SDK_EXCEPTIONS_H +#define FAIR_MQ_SDK_EXCEPTIONS_H + +#include +#include +#include + +namespace fair { +namespace mq { +namespace sdk { + +struct RuntimeError : ::std::runtime_error +{ + template + explicit RuntimeError(T&&... t) + : ::std::runtime_error::runtime_error(tools::ToString(std::forward(t)...)) + {} +}; + +struct MixedStateError : RuntimeError +{ + using RuntimeError::RuntimeError; +}; + +} /* namespace sdk */ +} /* namespace mq */ +} /* namespace fair */ + +#endif /* FAIR_MQ_SDK_EXCEPTIONS_H */ diff --git a/fairmq/sdk/Traits.h b/fairmq/sdk/Traits.h new file mode 100644 index 00000000..be30115e --- /dev/null +++ b/fairmq/sdk/Traits.h @@ -0,0 +1,50 @@ +/******************************************************************************** + * Copyright (C) 2019 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ + +#ifndef FAIR_MQ_SDK_TRAITS_H +#define FAIR_MQ_SDK_TRAITS_H + +#include +#include +#include + +namespace asio { +namespace detail { + +/// Specialize to match our coding conventions +template +struct associated_executor_impl::value>> +{ + using type = typename T::ExecutorType; + + static auto get(const T& obj, const Executor& /*ex = Executor()*/) noexcept -> type + { + return obj.GetExecutor(); + } +}; + +/// Specialize to match our coding conventions +template +struct associated_allocator_impl> +{ + using type = typename T::AllocatorType; + + static auto get(const T& obj, const Allocator& /*alloc = Allocator()*/) noexcept -> type + { + return obj.GetAllocator(); + } +}; + +} /* namespace detail */ +} /* namespace asio */ + +#endif /* FAIR_MQ_SDK_TRAITS_H */ diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index ee229223..1816e0b4 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -287,6 +287,7 @@ if(BUILD_SDK) add_testsuite(SDK SOURCES ${CMAKE_CURRENT_BINARY_DIR}/runner.cxx + sdk/_async_op.cxx sdk/_dds.cxx sdk/_topology.cxx sdk/Fixtures.h diff --git a/test/sdk/Fixtures.h b/test/sdk/Fixtures.h index 70b1a2d4..cf613adf 100644 --- a/test/sdk/Fixtures.h +++ b/test/sdk/Fixtures.h @@ -10,12 +10,13 @@ #define FAIR_MQ_TEST_FIXTURES #include "TestEnvironment.h" -#include -#include +#include #include #include #include +#include +#include #include #include @@ -76,6 +77,19 @@ struct TopologyFixture : ::testing::Test sdk::DDSEnvironment mDDSEnv; sdk::DDSSession mDDSSession; sdk::DDSTopology mDDSTopo; + asio::io_context mIoContext; +}; + +struct AsyncOpFixture : ::testing::Test +{ + auto SetUp() -> void override { + } + + auto TearDown() -> void override { + } + + LoggerConfig mLoggerConfig; + asio::io_context mIoContext; }; } /* namespace test */ diff --git a/test/sdk/_async_op.cxx b/test/sdk/_async_op.cxx new file mode 100644 index 00000000..da790636 --- /dev/null +++ b/test/sdk/_async_op.cxx @@ -0,0 +1,118 @@ +/******************************************************************************** + * Copyright (C) 2019 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ + +#include "Fixtures.h" + +#include +#include +#include +#include +#include + +namespace { + +using AsyncOp = fair::mq::test::AsyncOpFixture; + +// template +// class : public AsioBase + +TEST_F(AsyncOp, DefaultConstruction) +{ + using namespace fair::mq::sdk; + + AsioAsyncOp op; + EXPECT_TRUE(op.IsCompleted()); +} + +TEST_F(AsyncOp, ConstructionWithHandler) +{ + using namespace fair::mq::sdk; + + AsioAsyncOp op( + [](std::error_code, int) {}); + EXPECT_FALSE(op.IsCompleted()); +} + +TEST_F(AsyncOp, Complete) +{ + using namespace fair::mq::sdk; + + AsioAsyncOp op( + [](std::error_code ec, int v) { + EXPECT_FALSE(ec); // success + EXPECT_EQ(v, 42); + }); + + EXPECT_FALSE(op.IsCompleted()); + op.Complete(42); + EXPECT_TRUE(op.IsCompleted()); + + EXPECT_THROW(op.Complete(6), RuntimeError); // No double completion! +} + +TEST_F(AsyncOp, Cancel) +{ + using namespace fair::mq::sdk; + + AsioAsyncOp op( + [](std::error_code ec) { + EXPECT_TRUE(ec); // error + EXPECT_EQ(ec, std::make_error_code(std::errc::operation_canceled)); + }); + + op.Cancel(); +} + +TEST_F(AsyncOp, Timeout) +{ + using namespace fair::mq::sdk; + + asio::steady_timer timer(mIoContext.get_executor(), std::chrono::milliseconds(50)); + AsioAsyncOp op( + mIoContext.get_executor(), + [&timer](std::error_code ec) { + timer.cancel(); + std::cout << "Completion with: " << ec.message() << std::endl; + EXPECT_TRUE(ec); // error + EXPECT_EQ(ec, std::make_error_code(std::errc::operation_canceled)); + }); + timer.async_wait([&op](asio::error_code ec) { + std::cout << "Timer event" << std::endl; + if (ec != asio::error::operation_aborted) { + op.Cancel(); + } + }); + + mIoContext.run(); + EXPECT_THROW(op.Complete(), RuntimeError); +} + +TEST_F(AsyncOp, Timeout2) +{ + using namespace fair::mq::sdk; + + asio::steady_timer timer(mIoContext.get_executor(), std::chrono::milliseconds(50)); + AsioAsyncOp op( + mIoContext.get_executor(), + [&timer](std::error_code ec) { + timer.cancel(); + std::cout << "Completion with: " << ec.message() << std::endl; + EXPECT_FALSE(ec); // success + }); + op.Complete(); // Complete before timer + timer.async_wait([&op](asio::error_code ec) { + std::cout << "Timer event" << std::endl; + if (ec != asio::error::operation_aborted) { + op.Cancel(); + } + }); + + mIoContext.run(); + EXPECT_THROW(op.Complete(), RuntimeError); +} +} // namespace