Compare commits

..

46 Commits

Author SHA1 Message Date
Dennis Klein
3f5374820a CMake: Add config for --graphviz 2019-09-04 21:17:35 +02:00
Dennis Klein
8a2c7fb601 DDS plugin: Wait for IDLE->EXITING state-change to be acknowledged
Sometimes devices shut down too fast when entering the EXITING state so
that the publication of that state-change will never be sent. The plugin
now waits for an acknowledgement by the external controller with a
configurable timeout.
2019-09-04 21:17:35 +02:00
Dennis Klein
c1a17c97b8 SDK: Add getter for agent path 2019-09-04 21:17:35 +02:00
Dennis Klein
ac8825c8de SDK: Update convenience header 2019-09-04 21:17:35 +02:00
Dennis Klein
1c49dde668 SDK: Fix data races on the local semaphores 2019-09-04 21:17:35 +02:00
Dennis Klein
5d6184cd1a Tools: Add a copyable SharedSemaphore 2019-09-04 21:17:35 +02:00
Dennis Klein
0e5f648d2b SDK: Require DDS 2.5.46
This DDS version fixed a critical deadlock
2019-09-04 21:17:35 +02:00
Dennis Klein
8057b8ae33 Tests.SDK: Fix formatting 2019-09-04 21:17:35 +02:00
Dennis Klein
da28b85497 Tests.SDK: Add optional DDS Tools API stability tests 2019-09-04 21:17:35 +02:00
Dennis Klein
33b5a2a342 SDK: Require DDS 2.5.42 and adapt to most recent API change 2019-09-04 21:17:35 +02:00
Dennis Klein
5b47df3014 SDK: Fix race condition which lead to frequent segfaults on destruction 2019-09-04 21:17:35 +02:00
Dennis Klein
fd77f2b729 SDK: Add usage examples (and tests) 2019-09-04 21:17:35 +02:00
Dennis Klein
6275f4d267 fairmq: Remove obsolete functions 2019-09-04 21:17:35 +02:00
Dennis Klein
d09be4ab79 Docs: Fix link 2019-09-04 21:17:35 +02:00
Dennis Klein
246e99a577 SDK: Fix exception specification 2019-09-04 21:17:35 +02:00
Dennis Klein
0d182dc18f fairmq: Fix -Wdefaulted-function-deleted 2019-09-04 21:17:35 +02:00
Dennis Klein
46e0796e77 CMake: Make the SDK depend on DDS_PLUGIN 2019-09-04 21:17:35 +02:00
Dennis Klein
1055f035ff CMake: Issue a warning if build option requirements are not met 2019-09-04 21:17:35 +02:00
Dennis Klein
7a0d348bd4 SDK: Implement Topology with asio-compliant async interface 2019-09-04 21:17:35 +02:00
Dennis Klein
3cd6d8cfca SDK: Refactor out DDSTask 2019-09-04 21:17:35 +02:00
Dennis Klein
0f50abf3d9 SDK: Fix completion signature and catch completion exceptions 2019-09-04 21:17:35 +02:00
Dennis Klein
75a3a80ac1 CMake: Fix submodule update command 2019-09-04 21:17:35 +02:00
Dennis Klein
25539e99f2 SDK: Introduce fairmq error category 2019-09-04 21:17:35 +02:00
Dennis Klein
f73a6d71ed CMake: Do not use git version in install tree 2019-09-04 21:17:35 +02:00
Dennis Klein
73af0ed78b SDK: Implement asio-compliant asynchronous operation helpers 2019-09-04 21:17:35 +02:00
Dennis Klein
1dec059104 SDK: Require C++14, CMake 3.11 and bundle asio 1.13.0
Decouple from Boost distro to be able to use
newest releases and rely on std::error_code only.
2019-09-04 21:17:35 +02:00
Dennis Klein
88ff5d8fc0 CMake: Bundle GTest 2019-09-04 21:17:35 +02:00
Dennis Klein
d6d9312e53 CMake: Add find module for asio 2019-09-04 21:17:35 +02:00
Dennis Klein
2208fe91e8 README: Update instructions 2019-09-04 21:17:35 +02:00
Dennis Klein
8d12b908b6 SDK: Implement helper to find most recent running DDS session 2019-09-04 21:17:35 +02:00
Alexey Rybalchenko
02b20c320c Add support for fairlogger dependencies 2019-08-22 15:52:45 +02:00
Alexey Rybalchenko
be06a5629e Print install prefix in cmake summary 2019-08-14 15:03:12 +02:00
Alexey Rybalchenko
eaa8f5cbdd SDK: Require DDS 2.5.36 and support new Tools API 2019-08-13 20:04:05 +02:00
Alexey Rybalchenko
7f0237d97d Example.DDS: Support DDS 2.5.25+ CLI 2019-08-13 20:04:05 +02:00
Alexey Rybalchenko
2fc93994d1 Execute test for DDS example only if DDS was found 2019-08-12 16:22:39 +02:00
Alexey Rybalchenko
8feffe70ba Topology: Use DDSTask type, remove unused member 2019-08-12 16:22:39 +02:00
Dennis Klein
31edf948de FairMQ: Require Boost 1.66 2019-08-07 18:37:36 +02:00
Dennis Klein
7cacf471b9 CI: Disable sdk until DDS 2.6 2019-07-29 09:22:02 +02:00
Dennis Klein
7316b0e7f2 Example.DDS: Run example as unit test
Part of #185
2019-07-29 09:22:02 +02:00
Dennis Klein
1fa82f5f22 Example.DDS: Make example topologies pass xml validation 2019-07-29 09:22:02 +02:00
Dennis Klein
1bb77bf47b DDS plugin: Automatically set session and device id if not provided
Resolves #187
2019-07-29 09:22:02 +02:00
Dennis Klein
07fe02a0a0 Tests.SDK: Add another test 2019-07-29 09:22:02 +02:00
Dennis Klein
9cbccface7 DDS plugin: Synchronize FillChannelContainers and DDSKeyValue updates
This was a regression after introducing external control mode in f7cdf5e.
2019-07-29 09:22:02 +02:00
Dennis Klein
7b773cde51 SDK: Improve error handling in case state-change fails on a device
Replace the log message with

1. Nothing, if the device is already in the target state
2. Abort and call the completion callback with error otherwise
2019-07-29 09:22:02 +02:00
Dennis Klein
fd282fa950 SDK: Track channel to task id association 2019-07-29 09:22:02 +02:00
Dennis Klein
008be36125 PluginServices: Do not throw if device control cannot be released 2019-07-29 09:22:02 +02:00
55 changed files with 2100 additions and 671 deletions

View File

@@ -1,3 +1,3 @@
---
Checks: '*,-google-*,-fuchsia-*,-cert-*,-llvm-header-guard,-readability-named-parameter,-misc-non-private-member-variables-in-classes,-*-magic-numbers,-llvm-include-order,-hicpp-no-array-decay,-performance-unnecessary-value-param,-cppcoreguidelines-pro-bounds-array-to-pointer-decay'
HeaderFilterRegex: '/(fairmq/|FairMQ)'
HeaderFilterRegex: '/(fairmq/)'

6
.gitmodules vendored Normal file
View File

@@ -0,0 +1,6 @@
[submodule "extern/googletest"]
path = extern/googletest
url = https://github.com/google/googletest
[submodule "extern/asio"]
path = extern/asio
url = https://github.com/chriskohlhoff/asio

View File

@@ -0,0 +1,14 @@
set(GRAPHVIZ_GRAPH_TYPE digraph)
set(GRAPHVIZ_GRAPH_NAME FairMQ)
set(GRAPHVIZ_EXECUTABLES ON)
set(GRAPHVIZ_STATIC_LIBS OFF)
set(GRAPHVIZ_SHARED_LIBS ON)
set(GRAPHVIZ_MODULE_LIBS OFF)
set(GRAPHVIZ_GENERATE_PER_TARGET OFF)
set(GRAPHVIZ_GENERATE_DEPENDERS OFF)
set(GRAPHVIZ_IGNORE_TARGETS
"fairmq-ex.*"
"testsuite_.*"
"testhelper_.*"
"FairMQPlugin_test_dummy"
)

View File

@@ -6,8 +6,8 @@
# copied verbatim in the file "LICENSE" #
################################################################################
cmake_minimum_required(VERSION 3.10 FATAL_ERROR)
cmake_policy(VERSION 3.10...3.14)
cmake_minimum_required(VERSION 3.11 FATAL_ERROR)
cmake_policy(VERSION 3.11...3.15)
# Project ######################################################################
set(CMAKE_MODULE_PATH ${CMAKE_SOURCE_DIR}/cmake)
@@ -18,7 +18,7 @@ get_git_version()
project(FairMQ VERSION ${PROJECT_VERSION} LANGUAGES CXX)
message(STATUS "${BWhite}${PROJECT_NAME}${CR} ${PROJECT_GIT_VERSION} from ${PROJECT_DATE}")
if(BUILD_OFI_TRANSPORT)
if(BUILD_OFI_TRANSPORT OR BUILD_SDK)
set(PROJECT_MIN_CXX_STANDARD 14)
else()
set(PROJECT_MIN_CXX_STANDARD 11)
@@ -31,17 +31,26 @@ include(CTest)
# Build options ################################################################
include(CMakeDependentOption)
option(BUILD_FAIRMQ "Build FairMQ library and devices." ON)
cmake_dependent_option(BUILD_TESTING "Build tests." OFF "BUILD_FAIRMQ" OFF)
cmake_dependent_option(BUILD_NANOMSG_TRANSPORT "Build nanomsg transport." OFF "BUILD_FAIRMQ" OFF)
cmake_dependent_option(BUILD_OFI_TRANSPORT "Build experimental OFI transport." OFF "BUILD_FAIRMQ" OFF)
cmake_dependent_option(BUILD_DDS_PLUGIN "Build DDS plugin." OFF "BUILD_FAIRMQ" OFF)
cmake_dependent_option(BUILD_PMIX_PLUGIN "Build PMIx plugin." OFF "BUILD_FAIRMQ" OFF)
cmake_dependent_option(BUILD_EXAMPLES "Build FairMQ examples." ON "BUILD_FAIRMQ" OFF)
option(BUILD_SDK "Build the FairMQ controller SDK." OFF)
option(BUILD_DOCS "Build FairMQ documentation." OFF)
option(FAST_BUILD "Fast production build. Not recommended for development." OFF)
fairmq_build_option(BUILD_FAIRMQ "Build FairMQ library and devices."
DEFAULT ON)
fairmq_build_option(BUILD_TESTING "Build tests."
DEFAULT OFF REQUIRES "BUILD_FAIRMQ")
fairmq_build_option(BUILD_NANOMSG_TRANSPORT "Build nanomsg transport."
DEFAULT OFF REQUIRES "BUILD_FAIRMQ")
fairmq_build_option(BUILD_OFI_TRANSPORT "Build experimental OFI transport."
DEFAULT OFF REQUIRES "BUILD_FAIRMQ")
fairmq_build_option(BUILD_DDS_PLUGIN "Build DDS plugin."
DEFAULT OFF REQUIRES "BUILD_FAIRMQ")
fairmq_build_option(BUILD_PMIX_PLUGIN "Build PMIx plugin."
DEFAULT OFF REQUIRES "BUILD_FAIRMQ")
fairmq_build_option(BUILD_EXAMPLES "Build FairMQ examples."
DEFAULT ON REQUIRES "BUILD_FAIRMQ")
fairmq_build_option(BUILD_SDK "Build the FairMQ controller SDK."
DEFAULT OFF REQUIRES "BUILD_DDS_PLUGIN")
fairmq_build_option(BUILD_DOCS "Build FairMQ documentation."
DEFAULT OFF)
fairmq_build_option(FAST_BUILD "Fast production build. Not recommended for development."
DEFAULT OFF)
################################################################################
@@ -75,10 +84,11 @@ if(BUILD_NANOMSG_TRANSPORT)
endif()
if(BUILD_SDK)
set(required_dds_version 2.5.22)
set(required_dds_version 2.5.46)
else()
set(required_dds_version 2.4)
endif()
if(BUILD_DDS_PLUGIN OR BUILD_SDK)
find_package2(PRIVATE DDS REQUIRED
VERSION ${required_dds_version}
@@ -98,6 +108,13 @@ if(BUILD_FAIRMQ OR BUILD_SDK)
VERSION 1.2.0
)
foreach(dep IN LISTS FairLogger_PACKAGE_DEPENDENCIES)
if(NOT dep STREQUAL "Boost")
find_package2(PUBLIC ${dep} REQUIRED VERSION ${FairLogger_${dep}_VERSION})
set(PROJECT_${dep}_VERSION ${FairLogger_${dep}_VERSION})
endif()
endforeach()
if(NOT DEFINED Boost_NO_BOOST_CMAKE AND CMAKE_VERSION VERSION_LESS 3.15)
# Since Boost 1.70 a CMake package is shipped by default. Unfortunately, it has a number
# of problems that are only fixed in Boost 1.71 or CMake 3.15. By default we skip the
@@ -105,7 +122,7 @@ if(BUILD_FAIRMQ OR BUILD_SDK)
set(Boost_NO_BOOST_CMAKE ON)
endif()
find_package2(PUBLIC Boost REQUIRED
VERSION 1.64
VERSION 1.66
COMPONENTS
container
@@ -119,6 +136,21 @@ if(BUILD_FAIRMQ OR BUILD_SDK)
DDS
FairLogger
)
# Normalize Boost version
if(CMAKE_VERSION VERSION_LESS 3.15)
set(Boost_VERSION "${Boost_MAJOR_VERSION}.${Boost_MINOR_VERSION}.${Boost_SUBMINOR_VERSION}")
endif()
endif()
if(BUILD_SDK)
find_package2(BUNDLED asio
VERSION 1.13.0
)
if(NOT asio_FOUND)
build_bundled(asio extern/asio)
find_package2(PRIVATE asio REQUIRED)
endif()
endif()
if(BUILD_FAIRMQ)
@@ -128,9 +160,11 @@ if(BUILD_FAIRMQ)
endif()
if(BUILD_TESTING)
find_package2(PRIVATE GTest REQUIRED
VERSION 1.7.0
)
find_package2(PRIVATE GTest VERSION 1.7.0)
if(NOT GTest_FOUND)
build_bundled(GTest extern/googletest)
find_package2(PRIVATE GTest REQUIRED)
endif()
endif()
if(BUILD_DOCS)
@@ -210,6 +244,22 @@ if(BUILD_DOCS)
DESTINATION ${PROJECT_INSTALL_DATADIR}/docs
)
endif()
if(BUILD_SDK)
install(FILES cmake/Findasio.cmake
DESTINATION ${PROJECT_INSTALL_CMAKEMODDIR}
)
if(asio_BUNDLED)
install(TARGETS bundled_asio_headers EXPORT ${PROJECT_EXPORT_SET})
install(DIRECTORY "${asio_BUILD_INCLUDE_DIR}/asio"
DESTINATION ${asio_INSTALL_INCLUDE_DIR}
PATTERN "Makefile.am" EXCLUDE
PATTERN ".gitignore" EXCLUDE
)
install(FILES "${asio_BUILD_INCLUDE_DIR}/asio.hpp"
DESTINATION ${asio_INSTALL_INCLUDE_DIR}
)
endif()
endif()
install_cmake_package()
################################################################################
@@ -246,15 +296,7 @@ if(PROJECT_PACKAGE_DEPENDENCIES)
message(STATUS " ${Cyan}DEPENDENCY FOUND VERSION PREFIX${CR}")
foreach(dep IN LISTS PROJECT_PACKAGE_DEPENDENCIES)
if(${dep}_VERSION)
if(${dep} STREQUAL Boost)
if(Boost_VERSION_MAJOR)
set(version_str "${BGreen}${${dep}_VERSION_MAJOR}.${${dep}_VERSION_MINOR}${CR}")
else()
set(version_str "${BGreen}${${dep}_MAJOR_VERSION}.${${dep}_MINOR_VERSION}${CR}")
endif()
else()
set(version_str "${BGreen}${${dep}_VERSION}${CR}")
endif()
set(version_str "${BGreen}${${dep}_VERSION}${CR}")
else()
set(version_str "${BYellow}unknown${CR}")
endif()
@@ -298,10 +340,15 @@ if(PROJECT_PACKAGE_DEPENDENCIES)
get_target_property(doxygen_bin Doxygen::doxygen INTERFACE_LOCATION)
get_filename_component(prefix ${doxygen_bin} DIRECTORY)
get_filename_component(prefix ${prefix}/.. ABSOLUTE)
elseif(${dep} STREQUAL fmt)
get_target_property(fmt_include fmt::fmt INTERFACE_INCLUDE_DIRECTORIES)
get_filename_component(prefix ${fmt_include}/.. ABSOLUTE)
else()
get_filename_component(prefix ${${dep}_INCLUDE_DIR}/.. ABSOLUTE)
endif()
message(STATUS " ${BWhite}${dep_padded}${CR}${version_padded}${prefix}")
if(NOT ${dep}_BUNDLED)
message(STATUS " ${BWhite}${dep_padded}${CR}${version_padded}${prefix}")
endif()
unset(version_str)
unset(version_padded)
unset(version_req_str)
@@ -358,9 +405,9 @@ else()
endif()
message(STATUS " ${BWhite}docs${CR} ${docs_summary}")
if(BUILD_SDK)
set(sdk_summary "${BGreen}YES${CR} EXPERIMENTAL (disable with ${BMagenta}-DBUILD_SDK=OFF${CR})")
set(sdk_summary "${BGreen}YES${CR} EXPERIMENTAL (required C++14) (disable with ${BMagenta}-DBUILD_SDK=OFF${CR})")
else()
set(sdk_summary "${BRed} NO${CR} EXPERIMENTAL (default, enable with ${BMagenta}-DBUILD_SDK=ON${CR})")
set(sdk_summary "${BRed} NO${CR} EXPERIMENTAL (required C++14) (default, enable with ${BMagenta}-DBUILD_SDK=ON${CR})")
endif()
message(STATUS " ${BWhite}sdk${CR} ${sdk_summary}")
message(STATUS " ")
@@ -384,6 +431,8 @@ if(RUN_STATIC_ANALYSIS)
else()
set(static_ana_summary "${BRed}OFF${CR} (default, enable with ${BMagenta}-DRUN_STATIC_ANALYSIS=ON${CR})")
endif()
message(STATUS " ${Cyan}INSTALL PREFIX${CR} ${BGreen}${CMAKE_INSTALL_PREFIX}${CR} (change with ${BMagenta}-DCMAKE_INSTALL_PREFIX=...${CR})")
message(STATUS " ")
message(STATUS " ${Cyan}RUN STATIC ANALYSIS ${static_ana_summary}")
message(STATUS " ")
################################################################################

View File

@@ -15,6 +15,14 @@ Files: cmake/cotire.cmake
Copyright: 2012-2018 Sascha Kratky
License: COTIRE
Files: extern/googletest
Copyright: 2008, Google Inc.
License: GOOGLE
Files: extern/asio
Copyright: 2003-2019, Christopher M. Kohlhoff (chris at kohlhoff dot com)
License: BSL-1.0
License: LGPL-3.0-only
[see LICENSE file]
@@ -39,3 +47,58 @@ License: COTIRE
WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
OTHER DEALINGS IN THE SOFTWARE.
License: GOOGLE
Copyright 2008, Google Inc.
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are
met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above
copyright notice, this list of conditions and the following disclaimer
in the documentation and/or other materials provided with the
distribution.
* Neither the name of Google Inc. nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
License: BSL-1.0
Boost Software License - Version 1.0 - August 17th, 2003
Permission is hereby granted, free of charge, to any person or organization
obtaining a copy of the software and accompanying documentation covered by
this license (the "Software") to use, reproduce, display, distribute,
execute, and transmit the Software, and to prepare derivative works of the
Software, and to permit third-parties to whom the Software is furnished to
do so, all subject to the following:
The copyright notices in the Software and this entire statement, including
the above license grant, this restriction and the following disclaimer,
must be included in all copies of the Software, in whole or in part, and
all derivative works of the Software, unless such copies or derivative
works are solely in the form of machine-executable object code generated by
a source language processor.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT
SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE
FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE,
ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.

View File

@@ -29,7 +29,7 @@ Set(configure_options "${configure_options};-DCMAKE_PREFIX_PATH=$ENV{SIMPATH}")
Set(configure_options "${configure_options};-DBUILD_NANOMSG_TRANSPORT=ON")
# Set(configure_options "${configure_options};-DBUILD_OFI_TRANSPORT=ON")
Set(configure_options "${configure_options};-DBUILD_DDS_PLUGIN=ON")
Set(configure_options "${configure_options};-DBUILD_SDK=ON")
Set(configure_options "${configure_options};-DBUILD_SDK=OFF")
Set(configure_options "${configure_options};-DFAST_BUILD=ON")
Set(configure_options "${configure_options};-DCOTIRE_MAXIMUM_NUMBER_OF_UNITY_INCLUDES=-j$ENV{number_of_processors}")

View File

@@ -33,25 +33,21 @@ configuration and control services.
FairMQ has been developed in the context of its mother project [FairRoot](https://github.com/FairRootGroup/FairRoot) -
a simulation, reconstruction and analysis framework.
## Dependencies
* PUBLIC: [**Boost**](https://www.boost.org/), [**FairLogger**](https://github.com/FairRootGroup/FairLogger)
* BUILD: [CMake](https://cmake.org/), [GTest](https://github.com/google/googletest), [Doxygen](http://www.doxygen.org/)
* PRIVATE: [ZeroMQ](http://zeromq.org/), [Msgpack](https://msgpack.org/index.html), [nanomsg](http://nanomsg.org/),
[asiofi](https://github.com/FairRootGroup/asiofi), [DDS](http://dds.gsi.de), [PMIx](https://pmix.org/)
Supported platforms: Linux and MacOS.
## Installation from Source
Recommended:
```bash
git clone https://github.com/FairRootGroup/FairMQ fairmq
mkdir fairmq_build && cd fairmq_build
cmake -DCMAKE_INSTALL_PREFIX=./fairmq_install ../fairmq
cmake --build . --target install
git clone https://github.com/FairRootGroup/FairMQ fairmq_source
cmake -S fairmq_source -B fairmq_build -GNinja -DCMAKE_BUILD_TYPE=Release -DCMAKE_INSTALL_PREFIX=fairmq_install
cmake --build fairmq_build
cd fairmq_build; ctest -j4; cd ..
cmake --build fairmq_build --target install
```
If dependencies are not installed in standard system directories, you can hint the installation location via `-DCMAKE_PREFIX_PATH=...` or per dependency via `-D{DEPENDENCY}_ROOT=...`. `{DEPENDENCY}` can be `GTEST`, `BOOST`, `FAIRLOGGER`, `ZEROMQ`, `MSGPACK`, `NANOMSG`, `OFI`, `PMIX`, `ASIOFI` or `DDS` (`*_ROOT` variables can also be environment variables).
Please consult the [manpages of your CMake version](https://cmake.org/cmake/help/latest/manual/cmake.1.html) for more options.
If dependencies are not installed in standard system directories, you can hint the installation location via `-DCMAKE_PREFIX_PATH=...` or per dependency via `-D{DEPENDENCY}_ROOT=...`. `{DEPENDENCY}` can be `GTEST`, `BOOST`, `FAIRLOGGER`, `ZEROMQ`, `MSGPACK`, `NANOMSG`, `OFI`, `PMIX`, `ASIO`, `ASIOFI` or `DDS` (`*_ROOT` variables can also be environment variables).
## Usage
@@ -70,30 +66,50 @@ find_package(FairMQ)
`find_package(FairMQ)` will define an imported target `FairMQ::FairMQ`.
In order to succesfully compile and link against the `FairMQ::FairMQ` target, you need to discover its public package dependencies, too.
In order to succesfully compile and link against the `FairMQ::FairMQ` target, you need to discover its public package dependencies:
```cmake
find_package(FairMQ)
if(FairMQ_FOUND)
find_package(FairLogger ${FairMQ_FairLogger_VERSION})
find_package(Boost ${FairMQ_Boost_VERSION} COMPONENTS ${FairMQ_Boost_COMPONENTS})
foreach(dep IN LISTS FairMQ_PACKAGE_DEPENDENCIES)
if(FairMQ_${dep}_COMPONENTS)
find_package(${dep} ${FairMQ_${dep}_VERSION} COMPONENTS ${FairMQ_${dep}_COMPONENTS})
else()
find_package(${dep} ${FairMQ_${dep}_VERSION})
endif()
endforeach()
endif()
```
Of course, feel free to customize the above commands to your needs.
If your project shares a dependency with FairMQ or if you want to omit a certain dependency, you may want to customize the above example code to your needs.
Optionally, you can require certain FairMQ package components and a minimum version:
```cmake
find_package(FairMQ 1.1.0 COMPONENTS nanomsg_transport dds_plugin)
if(FairMQ_FOUND)
find_package(FairLogger ${FairMQ_FairLogger_VERSION})
find_package(Boost ${FairMQ_Boost_VERSION} COMPONENTS ${FairMQ_Boost_COMPONENTS})
endif()
```
When building FairMQ, CMake will print a summary table of all available package components.
## Dependencies
* [asio](https://github.com/chriskohlhoff/asio) (optionally bundled)
* [asiofi](https://github.com/FairRootGroup/asiofi)
* [Boost](https://www.boost.org/)
* [CMake](https://cmake.org/)
* [DDS](http://dds.gsi.de)
* [Doxygen](http://www.doxygen.org/)
* [FairLogger](https://github.com/FairRootGroup/FairLogger)
* [GTest](https://github.com/google/googletest) (optionally bundled)
* [Msgpack](https://msgpack.org/index.html)
* [nanomsg](http://nanomsg.org/)
* [PMIx](https://pmix.org/)
* [ZeroMQ](http://zeromq.org/)
Which dependencies are required depends on which components are built.
Supported platforms: Linux and MacOS.
## CMake options
On command line:
@@ -106,7 +122,7 @@ On command line:
* `-DBUILD_DDS_PLUGIN=ON` enables building of the DDS plugin.
* `-DBUILD_PMIX_PLUGIN=ON` enables building of the PMIx plugin.
* `-DBUILD_DOCS=ON` enables building of API docs.
* You can hint non-system installations for dependent packages, see the #Installation section above
* You can hint non-system installations for dependent packages, see the #installation-from-source section above
After the `find_package(FairMQ)` call the following CMake variables are defined:

View File

@@ -36,4 +36,6 @@ set(CMAKE_MODULE_PATH ${@PROJECT_NAME@_CMAKEMODDIR} ${CMAKE_MODULE_PATH})
### Import targets
include(@PACKAGE_CMAKE_INSTALL_PREFIX@/@PACKAGE_INSTALL_DESTINATION@/@PROJECT_EXPORT_SET@.cmake)
@BUNDLED_PACKAGES@
@PACKAGE_COMPONENTS@

View File

@@ -266,13 +266,30 @@ check_required_components(${PROJECT_NAME})
set(PACKAGE_COMPONENTS ${PACKAGE_COMPONENTS} PARENT_SCOPE)
endfunction()
function(generate_bundled_packages)
if(asio_BUNDLED)
set(BUNDLED_PACKAGES "\
####### Expanded from @BUNDLED_PACKAGES@ by configure_package_config_file() #########
if(\"\${CMAKE_MAJOR_VERSION}.\${CMAKE_MINOR_VERSION}\" VERSION_LESS 3.11)
message(FATAL_ERROR \"CMake >= 3.11 required\")
endif()
set_target_properties(${PROJECT_NAME}::bundled_asio_headers PROPERTIES IMPORTED_GLOBAL TRUE)
add_library(asio::headers ALIAS ${PROJECT_NAME}::bundled_asio_headers)
set(asio_VERSION ${asio_VERSION})
")
endif()
set(BUNDLED_PACKAGES ${BUNDLED_PACKAGES} PARENT_SCOPE)
endfunction()
# Configure/Install CMake package
macro(install_cmake_package)
list(SORT PROJECT_PACKAGE_DEPENDENCIES)
list(SORT PROJECT_INTERFACE_PACKAGE_DEPENDENCIES)
include(CMakePackageConfigHelpers)
set(PACKAGE_INSTALL_DESTINATION
${CMAKE_INSTALL_LIBDIR}/cmake/${PROJECT_NAME}-${PROJECT_GIT_VERSION}
${CMAKE_INSTALL_LIBDIR}/cmake/${PROJECT_NAME}-${PROJECT_VERSION}
)
if(BUILD_FAIRMQ)
install(EXPORT ${PROJECT_EXPORT_SET}
@@ -288,6 +305,7 @@ macro(install_cmake_package)
)
generate_package_dependencies() # fills ${PACKAGE_DEPENDENCIES}
generate_package_components() # fills ${PACKAGE_COMPONENTS}
generate_bundled_packages() # fills ${BUNDLED_PACKAGES}
string(TOUPPER ${CMAKE_BUILD_TYPE} PROJECT_BUILD_TYPE_UPPER)
set(PROJECT_CXX_FLAGS ${CMAKE_CXX_FLAGS} ${CMAKE_CXX_FLAGS_${PROJECT_BUILD_TYPE_UPPER}})
configure_package_config_file(
@@ -312,10 +330,11 @@ endmacro()
#
# Wrapper around CMake's native find_package command to add some features and bookkeeping.
#
# The qualifier (PRIVATE|PUBLIC|INTERFACE) to the package to populate
# The qualifier (PRIVATE|PUBLIC|INTERFACE|BUNDLED) to the package to populate
# the variables PROJECT_[INTERFACE]_<pkgname>_([VERSION]|[COMPONENTS]|PACKAGE_DEPENDENCIES)
# accordingly. This bookkeeping information is used to print our dependency found summary
# table and to generate a part of our CMake package.
# table and to generate a part of our CMake package. BUNDLED decays to PUBLIC if the variable
# <pkgname>_BUNDLED is false and to PRIVATE otherwise.
#
# When a dependending package is listed with ADD_REQUIREMENTS_OF the variables
# <dep_pkgname>_<pkgname>_VERSION|COMPONENTS are looked up to and added to the native
@@ -372,28 +391,130 @@ macro(find_package2 qualifier pkgname)
find_package(${pkgname} ${__version__} QUIET ${ARGS_UNPARSED_ARGUMENTS})
endif()
if(${qualifier} STREQUAL BUNDLED)
if(${pkgname}_BUNDLED)
set(__qualifier__ PRIVATE)
else()
set(__qualifier__ PUBLIC)
endif()
else()
set(__qualifier__ ${qualifier})
endif()
if(${pkgname}_FOUND)
if(${qualifier} STREQUAL PRIVATE)
if(${__qualifier__} STREQUAL PRIVATE)
set(PROJECT_${pkgname}_VERSION ${__version__})
set(PROJECT_${pkgname}_COMPONENTS ${__components__})
set(PROJECT_PACKAGE_DEPENDENCIES ${PROJECT_PACKAGE_DEPENDENCIES} ${pkgname})
elseif(${qualifier} STREQUAL PUBLIC)
elseif(${__qualifier__} STREQUAL PUBLIC)
set(PROJECT_${pkgname}_VERSION ${__version__})
set(PROJECT_${pkgname}_COMPONENTS ${__components__})
set(PROJECT_PACKAGE_DEPENDENCIES ${PROJECT_PACKAGE_DEPENDENCIES} ${pkgname})
set(PROJECT_INTERFACE_${pkgname}_VERSION ${__version__})
set(PROJECT_INTERFACE_${pkgname}_COMPONENTS ${__components__})
set(PROJECT_INTERFACE_PACKAGE_DEPENDENCIES ${PROJECT_INTERFACE_PACKAGE_DEPENDENCIES} ${pkgname})
elseif(${qualifier} STREQUAL INTERFACE)
elseif(${__qualifier__} STREQUAL INTERFACE)
set(PROJECT_INTERFACE_${pkgname}_VERSION ${__version__})
set(PROJECT_INTERFACE_${pkgname}_COMPONENTS ${__components__})
set(PROJECT_INTERFACE_PACKAGE_DEPENDENCIES ${PROJECT_INTERFACE_PACKAGE_DEPENDENCIES} ${pkgname})
endif()
endif()
unset(__qualifier__)
unset(__version__)
unset(__components__)
unset(__required_versions__)
set(CMAKE_PREFIX_PATH ${__old_cpp__})
unset(__old_cpp__)
endmacro()
macro(exec cmd)
join("${ARGN}" " " args)
file(APPEND ${${package}_BUILD_LOGFILE} ">>> ${cmd} ${args}\n")
execute_process(COMMAND ${cmd} ${ARGN}
WORKING_DIRECTORY ${CMAKE_BINARY_DIR}
OUTPUT_VARIABLE log
ERROR_VARIABLE log
RESULT_VARIABLE res
)
file(APPEND ${${package}_BUILD_LOGFILE} ${log})
if(res)
message(FATAL_ERROR "${res} \nSee also \"${${package}_BUILD_LOGFILE}\"")
endif()
endmacro()
function(build_bundled package bundle)
message(STATUS "Building bundled ${package}")
set(${package}_SOURCE_DIR ${CMAKE_SOURCE_DIR}/${bundle})
set(${package}_BINARY_DIR ${CMAKE_BINARY_DIR}/${bundle})
file(MAKE_DIRECTORY ${${package}_BINARY_DIR})
set(${package}_BUILD_LOGFILE ${${package}_BINARY_DIR}/build.log)
file(REMOVE ${${package}_BUILD_LOGFILE})
if(Git_FOUND)
exec(${GIT_EXECUTABLE} submodule update --init --recursive -- ${${package}_SOURCE_DIR})
endif()
if(${package} STREQUAL GTest)
set(${package}_INSTALL_DIR ${CMAKE_BINARY_DIR}/${bundle}_install)
file(MAKE_DIRECTORY ${${package}_INSTALL_DIR})
set(${package}_ROOT ${${package}_INSTALL_DIR})
exec(${CMAKE_COMMAND} -S ${${package}_SOURCE_DIR} -B ${${package}_BINARY_DIR} -G ${CMAKE_GENERATOR}
-DCMAKE_INSTALL_PREFIX=${${package}_INSTALL_DIR} -DBUILD_GMOCK=OFF
)
exec(${CMAKE_COMMAND} --build ${${package}_BINARY_DIR})
exec(${CMAKE_COMMAND} --build ${${package}_BINARY_DIR} --target install)
elseif(${package} STREQUAL asio)
set(${package}_BUILD_INCLUDE_DIR ${${package}_SOURCE_DIR}/asio/include CACHE PATH "Bundled ${package} build-interface include dir")
set(${package}_INSTALL_INCLUDE_DIR ${PROJECT_INSTALL_INCDIR}/bundled CACHE PATH "Bundled ${package} install-interface include dir")
set(${package}_ROOT ${${package}_SOURCE_DIR}/asio)
endif()
string(TOUPPER ${package} package_upper)
set(${package_upper}_ROOT "${${package}_ROOT}" CACHE PATH "Bundled ${package} install prefix search hint")
set(${package}_BUNDLED ON CACHE BOOL "Whether bundled ${package} was used")
message(STATUS "Building bundled ${package} - done")
endfunction()
macro(fairmq_build_option option description)
cmake_parse_arguments(ARGS "" "DEFAULT;REQUIRES" "" ${ARGN})
if(ARGS_DEFAULT)
set(__default__ ON)
else()
set(__default__ OFF)
endif()
if(ARGS_REQUIRES)
include(CMakeDependentOption)
set(__requires__ ${ARGS_REQUIRES})
string(REGEX REPLACE " +" ";" __requires_condition__ "${__requires__}")
if(${__requires_condition__})
else()
if(${option})
message(WARNING "Cannot enable build option ${option}, depending options are not set: ${__requires_condition__}.")
endif()
endif()
else()
set(__requires__)
endif()
if(__requires__)
cmake_dependent_option(${option} ${description} ${__default__} ${__requires__} OFF)
else()
option(${option} ${description} ${__default__})
endif()
set(__default__)
set(__requires__)
set(__requires_condition__)
set(ARGS_DEFAULT)
set(ARGS_REQUIRES)
set(option)
set(description)
endmacro()

53
cmake/Findasio.cmake Normal file
View File

@@ -0,0 +1,53 @@
################################################################################
# Copyright (C) 2019 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
# #
# This software is distributed under the terms of the #
# GNU Lesser General Public Licence (LGPL) version 3, #
# copied verbatim in the file "LICENSE" #
################################################################################
find_path(asio_INCLUDE_DIR
NAMES asio.hpp
PATH_SUFFIXES include
)
if(asio_INCLUDE_DIR)
find_file(asio_VERSION_HEADER "asio/version.hpp"
${asio_INCLUDE_DIR}
NO_DEFAULT_PATH
)
endif()
if(asio_VERSION_HEADER)
file(READ "${asio_VERSION_HEADER}" _asio_VERSION_HEADER_CONTENT)
string(REGEX MATCH "#define ASIO_VERSION ([0-9]+)" _MATCH "${_asio_VERSION_HEADER_CONTENT}")
set(asio_VERSION_MACRO ${CMAKE_MATCH_1})
math(EXPR asio_VERSION_MAJOR "${asio_VERSION_MACRO} / 100000")
math(EXPR asio_VERSION_MINOR "${asio_VERSION_MACRO} / 100 % 1000")
math(EXPR asio_VERSION_PATCH "${asio_VERSION_MACRO} % 100")
set(asio_VERSION "${asio_VERSION_MAJOR}.${asio_VERSION_MINOR}.${asio_VERSION_PATCH}")
endif()
include(FindPackageHandleStandardArgs)
find_package_handle_standard_args(asio
REQUIRED_VARS asio_INCLUDE_DIR
VERSION_VAR asio_VERSION
HANDLE_COMPONENTS
)
if(asio_FOUND AND asio_BUNDLED)
add_library(bundled_asio_headers INTERFACE)
target_include_directories(bundled_asio_headers INTERFACE
$<BUILD_INTERFACE:${asio_BUILD_INCLUDE_DIR}>
$<INSTALL_INTERFACE:${asio_INSTALL_INCLUDE_DIR}>
)
add_library(asio::headers ALIAS bundled_asio_headers)
endif()
if(asio_FOUND AND NOT TARGET asio::headers)
add_library(asio::headers INTERFACE IMPORTED)
set_target_properties(asio::headers PROPERTIES
INTERFACE_INCLUDE_DIRECTORIES "${asio_INCLUDE_DIR}"
)
endif()

View File

@@ -28,6 +28,12 @@ target_link_libraries(fairmq-ex-dds-sink PRIVATE ExampleDDSLib)
add_custom_target(ExampleDDS DEPENDS fairmq-ex-dds-sampler fairmq-ex-dds-processor fairmq-ex-dds-sink)
if(DDS_VERSION VERSION_LESS 2.5.25)
set(WAIT_COMMAND "dds-info --wait-for-idle-agents")
else()
set(WAIT_COMMAND "dds-info --idle-count --wait")
endif()
set(BIN_DIR ${CMAKE_CURRENT_BINARY_DIR}:${CMAKE_BINARY_DIR}/fairmq/plugins/DDS)
set(DATA_DIR ${CMAKE_CURRENT_BINARY_DIR})
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/ex-dds-topology.xml ${CMAKE_CURRENT_BINARY_DIR}/ex-dds-topology.xml @ONLY)
@@ -37,6 +43,14 @@ configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-ex-dds-env.sh ${CMAKE_CURRENT_
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-dds.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-dds.sh @ONLY)
# test
if(DDS_FOUND)
add_test(NAME Example.DDS.localhost COMMAND ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-dds.sh localhost)
set_tests_properties(Example.DDS.localhost PROPERTIES
TIMEOUT 15
RUN_SERIAL true
PASS_REGULAR_EXPRESSION "Example successful"
)
endif()
# install

View File

@@ -8,8 +8,8 @@
<declrequirement name="SinkWorker" type="wnname" value="sink"/>
<decltask name="Sampler">
<exe>fairmq-ex-dds-sampler --color false --channel-config name=data1,type=push,method=bind --rate 100 -P dds</exe>
<env reachable="false">fairmq-ex-dds-env.sh</env>
<exe>fairmq-ex-dds-sampler --id sampler --color false --channel-config name=data1,type=push,method=bind --rate 100 -P dds</exe>
<requirements>
<name>SamplerWorker</name>
</requirements>
@@ -19,10 +19,10 @@
</decltask>
<decltask name="Processor">
<exe>fairmq-ex-dds-processor --color false --channel-config name=data1,type=pull,method=connect name=data2,type=push,method=connect -P dds</exe>
<env reachable="false">fairmq-ex-dds-env.sh</env>
<exe>fairmq-ex-dds-processor --id processor_%taskIndex% --color false --channel-config name=data1,type=pull,method=connect name=data2,type=push,method=connect -P dds</exe>
<requirements>
<id>ProcessorWorker</id>
<name>ProcessorWorker</name>
</requirements>
<properties>
<name access="read">data1</name>
@@ -31,8 +31,8 @@
</decltask>
<decltask name="Sink">
<exe>fairmq-ex-dds-sink --color false --channel-config name=data2,type=pull,method=bind -P dds</exe>
<env reachable="false">fairmq-ex-dds-env.sh</env>
<exe>fairmq-ex-dds-sink --id sink --color false --channel-config name=data2,type=pull,method=bind -P dds</exe>
<requirements>
<name>SinkWorker</name>
</requirements>

View File

@@ -8,8 +8,8 @@
<declrequirement name="SinkWorker" type="wnname" value="sink"/>
<decltask name="Sampler">
<exe>fairmq-ex-dds-sampler --color false --channel-config name=data1,type=push,method=bind -P dds --iterations 10</exe>
<env reachable="false">fairmq-ex-dds-env.sh</env>
<exe>fairmq-ex-dds-sampler --id sampler --color false --channel-config name=data1,type=push,method=bind -P dds --iterations 10</exe>
<requirements>
<name>SamplerWorker</name>
</requirements>
@@ -19,10 +19,10 @@
</decltask>
<decltask name="Processor">
<exe>fairmq-ex-dds-processor --color false --channel-config name=data1,type=pull,method=connect name=data2,type=push,method=connect -P dds</exe>
<env reachable="false">fairmq-ex-dds-env.sh</env>
<exe>fairmq-ex-dds-processor --id processor_%taskIndex% --color false --channel-config name=data1,type=pull,method=connect name=data2,type=push,method=connect -P dds</exe>
<requirements>
<id>ProcessorWorker</id>
<name>ProcessorWorker</name>
</requirements>
<properties>
<name access="read">data1</name>
@@ -31,8 +31,8 @@
</decltask>
<decltask name="Sink">
<exe>fairmq-ex-dds-sink --color false --channel-config name=data2,type=pull,method=bind -P dds --iterations 10</exe>
<env reachable="false">fairmq-ex-dds-env.sh</env>
<exe>fairmq-ex-dds-sink --id sink --color false --channel-config name=data2,type=pull,method=bind -P dds --iterations 10</exe>
<requirements>
<name>SinkWorker</name>
</requirements>

View File

@@ -37,7 +37,7 @@ else
dds-submit -r ${plugin} -n ${requiredNofAgents}
fi
echo "...waiting for ${requiredNofAgents} idle agents..."
dds-info --wait-for-idle-agents ${requiredNofAgents}
@WAIT_COMMAND@ ${requiredNofAgents}
topologyFile=@DATA_DIR@/ex-dds-topology.xml
echo "TOPOLOGY FILE: ${topologyFile}"
@@ -46,7 +46,7 @@ echo "TOPOLOGY FILE: ${topologyFile}"
# TODO Uncomment once DDS 2.6 is released
# dds-info --active-topology
dds-topology --disable-validation --activate ${topologyFile}
dds-topology --activate ${topologyFile}
# dds-info --active-topology
# dds-info --wait-for-executing-agents ${requiredNofAgents}
sleep 1
@@ -67,9 +67,9 @@ echo "...$sampler_and_sink are READY, sending shutdown..."
fairmq-dds-command-ui -c s -w "RUNNING->READY" -n ${requiredNofAgents}
fairmq-dds-command-ui -c t -w "DEVICE READY" -n ${requiredNofAgents}
fairmq-dds-command-ui -c d -w "IDLE" -n ${requiredNofAgents}
fairmq-dds-command-ui -c q
fairmq-dds-command-ui -c q -w "EXITING" -n ${requiredNofAgents}
echo "...waiting for ${requiredNofAgents} idle agents..."
dds-info --wait-for-idle-agents ${requiredNofAgents}
@WAIT_COMMAND@ ${requiredNofAgents}
echo "------------------------"
# TODO Uncomment once DDS 2.6 is released
@@ -82,4 +82,7 @@ logDir="${wrkDir}/logs"
for file in $(find "${logDir}" -name "*.tar.gz"); do tar -xf ${file} -C "${logDir}" ; done
echo "AGENT LOG FILES IN: ${logDir}"
# This string is used by ctest to detect success
echo "Example successful :)"
# Cleanup function is called by EXIT trap

1
extern/asio vendored Submodule

Submodule extern/asio added at 90f32660cd

17
extern/bundled_asio.cmake vendored Normal file
View File

@@ -0,0 +1,17 @@
################################################################################
# Copyright (C) 2019 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
# #
# This software is distributed under the terms of the #
# GNU Lesser General Public Licence (LGPL) version 3, #
# copied verbatim in the file "LICENSE" #
################################################################################
if(NOT TARGET asio::headers)
if(Git_FOUND)
execute_process(
COMMAND ${GIT_EXECUTABLE} submodule update --init --recursive --depth 1 -- extern/asio
WORKING_DIRECTORY ${CMAKE_SOURCE_DIR}
)
endif()
endif()

1
extern/googletest vendored Submodule

Submodule extern/googletest added at 90a443f9c2

View File

@@ -70,7 +70,7 @@ if(BUILD_FAIRMQ OR BUILD_SDK)
Boost::boost
)
set_target_properties(${target} PROPERTIES
VERSION ${PROJECT_GIT_VERSION}
VERSION ${PROJECT_VERSION}
SOVERSION "${PROJECT_VERSION_MAJOR}.${PROJECT_VERSION_MINOR}"
OUTPUT_NAME FairMQ${target}
)
@@ -124,7 +124,7 @@ if(BUILD_FAIRMQ OR BUILD_SDK)
Tools
)
set_target_properties(${target} PROPERTIES
VERSION ${PROJECT_GIT_VERSION}
VERSION ${PROJECT_VERSION}
SOVERSION "${PROJECT_VERSION_MAJOR}.${PROJECT_VERSION_MINOR}"
OUTPUT_NAME FairMQ${target}
)
@@ -371,7 +371,7 @@ if(BUILD_FAIRMQ)
${OFI_DEPS}
)
set_target_properties(${_target} PROPERTIES
VERSION ${PROJECT_GIT_VERSION}
VERSION ${PROJECT_VERSION}
SOVERSION "${PROJECT_VERSION_MAJOR}.${PROJECT_VERSION_MINOR}"
)

View File

@@ -63,8 +63,6 @@ bool DeviceRunner::HandleGeneralOptions(const fair::mq::ProgOptions& config, boo
<< " / __/ / /_/ / / / _ / / / / /_/ / " << FAIRMQ_REPO_URL << endl
<< " /_/ \\__,_/_/_/ /_/ /_/ \\___\\_\\ " << FAIRMQ_LICENSE << " © " << FAIRMQ_COPYRIGHT << endl;
}
config.PrintOptions();
}
return true;
@@ -169,6 +167,9 @@ auto DeviceRunner::Run() -> int
// Instantiate and run plugins
fPluginManager.InstantiatePlugins();
// Log IDLE configuration
fConfig.PrintOptions();
// Run the device
fDevice->RunStateMachine();

View File

@@ -68,13 +68,13 @@ class FairMQChannel
FairMQChannel(const FairMQChannel&, const std::string& name);
/// Move constructor
FairMQChannel(FairMQChannel&&) = default;
FairMQChannel(FairMQChannel&&) = delete;
/// Assignment operator
FairMQChannel& operator=(const FairMQChannel&);
/// Move assignment operator
FairMQChannel& operator=(FairMQChannel&&) = default;
FairMQChannel& operator=(FairMQChannel&&) = delete;
/// Destructor
virtual ~FairMQChannel()

View File

@@ -482,32 +482,6 @@ void FairMQDevice::InitTaskWrapper()
ChangeState(Transition::Auto);
}
bool FairMQDevice::SortSocketsByAddress(const FairMQChannel &lhs, const FairMQChannel &rhs)
{
return lhs.fAddress < rhs.fAddress;
}
void FairMQDevice::SortChannel(const string& name, const bool reindex)
{
if (fChannels.find(name) != fChannels.end())
{
sort(fChannels.at(name).begin(), fChannels.at(name).end(), SortSocketsByAddress);
if (reindex)
{
for (auto vi = fChannels.at(name).begin(); vi != fChannels.at(name).end(); ++vi)
{
// set channel name: name + vector index
vi->fName = tools::ToString(name, "[", vi - fChannels.at(name).begin(), "]");
}
}
}
else
{
LOG(error) << "Sorting failed: no channel with the name \"" << name << "\".";
}
}
void FairMQDevice::RunWrapper()
{
LOG(info) << "DEVICE: Running...";

View File

@@ -115,11 +115,6 @@ class FairMQDevice
/// Outputs the socket transfer rates
virtual void LogSocketRates();
/// Sorts a channel by address, with optional reindexing of the sorted values
/// @param name Channel name
/// @param reindex Should reindexing be done
void SortChannel(const std::string& name, const bool reindex = true);
template<typename Serializer, typename DataType, typename... Args>
void Serialize(FairMQMessage& msg, DataType&& data, Args&&... args) const
{
@@ -312,11 +307,6 @@ class FairMQDevice
return fConfig;
}
/// Implements the sort algorithm used in SortChannel()
/// @param lhs Right hand side value for comparison
/// @param rhs Left hand side value for comparison
static bool SortSocketsByAddress(const FairMQChannel &lhs, const FairMQChannel &rhs);
// overload to easily bind member functions
template<typename T>
void OnData(const std::string& channelName, bool (T::* memberFunction)(FairMQMessagePtr& msg, int index))

View File

@@ -74,7 +74,8 @@ auto PluginServices::ReleaseDeviceControl(const string& controller) -> void
if (fDeviceController == controller) {
fDeviceController = boost::none;
} else {
throw DeviceControlError{tools::ToString("Plugin '", controller, "' cannot release control because it has not taken over control.")};
LOG(debug) << "Plugin '" << controller << "' cannot release control "
<< "because it has no control.";
}
}

View File

@@ -10,11 +10,17 @@
#define FAIR_MQ_SDK_H
// IWYU pragma: begin_exports
#include <fairmq/sdk/DDSInfo.h>
#include <fairmq/sdk/AsioAsyncOp.h>
#include <fairmq/sdk/AsioBase.h>
#include <fairmq/sdk/DDSAgent.h>
#include <fairmq/sdk/DDSEnvironment.h>
#include <fairmq/sdk/DDSInfo.h>
#include <fairmq/sdk/DDSSession.h>
#include <fairmq/sdk/DDSTask.h>
#include <fairmq/sdk/DDSTopology.h>
#include <fairmq/sdk/Error.h>
#include <fairmq/sdk/Topology.h>
#include <fairmq/sdk/Traits.h>
// IWYU pragma: end_exports
#endif // FAIR_MQ_SDK_H

View File

@@ -12,7 +12,7 @@ target_link_libraries(${plugin} FairMQ DDS::dds_intercom_lib DDS::dds_protocol_l
target_include_directories(${plugin} PRIVATE ${CMAKE_CURRENT_BINARY_DIR})
set_target_properties(${plugin} PROPERTIES CXX_VISIBILITY_PRESET hidden)
set_target_properties(${plugin} PROPERTIES
VERSION ${PROJECT_GIT_VERSION}
VERSION ${PROJECT_VERSION}
SOVERSION "${PROJECT_VERSION_MAJOR}.${PROJECT_VERSION_MINOR}"
LIBRARY_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/fairmq
)

View File

@@ -50,22 +50,25 @@ DDS::DDS(const string& name,
, fCurrentState(DeviceState::Idle)
, fLastState(DeviceState::Idle)
, fDeviceTerminationRequested(false)
, fLastExternalController(0)
, fExitingAckedByLastExternalController(false)
, fHeartbeatInterval(100)
, fUpdatesAllowed(false)
{
try {
TakeDeviceControl();
fControllerThread = thread(&DDS::HandleControl, this);
fHeartbeatThread = thread(&DDS::HeartbeatSender, this);
} catch (PluginServices::DeviceControlError& e) {
LOG(debug) << e.what();
} catch (exception& e) {
LOG(error) << "Error in plugin initialization: " << e.what();
}
}
auto DDS::HandleControl() -> void
{
try {
fHeartbeatThread = thread(&DDS::HeartbeatSender, this);
std::string deviceId(GetProperty<std::string>("id"));
if (deviceId.empty()) {
SetProperty<std::string>("id", dds::env_prop<dds::task_path>());
}
std::string sessionId(GetProperty<std::string>("session"));
if (sessionId == "default") {
SetProperty<std::string>("session", dds::env_prop<dds::dds_session_id>());
}
auto control = GetProperty<string>("control");
bool staticMode(false);
if (control == "static") {
@@ -85,22 +88,28 @@ auto DDS::HandleControl() -> void
// subscribe to device state changes, pushing new state changes into the event queue
SubscribeToDeviceStateChange([&](DeviceState newState) {
fStateQueue.Push(newState);
switch(newState) {
case DeviceState::Bound:
// Receive addresses of connecting channels from DDS
// and propagate addresses of bound channels to DDS.
FillChannelContainers();
switch (newState) {
case DeviceState::Bound:
// Receive addresses of connecting channels from DDS
// and propagate addresses of bound channels to DDS.
FillChannelContainers();
// publish bound addresses via DDS at keys corresponding to the channel prefixes, e.g. 'data' in data[i]
PublishBoundChannels();
break;
case DeviceState::Exiting:
fDeviceTerminationRequested = true;
UnsubscribeFromDeviceStateChange();
ReleaseDeviceControl();
break;
default:
break;
// publish bound addresses via DDS at keys corresponding to the channel
// prefixes, e.g. 'data' in data[i]
PublishBoundChannels();
break;
case DeviceState::ResettingDevice: {
std::lock_guard<std::mutex> lk(fUpdateMutex);
fUpdatesAllowed = false;
break;
}
case DeviceState::Exiting:
fDeviceTerminationRequested = true;
UnsubscribeFromDeviceStateChange();
ReleaseDeviceControl();
break;
default:
break;
}
lock_guard<mutex> lock{fStateChangeSubscriberMutex};
@@ -114,15 +123,35 @@ auto DDS::HandleControl() -> void
});
if (staticMode) {
TransitionDeviceStateTo(DeviceState::Running);
// wait until stop signal
unique_lock<mutex> lock(fStopMutex);
while (!fDeviceTerminationRequested) {
fStopCondition.wait_for(lock, chrono::seconds(1));
}
LOG(debug) << "Stopping DDS control plugin";
fControllerThread = thread(&DDS::StaticControl, this);
}
} catch (PluginServices::DeviceControlError& e) {
LOG(debug) << e.what();
} catch (exception& e) {
LOG(error) << "Error in plugin initialization: " << e.what();
}
}
auto DDS::WaitForExitingAck() -> void
{
unique_lock<mutex> lock(fStateChangeSubscriberMutex);
fExitingAcked.wait_for(
lock,
chrono::milliseconds(GetProperty<unsigned int>("wait-for-exiting-ack-timeout")),
[this]() { return fExitingAckedByLastExternalController; });
}
auto DDS::StaticControl() -> void
{
try {
TransitionDeviceStateTo(DeviceState::Running);
// wait until stop signal
unique_lock<mutex> lock(fStopMutex);
while (!fDeviceTerminationRequested) {
fStopCondition.wait_for(lock, chrono::seconds(1));
}
LOG(debug) << "Stopping DDS plugin static controller";
} catch (DeviceErrorState&) {
ReleaseDeviceControl();
} catch (exception& e) {
@@ -197,6 +226,11 @@ auto DDS::FillChannelContainers() -> void
LOG(debug) << "dds-i-n: adding " << chanName << " -> i: " << i << " n: " << n;
fIofN.insert(make_pair(chanName, IofN(i, n)));
}
{
std::lock_guard<std::mutex> lk(fUpdateMutex);
fUpdatesAllowed = true;
}
fUpdateCondition.notify_one();
} catch (const exception& e) {
LOG(error) << "Error filling channel containers: " << e.what();
}
@@ -209,6 +243,10 @@ auto DDS::SubscribeForConnectingChannels() -> void
fDDS.SubscribeKeyValue([&] (const string& propertyId, const string& value, uint64_t senderTaskID) {
try {
LOG(debug) << "Received update for " << propertyId << ": value=" << value << ", senderTaskID=" << senderTaskID;
std::unique_lock<std::mutex> lk(fUpdateMutex);
fUpdateCondition.wait(lk, [&]{ return fUpdatesAllowed; });
string val = value;
// check if it is to handle as one out of multiple values
auto it = fIofN.find(propertyId);
@@ -306,6 +344,10 @@ auto DDS::SubscribeForCustomCommands() -> void
unique_lock<mutex> lock(fStopMutex);
fStopCondition.notify_one();
}
{
lock_guard<mutex> lock{fStateChangeSubscriberMutex};
fLastExternalController = senderId;
}
} else if (cmd == "dump-config") {
stringstream ss;
for (const auto pKey: GetPropertyKeys()) {
@@ -325,11 +367,22 @@ auto DDS::SubscribeForCustomCommands() -> void
fHeartbeatSubscribers.erase(senderId);
}
fDDS.Send("heartbeat-unsubscription: " + id + ",OK", to_string(senderId));
} else if (cmd == "state-change-exiting-received") {
{
lock_guard<mutex> lock{fStateChangeSubscriberMutex};
if (fLastExternalController == senderId) {
fExitingAckedByLastExternalController = true;
}
}
fExitingAcked.notify_one();
} else if (cmd == "subscribe-to-state-changes") {
{
// auto size = fStateChangeSubscribers.size();
lock_guard<mutex> lock{fStateChangeSubscriberMutex};
fStateChangeSubscribers.insert(senderId);
if (!fControllerThread.joinable()) {
fControllerThread = thread(&DDS::WaitForExitingAck, this);
}
}
fDDS.Send("state-changes-subscription: " + id + ",OK", to_string(senderId));
{

View File

@@ -63,6 +63,9 @@ struct DDSSubscription
LOG(error) << "DDS Error received: error code: " << errorCode << ", error message: " << errorMsg;
});
// fDDSCustomCmd.subscribe([](const std::string& cmd, const std::string& cond, uint64_t senderId) {
// LOG(debug) << "cmd: " << cmd << ", cond: " << cond << ", senderId: " << senderId;
// });
assert(!dds_session_id.empty());
}
@@ -125,7 +128,8 @@ class DDS : public Plugin
~DDS();
private:
auto HandleControl() -> void;
auto StaticControl() -> void;
auto WaitForExitingAck() -> void;
auto FillChannelContainers() -> void;
auto SubscribeForConnectingChannels() -> void;
@@ -155,11 +159,19 @@ class DDS : public Plugin
std::set<uint64_t> fHeartbeatSubscribers;
std::mutex fHeartbeatSubscriberMutex;
std::set<uint64_t> fStateChangeSubscribers;
uint64_t fLastExternalController;
bool fExitingAckedByLastExternalController;
std::condition_variable fExitingAcked;
std::mutex fStateChangeSubscriberMutex;
std::thread fHeartbeatThread;
std::chrono::milliseconds fHeartbeatInterval;
bool fUpdatesAllowed;
std::mutex fUpdateMutex;
std::condition_variable fUpdateCondition;
};
Plugin::ProgOptions DDSProgramOptions()
@@ -167,7 +179,8 @@ Plugin::ProgOptions DDSProgramOptions()
boost::program_options::options_description options{"DDS Plugin"};
options.add_options()
("dds-i", boost::program_options::value<std::vector<std::string>>()->multitoken()->composing(), "Task index for chosing connection target (single channel n to m). When all values come via same update.")
("dds-i-n", boost::program_options::value<std::vector<std::string>>()->multitoken()->composing(), "Task index for chosing connection target (one out of n values to take). When values come as independent updates.");
("dds-i-n", boost::program_options::value<std::vector<std::string>>()->multitoken()->composing(), "Task index for chosing connection target (one out of n values to take). When values come as independent updates.")
("wait-for-exiting-ack-timeout", boost::program_options::value<unsigned int>()->default_value(1000), "Wait timeout for EXITING state-change acknowledgement by external controller in milliseconds.");
return options;
}

View File

@@ -264,6 +264,9 @@ int main(int argc, char* argv[])
// cerr << "Received: " << msg << endl;
boost::trim(parts[2]);
waitMode.AddNewStateEntry(senderId, parts[3]);
if(parts[3] == "IDLE->EXITING") {
ddsCustomCmd.send("state-change-exiting-received", std::to_string(senderId));
}
} else if (parts[0] == "state-changes-subscription") {
if (parts[2] != "OK") {
cerr << "state-changes-subscription failed with return code: " << parts[2];
@@ -273,7 +276,7 @@ int main(int argc, char* argv[])
cerr << "state-changes-unsubscription failed with return code: " << parts[2];
}
} else {
// cout << "Received: " << msg << endl;
cout << "Received: " << msg << endl;
}
});

View File

@@ -16,7 +16,7 @@ target_link_libraries(${plugin} FairMQ PMIx::libpmix)
target_include_directories(${plugin} PRIVATE ${CMAKE_CURRENT_BINARY_DIR})
set_target_properties(${plugin} PROPERTIES
CXX_VISIBILITY_PRESET hidden
VERSION ${PROJECT_GIT_VERSION}
VERSION ${PROJECT_VERSION}
SOVERSION "${PROJECT_VERSION_MAJOR}.${PROJECT_VERSION_MINOR}"
)

223
fairmq/sdk/AsioAsyncOp.h Normal file
View File

@@ -0,0 +1,223 @@
/********************************************************************************
* Copyright (C) 2019 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#ifndef FAIR_MQ_SDK_ASIOASYNCOP_H
#define FAIR_MQ_SDK_ASIOASYNCOP_H
#include <asio/associated_allocator.hpp>
#include <asio/associated_executor.hpp>
#include <asio/executor_work_guard.hpp>
#include <asio/system_executor.hpp>
#include <chrono>
#include <exception>
#include <fairlogger/Logger.h>
#include <fairmq/sdk/Error.h>
#include <fairmq/sdk/Traits.h>
#include <functional>
#include <memory>
#include <system_error>
#include <type_traits>
#include <utility>
namespace fair {
namespace mq {
namespace sdk {
template<typename... SignatureArgTypes>
struct AsioAsyncOpImplBase
{
virtual auto Complete(std::error_code, SignatureArgTypes...) -> void = 0;
virtual auto IsCompleted() const -> bool = 0;
};
/**
* @tparam Executor1 Associated I/O executor, see https://www.boost.org/doc/libs/1_70_0/doc/html/boost_asio/reference/asynchronous_operations.html#boost_asio.reference.asynchronous_operations.associated_i_o_executor
* @tparam Allocator1 Default allocation strategy, see https://www.boost.org/doc/libs/1_70_0/doc/html/boost_asio/reference/asynchronous_operations.html#boost_asio.reference.asynchronous_operations.allocation_of_intermediate_storage
*/
template<typename Executor1, typename Allocator1, typename Handler, typename... SignatureArgTypes>
struct AsioAsyncOpImpl : AsioAsyncOpImplBase<SignatureArgTypes...>
{
/// See https://www.boost.org/doc/libs/1_70_0/doc/html/boost_asio/reference/asynchronous_operations.html#boost_asio.reference.asynchronous_operations.allocation_of_intermediate_storage
using Allocator2 = typename asio::associated_allocator<Handler, Allocator1>::type;
/// See https://www.boost.org/doc/libs/1_70_0/doc/html/boost_asio/reference/asynchronous_operations.html#boost_asio.reference.asynchronous_operations.associated_completion_handler_executor
using Executor2 = typename asio::associated_executor<Handler, Executor1>::type;
/// Ctor
AsioAsyncOpImpl(const Executor1& ex1, Allocator1&& alloc1, Handler&& handler)
: fWork1(ex1)
, fWork2(asio::get_associated_executor(handler, ex1))
, fHandler(std::move(handler))
, fAlloc1(std::move(alloc1))
{}
auto GetAlloc2() const -> Allocator2 { return asio::get_associated_allocator(fHandler, fAlloc1); }
auto GetEx2() const -> Executor2 { return asio::get_associated_executor(fWork2); }
auto Complete(std::error_code ec, SignatureArgTypes... args) -> void override
{
if (IsCompleted()) {
throw RuntimeError("Async operation already completed");
}
GetEx2().dispatch(
[=, handler = std::move(fHandler)]() mutable {
try {
handler(ec, args...);
} catch (const std::exception& e) {
LOG(error) << "Uncaught exception in AsioAsyncOp completion handler: " << e.what();
} catch (...) {
LOG(error) << "Unknown uncaught exception in AsioAsyncOp completion handler.";
}
},
GetAlloc2());
fWork1.reset();
fWork2.reset();
}
auto IsCompleted() const -> bool override
{
return !fWork1.owns_work() && !fWork2.owns_work();
}
private:
/// See https://www.boost.org/doc/libs/1_70_0/doc/html/boost_asio/reference/asynchronous_operations.html#boost_asio.reference.asynchronous_operations.outstanding_work
asio::executor_work_guard<Executor1> fWork1;
asio::executor_work_guard<Executor2> fWork2;
Handler fHandler;
Allocator1 fAlloc1;
};
/**
* @class AsioAsyncOp AsioAsyncOp.h <fairmq/sdk/AsioAsyncOp.h>
* @tparam Executor Associated I/O executor, see https://www.boost.org/doc/libs/1_70_0/doc/html/boost_asio/reference/asynchronous_operations.html#boost_asio.reference.asynchronous_operations.associated_i_o_executor
* @tparam Allocator Default allocation strategy, see https://www.boost.org/doc/libs/1_70_0/doc/html/boost_asio/reference/asynchronous_operations.html#boost_asio.reference.asynchronous_operations.allocation_of_intermediate_storage
* @tparam CompletionSignature
* @brief Interface for Asio-compliant asynchronous operation, see https://www.boost.org/doc/libs/1_70_0/doc/html/boost_asio/reference/asynchronous_operations.html
*
* @par Thread Safety
* @e Distinct @e objects: Safe.@n
* @e Shared @e objects: Unsafe.
*
* primary template
*/
template<typename Executor, typename Allocator, typename CompletionSignature>
struct AsioAsyncOp
{
};
/**
* @tparam Executor See primary template
* @tparam Allocator See primary template
* @tparam SignatureReturnType Return type of CompletionSignature, see primary template
* @tparam SignatureFirstArgType Type of first argument of CompletionSignature, see primary template
* @tparam SignatureArgTypes Types of the rest of arguments of CompletionSignature
*
* partial specialization to deconstruct CompletionSignature
*/
template<typename Executor,
typename Allocator,
typename SignatureReturnType,
typename SignatureFirstArgType,
typename... SignatureArgTypes>
struct AsioAsyncOp<Executor,
Allocator,
SignatureReturnType(SignatureFirstArgType, SignatureArgTypes...)>
{
static_assert(std::is_void<SignatureReturnType>::value,
"return value of CompletionSignature must be void");
static_assert(std::is_same<SignatureFirstArgType, std::error_code>::value,
"first argument of CompletionSignature must be std::error_code");
using Duration = std::chrono::milliseconds;
private:
using Impl = AsioAsyncOpImplBase<SignatureArgTypes...>;
using ImplPtr = std::unique_ptr<Impl, std::function<void(Impl*)>>;
ImplPtr fImpl;
public:
/// Default Ctor
AsioAsyncOp()
: fImpl(nullptr)
{}
/// Ctor with handler
template<typename Handler>
AsioAsyncOp(Executor&& ex1, Allocator&& alloc1, Handler&& handler)
: AsioAsyncOp()
{
// Async operation type to be allocated and constructed
using Op = AsioAsyncOpImpl<Executor, Allocator, Handler, SignatureArgTypes...>;
// Create allocator for concrete op type
// Allocator2, see https://www.boost.org/doc/libs/1_70_0/doc/html/boost_asio/reference/asynchronous_operations.html#boost_asio.reference.asynchronous_operations.allocation_of_intermediate_storage
using OpAllocator =
typename std::allocator_traits<typename Op::Allocator2>::template rebind_alloc<Op>;
OpAllocator opAlloc;
// Allocate memory
auto mem(std::allocator_traits<OpAllocator>::allocate(opAlloc, 1));
// Construct object
auto ptr(new (mem) Op(std::forward<Executor>(ex1),
std::forward<Allocator>(alloc1),
std::forward<Handler>(handler)));
// Assign ownership to this object
fImpl = ImplPtr(ptr, [opAlloc](Impl* p) mutable {
std::allocator_traits<OpAllocator>::deallocate(opAlloc, static_cast<Op*>(p), 1);
});
}
/// Ctor with handler #2
template<typename Handler>
AsioAsyncOp(Executor&& ex1, Handler&& handler)
: AsioAsyncOp(std::forward<Executor>(ex1), Allocator(), std::forward<Handler>(handler))
{}
/// Ctor with handler #3
template<typename Handler>
explicit AsioAsyncOp(Handler&& handler)
: AsioAsyncOp(asio::system_executor(), std::forward<Handler>(handler))
{}
auto IsCompleted() -> bool { return (fImpl == nullptr) || fImpl->IsCompleted(); }
auto Complete(std::error_code ec, SignatureArgTypes... args) -> void
{
if(IsCompleted()) {
throw RuntimeError("Async operation already completed");
}
fImpl->Complete(ec, args...);
fImpl.reset(nullptr);
}
auto Complete(SignatureArgTypes... args) -> void
{
Complete(std::error_code(), args...);
}
auto Cancel(SignatureArgTypes... args) -> void
{
Complete(MakeErrorCode(ErrorCode::OperationCanceled), args...);
}
auto Timeout(SignatureArgTypes... args) -> void
{
Complete(MakeErrorCode(ErrorCode::OperationTimeout), args...);
}
};
} /* namespace sdk */
} /* namespace mq */
} /* namespace fair */
#endif /* FAIR_MQ_SDK_ASIOASYNCOP_H */

76
fairmq/sdk/AsioBase.h Normal file
View File

@@ -0,0 +1,76 @@
/********************************************************************************
* Copyright (C) 2019 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#ifndef FAIR_MQ_SDK_ASIOBASE_H
#define FAIR_MQ_SDK_ASIOBASE_H
#include <asio/executor.hpp>
#include <fairmq/sdk/Traits.h>
#include <memory>
#include <utility>
namespace fair {
namespace mq {
namespace sdk {
using DefaultExecutor = asio::executor;
using DefaultAllocator = std::allocator<int>;
/**
* @class AsioBase AsioBase.h <fairmq/sdk/AsioBase.h>
* @tparam Executor Associated I/O executor
* @tparam Allocator Associated default allocator
* @brief Base for creating Asio-enabled I/O objects
*
* @par Thread Safety
* @e Distinct @e objects: Safe.@n
* @e Shared @e objects: Unsafe.
*/
template<typename Executor, typename Allocator>
class AsioBase
{
public:
/// Member type of associated I/O executor
using ExecutorType = Executor;
/// Get associated I/O executor
auto GetExecutor() const noexcept -> ExecutorType { return fExecutor; }
/// Member type of associated default allocator
using AllocatorType = Allocator;
/// Get associated default allocator
auto GetAllocator() const noexcept -> AllocatorType { return fAllocator; }
/// NO default ctor
AsioBase() = delete;
/// Construct with associated I/O executor
explicit AsioBase(Executor ex, Allocator alloc)
: fExecutor(std::move(ex))
, fAllocator(std::move(alloc))
{}
/// NOT copyable
AsioBase(const AsioBase&) = delete;
AsioBase& operator=(const AsioBase&) = delete;
/// movable
AsioBase(AsioBase&&) noexcept = default;
AsioBase& operator=(AsioBase&&) noexcept = default;
~AsioBase() = default;
private:
ExecutorType fExecutor;
AllocatorType fAllocator;
};
} /* namespace sdk */
} /* namespace mq */
} /* namespace fair */
#endif /* FAIR_MQ_SDK_ASIOBASE_H */

View File

@@ -15,10 +15,16 @@ set(target SDK)
set(SDK_PUBLIC_HEADER_FILES
../SDK.h
AsioAsyncOp.h
AsioBase.h
DDSAgent.h
DDSEnvironment.h
DDSSession.h
DDSTask.h
DDSTopology.h
Error.h
Topology.h
Traits.h
)
set(SDK_PRIVATE_HEADER_FILES
@@ -29,6 +35,7 @@ set(SDK_SOURCE_FILES
DDSEnvironment.cxx
DDSSession.cxx
DDSTopology.cxx
Error.cxx
Topology.cxx
)
@@ -38,28 +45,29 @@ add_library(${target}
${SDK_PRIVATE_HEADER_FILES} # for IDE integration
)
set_target_properties(${target} PROPERTIES LABELS coverage)
target_compile_definitions(${target} PUBLIC BOOST_ERROR_CODE_HEADER_ONLY)
target_include_directories(${target}
PUBLIC
$<BUILD_INTERFACE:${CMAKE_SOURCE_DIR}>
$<BUILD_INTERFACE:${CMAKE_BINARY_DIR}>
$<INSTALL_INTERFACE:include>
$<INSTALL_INTERFACE:${CMAKE_INSTALL_INCLUDEDIR}>
)
target_link_libraries(${target}
PUBLIC
asio::headers
Boost::boost
Boost::filesystem
FairLogger::FairLogger
Threads::Threads
Tools
StateMachine
PRIVATE
Boost::boost
DDS::dds_intercom_lib
DDS::dds_tools_lib
DDS::dds_topology_lib
Tools
)
set_target_properties(${target} PROPERTIES
VERSION ${PROJECT_GIT_VERSION}
VERSION ${PROJECT_VERSION}
SOVERSION "${PROJECT_VERSION_MAJOR}.${PROJECT_VERSION_MINOR}"
OUTPUT_NAME FairMQ_${target}
)

95
fairmq/sdk/DDSAgent.h Normal file
View File

@@ -0,0 +1,95 @@
/********************************************************************************
* Copyright (C) 2019 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#ifndef FAIR_MQ_SDK_DDSSAGENT_H
#define FAIR_MQ_SDK_DDSSAGENT_H
#include <fairmq/sdk/DDSSession.h>
#include <ostream>
#include <string>
#include <chrono>
#include <cstdint>
namespace fair {
namespace mq {
namespace sdk {
/**
* @class DDSAgent <fairmq/sdk/DDSAgent.h>
* @brief Represents a DDS agent
*/
class DDSAgent
{
public:
using Id = uint64_t;
using Pid = uint32_t;
explicit DDSAgent(DDSSession session,
Id id,
Pid pid,
std::string state,
std::string path,
std::string host,
bool lobbyLeader,
std::chrono::milliseconds startupTime,
Id taskId,
std::string username)
: fSession(std::move(session))
, fId(id)
, fPid(pid)
, fState(std::move(state))
, fDDSPath(std::move(path))
, fHost(std::move(host))
, fLobbyLeader(lobbyLeader)
, fStartupTime(startupTime)
, fTaskId(taskId)
, fUsername(std::move(username))
{}
DDSSession GetSession() const { return fSession; }
Id GetId() const { return fId; }
Pid GetPid() const { return fPid; }
std::string GetState() const { return fState; }
std::string GetHost() const { return fHost; }
std::string GetDDSPath() const { return fDDSPath; }
bool IsLobbyLeader() const { return fLobbyLeader; }
std::chrono::milliseconds GetStartupTime() const { return fStartupTime; }
std::string GetUsername() const { return fUsername; }
friend auto operator<<(std::ostream& os, const DDSAgent& agent) -> std::ostream&
{
return os << "DDSAgent id: " << agent.fId
<< ", pid: " << agent.fPid
<< ", state: " << agent.fState
<< ", path: " << agent.fDDSPath
<< ", host: " << agent.fHost
<< ", lobbyLeader: " << agent.fLobbyLeader
<< ", startupTime: " << agent.fStartupTime.count()
<< ", taskId: " << agent.fTaskId
<< ", username: " << agent.fUsername;
}
private:
DDSSession fSession;
Id fId;
Pid fPid;
std::string fState;
std::string fDDSPath;
std::string fHost;
bool fLobbyLeader;
std::chrono::milliseconds fStartupTime;
Id fTaskId;
std::string fUsername;
};
} // namespace sdk
} // namespace mq
} // namespace fair
#endif /* FAIR_MQ_SDK_DDSSAGENT_H */

View File

@@ -8,20 +8,21 @@
#include "DDSSession.h"
#include <fairmq/sdk/DDSEnvironment.h>
#include <fairmq/sdk/DDSTopology.h>
#include <fairmq/Tools.h>
#include <fairlogger/Logger.h>
#include <DDS/Tools.h>
#include <boost/process.hpp>
#include <boost/uuid/uuid_io.hpp>
#include <cassert>
#include <cstdlib>
#include <fairlogger/Logger.h>
#include <fairmq/Tools.h>
#include <fairmq/sdk/DDSAgent.h>
#include <fairmq/sdk/DDSEnvironment.h>
#include <fairmq/sdk/DDSTopology.h>
#include <mutex>
#include <sstream>
#include <unordered_map>
#include <utility>
#include <vector>
namespace fair {
namespace mq {
@@ -134,6 +135,8 @@ struct DDSSession::Impl
dds::intercom_api::CCustomCmd fDDSCustomCmd;
Id fId;
bool fStopOnDestruction;
mutable std::mutex fMtx;
std::unordered_map<DDSChannel::Id, DDSTask::Id> fTaskIdByChannelIdMap;
};
DDSSession::DDSSession(DDSEnvironment env)
@@ -176,76 +179,110 @@ auto DDSSession::SubmitAgents(Quantity agents) -> void
// Requesting to submit 0 agents is not meaningful
assert(agents > 0);
dds::tools_api::SSubmitRequestData submitInfo;
using namespace dds::tools_api;
SSubmitRequestData submitInfo;
submitInfo.m_rms = tools::ToString(GetRMSPlugin());
submitInfo.m_instances = agents;
submitInfo.m_config = GetRMSConfig().string();
tools::Semaphore blocker;
auto submitRequest = dds::tools_api::SSubmitRequest::makeRequest(submitInfo);
submitRequest->setMessageCallback(
[](const dds::tools_api::SMessageResponseData& message) { LOG(debug) << message; });
submitRequest->setDoneCallback([&]() {
tools::SharedSemaphore blocker;
auto submitRequest = SSubmitRequest::makeRequest(submitInfo);
submitRequest->setMessageCallback([](const SMessageResponseData& message){
LOG(debug) << message.m_msg;
});
submitRequest->setDoneCallback([agents, blocker]() mutable {
LOG(debug) << agents << " Agents submitted";
blocker.Signal();
});
fImpl->fSession->sendRequest<dds::tools_api::SSubmitRequest>(submitRequest);
fImpl->fSession->sendRequest<SSubmitRequest>(submitRequest);
blocker.Wait();
// perfect
WaitForIdleAgents(agents);
}
auto DDSSession::RequestAgentInfo() -> AgentInfo
auto DDSSession::RequestAgentCount() -> AgentCount
{
dds::tools_api::SAgentInfoRequestData agentInfoInfo;
tools::Semaphore blocker;
AgentInfo info;
auto agentInfoRequest = dds::tools_api::SAgentInfoRequest::makeRequest(agentInfoInfo);
agentInfoRequest->setResponseCallback(
[this, &info](const dds::tools_api::SAgentInfoResponseData& _response) {
if (_response.m_index == 0) {
info.activeAgentsCount = _response.m_activeAgentsCount;
info.idleAgentsCount = _response.m_idleAgentsCount;
info.executingAgentsCount = _response.m_executingAgentsCount;
info.agents.reserve(_response.m_activeAgentsCount);
}
info.agents.emplace_back(*this, _response.m_agentInfo);
});
agentInfoRequest->setMessageCallback(
[](const dds::tools_api::SMessageResponseData& _message) { LOG(debug) << _message; });
agentInfoRequest->setDoneCallback([&]() { blocker.Signal(); });
fImpl->fSession->sendRequest<dds::tools_api::SAgentInfoRequest>(agentInfoRequest);
blocker.Wait();
using namespace dds::tools_api;
return info;
SAgentCountRequest::response_t res;
fImpl->fSession->syncSendRequest<SAgentCountRequest>(SAgentCountRequest::request_t(), res);
AgentCount count;
count.active = res.m_activeAgentsCount;
count.idle = res.m_idleAgentsCount;
count.executing = res.m_executingAgentsCount;
return count;
}
auto DDSSession::RequestAgentInfo() -> std::vector<DDSAgent>
{
using namespace dds::tools_api;
SAgentInfoRequest::responseVector_t res;
fImpl->fSession->syncSendRequest<SAgentInfoRequest>(SAgentInfoRequest::request_t(), res);
std::vector<DDSAgent> agentInfo;
agentInfo.reserve(res.size());
for (const auto& a : res) {
agentInfo.emplace_back(
*this,
a.m_agentID,
a.m_agentPid,
a.m_agentState,
a.m_DDSPath,
a.m_host,
a.m_lobbyLeader,
a.m_startUpTime,
a.m_taskID,
a.m_username
);
}
return agentInfo;
}
auto DDSSession::RequestTaskInfo() -> std::vector<DDSTask>
{
using namespace dds::tools_api;
SAgentInfoRequest::responseVector_t res;
fImpl->fSession->syncSendRequest<SAgentInfoRequest>(SAgentInfoRequest::request_t(), res);
std::vector<DDSTask> taskInfo;
taskInfo.reserve(res.size());
for (auto& a : res) {
taskInfo.emplace_back(a.m_taskID);
}
return taskInfo;
}
auto DDSSession::RequestCommanderInfo() -> CommanderInfo
{
dds::tools_api::SCommanderInfoRequestData commanderInfoInfo;
tools::Semaphore blocker;
using namespace dds::tools_api;
SCommanderInfoRequestData commanderInfo;
tools::SharedSemaphore blocker;
std::string error;
auto commanderInfoRequest =
dds::tools_api::SCommanderInfoRequest::makeRequest(commanderInfoInfo);
auto commanderInfoRequest = SCommanderInfoRequest::makeRequest(commanderInfo);
CommanderInfo info;
commanderInfoRequest->setResponseCallback(
[&info](const dds::tools_api::SCommanderInfoResponseData& _response) {
info.pid = _response.m_pid;
info.activeTopologyName = _response.m_activeTopologyName;
});
commanderInfoRequest->setMessageCallback(
[&](const dds::tools_api::SMessageResponseData& _message) {
if (_message.m_severity == dds::intercom_api::EMsgSeverity::error) {
error = _message.m_msg;
blocker.Signal();
} else {
LOG(debug) << _message;
}
});
commanderInfoRequest->setDoneCallback([&]() { blocker.Signal(); });
fImpl->fSession->sendRequest<dds::tools_api::SCommanderInfoRequest>(commanderInfoRequest);
commanderInfoRequest->setResponseCallback([&info](const SCommanderInfoResponseData& _response) {
info.pid = _response.m_pid;
info.activeTopologyName = _response.m_activeTopologyName;
});
commanderInfoRequest->setMessageCallback([&](const SMessageResponseData& _message) {
if (_message.m_severity == dds::intercom_api::EMsgSeverity::error) {
error = _message.m_msg;
blocker.Signal();
} else {
LOG(debug) << _message.m_msg;
}
});
commanderInfoRequest->setDoneCallback([blocker]() mutable { blocker.Signal(); });
fImpl->fSession->sendRequest<SCommanderInfoRequest>(commanderInfoRequest);
blocker.Wait();
if (!error.empty()) {
@@ -257,38 +294,41 @@ auto DDSSession::RequestCommanderInfo() -> CommanderInfo
auto DDSSession::WaitForExecutingAgents(Quantity minCount) -> void
{
auto info(RequestAgentInfo());
auto count(RequestAgentCount());
int interval(8);
while (info.executingAgentsCount < minCount) {
while (count.executing < minCount) {
std::this_thread::sleep_for(std::chrono::milliseconds(interval));
interval = std::min(256, interval * 2);
info = RequestAgentInfo();
count = RequestAgentCount();
}
}
auto DDSSession::WaitForIdleAgents(Quantity minCount) -> void
{
auto info(RequestAgentInfo());
auto count(RequestAgentCount());
int interval(8);
while (info.idleAgentsCount < minCount) {
while (count.idle < minCount) {
std::this_thread::sleep_for(std::chrono::milliseconds(interval));
interval = std::min(256, interval * 2);
info = RequestAgentInfo();
count = RequestAgentCount();
}
}
auto DDSSession::ActivateTopology(DDSTopology topo) -> void
{
dds::tools_api::STopologyRequestData topologyInfo;
topologyInfo.m_updateType = dds::tools_api::STopologyRequestData::EUpdateType::ACTIVATE;
using namespace dds::tools_api;
STopologyRequestData topologyInfo;
topologyInfo.m_updateType = STopologyRequestData::EUpdateType::ACTIVATE;
topologyInfo.m_topologyFile = topo.GetTopoFile().string();
tools::Semaphore blocker;
auto topologyRequest = dds::tools_api::STopologyRequest::makeRequest(topologyInfo);
topologyRequest->setMessageCallback(
[](const dds::tools_api::SMessageResponseData& _message) { LOG(debug) << _message; });
topologyRequest->setDoneCallback([&]() { blocker.Signal(); });
fImpl->fSession->sendRequest<dds::tools_api::STopologyRequest>(topologyRequest);
tools::SharedSemaphore blocker;
auto topologyRequest = STopologyRequest::makeRequest(topologyInfo);
topologyRequest->setMessageCallback([](const SMessageResponseData& _message) {
LOG(debug) << _message.m_msg;
});
topologyRequest->setDoneCallback([blocker]() mutable { blocker.Signal(); });
fImpl->fSession->sendRequest<STopologyRequest>(topologyRequest);
blocker.Wait();
WaitForExecutingAgents(topo.GetNumRequiredAgents());
@@ -316,24 +356,59 @@ void DDSSession::UnsubscribeFromCommands()
void DDSSession::SendCommand(const std::string& cmd) { fImpl->fDDSCustomCmd.send(cmd, ""); }
void DDSSession::SendCommand(const std::string& cmd, DDSChannel::Id recipient)
{
fImpl->fDDSCustomCmd.send(cmd, std::to_string(recipient));
}
auto DDSSession::UpdateChannelToTaskAssociation(DDSChannel::Id channelId, DDSTask::Id taskId) -> void
{
std::lock_guard<std::mutex> lk(fImpl->fMtx);
fImpl->fTaskIdByChannelIdMap[channelId] = taskId;
}
auto DDSSession::GetTaskId(DDSChannel::Id channelId) const -> DDSTask::Id
{
std::lock_guard<std::mutex> lk(fImpl->fMtx);
return fImpl->fTaskIdByChannelIdMap.at(channelId);
}
auto operator<<(std::ostream& os, const DDSSession& session) -> std::ostream&
{
return os << "$DDS_SESSION_ID: " << session.GetId();
}
auto DDSAgent::GetSession() const -> DDSSession
auto getMostRecentRunningDDSSession(DDSEnv env) -> DDSSession
{
return fSession;
}
boost::process::ipstream pipeStream;
boost::process::child c("dds-session list all", boost::process::std_out > pipeStream);
std::string lastLine;
std::string currentLine;
auto DDSAgent::GetInfoStr() const -> std::string
{
return fInfoStr;
}
while (pipeStream && std::getline(pipeStream, currentLine) && !currentLine.empty()) {
lastLine = currentLine;
}
c.wait();
std::string sessionId;
auto operator<<(std::ostream& os, const DDSAgent& agent) -> std::ostream&
{
return os << agent.GetInfoStr();
if (!lastLine.empty()) {
std::vector<std::string> words;
std::istringstream iss(lastLine);
for (std::string s; iss >> s;) {
if (s != "*") {
words.push_back(s);
}
}
if (words.back() == "RUNNING") {
sessionId = words.front();
}
}
if (sessionId.empty()) {
throw std::runtime_error("could not find most recent DDS session");
}
return DDSSession(DDSSession::Id(sessionId), std::move(env));
}
} // namespace sdk

View File

@@ -11,6 +11,7 @@
#include <fairmq/sdk/DDSEnvironment.h>
#include <fairmq/sdk/DDSInfo.h>
#include <fairmq/sdk/DDSTask.h>
#include <boost/filesystem.hpp>
@@ -21,13 +22,12 @@
#include <stdexcept>
#include <string>
#include <functional>
#include <vector>
namespace fair {
namespace mq {
namespace sdk {
class DDSEnvironment;
/**
* @enum DDSRMSPlugin DDSSession.h <fairmq/sdk/DDSSession.h>
* @brief Supported DDS resource management system plugins
@@ -43,6 +43,12 @@ auto operator>>(std::istream& is, DDSRMSPlugin& plugin) -> std::istream&;
class DDSTopology;
class DDSAgent;
class DDSChannel
{
public:
using Id = std::uint64_t;
};
/**
* @class DDSSession DDSSession.h <fairmq/sdk/DDSSession.h>
* @brief Represents a DDS session
@@ -72,13 +78,14 @@ class DDSSession
auto StopOnDestruction(bool stop = true) -> void;
auto IsRunning() const -> bool;
auto SubmitAgents(Quantity agents) -> void;
struct AgentInfo {
Quantity idleAgentsCount = 0;
Quantity activeAgentsCount = 0;
Quantity executingAgentsCount = 0;
std::vector<DDSAgent> agents;
struct AgentCount {
Quantity idle = 0;
Quantity active = 0;
Quantity executing = 0;
};
auto RequestAgentInfo() -> AgentInfo;
auto RequestAgentCount() -> AgentCount;
auto RequestAgentInfo() -> std::vector<DDSAgent>;
auto RequestTaskInfo() -> std::vector<DDSTask>;
struct CommanderInfo {
int pid = -1;
std::string activeTopologyName;
@@ -95,6 +102,9 @@ class DDSSession
void SubscribeToCommands(std::function<void(const std::string& msg, const std::string& condition, uint64_t senderId)>);
void UnsubscribeFromCommands();
void SendCommand(const std::string&);
void SendCommand(const std::string&, DDSChannel::Id);
auto UpdateChannelToTaskAssociation(DDSChannel::Id, DDSTask::Id) -> void;
auto GetTaskId(DDSChannel::Id) const -> DDSTask::Id;
friend auto operator<<(std::ostream& os, const DDSSession& session) -> std::ostream&;
@@ -103,27 +113,8 @@ class DDSSession
std::shared_ptr<Impl> fImpl;
};
/**
* @class DDSAgent DDSSession.h <fairmq/sdk/DDSSession.h>
* @brief Represents a DDS agent
*/
class DDSAgent
{
public:
explicit DDSAgent(DDSSession session, std::string infostr)
: fInfoStr(std::move(infostr))
, fSession(std::move(session))
{}
auto getMostRecentRunningDDSSession(DDSEnv env = {}) -> DDSSession;
auto GetSession() const -> DDSSession;
auto GetInfoStr() const -> std::string;
friend auto operator<<(std::ostream& os, const DDSAgent& plugin) -> std::ostream&;
private:
std::string fInfoStr;
DDSSession fSession;
};
} // namespace sdk
} // namespace mq
} // namespace fair

49
fairmq/sdk/DDSTask.h Normal file
View File

@@ -0,0 +1,49 @@
/********************************************************************************
* Copyright (C) 2019 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#ifndef FAIR_MQ_SDK_DDSTASK_H
#define FAIR_MQ_SDK_DDSTASK_H
// #include <fairmq/sdk/DDSAgent.h>
#include <ostream>
#include <cstdint>
namespace fair {
namespace mq {
namespace sdk {
/**
* @class DDSTask <fairmq/sdk/DDSTask.h>
* @brief Represents a DDS task
*/
class DDSTask
{
public:
using Id = std::uint64_t;
explicit DDSTask(Id id)
: fId(id)
{}
Id GetId() const { return fId; }
friend auto operator<<(std::ostream& os, const DDSTask& task) -> std::ostream&
{
return os << "DDSTask id: " << task.fId;
}
private:
Id fId;
};
} // namespace sdk
} // namespace mq
} // namespace fair
#endif /* FAIR_MQ_SDK_DDSTASK_H */

View File

@@ -63,29 +63,30 @@ auto DDSTopology::GetTopoFile() const -> Path
return file;
}
int DDSTopology::GetNumRequiredAgents()
auto DDSTopology::GetNumRequiredAgents() const -> int
{
return fImpl->fTopo.getRequiredNofAgents();
}
std::vector<uint64_t> DDSTopology::GetDeviceList()
auto DDSTopology::GetTasks() const -> std::vector<DDSTask>
{
std::vector<uint64_t> taskIDs;
taskIDs.reserve(GetNumRequiredAgents());
std::vector<DDSTask> list;
list.reserve(GetNumRequiredAgents());
// TODO make sure returned tasks are actually devices
auto itPair = fImpl->fTopo.getRuntimeTaskIterator(
[](const dds::topology_api::STopoRuntimeTask::FilterIterator_t::value_type& /*value*/) -> bool { return true; });
[](const dds::topology_api::STopoRuntimeTask::FilterIterator_t::value_type&) -> bool {
return true;
});
auto tasks = boost::make_iterator_range(itPair.first, itPair.second);
for (auto task : tasks) {
for (const auto& task : tasks) {
LOG(debug) << "Found task " << task.first << ": "
<< "Path: " << task.second.m_taskPath << ", "
<< "Name: " << task.second.m_task->getName() << "_" << task.second.m_taskIndex;
taskIDs.push_back(task.first);
list.emplace_back(task.first);
}
return taskIDs;
return list;
}
auto DDSTopology::GetName() const -> std::string { return fImpl->fTopo.getName(); }

View File

@@ -10,8 +10,9 @@
#define FAIR_MQ_SDK_DDSTOPOLOGY_H
#include <boost/filesystem.hpp>
#include <fairmq/sdk/DDSInfo.h>
#include <fairmq/sdk/DDSEnvironment.h>
#include <fairmq/sdk/DDSInfo.h>
#include <fairmq/sdk/DDSTask.h>
#include <memory>
#include <string>
@@ -48,10 +49,10 @@ class DDSTopology
auto GetTopoFile() const -> Path;
/// @brief Get number of required agents for this topology
int GetNumRequiredAgents();
auto GetNumRequiredAgents() const -> int;
/// @brief Get list of devices
std::vector<uint64_t> GetDeviceList();
/// @brief Get list of tasks in this topology
auto GetTasks() const -> std::vector<DDSTask>;
/// @brief Get the name of the topology
auto GetName() const -> std::string;

40
fairmq/sdk/Error.cxx Normal file
View File

@@ -0,0 +1,40 @@
/********************************************************************************
* Copyright (C) 2019 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "Error.h"
namespace fair {
namespace mq {
const char* ErrorCategory::name() const noexcept
{
return "fairmq";
}
std::string ErrorCategory::message(int ev) const
{
switch (static_cast<ErrorCode>(ev)) {
case ErrorCode::OperationInProgress:
return "async operation already in progress";
case ErrorCode::OperationTimeout:
return "async operation timed out";
case ErrorCode::OperationCanceled:
return "async operation canceled";
case ErrorCode::DeviceChangeStateFailed:
return "failed to change state of a fairmq device";
default:
return "(unrecognized error)";
}
}
const ErrorCategory errorCategory{};
std::error_code MakeErrorCode(ErrorCode e) { return {static_cast<int>(e), errorCategory}; }
} // namespace mq
} // namespace fair

62
fairmq/sdk/Error.h Normal file
View File

@@ -0,0 +1,62 @@
/********************************************************************************
* Copyright (C) 2019 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#ifndef FAIR_MQ_SDK_ERROR_H
#define FAIR_MQ_SDK_ERROR_H
#include <fairmq/Tools.h>
#include <stdexcept>
#include <system_error>
namespace fair {
namespace mq {
namespace sdk {
struct RuntimeError : ::std::runtime_error
{
template<typename... T>
explicit RuntimeError(T&&... t)
: ::std::runtime_error::runtime_error(tools::ToString(std::forward<T>(t)...))
{}
};
struct MixedStateError : RuntimeError
{
using RuntimeError::RuntimeError;
};
} /* namespace sdk */
enum class ErrorCode
{
OperationInProgress = 10,
OperationTimeout,
OperationCanceled,
DeviceChangeStateFailed
};
std::error_code MakeErrorCode(ErrorCode);
struct ErrorCategory : std::error_category
{
const char* name() const noexcept override;
std::string message(int ev) const override;
};
} /* namespace mq */
} /* namespace fair */
namespace std {
template<>
struct is_error_code_enum<fair::mq::ErrorCode> : true_type
{};
} // namespace std
#endif /* FAIR_MQ_SDK_ERROR_H */

View File

@@ -10,244 +10,20 @@
#include <DDS/Tools.h>
#include <DDS/Topology.h>
#include <boost/algorithm/string/split.hpp>
#include <boost/algorithm/string/trim.hpp>
#include <condition_variable>
#include <fairlogger/Logger.h>
#include <future>
#include <mutex>
#include <thread>
#include <utility>
namespace fair {
namespace mq {
auto operator<<(std::ostream& os, AsyncOpResultCode v) -> std::ostream&
{
switch (v) {
case AsyncOpResultCode::Aborted:
return os << "Aborted";
case AsyncOpResultCode::Timeout:
return os << "Timeout";
case AsyncOpResultCode::Error:
return os << "Error";
case AsyncOpResultCode::Ok:
default:
return os << "Ok";
}
}
auto operator<<(std::ostream& os, AsyncOpResult v) -> std::ostream&
{
os << "[" << v.code << "]";
if (!v.msg.empty()) {
os << " " << v.msg;
}
return os;
}
namespace sdk {
const std::unordered_map<DeviceTransition, DeviceState, tools::HashEnum<DeviceTransition>> expectedState =
/// @brief Helper to (Re)Construct a FairMQ topology based on already existing native DDS API objects
/// @param nativeSession Existing and initialized CSession (either via create() or attach())
/// @param nativeTopo Existing CTopology that is activated on the given nativeSession
/// @param env Optional DDSEnv (needed primarily for unit testing)
auto MakeTopology(dds::topology_api::CTopology nativeTopo,
std::shared_ptr<dds::tools_api::CSession> nativeSession,
DDSEnv env) -> Topology
{
{ Transition::InitDevice, DeviceState::InitializingDevice },
{ Transition::CompleteInit, DeviceState::Initialized },
{ Transition::Bind, DeviceState::Bound },
{ Transition::Connect, DeviceState::DeviceReady },
{ Transition::InitTask, DeviceState::Ready },
{ Transition::Run, DeviceState::Running },
{ Transition::Stop, DeviceState::Ready },
{ Transition::ResetTask, DeviceState::DeviceReady },
{ Transition::ResetDevice, DeviceState::Idle },
{ Transition::End, DeviceState::Exiting }
};
Topology::Topology(DDSTopology topo, DDSSession session)
: fDDSSession(std::move(session))
, fDDSTopo(std::move(topo))
, fStateChangeOngoing(false)
, fTargetState(DeviceState::Idle)
, fStateChangeTimeout(0)
, fShutdown(false)
{
std::vector<uint64_t> deviceList = fDDSTopo.GetDeviceList();
for (const auto& d : deviceList) {
// LOG(debug) << "Adding device " << d;
fState.emplace(d, DeviceStatus{ false, DeviceState::Ok });
}
fDDSSession.SubscribeToCommands([this](const std::string& msg, const std::string& /* condition */, uint64_t senderId) {
// LOG(debug) << "Received from " << senderId << ": " << msg;
std::vector<std::string> parts;
boost::algorithm::split(parts, msg, boost::algorithm::is_any_of(":,"));
for (unsigned int i = 0; i < parts.size(); ++i) {
boost::trim(parts.at(i));
}
if (parts[0] == "state-change") {
AddNewStateEntry(std::stoull(parts[2]), parts[3]);
} else if (parts[0] == "state-changes-subscription") {
LOG(debug) << "Received from " << senderId << ": " << msg;
if (parts[2] != "OK") {
LOG(error) << "state-changes-subscription failed with return code: " << parts[2];
}
} else if (parts[0] == "state-changes-unsubscription") {
if (parts[2] != "OK") {
LOG(error) << "state-changes-unsubscription failed with return code: " << parts[2];
}
} else if (parts[1] == "could not queue") {
LOG(warn) << "Could not queue " << parts[2] << " transition on " << senderId;
}
});
fDDSSession.StartDDSService();
LOG(debug) << "subscribe-to-state-changes";
fDDSSession.SendCommand("subscribe-to-state-changes");
fExecutionThread = std::thread(&Topology::WaitForState, this);
}
Topology::Topology(dds::topology_api::CTopology nativeTopo,
std::shared_ptr<dds::tools_api::CSession> nativeSession,
DDSEnv env)
: Topology(DDSTopo(std::move(nativeTopo), env), DDSSession(std::move(nativeSession), env))
{
if (fDDSSession.RequestCommanderInfo().activeTopologyName != fDDSTopo.GetName()) {
throw std::runtime_error("Given topology must be activated");
}
}
auto Topology::ChangeState(TopologyTransition transition, ChangeStateCallback cb, Duration timeout) -> void
{
{
std::unique_lock<std::mutex> lock(fMtx);
if (fStateChangeOngoing) {
throw std::runtime_error("A state change request is already in progress, concurrent requests are currently not supported");
lock.unlock();
}
LOG(debug) << "Initiating ChangeState with " << transition << " to " << expectedState.at(transition);
fStateChangeOngoing = true;
fChangeStateCallback = cb;
fStateChangeTimeout = timeout;
fTargetState = expectedState.at(transition);
fDDSSession.SendCommand(GetTransitionName(transition));
}
fExecutionCV.notify_one();
}
auto Topology::ChangeState(TopologyTransition t, Duration timeout) -> ChangeStateResult
{
fair::mq::tools::Semaphore blocker;
ChangeStateResult res;
ChangeState(
t,
[&blocker, &res](Topology::ChangeStateResult _res) {
res = _res;
blocker.Signal();
},
timeout);
blocker.Wait();
return res;
}
void Topology::WaitForState()
{
while (!fShutdown) {
if (fStateChangeOngoing) {
try {
auto condition = [&] {
// LOG(info) << "checking condition";
// LOG(info) << "fShutdown: " << fShutdown;
// LOG(info) << "condition: " << std::all_of(fState.cbegin(), fState.cend(),
// [&](TopologyState::value_type i) { return i.second.state == fTargetState; });
return fShutdown
|| std::all_of(
fState.cbegin(), fState.cend(), [&](TopologyState::value_type i) {
// TODO Check, if we can make sure that EXITING state change event are not missed
return (fTargetState == DeviceState::Exiting)
|| ((i.second.state == fTargetState)
&& i.second.initialized);
});
};
std::unique_lock<std::mutex> lock(fMtx);
if (fStateChangeTimeout > std::chrono::milliseconds(0)) {
if (!fCV.wait_for(lock, fStateChangeTimeout, condition)) {
// LOG(debug) << "timeout";
fStateChangeOngoing = false;
TopologyState state = fState;
lock.unlock();
fChangeStateCallback(
{{AsyncOpResultCode::Timeout, "timeout"}, std::move(state)});
break;
}
} else {
fCV.wait(lock, condition);
}
fStateChangeOngoing = false;
if (fShutdown) {
LOG(debug) << "Aborting because a shutdown was requested";
TopologyState state = fState;
lock.unlock();
fChangeStateCallback(
{{AsyncOpResultCode::Aborted, "Aborted because a shutdown was requested"},
std::move(state)});
break;
}
} catch (std::exception& e) {
fStateChangeOngoing = false;
LOG(error) << "Error while processing state request: " << e.what();
fChangeStateCallback(
{{AsyncOpResultCode::Error, tools::ToString("Exception thrown: ", e.what())},
fState});
}
fChangeStateCallback({{AsyncOpResultCode::Ok, "success"}, fState});
} else {
std::unique_lock<std::mutex> lock(fExecutionMtx);
fExecutionCV.wait(lock);
}
}
LOG(debug) << "WaitForState shutting down";
};
void Topology::AddNewStateEntry(uint64_t senderId, const std::string& state)
{
std::size_t pos = state.find("->");
std::string endState = state.substr(pos + 2);
// LOG(debug) << "Adding new state entry: " << senderId << ", " << state << ", end state: " << endState;
{
try {
std::unique_lock<std::mutex> lock(fMtx);
fState[senderId] = DeviceStatus{ true, fair::mq::GetState(endState) };
} catch (const std::exception& e) {
LOG(error) << "Exception in AddNewStateEntry: " << e.what();
}
// LOG(info) << "fState after update: ";
// for (auto& e : fState) {
// LOG(info) << e.first << ": " << e.second.state;
// }
}
fCV.notify_one();
}
Topology::~Topology()
{
fDDSSession.UnsubscribeFromCommands();
{
std::lock_guard<std::mutex> guard(fExecutionMtx);
fShutdown = true;
}
fExecutionCV.notify_one();
fExecutionThread.join();
}
auto operator<<(std::ostream& os, Topology::ChangeStateResult v) -> std::ostream&
{
return os << v.rc;
return {DDSTopo(std::move(nativeTopo), env), DDSSession(std::move(nativeSession), env)};
}
} // namespace sdk

View File

@@ -9,58 +9,63 @@
#ifndef FAIR_MQ_SDK_TOPOLOGY_H
#define FAIR_MQ_SDK_TOPOLOGY_H
#include <asio/async_result.hpp>
#include <asio/associated_executor.hpp>
#include <asio/steady_timer.hpp>
#include <asio/system_executor.hpp>
#include <boost/algorithm/string/split.hpp>
#include <boost/algorithm/string/trim.hpp>
#include <chrono>
#include <fairlogger/Logger.h>
#include <fairmq/States.h>
#include <fairmq/Tools.h>
#include <fairmq/sdk/AsioAsyncOp.h>
#include <fairmq/sdk/AsioBase.h>
#include <fairmq/sdk/DDSInfo.h>
#include <fairmq/sdk/DDSSession.h>
#include <fairmq/sdk/DDSTopology.h>
#include <fairmq/States.h>
#include <fairmq/Tools.h>
#include <fairmq/sdk/Error.h>
#include <functional>
#include <unordered_map>
#include <map>
#include <memory>
#include <chrono>
#include <mutex>
#include <ostream>
#include <string>
#include <vector>
#include <stdexcept>
#include <string>
#include <unordered_map>
#include <utility>
#include <vector>
namespace fair {
namespace mq {
enum class AsyncOpResultCode
{
Ok,
Timeout,
Error,
Aborted
};
auto operator<<(std::ostream& os, AsyncOpResultCode v) -> std::ostream&;
using AsyncOpResultMessage = std::string;
struct AsyncOpResult {
AsyncOpResultCode code;
AsyncOpResultMessage msg;
operator AsyncOpResultCode() const { return code; }
};
auto operator<<(std::ostream& os, AsyncOpResult v) -> std::ostream&;
namespace sdk {
using DeviceState = fair::mq::State;
using DeviceTransition = fair::mq::Transition;
const std::map<DeviceTransition, DeviceState> expectedState =
{
{ DeviceTransition::InitDevice, DeviceState::InitializingDevice },
{ DeviceTransition::CompleteInit, DeviceState::Initialized },
{ DeviceTransition::Bind, DeviceState::Bound },
{ DeviceTransition::Connect, DeviceState::DeviceReady },
{ DeviceTransition::InitTask, DeviceState::Ready },
{ DeviceTransition::Run, DeviceState::Running },
{ DeviceTransition::Stop, DeviceState::Ready },
{ DeviceTransition::ResetTask, DeviceState::DeviceReady },
{ DeviceTransition::ResetDevice, DeviceState::Idle },
{ DeviceTransition::End, DeviceState::Exiting }
};
struct DeviceStatus
{
bool initialized;
DeviceState state;
};
using TopologyState = std::unordered_map<uint64_t, DeviceStatus>;
using TopologyState = std::unordered_map<DDSTask::Id, DeviceStatus>;
using TopologyTransition = fair::mq::Transition;
struct MixedState : std::runtime_error { using std::runtime_error::runtime_error; };
inline DeviceState AggregateState(const TopologyState& topologyState)
{
DeviceState first = topologyState.begin()->second.state;
@@ -71,7 +76,7 @@ inline DeviceState AggregateState(const TopologyState& topologyState)
return first;
}
throw MixedState("State is not uniform");
throw MixedStateError("State is not uniform");
}
@@ -81,82 +86,353 @@ inline bool StateEqualsTo(const TopologyState& topologyState, DeviceState state)
}
/**
* @class Topology Topology.h <fairmq/sdk/Topology.h>
* @class BasicTopology Topology.h <fairmq/sdk/Topology.h>
* @tparam Executor Associated I/O executor
* @tparam Allocator Associated default allocator
* @brief Represents a FairMQ topology
*
* @par Thread Safety
* @e Distinct @e objects: Safe.@n
* @e Shared @e objects: Safe.
*/
class Topology
template <typename Executor, typename Allocator>
class BasicTopology : public AsioBase<Executor, Allocator>
{
public:
/// @brief (Re)Construct a FairMQ topology from an existing DDS topology
/// @param topo DDSTopology
/// @param session DDSSession
explicit Topology(DDSTopology topo, DDSSession session = DDSSession());
BasicTopology(DDSTopology topo, DDSSession session)
: BasicTopology<Executor, Allocator>(asio::system_executor(),
std::move(topo),
std::move(session))
{}
/// @brief (Re)Construct a FairMQ topology based on already existing native DDS API objects
/// @param nativeSession Existing and initialized CSession (either via create() or attach())
/// @param nativeTopo Existing CTopology that is activated on the given nativeSession
/// @param env Optional DDSEnv (needed primarily for unit testing)
explicit Topology(dds::topology_api::CTopology nativeTopo,
std::shared_ptr<dds::tools_api::CSession> nativeSession,
DDSEnv env = {});
/// @brief (Re)Construct a FairMQ topology from an existing DDS topology
/// @param ex I/O executor to be associated
/// @param topo DDSTopology
/// @param session DDSSession
/// @throws RuntimeError
BasicTopology(const Executor& ex,
DDSTopology topo,
DDSSession session,
Allocator alloc = DefaultAllocator())
: AsioBase<Executor, Allocator>(ex, std::move(alloc))
, fDDSSession(std::move(session))
, fDDSTopo(std::move(topo))
, fState(makeTopologyState(fDDSTopo))
, fChangeStateOp()
, fChangeStateOpTimer(ex)
, fChangeStateTarget(DeviceState::Idle)
{
std::string activeTopo(fDDSSession.RequestCommanderInfo().activeTopologyName);
std::string givenTopo(fDDSTopo.GetName());
if (activeTopo != givenTopo) {
throw RuntimeError("Given topology ", givenTopo,
" is not activated (active: ", activeTopo, ")");
}
explicit Topology(const Topology&) = delete;
Topology& operator=(const Topology&) = delete;
explicit Topology(Topology&&) = delete;
Topology& operator=(Topology&&) = delete;
fDDSSession.SubscribeToCommands([&](const std::string& msg,
const std::string& /* condition */,
DDSChannel::Id senderId) {
// LOG(debug) << "Received from " << senderId << ": " << msg;
std::vector<std::string> parts;
boost::algorithm::split(parts, msg, boost::algorithm::is_any_of(":,"));
~Topology();
for (unsigned int i = 0; i < parts.size(); ++i) {
boost::trim(parts.at(i));
}
if (parts[0] == "state-change") {
DDSTask::Id taskId(std::stoull(parts[2]));
fDDSSession.UpdateChannelToTaskAssociation(senderId, taskId);
if(parts[3] == "IDLE->EXITING") {
fDDSSession.SendCommand("state-change-exiting-received", senderId);
}
UpdateStateEntry(taskId, parts[3]);
} else if (parts[0] == "state-changes-subscription") {
LOG(debug) << "Received from " << senderId << ": " << msg;
if (parts[2] != "OK") {
LOG(error) << "state-changes-subscription failed with return code: "
<< parts[2];
}
} else if (parts[0] == "state-changes-unsubscription") {
if (parts[2] != "OK") {
LOG(error) << "state-changes-unsubscription failed with return code: "
<< parts[2];
}
} else if (parts[1] == "could not queue") {
std::lock_guard<std::mutex> lk(fMtx);
if (!fChangeStateOp.IsCompleted()
&& fState.at(fDDSSession.GetTaskId(senderId)).state != fChangeStateTarget) {
fChangeStateOpTimer.cancel();
fChangeStateOp.Complete(MakeErrorCode(ErrorCode::DeviceChangeStateFailed),
fState);
}
}
});
fDDSSession.StartDDSService();
LOG(debug) << "subscribe-to-state-changes";
fDDSSession.SendCommand("subscribe-to-state-changes");
}
/// not copyable
BasicTopology(const BasicTopology&) = delete;
BasicTopology& operator=(const BasicTopology&) = delete;
/// movable
BasicTopology(BasicTopology&&) = default;
BasicTopology& operator=(BasicTopology&&) = default;
~BasicTopology()
{
std::lock_guard<std::mutex> lk(fMtx);
fDDSSession.UnsubscribeFromCommands();
try {
fChangeStateOp.Cancel(fState);
} catch (...) {}
}
struct ChangeStateResult {
AsyncOpResult rc;
TopologyState state;
friend auto operator<<(std::ostream& os, ChangeStateResult v) -> std::ostream&;
};
using ChangeStateCallback = std::function<void(ChangeStateResult)>;
using Duration = std::chrono::milliseconds;
using ChangeStateCompletionSignature = void(std::error_code, TopologyState);
/// @brief Initiate state transition on all FairMQ devices in this topology
/// @param t FairMQ device state machine transition
/// @param cb Completion callback
/// @param transition FairMQ device state machine transition
/// @param timeout Timeout in milliseconds, 0 means no timeout
auto ChangeState(TopologyTransition t, ChangeStateCallback cb, Duration timeout = std::chrono::milliseconds(0)) -> void;
/// @param token Asio completion token
/// @tparam CompletionToken Asio completion token type
/// @throws std::system_error
///
/// @par Usage examples
/// With lambda:
/// @code
/// topo.AsyncChangeState(
/// fair::mq::sdk::TopologyTransition::InitDevice,
/// std::chrono::milliseconds(500),
/// [](std::error_code ec, TopologyState state) {
/// if (!ec) {
/// // success
/// } else if (ec.category().name() == "fairmq") {
/// switch (static_cast<fair::mq::ErrorCode>(ec.value())) {
/// case fair::mq::ErrorCode::OperationTimeout:
/// // async operation timed out
/// case fair::mq::ErrorCode::OperationCanceled:
/// // async operation canceled
/// case fair::mq::ErrorCode::DeviceChangeStateFailed:
/// // failed to change state of a fairmq device
/// case fair::mq::ErrorCode::OperationInProgress:
/// // async operation already in progress
/// default:
/// }
/// }
/// }
/// );
/// @endcode
/// With future:
/// @code
/// auto fut = topo.AsyncChangeState(fair::mq::sdk::TopologyTransition::InitDevice,
/// std::chrono::milliseconds(500),
/// asio::use_future);
/// try {
/// fair::mq::sdk::TopologyState state = fut.get();
/// // success
/// } catch (const std::system_error& ex) {
/// auto ec(ex.code());
/// if (ec.category().name() == "fairmq") {
/// switch (static_cast<fair::mq::ErrorCode>(ec.value())) {
/// case fair::mq::ErrorCode::OperationTimeout:
/// // async operation timed out
/// case fair::mq::ErrorCode::OperationCanceled:
/// // async operation canceled
/// case fair::mq::ErrorCode::DeviceChangeStateFailed:
/// // failed to change state of a fairmq device
/// case fair::mq::ErrorCode::OperationInProgress:
/// // async operation already in progress
/// default:
/// }
/// }
/// }
/// @endcode
/// With coroutine (C++20, see https://en.cppreference.com/w/cpp/language/coroutines):
/// @code
/// try {
/// fair::mq::sdk::TopologyState state = co_await
/// topo.AsyncChangeState(fair::mq::sdk::TopologyTransition::InitDevice,
/// std::chrono::milliseconds(500),
/// asio::use_awaitable);
/// // success
/// } catch (const std::system_error& ex) {
/// auto ec(ex.code());
/// if (ec.category().name() == "fairmq") {
/// switch (static_cast<fair::mq::ErrorCode>(ec.value())) {
/// case fair::mq::ErrorCode::OperationTimeout:
/// // async operation timed out
/// case fair::mq::ErrorCode::OperationCanceled:
/// // async operation canceled
/// case fair::mq::ErrorCode::DeviceChangeStateFailed:
/// // failed to change state of a fairmq device
/// case fair::mq::ErrorCode::OperationInProgress:
/// // async operation already in progress
/// default:
/// }
/// }
/// }
/// @endcode
template<typename CompletionToken>
auto AsyncChangeState(TopologyTransition transition,
Duration timeout,
CompletionToken&& token)
{
return asio::async_initiate<CompletionToken, ChangeStateCompletionSignature>(
[&](auto handler) {
std::lock_guard<std::mutex> lk(fMtx);
/// @brief Perform a state transition on all FairMQ devices in this topology
/// @param t FairMQ device state machine transition
if (fChangeStateOp.IsCompleted()) {
fChangeStateOp = ChangeStateOp(AsioBase<Executor, Allocator>::GetExecutor(),
AsioBase<Executor, Allocator>::GetAllocator(),
std::move(handler));
fChangeStateTarget = expectedState.at(transition);
fDDSSession.SendCommand(GetTransitionName(transition));
if (timeout > std::chrono::milliseconds(0)) {
fChangeStateOpTimer.expires_after(timeout);
fChangeStateOpTimer.async_wait([&](std::error_code ec) {
if (!ec) {
std::lock_guard<std::mutex> lk2(fMtx);
fChangeStateOp.Timeout(fState);
}
});
}
} else {
// TODO refactor to hide boiler plate
auto ex2(asio::get_associated_executor(
handler, AsioBase<Executor, Allocator>::GetExecutor()));
auto alloc2(asio::get_associated_allocator(
handler, AsioBase<Executor, Allocator>::GetAllocator()));
auto state(GetCurrentStateUnsafe());
ex2.post(
[h = std::move(handler), s = std::move(state)]() mutable {
try {
h(MakeErrorCode(ErrorCode::OperationInProgress), s);
} catch (const std::exception& e) {
LOG(error)
<< "Uncaught exception in completion handler: " << e.what();
} catch (...) {
LOG(error) << "Unknown uncaught exception in completion handler.";
}
},
alloc2);
}
},
token);
}
template<typename CompletionToken>
auto AsyncChangeState(TopologyTransition transition, CompletionToken&& token)
{
return AsyncChangeState(transition, Duration(0), std::move(token));
}
/// @brief Perform state transition on all FairMQ devices in this topology
/// @param transition FairMQ device state machine transition
/// @param timeout Timeout in milliseconds, 0 means no timeout
/// @return The result of the state transition
auto ChangeState(TopologyTransition t, Duration timeout = std::chrono::milliseconds(0)) -> ChangeStateResult;
/// @tparam CompletionToken Asio completion token type
/// @throws std::system_error
auto ChangeState(TopologyTransition transition, Duration timeout = Duration(0))
-> std::pair<std::error_code, TopologyState>
{
tools::SharedSemaphore blocker;
std::error_code ec;
TopologyState state;
AsyncChangeState(
transition, timeout, [&, blocker](std::error_code _ec, TopologyState _state) mutable {
ec = _ec;
state = _state;
blocker.Signal();
});
blocker.Wait();
return {ec, state};
}
/// @brief Returns the current state of the topology
/// @return map of id : DeviceStatus (initialized, state)
TopologyState GetCurrentState() const { std::lock_guard<std::mutex> guard(fMtx); return fState; }
auto GetCurrentState() const -> TopologyState
{
std::lock_guard<std::mutex> lk(fMtx);
return fState;
}
DeviceState AggregateState() { return sdk::AggregateState(fState); }
auto AggregateState() const -> DeviceState { return sdk::AggregateState(GetCurrentState()); }
bool StateEqualsTo(DeviceState state) { return sdk::StateEqualsTo(fState, state); }
auto StateEqualsTo(DeviceState state) const -> bool { return sdk::StateEqualsTo(GetCurrentState(), state); }
private:
DDSSession fDDSSession;
DDSTopology fDDSTopo;
TopologyState fState;
std::unordered_map<uint64_t, DeviceStatus> fStateChangesSubscriptions;
bool fStateChangeOngoing;
DeviceState fTargetState;
mutable std::mutex fMtx;
mutable std::mutex fExecutionMtx;
std::condition_variable fCV;
std::condition_variable fExecutionCV;
std::thread fExecutionThread;
ChangeStateCallback fChangeStateCallback;
std::chrono::milliseconds fStateChangeTimeout;
bool fShutdown;
void WaitForState();
void AddNewStateEntry(uint64_t senderId, const std::string& state);
using ChangeStateOp = AsioAsyncOp<Executor, Allocator, ChangeStateCompletionSignature>;
ChangeStateOp fChangeStateOp;
asio::steady_timer fChangeStateOpTimer;
DeviceState fChangeStateTarget;
static auto makeTopologyState(const DDSTopo& topo) -> TopologyState
{
TopologyState state;
for (const auto& task : topo.GetTasks()) {
state.emplace(task.GetId(), DeviceStatus{false, DeviceState::Ok});
}
return state;
}
auto UpdateStateEntry(DDSTask::Id taskId, const std::string& state) -> void
{
std::size_t pos = state.find("->");
std::string endState = state.substr(pos + 2);
try {
std::lock_guard<std::mutex> lk(fMtx);
fState[taskId] = DeviceStatus{true, fair::mq::GetState(endState)};
LOG(debug) << "Updated state entry: taskId=" << taskId << ",state=" << state;
TryChangeStateCompletion();
} catch (const std::exception& e) {
LOG(error) << "Exception in UpdateStateEntry: " << e.what();
}
}
/// call only under locked fMtx!
auto TryChangeStateCompletion() -> void
{
bool targetStateReached(
std::all_of(fState.cbegin(), fState.cend(), [&](TopologyState::value_type i) {
return (i.second.state == fChangeStateTarget) && i.second.initialized;
}));
if (!fChangeStateOp.IsCompleted() && targetStateReached) {
fChangeStateOpTimer.cancel();
fChangeStateOp.Complete(fState);
}
}
/// call only under locked fMtx!
auto GetCurrentStateUnsafe() const -> TopologyState
{
return fState;
}
};
using Topology = BasicTopology<DefaultExecutor, DefaultAllocator>;
using Topo = Topology;
/// @brief Helper to (Re)Construct a FairMQ topology based on already existing native DDS API objects
/// @param nativeSession Existing and initialized CSession (either via create() or attach())
/// @param nativeTopo Existing CTopology that is activated on the given nativeSession
/// @param env Optional DDSEnv (needed primarily for unit testing)
auto MakeTopology(dds::topology_api::CTopology nativeTopo,
std::shared_ptr<dds::tools_api::CSession> nativeSession,
DDSEnv env = {}) -> Topology;
} // namespace sdk
} // namespace mq
} // namespace fair

50
fairmq/sdk/Traits.h Normal file
View File

@@ -0,0 +1,50 @@
/********************************************************************************
* Copyright (C) 2019 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#ifndef FAIR_MQ_SDK_TRAITS_H
#define FAIR_MQ_SDK_TRAITS_H
#include <asio/associated_allocator.hpp>
#include <asio/associated_executor.hpp>
#include <type_traits>
namespace asio {
namespace detail {
/// Specialize to match our coding conventions
template<typename T, typename Executor>
struct associated_executor_impl<T,
Executor,
std::enable_if_t<is_executor<typename T::ExecutorType>::value>>
{
using type = typename T::ExecutorType;
static auto get(const T& obj, const Executor& /*ex = Executor()*/) noexcept -> type
{
return obj.GetExecutor();
}
};
/// Specialize to match our coding conventions
template<typename T, typename Allocator>
struct associated_allocator_impl<T,
Allocator,
std::enable_if_t<T::AllocatorType>>
{
using type = typename T::AllocatorType;
static auto get(const T& obj, const Allocator& /*alloc = Allocator()*/) noexcept -> type
{
return obj.GetAllocator();
}
};
} /* namespace detail */
} /* namespace asio */
#endif /* FAIR_MQ_SDK_TRAITS_H */

View File

@@ -45,8 +45,8 @@ struct Region
Region() = delete;
Region(const Region&) = default;
Region(Region&&) = default;
Region(const Region&) = delete;
Region(Region&&) = delete;
void InitializeQueues();

View File

@@ -39,12 +39,35 @@ auto Semaphore::Signal() -> void
fCv.notify_one();
}
auto Semaphore::GetCount() -> std::size_t
auto Semaphore::GetCount() const -> std::size_t
{
std::unique_lock<std::mutex> lk(fMutex);
return fCount;
}
SharedSemaphore::SharedSemaphore()
: fSemaphore(std::make_shared<Semaphore>())
{}
SharedSemaphore::SharedSemaphore(std::size_t initial_count)
: fSemaphore(std::make_shared<Semaphore>(initial_count))
{}
auto SharedSemaphore::Wait() -> void
{
fSemaphore->Wait();
}
auto SharedSemaphore::Signal() -> void
{
fSemaphore->Signal();
}
auto SharedSemaphore::GetCount() const -> std::size_t
{
return fSemaphore->GetCount();
}
} /* namespace tools */
} /* namespace mq */
} /* namespace fair */

View File

@@ -12,6 +12,7 @@
#include <condition_variable>
#include <cstdint>
#include <functional>
#include <memory>
#include <mutex>
namespace fair {
@@ -24,19 +25,36 @@ namespace tools {
*/
struct Semaphore
{
explicit Semaphore();
Semaphore();
explicit Semaphore(std::size_t initial_count);
auto Wait() -> void;
auto Signal() -> void;
auto GetCount() -> std::size_t;
auto GetCount() const -> std::size_t;
private:
std::size_t fCount;
std::mutex fMutex;
mutable std::mutex fMutex;
std::condition_variable fCv;
};
/**
* @struct SharedSemaphore Semaphore.h <fairmq/tools/Semaphore.h>
* @brief A simple copyable blocking semaphore.
*/
struct SharedSemaphore
{
SharedSemaphore();
explicit SharedSemaphore(std::size_t initial_count);
auto Wait() -> void;
auto Signal() -> void;
auto GetCount() const -> std::size_t;
private:
std::shared_ptr<Semaphore> fSemaphore;
};
} /* namespace tools */
} /* namespace mq */
} /* namespace fair */

View File

@@ -287,6 +287,7 @@ if(BUILD_SDK)
add_testsuite(SDK
SOURCES
${CMAKE_CURRENT_BINARY_DIR}/runner.cxx
sdk/_async_op.cxx
sdk/_dds.cxx
sdk/_topology.cxx
sdk/Fixtures.h
@@ -298,8 +299,20 @@ if(BUILD_SDK)
DDS::dds_tools_lib
INCLUDES ${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_BINARY_DIR}
TIMEOUT 15
RUN_SERIAL ON
TIMEOUT 30
${definitions}
)
if(DDS_TESTS)
foreach(i RANGE 1 ${DDS_TESTS})
add_test(NAME DDSToolsAPIStabilityTest_${i}
COMMAND ${CMAKE_CURRENT_BINARY_DIR}/testsuite_SDK --gtest_filter=TopologyHelper.MakeTopology --gtest_also_run_disabled_tests
)
set_tests_properties(DDSToolsAPIStabilityTest_${i} PROPERTIES TIMEOUT 10)
endforeach()
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/DDSToolsAPIStabilityTest.cmake.in
${CMAKE_BINARY_DIR}/DDSToolsAPIStabilityTest.cmake
@ONLY
)
endif()
endif()

View File

@@ -0,0 +1,24 @@
################################################################################
# Copyright (C) 2019 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
# #
# This software is distributed under the terms of the #
# GNU Lesser General Public Licence (LGPL) version 3, #
# copied verbatim in the file "LICENSE" #
################################################################################
include(@CMAKE_SOURCE_DIR@/CTestConfig.cmake)
cmake_host_system_information(RESULT fqdn QUERY FQDN)
set(CTEST_SITE ${fqdn})
set(CTEST_BUILD_NAME "@CMAKE_SYSTEM@ - @CMAKE_CXX_COMPILER_ID@ @CMAKE_CXX_COMPILER_VERSION@ - DDS Stability Test (@DDS_TESTS@ iterations, DDS: @DDS_VERSION@, FairMQ: @PROJECT_GIT_VERSION@, Boost: @Boost_VERSION@)")
set(CTEST_SOURCE_DIRECTORY @CMAKE_SOURCE_DIR@)
set(CTEST_BINARY_DIRECTORY @CMAKE_BINARY_DIR@)
file(REMOVE_RECURSE ${CTEST_BINARY_DIRECTORY}/test/.DDS)
ctest_start(Experimental)
ctest_test(INCLUDE "DDSToolsAPIStabilityTest")
ctest_submit()
set(dds_logs @CMAKE_BINARY_DIR@/dds_logs.tar.gz)
execute_process(COMMAND ${CMAKE_COMMAND} -E tar "cfvz" "${dds_logs}" "@CMAKE_BINARY_DIR@/test/.DDS" OUTPUT_QUIET)
message("DDS logs packed: ${dds_logs}")

View File

@@ -26,10 +26,7 @@ TEST_F(PluginServices, OnlySingleController)
mServices.ChangeDeviceState("bar", DeviceStateTransition::InitDevice),
fair::mq::PluginServices::DeviceControlError
);
ASSERT_THROW( // no control for bar
mServices.ReleaseDeviceControl("bar"),
fair::mq::PluginServices::DeviceControlError
);
ASSERT_NO_THROW(mServices.ReleaseDeviceControl("bar"));
ASSERT_NO_THROW(mServices.ReleaseDeviceControl("foo"));
ASSERT_FALSE(mServices.GetDeviceController());

View File

@@ -33,9 +33,9 @@ auto RunSingleThreadedMultipart(string transport, string address) -> void {
config.SetProperty<string>("session", std::to_string(session));
auto factory = FairMQTransportFactory::CreateTransportFactory(transport, fair::mq::tools::Uuid(), &config);
FairMQTransportFactory* factoryptr = factory.get();
auto push = FairMQChannel{"Push", "push", factory};
FairMQChannel push("Push", "push", factory);
ASSERT_TRUE(push.Bind(address));
auto pull = FairMQChannel{"Pull", "pull", factory};
FairMQChannel pull("Pull", "pull", factory);
pull.Connect(address);
// TODO validate that fTransportFactory is not nullptr
@@ -72,9 +72,9 @@ auto RunMultiThreadedMultipart(string transport, string address) -> void
config.SetProperty<int>("io-threads", 1);
config.SetProperty<size_t>("shm-segment-size", 20000000);
auto factory = FairMQTransportFactory::CreateTransportFactory(transport, fair::mq::tools::Uuid(), &config);
auto push = FairMQChannel{"Push", "push", factory};
FairMQChannel push("Push", "push", factory);
ASSERT_TRUE(push.Bind(address));
auto pull = FairMQChannel{"Pull", "pull", factory};
FairMQChannel pull("Pull", "pull", factory);
pull.Connect(address);
auto pusher = thread{[&push](){

View File

@@ -10,12 +10,13 @@
#define FAIR_MQ_TEST_FIXTURES
#include "TestEnvironment.h"
#include <fairmq/SDK.h>
#include <fairmq/Tools.h>
#include <asio/io_context.hpp>
#include <chrono>
#include <cstdlib>
#include <fairlogger/Logger.h>
#include <fairmq/SDK.h>
#include <fairmq/Tools.h>
#include <gtest/gtest.h>
#include <thread>
@@ -46,7 +47,7 @@ struct TopologyFixture : ::testing::Test
: mDDSTopoFile(tools::ToString(SDK_TESTSUITE_SOURCE_DIR, "/test_topo.xml"))
, mDDSEnv(CMAKE_CURRENT_BINARY_DIR)
, mDDSSession(mDDSEnv)
, mDDSTopo(mDDSTopoFile, mDDSEnv)
, mDDSTopo(sdk::DDSTopology::Path(mDDSTopoFile), mDDSEnv)
{
mDDSSession.StopOnDestruction();
}
@@ -58,6 +59,14 @@ struct TopologyFixture : ::testing::Test
auto n(mDDSTopo.GetNumRequiredAgents());
mDDSSession.SubmitAgents(n);
mDDSSession.ActivateTopology(mDDSTopo);
std::vector<sdk::DDSAgent> agents = mDDSSession.RequestAgentInfo();
for (const auto& a : agents) {
LOG(debug) << a;
}
std::vector<sdk::DDSTask> tasks = mDDSSession.RequestTaskInfo();
for (const auto& t : tasks) {
LOG(debug) << t;
}
}
auto TearDown() -> void override {
@@ -68,6 +77,19 @@ struct TopologyFixture : ::testing::Test
sdk::DDSEnvironment mDDSEnv;
sdk::DDSSession mDDSSession;
sdk::DDSTopology mDDSTopo;
asio::io_context mIoContext;
};
struct AsyncOpFixture : ::testing::Test
{
auto SetUp() -> void override {
}
auto TearDown() -> void override {
}
LoggerConfig mLoggerConfig;
asio::io_context mIoContext;
};
} /* namespace test */

118
test/sdk/_async_op.cxx Normal file
View File

@@ -0,0 +1,118 @@
/********************************************************************************
* Copyright (C) 2019 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "Fixtures.h"
#include <fairmq/sdk/AsioBase.h>
#include <fairmq/sdk/AsioAsyncOp.h>
#include <asio/steady_timer.hpp>
#include <iostream>
#include <thread>
namespace {
using AsyncOp = fair::mq::test::AsyncOpFixture;
// template <typename Executor, typename Allocator>
// class : public AsioBase<Executor, Allocator>
TEST_F(AsyncOp, DefaultConstruction)
{
using namespace fair::mq::sdk;
AsioAsyncOp<DefaultExecutor, DefaultAllocator, void(std::error_code, int)> op;
EXPECT_TRUE(op.IsCompleted());
}
TEST_F(AsyncOp, ConstructionWithHandler)
{
using namespace fair::mq::sdk;
AsioAsyncOp<DefaultExecutor, DefaultAllocator, void(std::error_code, int)> op(
[](std::error_code, int) {});
EXPECT_FALSE(op.IsCompleted());
}
TEST_F(AsyncOp, Complete)
{
using namespace fair::mq::sdk;
AsioAsyncOp<DefaultExecutor, DefaultAllocator, void(std::error_code, int)> op(
[](std::error_code ec, int v) {
EXPECT_FALSE(ec); // success
EXPECT_EQ(v, 42);
});
EXPECT_FALSE(op.IsCompleted());
op.Complete(42);
EXPECT_TRUE(op.IsCompleted());
EXPECT_THROW(op.Complete(6), RuntimeError); // No double completion!
}
TEST_F(AsyncOp, Cancel)
{
using namespace fair::mq;
sdk::AsioAsyncOp<sdk::DefaultExecutor, sdk::DefaultAllocator, void(std::error_code)> op(
[](std::error_code ec) {
EXPECT_TRUE(ec); // error
EXPECT_EQ(ec, MakeErrorCode(ErrorCode::OperationCanceled));
});
op.Cancel();
}
TEST_F(AsyncOp, Timeout)
{
using namespace fair::mq;
asio::steady_timer timer(mIoContext.get_executor(), std::chrono::milliseconds(50));
sdk::AsioAsyncOp<sdk::DefaultExecutor, sdk::DefaultAllocator, void(std::error_code)> op(
mIoContext.get_executor(),
[&timer](std::error_code ec) {
timer.cancel();
std::cout << "Completion with: " << ec.message() << std::endl;
EXPECT_TRUE(ec); // error
EXPECT_EQ(ec, MakeErrorCode(ErrorCode::OperationTimeout));
});
timer.async_wait([&op](asio::error_code ec) {
std::cout << "Timer event" << std::endl;
if (ec != asio::error::operation_aborted) {
op.Timeout();
}
});
mIoContext.run();
EXPECT_THROW(op.Complete(), sdk::RuntimeError);
}
TEST_F(AsyncOp, Timeout2)
{
using namespace fair::mq::sdk;
asio::steady_timer timer(mIoContext.get_executor(), std::chrono::milliseconds(50));
AsioAsyncOp<DefaultExecutor, DefaultAllocator, void(std::error_code)> op(
mIoContext.get_executor(),
[&timer](std::error_code ec) {
timer.cancel();
std::cout << "Completion with: " << ec.message() << std::endl;
EXPECT_FALSE(ec); // success
});
op.Complete(); // Complete before timer
timer.async_wait([&op](asio::error_code ec) {
std::cout << "Timer event" << std::endl;
if (ec != asio::error::operation_aborted) {
op.Timeout();
}
});
mIoContext.run();
EXPECT_THROW(op.Complete(), RuntimeError);
}
} // namespace

View File

@@ -16,8 +16,8 @@ namespace {
TEST(DDSEnvironment, Construction)
{
fair::mq::test::LoggerConfig cfg;
fair::mq::sdk::DDSEnvironment env(CMAKE_CURRENT_BINARY_DIR);
LOG(debug) << env;
}

View File

@@ -8,6 +8,7 @@
#include "Fixtures.h"
#include <asio.hpp>
#include <DDS/Topology.h>
#include <DDS/Tools.h>
#include <fairmq/sdk/Topology.h>
@@ -17,19 +18,21 @@ namespace {
using Topology = fair::mq::test::TopologyFixture;
TEST(Topology2, ConstructionWithNativeDdsApiObjects)
TEST(TopologyHelper, MakeTopology)
{
// This is only needed for this unit test
fair::mq::test::LoggerConfig cfg;
fair::mq::sdk::DDSEnv env(CMAKE_CURRENT_BINARY_DIR);
/////////////////////////////////////////
using namespace fair::mq;
// This is only needed for this unit test
test::LoggerConfig cfg;
sdk::DDSEnv env(CMAKE_CURRENT_BINARY_DIR);
/////////////////////////////////////
// Example usage:
dds::topology_api::CTopology nativeTopo(
fair::mq::tools::ToString(SDK_TESTSUITE_SOURCE_DIR, "/test_topo.xml"));
tools::ToString(SDK_TESTSUITE_SOURCE_DIR, "/test_topo.xml"));
auto nativeSession(std::make_shared<dds::tools_api::CSession>());
nativeSession->create();
EXPECT_THROW(fair::mq::sdk::Topology topo(nativeTopo, nativeSession, env), std::runtime_error);
EXPECT_THROW(sdk::MakeTopology(nativeTopo, nativeSession, env), sdk::RuntimeError);
nativeSession->shutdown();
}
TEST_F(Topology, Construction)
@@ -37,80 +40,143 @@ TEST_F(Topology, Construction)
fair::mq::sdk::Topology topo(mDDSTopo, mDDSSession);
}
TEST_F(Topology, ChangeStateAsync)
TEST_F(Topology, Construction2)
{
using fair::mq::sdk::Topology;
using fair::mq::sdk::TopologyTransition;
fair::mq::sdk::Topology topo(mIoContext.get_executor(), mDDSTopo, mDDSSession);
}
Topology topo(mDDSTopo, mDDSSession);
fair::mq::tools::Semaphore blocker;
topo.ChangeState(
TopologyTransition::InitDevice, [&blocker, &topo](Topology::ChangeStateResult result) {
LOG(info) << result;
EXPECT_EQ(result.rc, fair::mq::AsyncOpResultCode::Ok);
EXPECT_NO_THROW(fair::mq::sdk::AggregateState(result.state));
EXPECT_EQ(
fair::mq::sdk::StateEqualsTo(result.state, fair::mq::sdk::DeviceState::InitializingDevice),
true);
TEST_F(Topology, AsyncChangeState)
{
using namespace fair::mq;
tools::SharedSemaphore blocker;
sdk::Topology topo(mDDSTopo, mDDSSession);
topo.AsyncChangeState(
sdk::TopologyTransition::InitDevice,
[=](std::error_code ec, sdk::TopologyState) mutable {
LOG(info) << ec;
EXPECT_EQ(ec, std::error_code());
blocker.Signal();
});
blocker.Wait();
}
TEST_F(Topology, ChangeStateSync)
TEST_F(Topology, AsyncChangeStateWithCustomExecutor)
{
using fair::mq::sdk::Topology;
using fair::mq::sdk::TopologyTransition;
using namespace fair::mq;
Topology topo(mDDSTopo, mDDSSession);
auto result(topo.ChangeState(TopologyTransition::InitDevice));
sdk::Topology topo(mIoContext.get_executor(), mDDSTopo, mDDSSession);
topo.AsyncChangeState(
sdk::TopologyTransition::InitDevice,
[](std::error_code ec, sdk::TopologyState) {
LOG(info) << ec;
EXPECT_EQ(ec, std::error_code());
});
EXPECT_EQ(result.rc, fair::mq::AsyncOpResultCode::Ok);
EXPECT_NO_THROW(fair::mq::sdk::AggregateState(result.state));
EXPECT_EQ(
fair::mq::sdk::StateEqualsTo(result.state, fair::mq::sdk::DeviceState::InitializingDevice),
true);
mIoContext.run();
}
TEST_F(Topology, ChangeStateConcurrent)
TEST_F(Topology, AsyncChangeStateFuture)
{
using fair::mq::sdk::Topology;
using fair::mq::sdk::TopologyTransition;
using namespace fair::mq;
Topology topo(mDDSTopo, mDDSSession);
fair::mq::tools::Semaphore blocker;
topo.ChangeState(TopologyTransition::InitDevice,
[&blocker](Topology::ChangeStateResult result) {
LOG(info) << "result for valid ChangeState: " << result;
blocker.Signal();
});
EXPECT_THROW(topo.ChangeState(TopologyTransition::Stop,
[&blocker](Topology::ChangeStateResult) {}),
std::runtime_error);
sdk::Topology topo(mIoContext.get_executor(), mDDSTopo, mDDSSession);
auto fut(topo.AsyncChangeState(
sdk::TopologyTransition::InitDevice,
asio::use_future));
std::thread t([&]() { mIoContext.run(); });
bool success(false);
try {
sdk::TopologyState state = fut.get();
success = true;
} catch (const std::system_error& ex) {
LOG(error) << ex.what();
}
EXPECT_TRUE(success);
t.join();
}
#if defined(ASIO_HAS_CO_AWAIT)
TEST_F(Topology, AsyncChangeStateCoroutine)
{
using namespace fair::mq;
bool success(false);
asio::co_spawn(
mIoContext.get_executor(),
[&]() mutable -> asio::awaitable<void> {
auto executor = co_await asio::this_coro::executor;
sdk::Topology topo(executor, mDDSTopo, mDDSSession);
try {
sdk::TopologyState state = co_await topo.AsyncChangeState(
sdk::TopologyTransition::InitDevice, asio::use_awaitable);
success = true;
} catch (const std::system_error& ex) {
LOG(error) << ex.what();
}
},
asio::detached);
mIoContext.run();
EXPECT_TRUE(success);
}
#endif
TEST_F(Topology, ChangeState)
{
using namespace fair::mq;
sdk::Topology topo(mDDSTopo, mDDSSession);
auto result(topo.ChangeState(sdk::TopologyTransition::InitDevice));
LOG(info) << result.first;
EXPECT_EQ(result.first, std::error_code());
EXPECT_NO_THROW(sdk::AggregateState(result.second));
EXPECT_EQ(sdk::StateEqualsTo(result.second, sdk::DeviceState::InitializingDevice), true);
}
TEST_F(Topology, AsyncChangeStateConcurrent)
{
using namespace fair::mq;
sdk::Topology topo(mDDSTopo, mDDSSession);
tools::SharedSemaphore blocker;
topo.AsyncChangeState(sdk::TopologyTransition::InitDevice,
[blocker](std::error_code ec, sdk::TopologyState) mutable {
LOG(info) << "result for valid ChangeState: " << ec;
blocker.Signal();
});
topo.AsyncChangeState(sdk::TopologyTransition::Stop,
[](std::error_code ec, sdk::TopologyState) {
LOG(ERROR) << "Expected error: " << ec;
EXPECT_EQ(ec, MakeErrorCode(ErrorCode::OperationInProgress));
});
blocker.Wait();
}
TEST_F(Topology, ChangeStateTimeout)
TEST_F(Topology, AsyncChangeStateTimeout)
{
using fair::mq::sdk::Topology;
using fair::mq::sdk::TopologyTransition;
using namespace fair::mq;
Topology topo(mDDSTopo, mDDSSession);
fair::mq::tools::Semaphore blocker;
topo.ChangeState(TopologyTransition::InitDevice, [&](Topology::ChangeStateResult result) {
LOG(info) << result;
EXPECT_EQ(result.rc, fair::mq::AsyncOpResultCode::Timeout);
blocker.Signal();
}, std::chrono::milliseconds(1));
blocker.Wait();
sdk::Topology topo(mIoContext.get_executor(), mDDSTopo, mDDSSession);
topo.AsyncChangeState(sdk::TopologyTransition::InitDevice,
std::chrono::milliseconds(1),
[](std::error_code ec, sdk::TopologyState) {
LOG(info) << ec;
EXPECT_EQ(ec, MakeErrorCode(ErrorCode::OperationTimeout));
});
mIoContext.run();
}
TEST_F(Topology, ChangeStateFullDeviceLifetime)
TEST_F(Topology, ChangeStateFullDeviceLifecycle)
{
using fair::mq::sdk::Topology;
using namespace fair::mq;
using fair::mq::sdk::TopologyTransition;
Topology topo(mDDSTopo, mDDSSession);
sdk::Topology topo(mDDSTopo, mDDSSession);
for (auto transition : {TopologyTransition::InitDevice,
TopologyTransition::CompleteInit,
TopologyTransition::Bind,
@@ -121,7 +187,31 @@ TEST_F(Topology, ChangeStateFullDeviceLifetime)
TopologyTransition::ResetTask,
TopologyTransition::ResetDevice,
TopologyTransition::End}) {
ASSERT_EQ(topo.ChangeState(transition).rc, fair::mq::AsyncOpResultCode::Ok);
ASSERT_EQ(topo.ChangeState(transition).first, std::error_code());
}
}
TEST_F(Topology, ChangeStateFullDeviceLifecycle2)
{
using namespace fair::mq;
using fair::mq::sdk::TopologyTransition;
sdk::Topology topo(mDDSTopo, mDDSSession);
for (int i(0); i < 10; ++i) {
for (auto transition : {TopologyTransition::InitDevice,
TopologyTransition::CompleteInit,
TopologyTransition::Bind,
TopologyTransition::Connect,
TopologyTransition::InitTask,
TopologyTransition::Run}) {
ASSERT_EQ(topo.ChangeState(transition).first, std::error_code());
}
std::this_thread::sleep_for(std::chrono::milliseconds(100));
for (auto transition : {TopologyTransition::Stop,
TopologyTransition::ResetTask,
TopologyTransition::ResetDevice}) {
ASSERT_EQ(topo.ChangeState(transition).first, std::error_code());
}
}
}

View File

@@ -6,7 +6,7 @@
<declrequirement name="SinkWorker" type="wnname" value="sink"/>
<decltask name="Sampler">
<exe reachable="true">fairmq-bsampler --id sampler --color false --channel-config name=data,type=push,method=bind -P dds --msg-rate 10</exe>
<exe reachable="true">fairmq-bsampler --color false --channel-config name=data,type=push,method=bind -P dds --msg-rate 10</exe>
<requirements>
<name>SamplerWorker</name>
</requirements>
@@ -16,7 +16,7 @@
</decltask>
<decltask name="Sink">
<exe reachable="true">fairmq-sink --id sink_%taskIndex% --color false --channel-config name=data,type=pull,method=connect -P dds</exe>
<exe reachable="true">fairmq-sink --color false --channel-config name=data,type=pull,method=connect -P dds</exe>
<requirements>
<name>SinkWorker</name>
</requirements>