mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-15 09:31:45 +00:00
Compare commits
22 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
2c6b2e7f04 | ||
|
6f7ffeef13 | ||
|
2eddde0e5f | ||
|
4aae1ad8d4 | ||
|
b89c309768 | ||
|
c02fbed331 | ||
|
76aeb2c7e6 | ||
|
db5f3d794c | ||
|
b814e40c87 | ||
|
5d37ab2f01 | ||
|
5303e916fb | ||
|
7d5e76dece | ||
|
2498837b8e | ||
|
6545daeda7 | ||
|
e95096eb37 | ||
|
1bb558a457 | ||
|
1c78b8ef0a | ||
|
017c5cdc3f | ||
|
d4daa9c262 | ||
|
9564b13c19 | ||
|
56cdb3812d | ||
|
2a002a2984 |
2
.codecov.yml
Normal file
2
.codecov.yml
Normal file
@@ -0,0 +1,2 @@
|
||||
comment:
|
||||
layout: "diff, files"
|
9
.github/ISSUE_TEMPLATE/bug_report.md
vendored
9
.github/ISSUE_TEMPLATE/bug_report.md
vendored
@@ -16,12 +16,15 @@ Steps to reproduce the behavior:
|
||||
**Expected behavior**
|
||||
A clear and concise description of what you expected to happen.
|
||||
|
||||
**Screenshots**
|
||||
If applicable, add screenshots to help explain your problem.
|
||||
**Logs / Screenshots**
|
||||
If applicable, add logs or screenshots to help explain your problem.
|
||||
|
||||
**System information (please complete the following information):**
|
||||
- OS: [e.g. MacOS, Fedora28, Ubuntu14.04]
|
||||
- OS: [e.g. MacOS 10.13, Fedora 28, Ubuntu 14.04]
|
||||
- Compiler: [e.g. GCC 8.1, Clang 3.5]
|
||||
- Environment: [e.g. FairSoft version, alfadist revision]
|
||||
|
||||
**Additional context**
|
||||
Add any other context about the problem here.
|
||||
|
||||
See [github markdown cheatsheet](https://github.com/adam-p/markdown-here/wiki/Markdown-Cheatsheet#code) on how to format inline codes examples and logs.
|
||||
|
2
.gitignore
vendored
2
.gitignore
vendored
@@ -1,3 +1,5 @@
|
||||
build
|
||||
|
||||
.DS_Store
|
||||
|
||||
.vscode
|
||||
|
@@ -43,17 +43,6 @@ if(FAST_BUILD)
|
||||
include(cotire)
|
||||
endif()
|
||||
|
||||
macro(find_msgpack)
|
||||
if(NOT msgpack_FOUND)
|
||||
find_package2(PRIVATE msgpack VERSION 3.0.0)
|
||||
set(PROJECT_msgpack_VERSION 2.1.5)
|
||||
if(NOT msgpack_FOUND)
|
||||
find_package2(PRIVATE msgpack VERSION 2.1.5 REQUIRED)
|
||||
endif()
|
||||
set(msgpack_ROOT ${PACKAGE_PREFIX_DIR})
|
||||
endif()
|
||||
endmacro()
|
||||
|
||||
set(CMAKE_THREAD_PREFER_PTHREAD TRUE)
|
||||
set(THREADS_PREFER_PTHREAD_FLAG TRUE)
|
||||
find_package(Threads REQUIRED)
|
||||
@@ -67,13 +56,9 @@ if(BUILD_FAIRMQ)
|
||||
endif()
|
||||
|
||||
if(BUILD_NANOMSG_TRANSPORT)
|
||||
find_package2(PRIVATE nanomsg VERSION 1.0.0 REQUIRED)
|
||||
find_package2(PRIVATE msgpack VERSION 3.0.0)
|
||||
set(PROJECT_msgpack_VERSION 2.1.5)
|
||||
if(NOT msgpack_FOUND)
|
||||
find_package2(PRIVATE msgpack VERSION 2.1.5 REQUIRED)
|
||||
endif()
|
||||
set(msgpack_ROOT ${PACKAGE_PREFIX_DIR})
|
||||
find_package2(PRIVATE nanomsg REQUIRED)
|
||||
set(PROJECT_nanomsg_VERSION 1.1.3) # Once upstream releases 1.1.5, we should bump again and use version check
|
||||
find_package2(PRIVATE msgpack VERSION 3.1.0 REQUIRED)
|
||||
endif()
|
||||
|
||||
if(BUILD_OFI_TRANSPORT)
|
||||
@@ -165,11 +150,6 @@ if(BUILD_DDS_PLUGIN)
|
||||
DESTINATION ${PROJECT_INSTALL_CMAKEMODDIR}
|
||||
)
|
||||
endif()
|
||||
if(BUILD_NANOMSG_TRANSPORT)
|
||||
install(FILES cmake/Findnanomsg.cmake
|
||||
DESTINATION ${PROJECT_INSTALL_CMAKEMODDIR}
|
||||
)
|
||||
endif()
|
||||
if(BUILD_OFI_TRANSPORT)
|
||||
install(FILES cmake/FindOFI.cmake
|
||||
DESTINATION ${PROJECT_INSTALL_CMAKEMODDIR}
|
||||
@@ -236,7 +216,8 @@ if(PROJECT_PACKAGE_DEPENDENCIES)
|
||||
elseif(${dep} STREQUAL GTest)
|
||||
get_filename_component(prefix ${GTEST_INCLUDE_DIRS}/.. ABSOLUTE)
|
||||
elseif(${dep} STREQUAL msgpack)
|
||||
set(prefix ${msgpack_ROOT})
|
||||
get_target_property(msgpack_include msgpackc-cxx INTERFACE_INCLUDE_DIRECTORIES)
|
||||
get_filename_component(prefix ${msgpack_include}/.. ABSOLUTE)
|
||||
elseif(${dep} STREQUAL OFI)
|
||||
get_filename_component(prefix ${${dep}_INCLUDE_DIRS}/.. ABSOLUTE)
|
||||
elseif(${dep} STREQUAL Doxygen)
|
||||
|
2
LICENSE
2
LICENSE
@@ -1,4 +1,4 @@
|
||||
GNU LESSER GENERAL PUBLIC LICENSE
|
||||
GNU LESSER GENERAL PUBLIC LICENSE
|
||||
Version 3, 29 June 2007
|
||||
|
||||
Copyright (C) 2007 Free Software Foundation, Inc. <http://fsf.org/>
|
||||
|
14
README.md
14
README.md
@@ -1,12 +1,14 @@
|
||||
<!-- {#mainpage} -->
|
||||
# FairMQ [](COPYRIGHT)
|
||||
# FairMQ [](COPYRIGHT) [](https://alfa-ci.gsi.de/blue/organizations/jenkins/FairRootGroup%2FFairMQ/branches) [](https://codecov.io/gh/FairRootGroup/FairMQ/branch/master)
|
||||
|
||||
C++ Message Queuing Library and Framework
|
||||
|
||||
| Branch | Version | Docs | Status |
|
||||
| :---: | :--- | :--- | :--- |
|
||||
| `master` | [](https://github.com/FairRootGroup/FairMQ/releases/latest) | [API](https://fairrootgroup.github.io/FairMQ/latest), [Book](https://github.com/FairRootGroup/FairMQ/blob/master/README.md#documentation) | [](https://alfa-ci.gsi.de/blue/organizations/jenkins/FairRootGroup%2FFairMQ/branches) [](https://codecov.io/gh/FairRootGroup/FairMQ/branch/master) |
|
||||
| `dev` | [](https://github.com/FairRootGroup/FairMQ/tags) | [Book](https://github.com/FairRootGroup/FairMQ/blob/dev/README.md#documentation) | [](https://alfa-ci.gsi.de/blue/organizations/jenkins/FairRootGroup%2FFairMQ/branches) [](https://codecov.io/gh/FairRootGroup/FairMQ/branch/dev) |
|
||||
| Release | Version | Docs |
|
||||
| :---: | :--- | :--- |
|
||||
| `stable` | [](https://github.com/FairRootGroup/FairMQ/releases/latest) | [API](https://fairrootgroup.github.io/FairMQ/latest), [Book](https://github.com/FairRootGroup/FairMQ/blob/master/README.md#documentation) |
|
||||
| `testing` | [](https://github.com/FairRootGroup/FairMQ/tags) | [Book](https://github.com/FairRootGroup/FairMQ/blob/dev/README.md#documentation) |
|
||||
|
||||
Find all FairMQ releases [here](https://github.com/FairRootGroup/FairMQ/releases).
|
||||
|
||||
## Introduction
|
||||
|
||||
@@ -31,8 +33,6 @@ configuration and control services.
|
||||
FairMQ has been developed in the context of its mother project [FairRoot](https://github.com/FairRootGroup/FairRoot) -
|
||||
a simulation, reconstruction and analysis framework.
|
||||
|
||||
Find all FairMQ releases and development tags [here](https://github.com/FairRootGroup/FairMQ/releases).
|
||||
|
||||
## Dependencies
|
||||
|
||||
* PUBLIC: [**Boost**](https://www.boost.org/), [**FairLogger**](https://github.com/FairRootGroup/FairLogger)
|
||||
|
@@ -89,7 +89,7 @@ endif()
|
||||
|
||||
include(FindPackageHandleStandardArgs)
|
||||
find_package_handle_standard_args(ZeroMQ
|
||||
REQUIRED_VARS ZeroMQ_LIBRARY_SHARED ZeroMQ_INCLUDE_DIR ZeroMQ_LIBRARY_STATIC
|
||||
REQUIRED_VARS ZeroMQ_LIBRARY_SHARED ZeroMQ_INCLUDE_DIR
|
||||
VERSION_VAR ZeroMQ_VERSION
|
||||
)
|
||||
|
||||
|
@@ -1,46 +0,0 @@
|
||||
################################################################################
|
||||
# Copyright (C) 2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
|
||||
# #
|
||||
# This software is distributed under the terms of the #
|
||||
# GNU Lesser General Public Licence (LGPL) version 3, #
|
||||
# copied verbatim in the file "LICENSE" #
|
||||
################################################################################
|
||||
|
||||
unset(_args)
|
||||
|
||||
if(msgpack_FIND_VERSION)
|
||||
list(APPEND _args ${msgpack_FIND_VERSION})
|
||||
endif()
|
||||
|
||||
if(msgpack_FIND_EXACT)
|
||||
list(APPEND _args "EXACT")
|
||||
endif()
|
||||
|
||||
if(msgpack_FIND_QUIETLY)
|
||||
list(APPEND _args "QUIET")
|
||||
endif()
|
||||
|
||||
if(msgpack_FIND_REQUIRED)
|
||||
list(APPEND _args "REQUIRED")
|
||||
endif()
|
||||
|
||||
if(msgpack_FIND_COMPONENTS)
|
||||
list(APPEND _args "COMPONENTS" ${msgpack_FIND_COMPONENTS})
|
||||
endif()
|
||||
|
||||
find_package(msgpack ${_args} CONFIG)
|
||||
|
||||
if(msgpack_FOUND AND NOT TARGET msgpack::msgpack)
|
||||
# config mode find_package does not set $msgpack_ROOT, workaround by extracting
|
||||
# root path from library target
|
||||
unset(_msgpack_lib)
|
||||
unset(_prefix)
|
||||
get_target_property(_msgpack_lib msgpackc INTERFACE_LOCATION)
|
||||
get_filename_component(_prefix ${_msgpack_lib} DIRECTORY)
|
||||
get_filename_component(_prefix ${_prefix}/.. ABSOLUTE)
|
||||
|
||||
add_library(msgpack::msgpack INTERFACE IMPORTED)
|
||||
set_target_properties(msgpack::msgpack PROPERTIES
|
||||
INTERFACE_INCLUDE_DIRECTORIES "${_prefix}/include"
|
||||
)
|
||||
endif()
|
@@ -1,34 +0,0 @@
|
||||
################################################################################
|
||||
# Copyright (C) 2014-2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
|
||||
# #
|
||||
# This software is distributed under the terms of the #
|
||||
# GNU Lesser General Public Licence (LGPL) version 3, #
|
||||
# copied verbatim in the file "LICENSE" #
|
||||
################################################################################
|
||||
|
||||
find_path(nanomsg_INCLUDE_DIR
|
||||
NAMES nanomsg/nn.h
|
||||
HINTS ${NANOMSG_ROOT} $ENV{NANOMSG_ROOT}
|
||||
PATH_SUFFIXES include
|
||||
DOC "Path to nanomsg include header files."
|
||||
)
|
||||
|
||||
find_library(nanomsg_LIBRARY_SHARED
|
||||
NAMES libnanomsg.dylib libnanomsg.so
|
||||
HINTS ${NANOMSG_ROOT} $ENV{NANOMSG_ROOT}
|
||||
PATH_SUFFIXES lib
|
||||
DOC "Path to libnanomsg.dylib libnanomsg.so."
|
||||
)
|
||||
|
||||
include(FindPackageHandleStandardArgs)
|
||||
find_package_handle_standard_args(nanomsg
|
||||
REQUIRED_VARS nanomsg_LIBRARY_SHARED nanomsg_INCLUDE_DIR
|
||||
)
|
||||
|
||||
if(NOT TARGET nanomsg AND nanomsg_FOUND)
|
||||
add_library(nanomsg SHARED IMPORTED)
|
||||
set_target_properties(nanomsg PROPERTIES
|
||||
IMPORTED_LOCATION ${nanomsg_LIBRARY_SHARED}
|
||||
INTERFACE_INCLUDE_DIRECTORIES ${nanomsg_INCLUDE_DIR}
|
||||
)
|
||||
endif()
|
@@ -51,7 +51,7 @@ function(add_testsuite suitename)
|
||||
cmake_parse_arguments(testsuite
|
||||
""
|
||||
"TIMEOUT;RUN_SERIAL"
|
||||
"SOURCES;LINKS;DEPENDS;INCLUDES"
|
||||
"SOURCES;LINKS;DEPENDS;INCLUDES;DEFINITIONS"
|
||||
${ARGN}
|
||||
)
|
||||
|
||||
@@ -69,6 +69,9 @@ function(add_testsuite suitename)
|
||||
if(testsuite_INCLUDES)
|
||||
target_include_directories(${target} PUBLIC ${testsuite_INCLUDES})
|
||||
endif()
|
||||
if(testsuite_DEFINITIONS)
|
||||
target_compile_definitions("${target}" PUBLIC ${testsuite_DEFINITIONS})
|
||||
endif()
|
||||
|
||||
add_test(NAME "${suitename}" WORKING_DIRECTORY ${CMAKE_BINARY_DIR} COMMAND ${target})
|
||||
if(testsuite_TIMEOUT)
|
||||
@@ -86,7 +89,7 @@ function(add_testhelper helpername)
|
||||
cmake_parse_arguments(testhelper
|
||||
""
|
||||
""
|
||||
"SOURCES;LINKS;DEPENDS;INCLUDES"
|
||||
"SOURCES;LINKS;DEPENDS;INCLUDES;DEFINITIONS"
|
||||
${ARGN}
|
||||
)
|
||||
|
||||
@@ -102,6 +105,9 @@ function(add_testhelper helpername)
|
||||
if(testhelper_INCLUDES)
|
||||
target_include_directories(${target} PUBLIC ${testhelper_INCLUDES})
|
||||
endif()
|
||||
if(testhelper_DEFINITIONS)
|
||||
target_compile_definitions(${target} PUBLIC ${testhelper_DEFINITIONS})
|
||||
endif()
|
||||
|
||||
list(APPEND ALL_TEST_TARGETS ${target})
|
||||
set(ALL_TEST_TARGETS ${ALL_TEST_TARGETS} PARENT_SCOPE)
|
||||
@@ -111,7 +117,7 @@ function(add_testlib libname)
|
||||
cmake_parse_arguments(testlib
|
||||
"HIDDEN"
|
||||
"VERSION"
|
||||
"SOURCES;LINKS;DEPENDS;INCLUDES"
|
||||
"SOURCES;LINKS;DEPENDS;INCLUDES;DEFINITIONS"
|
||||
${ARGN}
|
||||
)
|
||||
|
||||
@@ -133,6 +139,9 @@ function(add_testlib libname)
|
||||
if(testlib_VERSION)
|
||||
set_target_properties(${target} PROPERTIES VERSION ${testlib_VERSION})
|
||||
endif()
|
||||
if(testlib_DEFINITIONS)
|
||||
target_compile_definitions(${target} PUBLIC ${testlib_DEFINITIONS})
|
||||
endif()
|
||||
|
||||
list(APPEND ALL_TEST_TARGETS ${target})
|
||||
set(ALL_TEST_TARGETS ${ALL_TEST_TARGETS} PARENT_SCOPE)
|
||||
|
@@ -1,16 +1,10 @@
|
||||
/********************************************************************************
|
||||
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* Copyright (C) 2014-2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* *
|
||||
* This software is distributed under the terms of the *
|
||||
* GNU Lesser General Public Licence (LGPL) version 3, *
|
||||
* copied verbatim in the file "LICENSE" *
|
||||
********************************************************************************/
|
||||
/**
|
||||
* Sampler.cpp
|
||||
*
|
||||
* @since 2014-10-10
|
||||
* @author A. Rybalchenko
|
||||
*/
|
||||
|
||||
#include <thread> // this_thread::sleep_for
|
||||
#include <chrono>
|
||||
@@ -64,8 +58,6 @@ bool Sampler::ConditionalRun()
|
||||
return false;
|
||||
}
|
||||
|
||||
this_thread::sleep_for(chrono::seconds(1));
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@@ -4,6 +4,7 @@ export FAIRMQ_PATH=@FAIRMQ_BIN_DIR@
|
||||
|
||||
SAMPLER="fairmq-ex-1-1-sampler"
|
||||
SAMPLER+=" --id sampler1"
|
||||
SAMPLER+=" --rate 1"
|
||||
SAMPLER+=" --channel-config name=data,type=push,method=bind,address=tcp://*:5555,rateLogging=0"
|
||||
xterm -geometry 80x23+0+0 -hold -e @EX_BIN_DIR@/$SAMPLER &
|
||||
|
||||
|
@@ -13,6 +13,7 @@ trap 'kill -TERM $SAMPLER_PID; kill -TERM $SINK_PID; wait $SAMPLER_PID; wait $SI
|
||||
|
||||
SAMPLER="fairmq-ex-1-1-sampler"
|
||||
SAMPLER+=" --id sampler1"
|
||||
SAMPLER+=" --rate 1"
|
||||
SAMPLER+=" --transport $transport"
|
||||
SAMPLER+=" --verbosity veryhigh"
|
||||
SAMPLER+=" --control static --color false"
|
||||
|
@@ -1,23 +1,17 @@
|
||||
DDS Example
|
||||
===========
|
||||
|
||||
This example demonstrates usage of the Dynamic Deployment System ([DDS](http://dds.gsi.de/)) to dynamically deploy and configure a topology of devices. The topology is similar to those of Example 2, but now it can be easily distributed on different computing nodes without the need for manual socket reconfiguration of the devices.
|
||||
|
||||
To make use of DDS functionality the example executables need to find the DDS plugin libraries that are compiled with FairRoot when FairRoot find DDS installed. Custom DDS location can be given to CMake as follows:
|
||||
|
||||
```bash
|
||||
cmake -DDDS_ROOT="/path/to/dds/install/dir/" ..
|
||||
```
|
||||
This example demonstrates usage of the Dynamic Deployment System ([DDS](http://dds.gsi.de/)) to dynamically deploy and configure a topology of devices. The topology is similar to the one in Example 1-n-1, but now it can be easily distributed on different computing nodes without the need for manual addresses reconfiguration of the devices.
|
||||
|
||||
The description below outlines the minimal steps needed to run the example with DDS. For general DDS help please refer to DDS documentation on [DDS Website](http://dds.gsi.de/).
|
||||
|
||||
##### 1. The device handles the socket addresses and ports configuration via DDS Plugin.
|
||||
##### 1. The device handles the channel addresses and ports configuration via DDS Plugin.
|
||||
|
||||
It is sufficient to provide the `-S "<@FAIRROOT_INSTALL_DIR@/lib" -P dds` (`<` prepends the following path to the default plugin search paths; put in the path which points to the library dir of your FairRoot installation) command line arguments to let the devices be configured dynamically. No code changes in the device are necessary. See the XML topology file for example of using the command line arguments.
|
||||
It is sufficient to provide the `-S "<@FAIRMQ_INSTALL_DIR@/lib" -P dds` (`<` prepends the following path to the default plugin search paths; put in the path which points to the library dir of your FairRoot installation) command line arguments to let the devices be configured dynamically. No code changes in the device are necessary. See the XML topology file for example of using the command line arguments.
|
||||
|
||||
##### 2a. Write DDS hosts file that contains a list of worker nodes to run the topology on (When deploying using the SSH plug-in).
|
||||
|
||||
We run this example on the local machine for simplicity. The file below defines three workers, sampler, processor and sink, with a total of 12 DDS agents (thus able to accept 12 tasks). The parameters for each worker node are:
|
||||
We run this example on the local machine for simplicity. The file below defines 3 workers - sampler, processor and sink - with a total of 12 DDS agents (thus able to accept 12 tasks). The parameters for each worker node are:
|
||||
- user-chosen worker ID (must be unique)
|
||||
- a host name with or without a login, in a form: login@host.fqdn (password-less SSH access to these hosts must be possible)
|
||||
- additional SSH params (can be empty)
|
||||
@@ -40,23 +34,27 @@ If you want to deploy on a single host DDS 1.6+ provides a localhost rms plug-in
|
||||
|
||||
##### 3. Write DDS topology file that describes which tasks (processes) to run and their topology and configuration.
|
||||
|
||||
Take a look at `ex-dds-topology.xml`. It consists of a definition part (properties, tasks, collections and more) and execution part (main). In our example Sampler, Processor and Sink tasks are defines, containing their executables and exchanged properties. The `<main>` of the topology uses the defined tasks. Besides one Sampler and one Sink task, a group containing Processor task is defined. The group has a multiplicity of 10, meaninig 10 Processors will be executed. Each of the Processors will receive the properties with Sampler and Sink addresses.
|
||||
Take a look at `ex-dds-topology.xml`. It consists of a definition part (properties, tasks, collections and more) and execution part (main). In our example Sampler, Processor and Sink tasks are defined, containing their executables and exchanged properties. The `<main>` of the topology uses the defined tasks. Besides one Sampler and one Sink task, a group containing Processor task is defined. The group has a multiplicity of 10, meaninig 10 Processors will be executed. Each of the Processors will receive the properties with Sampler and Sink addresses.
|
||||
|
||||
The configuration of the channel connection addresses is done by the DDS plugin via the channel names. The task property names must correspond to the channel names (data1, data2), with binding channels writing the properties and connecting channel reading the properties (see the example XML and JSON files).
|
||||
The configuration of the channel connection addresses is done by the DDS plugin via the channel names. The task property names must correspond to the channel names (data1, data2), with binding channels writing the properties and connecting channel reading the properties.
|
||||
|
||||
If `eth0` network interface (default for binding) is not available on your system, specify another one in the topology file for each task. For example: `--network-interface lo0`.
|
||||
|
||||
If you chose step 2b earlier, then modify the provided `ex-dds-topology.xml` in the top that the following lines read as following:
|
||||
**If you chose step 2b earlier**, then modify the provided `ex-dds-topology.xml` in the top that the following lines read as following:
|
||||
```xml
|
||||
<declrequirement id="SamplerWorker" type="wnname" value=".*"/>
|
||||
<declrequirement id="ProcessorWorker" type="wnname" value=".*"/>
|
||||
<declrequirement id="SinkWorker" type="wnname" value=".*"/>
|
||||
<declrequirement id="SamplerWorker" type="wnname" value=".*"/>
|
||||
<declrequirement id="ProcessorWorker" type="wnname" value=".*"/>
|
||||
<declrequirement id="SinkWorker" type="wnname" value=".*"/>
|
||||
```
|
||||
|
||||
Note that the attributes `value` contain a different value.
|
||||
|
||||
##### 4. Start DDS server.
|
||||
|
||||
First you need to initialize DDS environment:
|
||||
|
||||
```bash
|
||||
source DDS_env.sh # this script is located in the DDS installation directory
|
||||
```
|
||||
|
||||
The DDS server is started with:
|
||||
|
||||
```bash
|
||||
@@ -71,7 +69,7 @@ dds-submit --rms ssh --config ex-dds-hosts.cfg
|
||||
```
|
||||
The `--rms` option defines a destination resource management system. The `--config` specifies an SSH plug-in resource definition file.
|
||||
|
||||
If you chose step 2b earlier, run the following command instead:
|
||||
**If you chose step 2b earlier**, run the following command instead:
|
||||
|
||||
```bash
|
||||
dds-submit --rms localhost -n 12
|
||||
@@ -85,13 +83,30 @@ dds-topology --activate ex-dds-topology.xml
|
||||
|
||||
##### 7. Run
|
||||
|
||||
After activation, agents will execute the defined tasks on the worker nodes. Output of the tasks will be stored in the directory that was specified in the hosts file.
|
||||
After activation, agents will execute the defined tasks on the worker nodes. Output of the tasks will be stored in the directory that was specified in the hosts file (or in the system temporary directory when using the localhost plugin).
|
||||
|
||||
##### 8. (optional) Use example command UI to check state of the devices
|
||||
|
||||
A simple utility (fairmq-dds-command-ui) is included with FairRoot to send commands to devices and receive replies from them. The utility uses the DDS intercom library to send "check-state" string to all devices, to which they reply with their ID and state they are in. The utility also allows requesting state changes from devices. To let the device listen to the commands from the utility, start the device with `-S "<@FAIRROOT_INSTALL_DIR@/lib" -P dds` cmd option (see example XML topology).
|
||||
A simple utility (fairmq-dds-command-ui) is included with FairMQ to send commands to devices and receive replies from them. The utility uses the DDS intercom library to query state/config of devices and allows changing their state. To let the device listen to the commands from the utility, start the device with `-S "<@FAIRMQ_INSTALL_DIR@/lib" -P dds` cmd option (see example XML topology).
|
||||
|
||||
To see it in action, start the fairmq-dds-command-ui while the topology is running.
|
||||
To see it in action, start the fairmq-dds-command-ui while the topology is running. Run the utility with `-h` to see everything that it can do.
|
||||
|
||||
The utility requires a session parameter to connect to appropriate DDS session. The session value is given when starting dds-server.
|
||||
|
||||
By default the command UI sends commands to all tasks. This can be further refined by giving a specific topology path via `-p` argument.
|
||||
Given our topology file, here are some examples of valid paths:
|
||||
```bash
|
||||
# get state of all devices
|
||||
./fairmq/plugins/DDS/fairmq-dds-command-ui -s 937ffbca-b524-44d8-9898-1d69aedc3751 -c c
|
||||
# get state of sampler
|
||||
./fairmq/plugins/DDS/fairmq-dds-command-ui -s 937ffbca-b524-44d8-9898-1d69aedc3751 -c c -p main/Sampler
|
||||
# get state of sink
|
||||
./fairmq/plugins/DDS/fairmq-dds-command-ui -s 937ffbca-b524-44d8-9898-1d69aedc3751 -c c -p main/Sink
|
||||
# get state all processors
|
||||
./fairmq/plugins/DDS/fairmq-dds-command-ui -s 937ffbca-b524-44d8-9898-1d69aedc3751 -c c -p main/ProcessorGroup/Processor
|
||||
# get state of a specific processor
|
||||
./fairmq/plugins/DDS/fairmq-dds-command-ui -s 937ffbca-b524-44d8-9898-1d69aedc3751 -c c -p main/ProcessorGroup/Processor_9
|
||||
```
|
||||
|
||||
##### 9. Stop DDS server/topology.
|
||||
|
||||
|
@@ -61,6 +61,7 @@ set(FAIRMQ_PUBLIC_HEADER_FILES
|
||||
tools/CppSTL.h
|
||||
tools/Network.h
|
||||
tools/Process.h
|
||||
tools/RateLimit.h
|
||||
tools/Strings.h
|
||||
tools/Unique.h
|
||||
tools/Version.h
|
||||
@@ -222,7 +223,7 @@ target_include_directories(${_target}
|
||||
# link libraries #
|
||||
##################
|
||||
if(BUILD_NANOMSG_TRANSPORT)
|
||||
set(NANOMSG_DEPS nanomsg msgpack::msgpack)
|
||||
set(NANOMSG_DEPS nanomsg msgpackc-cxx)
|
||||
endif()
|
||||
if(BUILD_OFI_TRANSPORT)
|
||||
set(OFI_DEPS OFI::libfabric protobuf::libprotobuf $<TARGET_OBJECTS:OfiTransport>)
|
||||
|
@@ -8,9 +8,6 @@
|
||||
|
||||
#include <FairMQDevice.h>
|
||||
|
||||
#include <fairmq/Tools.h>
|
||||
#include <fairmq/Transports.h>
|
||||
|
||||
#include <boost/algorithm/string.hpp> // join/split
|
||||
|
||||
#include <boost/uuid/uuid.hpp>
|
||||
@@ -31,53 +28,39 @@
|
||||
|
||||
using namespace std;
|
||||
|
||||
|
||||
FairMQDevice::FairMQDevice()
|
||||
: fTransportFactory(nullptr)
|
||||
, fTransports()
|
||||
, fChannels()
|
||||
, fConfig(nullptr)
|
||||
, fId()
|
||||
, fNumIoThreads(1)
|
||||
, fInitialValidationFinished(false)
|
||||
, fInitialValidationCondition()
|
||||
, fInitialValidationMutex()
|
||||
, fPortRangeMin(22000)
|
||||
, fPortRangeMax(32000)
|
||||
, fNetworkInterface()
|
||||
, fDefaultTransportType(fair::mq::Transport::DEFAULT)
|
||||
, fInitializationTimeoutInS(120)
|
||||
, fDataCallbacks(false)
|
||||
, fMsgInputs()
|
||||
, fMultipartInputs()
|
||||
, fMultitransportInputs()
|
||||
, fChannelRegistry()
|
||||
, fInputChannelKeys()
|
||||
, fMultitransportMutex()
|
||||
, fMultitransportProceed(false)
|
||||
, fExternalConfig(false)
|
||||
, fVersion({0, 0, 0})
|
||||
, fRate(0.)
|
||||
, fLastTime(0)
|
||||
, fRawCmdLineArgs()
|
||||
: FairMQDevice(nullptr, {0, 0, 0})
|
||||
{
|
||||
}
|
||||
|
||||
FairMQDevice::FairMQDevice(FairMQProgOptions& config)
|
||||
: FairMQDevice(&config, {0, 0, 0})
|
||||
{
|
||||
}
|
||||
|
||||
FairMQDevice::FairMQDevice(const fair::mq::tools::Version version)
|
||||
: FairMQDevice(nullptr, version)
|
||||
{
|
||||
}
|
||||
|
||||
FairMQDevice::FairMQDevice(FairMQProgOptions& config, const fair::mq::tools::Version version)
|
||||
: FairMQDevice(&config, version)
|
||||
{
|
||||
}
|
||||
|
||||
FairMQDevice::FairMQDevice(FairMQProgOptions* config, const fair::mq::tools::Version version)
|
||||
: fTransportFactory(nullptr)
|
||||
, fTransports()
|
||||
, fChannels()
|
||||
, fConfig(nullptr)
|
||||
, fInternalConfig(config ? nullptr : fair::mq::tools::make_unique<FairMQProgOptions>())
|
||||
, fConfig(config ? config : fInternalConfig.get())
|
||||
, fId()
|
||||
, fNumIoThreads(1)
|
||||
, fInitialValidationFinished(false)
|
||||
, fInitialValidationCondition()
|
||||
, fInitialValidationMutex()
|
||||
, fPortRangeMin(22000)
|
||||
, fPortRangeMax(32000)
|
||||
, fNetworkInterface()
|
||||
, fDefaultTransportType(fair::mq::Transport::DEFAULT)
|
||||
, fInitializationTimeoutInS(120)
|
||||
, fDataCallbacks(false)
|
||||
, fMsgInputs()
|
||||
, fMultipartInputs()
|
||||
@@ -86,26 +69,51 @@ FairMQDevice::FairMQDevice(const fair::mq::tools::Version version)
|
||||
, fInputChannelKeys()
|
||||
, fMultitransportMutex()
|
||||
, fMultitransportProceed(false)
|
||||
, fExternalConfig(false)
|
||||
, fVersion(version)
|
||||
, fRate(0.)
|
||||
, fLastTime(0)
|
||||
, fRawCmdLineArgs()
|
||||
{
|
||||
}
|
||||
|
||||
void FairMQDevice::InitWrapper()
|
||||
{
|
||||
if (!fTransportFactory)
|
||||
fId = fConfig->GetValue<string>("id");
|
||||
fRate = fConfig->GetValue<float>("rate");
|
||||
fPortRangeMin = fConfig->GetValue<int>("port-range-min");
|
||||
fPortRangeMax = fConfig->GetValue<int>("port-range-max");
|
||||
|
||||
try
|
||||
{
|
||||
LOG(error) << "Transport not initialized. Did you call SetTransport()?";
|
||||
exit(EXIT_FAILURE);
|
||||
fDefaultTransportType = fair::mq::TransportTypes.at(fConfig->GetValue<string>("transport"));
|
||||
}
|
||||
catch (const exception& e)
|
||||
{
|
||||
LOG(error) << "invalid transport type provided: " << fConfig->GetValue<string>("transport");
|
||||
}
|
||||
|
||||
for (auto& c : fConfig->GetFairMQMap())
|
||||
{
|
||||
if (fChannels.find(c.first) == fChannels.end())
|
||||
{
|
||||
LOG(debug) << "Inserting new device channel from config: " << c.first;
|
||||
fChannels.insert(c);
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG(debug) << "Updating existing device channel from config: " << c.first;
|
||||
fChannels[c.first] = c.second;
|
||||
}
|
||||
}
|
||||
|
||||
LOG(debug) << "Requesting '" << fair::mq::TransportNames.at(fDefaultTransportType) << "' as default transport for the device";
|
||||
fTransportFactory = AddTransport(fDefaultTransportType);
|
||||
|
||||
// Containers to store the uninitialized channels.
|
||||
vector<FairMQChannel*> uninitializedBindingChannels;
|
||||
vector<FairMQChannel*> uninitializedConnectingChannels;
|
||||
|
||||
string networkInterface = fConfig->GetValue<string>("network-interface");
|
||||
|
||||
// Fill the uninitialized channel containers
|
||||
for (auto& mi : fChannels)
|
||||
{
|
||||
@@ -126,11 +134,11 @@ void FairMQDevice::InitWrapper()
|
||||
if (vi->fAddress == "unspecified" || vi->fAddress == "")
|
||||
{
|
||||
// if the configured network interface is default, get its name from the default route
|
||||
if (fNetworkInterface == "default")
|
||||
if (networkInterface == "default")
|
||||
{
|
||||
fNetworkInterface = fair::mq::tools::getDefaultRouteNetworkInterface();
|
||||
networkInterface = fair::mq::tools::getDefaultRouteNetworkInterface();
|
||||
}
|
||||
vi->fAddress = "tcp://" + fair::mq::tools::getInterfaceIP(fNetworkInterface) + ":1";
|
||||
vi->fAddress = "tcp://" + fair::mq::tools::getInterfaceIP(networkInterface) + ":1";
|
||||
}
|
||||
// fill the uninitialized list
|
||||
uninitializedBindingChannels.push_back(&(*vi));
|
||||
@@ -175,10 +183,12 @@ void FairMQDevice::InitWrapper()
|
||||
fInitialValidationCondition.notify_one();
|
||||
}
|
||||
|
||||
int initializationTimeoutInS = fConfig->GetValue<int>("initialization-timeout");
|
||||
|
||||
// go over the list of channels until all are initialized (and removed from the uninitialized list)
|
||||
int numAttempts = 1;
|
||||
auto sleepTimeInMS = 50;
|
||||
auto maxAttempts = fInitializationTimeoutInS * 1000 / sleepTimeInMS;
|
||||
auto maxAttempts = initializationTimeoutInS * 1000 / sleepTimeInMS;
|
||||
// first attempt
|
||||
AttachChannels(uninitializedConnectingChannels);
|
||||
// if not all channels could be connected, update their address values from config and retry
|
||||
@@ -201,9 +211,9 @@ void FairMQDevice::InitWrapper()
|
||||
|
||||
if (numAttempts++ > maxAttempts)
|
||||
{
|
||||
LOG(error) << "could not connect all channels after " << fInitializationTimeoutInS << " attempts";
|
||||
LOG(error) << "could not connect all channels after " << initializationTimeoutInS << " attempts";
|
||||
ChangeState(ERROR_FOUND);
|
||||
// throw runtime_error(fair::mq::tools::ToString("could not connect all channels after ", fInitializationTimeoutInS, " attempts"));
|
||||
// throw runtime_error(fair::mq::tools::ToString("could not connect all channels after ", initializationTimeoutInS, " attempts"));
|
||||
}
|
||||
|
||||
AttachChannels(uninitializedConnectingChannels);
|
||||
@@ -252,19 +262,15 @@ void FairMQDevice::AttachChannels(vector<FairMQChannel*>& chans)
|
||||
|
||||
bool FairMQDevice::AttachChannel(FairMQChannel& ch)
|
||||
{
|
||||
if (!ch.fTransportFactory)
|
||||
if (ch.fTransportType == fair::mq::Transport::DEFAULT || ch.fTransportType == fTransportFactory->GetType())
|
||||
{
|
||||
if (ch.fTransportType == fair::mq::Transport::DEFAULT || ch.fTransportType == fTransportFactory->GetType())
|
||||
{
|
||||
LOG(debug) << ch.fName << ": using default transport";
|
||||
ch.InitTransport(fTransportFactory);
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG(debug) << ch.fName << ": channel transport (" << fair::mq::TransportNames.at(fDefaultTransportType) << ") overriden to " << fair::mq::TransportNames.at(ch.fTransportType);
|
||||
ch.InitTransport(AddTransport(ch.fTransportType));
|
||||
}
|
||||
ch.fTransportType = ch.fTransportFactory->GetType();
|
||||
LOG(debug) << ch.fName << ": using default transport";
|
||||
ch.InitTransport(fTransportFactory);
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG(debug) << ch.fName << ": channel transport (" << fair::mq::TransportNames.at(fDefaultTransportType) << ") overriden to " << fair::mq::TransportNames.at(ch.fTransportType);
|
||||
ch.InitTransport(AddTransport(ch.fTransportType));
|
||||
}
|
||||
|
||||
vector<string> endpoints;
|
||||
@@ -509,20 +515,14 @@ void FairMQDevice::RunWrapper()
|
||||
}
|
||||
else
|
||||
{
|
||||
using Clock = chrono::steady_clock;
|
||||
using TimeScale = chrono::microseconds;
|
||||
const TimeScale::rep period = TimeScale::period::den / fRate;
|
||||
const auto reftime = Clock::now();
|
||||
fair::mq::tools::RateLimiter rateLimiter(fRate);
|
||||
|
||||
while (CheckCurrentState(RUNNING) && ConditionalRun())
|
||||
{
|
||||
if (fRate > 0.001) {
|
||||
auto timespan = static_cast<TimeScale::rep>(chrono::duration_cast<TimeScale>(Clock::now() - reftime).count() - fLastTime);
|
||||
if (timespan < period) {
|
||||
TimeScale sleepfor(period - timespan);
|
||||
this_thread::sleep_for(sleepfor);
|
||||
if (fRate > 0.001)
|
||||
{
|
||||
rateLimiter.maybe_sleep();
|
||||
}
|
||||
fLastTime = chrono::duration_cast<TimeScale>(Clock::now() - reftime).count();
|
||||
}
|
||||
}
|
||||
|
||||
Run();
|
||||
@@ -798,78 +798,10 @@ shared_ptr<FairMQTransportFactory> FairMQDevice::AddTransport(const fair::mq::Tr
|
||||
}
|
||||
}
|
||||
|
||||
void FairMQDevice::CreateOwnConfig()
|
||||
{
|
||||
// TODO: make fConfig a shared_ptr when no old user code has FairMQProgOptions ptr*
|
||||
fConfig = new FairMQProgOptions();
|
||||
|
||||
string id{boost::uuids::to_string(boost::uuids::random_generator()())};
|
||||
LOG(warn) << "No FairMQProgOptions provided, creating one internally and setting device ID to " << id;
|
||||
|
||||
// dummy argc+argv
|
||||
char arg0[] = "undefined"; // executable name
|
||||
char arg1[] = "--id";
|
||||
char* arg2 = const_cast<char*>(id.c_str()); // device ID
|
||||
const char* argv[] = { &arg0[0], &arg1[0], arg2, nullptr };
|
||||
int argc = static_cast<int>((sizeof(argv) / sizeof(argv[0])) - 1);
|
||||
|
||||
fConfig->ParseAll(argc, &argv[0]);
|
||||
|
||||
fId = fConfig->GetValue<string>("id");
|
||||
fNetworkInterface = fConfig->GetValue<string>("network-interface");
|
||||
fNumIoThreads = fConfig->GetValue<int>("io-threads");
|
||||
fInitializationTimeoutInS = fConfig->GetValue<int>("initialization-timeout");
|
||||
fRate = fConfig->GetValue<float>("rate");
|
||||
try {
|
||||
fDefaultTransportType = fair::mq::TransportTypes.at(fConfig->GetValue<string>("transport"));
|
||||
} catch(const exception& e) {
|
||||
LOG(error) << "invalid transport type provided: " << fConfig->GetValue<string>("transport");
|
||||
}
|
||||
}
|
||||
|
||||
void FairMQDevice::SetTransport(const string& transport)
|
||||
{
|
||||
// This method is the first to be called, if FairMQProgOptions are not used (either SetTransport() or SetConfig() make sense, not both).
|
||||
// Make sure here that at least internal config is available.
|
||||
if (!fExternalConfig && !fConfig)
|
||||
{
|
||||
CreateOwnConfig();
|
||||
}
|
||||
|
||||
if (fTransports.empty())
|
||||
{
|
||||
LOG(debug) << "Requesting '" << transport << "' as default transport for the device";
|
||||
fTransportFactory = AddTransport(fair::mq::TransportTypes.at(transport));
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG(error) << "Transports container is not empty when setting transport. Setting default twice?";
|
||||
ChangeState(ERROR_FOUND);
|
||||
}
|
||||
}
|
||||
|
||||
void FairMQDevice::SetConfig(FairMQProgOptions& config)
|
||||
{
|
||||
fExternalConfig = true;
|
||||
fInternalConfig.reset();
|
||||
fConfig = &config;
|
||||
for (auto& c : fConfig->GetFairMQMap())
|
||||
{
|
||||
if (!fChannels.insert(c).second)
|
||||
{
|
||||
LOG(warn) << "did not insert channel '" << c.first << "', it is already in the device.";
|
||||
}
|
||||
}
|
||||
fId = fConfig->GetValue<string>("id");
|
||||
fNetworkInterface = fConfig->GetValue<string>("network-interface");
|
||||
fNumIoThreads = fConfig->GetValue<int>("io-threads");
|
||||
fInitializationTimeoutInS = fConfig->GetValue<int>("initialization-timeout");
|
||||
fRate = fConfig->GetValue<float>("rate");
|
||||
try {
|
||||
fDefaultTransportType = fair::mq::TransportTypes.at(fConfig->GetValue<string>("transport"));
|
||||
} catch(const exception& e) {
|
||||
LOG(error) << "invalid transport type provided: " << fConfig->GetValue<string>("transport");
|
||||
}
|
||||
SetTransport(fConfig->GetValue<string>("transport"));
|
||||
}
|
||||
|
||||
void FairMQDevice::LogSocketRates()
|
||||
@@ -1020,7 +952,7 @@ void FairMQDevice::Reset()
|
||||
for (auto& vi : mi.second)
|
||||
{
|
||||
// vi.fReset = true;
|
||||
vi.fSocket.reset();
|
||||
vi.fSocket.reset(); // destroy FairMQSocket
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1032,10 +964,6 @@ const FairMQChannel& FairMQDevice::GetChannel(const string& channelName, const i
|
||||
|
||||
void FairMQDevice::Exit()
|
||||
{
|
||||
if (!fExternalConfig && fConfig)
|
||||
{
|
||||
delete fConfig;
|
||||
}
|
||||
}
|
||||
|
||||
FairMQDevice::~FairMQDevice()
|
||||
|
@@ -48,9 +48,19 @@ class FairMQDevice : public FairMQStateMachine
|
||||
public:
|
||||
/// Default constructor
|
||||
FairMQDevice();
|
||||
/// Constructor with external FairMQProgOptions
|
||||
FairMQDevice(FairMQProgOptions& config);
|
||||
|
||||
/// Constructor that sets the version
|
||||
FairMQDevice(const fair::mq::tools::Version version);
|
||||
|
||||
/// Constructor that sets the version and external FairMQProgOptions
|
||||
FairMQDevice(FairMQProgOptions& config, const fair::mq::tools::Version version);
|
||||
|
||||
private:
|
||||
FairMQDevice(FairMQProgOptions* config, const fair::mq::tools::Version version);
|
||||
|
||||
public:
|
||||
/// Copy constructor (disabled)
|
||||
FairMQDevice(const FairMQDevice&) = delete;
|
||||
/// Assignment operator (disabled)
|
||||
@@ -294,12 +304,11 @@ class FairMQDevice : public FairMQStateMachine
|
||||
/// Adds a transport to the device if it doesn't exist
|
||||
/// @param transport Transport string ("zeromq"/"nanomsg"/"shmem")
|
||||
std::shared_ptr<FairMQTransportFactory> AddTransport(const fair::mq::Transport transport);
|
||||
/// Sets the default transport for the device
|
||||
/// @param transport Transport string ("zeromq"/"nanomsg"/"shmem")
|
||||
void SetTransport(const std::string& transport = "zeromq");
|
||||
|
||||
/// Assigns config to the device
|
||||
void SetConfig(FairMQProgOptions& config);
|
||||
const FairMQProgOptions* GetConfig() const
|
||||
/// Get pointer to the config
|
||||
FairMQProgOptions* GetConfig() const
|
||||
{
|
||||
return fConfig;
|
||||
}
|
||||
@@ -395,23 +404,29 @@ class FairMQDevice : public FairMQStateMachine
|
||||
|
||||
const fair::mq::tools::Version GetVersion() const { return fVersion; }
|
||||
|
||||
void SetNumIoThreads(int numIoThreads) { fNumIoThreads = numIoThreads; }
|
||||
int GetNumIoThreads() const { return fNumIoThreads; }
|
||||
void SetNumIoThreads(int numIoThreads) { fConfig->SetValue<int>("io-threads", numIoThreads);}
|
||||
int GetNumIoThreads() const { return fConfig->GetValue<int>("io-threads"); }
|
||||
|
||||
void SetPortRangeMin(int portRangeMin) { fPortRangeMin = portRangeMin; }
|
||||
int GetPortRangeMin() const { return fPortRangeMin; }
|
||||
void SetPortRangeMin(int portRangeMin) { fConfig->SetValue<int>("port-range-min", portRangeMin); }
|
||||
int GetPortRangeMin() const { return fConfig->GetValue<int>("port-range-min"); }
|
||||
|
||||
void SetPortRangeMax(int portRangeMax) { fPortRangeMax = portRangeMax; }
|
||||
int GetPortRangeMax() const { return fPortRangeMax; }
|
||||
void SetPortRangeMax(int portRangeMax) { fConfig->SetValue<int>("port-range-max", portRangeMax); }
|
||||
int GetPortRangeMax() const { return fConfig->GetValue<int>("port-range-max"); }
|
||||
|
||||
void SetNetworkInterface(const std::string& networkInterface) { fNetworkInterface = networkInterface; }
|
||||
std::string GetNetworkInterface() const { return fNetworkInterface; }
|
||||
void SetNetworkInterface(const std::string& networkInterface) { fConfig->SetValue<std::string>("network-interface", networkInterface); }
|
||||
std::string GetNetworkInterface() const { return fConfig->GetValue<std::string>("network-interface"); }
|
||||
|
||||
void SetDefaultTransport(const std::string& name) { fDefaultTransportType = fair::mq::TransportTypes.at(name); }
|
||||
std::string GetDefaultTransport() const { return fair::mq::TransportNames.at(fDefaultTransportType); }
|
||||
void SetDefaultTransport(const std::string& name) { fConfig->SetValue<std::string>("transport", name); }
|
||||
std::string GetDefaultTransport() const { return fConfig->GetValue<std::string>("transport"); }
|
||||
|
||||
void SetInitializationTimeoutInS(int initializationTimeoutInS) { fInitializationTimeoutInS = initializationTimeoutInS; }
|
||||
int GetInitializationTimeoutInS() const { return fInitializationTimeoutInS; }
|
||||
void SetInitializationTimeoutInS(int initializationTimeoutInS) { fConfig->SetValue<int>("initialization-timeout", initializationTimeoutInS); }
|
||||
int GetInitializationTimeoutInS() const { return fConfig->GetValue<int>("initialization-timeout"); }
|
||||
|
||||
/// Sets the default transport for the device
|
||||
/// @param transport Transport string ("zeromq"/"nanomsg"/"shmem")
|
||||
void SetTransport(const std::string& transport) { fConfig->SetValue<std::string>("transport", transport); }
|
||||
/// Gets the default transport name
|
||||
std::string GetTransportName() const { return fConfig->GetValue<std::string>("transport"); }
|
||||
|
||||
void SetRawCmdLineArgs(const std::vector<std::string>& args) { fRawCmdLineArgs = args; }
|
||||
std::vector<std::string> GetRawCmdLineArgs() const { return fRawCmdLineArgs; }
|
||||
@@ -424,13 +439,17 @@ class FairMQDevice : public FairMQStateMachine
|
||||
|
||||
public:
|
||||
std::unordered_map<std::string, std::vector<FairMQChannel>> fChannels; ///< Device channels
|
||||
FairMQProgOptions* fConfig; ///< Program options configuration
|
||||
std::unique_ptr<FairMQProgOptions> fInternalConfig; ///< Internal program options configuration
|
||||
FairMQProgOptions* fConfig; ///< Pointer to config (internal or external)
|
||||
|
||||
void AddChannel(const std::string& channelName, const FairMQChannel& channel)
|
||||
{
|
||||
fConfig->AddChannel(channelName, channel);
|
||||
}
|
||||
|
||||
protected:
|
||||
std::string fId; ///< Device ID
|
||||
|
||||
int fNumIoThreads; ///< Number of ZeroMQ I/O threads
|
||||
|
||||
/// Additional user initialization (can be overloaded in child classes). Prefer to use InitTask().
|
||||
/// Executed in a worker thread
|
||||
virtual void Init();
|
||||
@@ -476,11 +495,8 @@ class FairMQDevice : public FairMQStateMachine
|
||||
int fPortRangeMin; ///< Minimum value for the port range (if dynamic)
|
||||
int fPortRangeMax; ///< Maximum value for the port range (if dynamic)
|
||||
|
||||
std::string fNetworkInterface; ///< Network interface to use for dynamic binding
|
||||
fair::mq::Transport fDefaultTransportType; ///< Default transport for the device
|
||||
|
||||
int fInitializationTimeoutInS; ///< Timeout for the initialization (in seconds)
|
||||
|
||||
/// Handles the initialization and the Init() method
|
||||
void InitWrapper();
|
||||
/// Handles the InitTask() method
|
||||
@@ -532,11 +548,8 @@ class FairMQDevice : public FairMQStateMachine
|
||||
std::mutex fMultitransportMutex;
|
||||
std::atomic<bool> fMultitransportProceed;
|
||||
|
||||
bool fExternalConfig;
|
||||
|
||||
const fair::mq::tools::Version fVersion;
|
||||
float fRate; ///< Rate limiting for ConditionalRun
|
||||
size_t fLastTime; ///< Rate limiting for ConditionalRun
|
||||
std::vector<std::string> fRawCmdLineArgs;
|
||||
};
|
||||
|
||||
|
@@ -1,5 +1,5 @@
|
||||
/********************************************************************************
|
||||
* Copyright (C) 2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* Copyright (C) 2017-2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* *
|
||||
* This software is distributed under the terms of the *
|
||||
* GNU Lesser General Public Licence (LGPL) version 3, *
|
||||
@@ -13,6 +13,7 @@
|
||||
#include <fairmq/tools/CppSTL.h>
|
||||
#include <fairmq/tools/Network.h>
|
||||
#include <fairmq/tools/Process.h>
|
||||
#include <fairmq/tools/RateLimit.h>
|
||||
#include <fairmq/tools/Strings.h>
|
||||
#include <fairmq/tools/Unique.h>
|
||||
#include <fairmq/tools/Version.h>
|
||||
|
@@ -1,36 +1,29 @@
|
||||
/********************************************************************************
|
||||
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* Copyright (C) 2014-2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* *
|
||||
* This software is distributed under the terms of the *
|
||||
* GNU Lesser General Public Licence (LGPL) version 3, *
|
||||
* This software is distributed under the terms of the *
|
||||
* GNU Lesser General Public Licence (LGPL) version 3, *
|
||||
* copied verbatim in the file "LICENSE" *
|
||||
********************************************************************************/
|
||||
/**
|
||||
* FairMQBenchmarkSampler.cpp
|
||||
*
|
||||
* @since 2013-04-23
|
||||
* @author D. Klein, A. Rybalchenko
|
||||
*/
|
||||
|
||||
#include "FairMQBenchmarkSampler.h"
|
||||
|
||||
#include <vector>
|
||||
#include <chrono>
|
||||
|
||||
#include <fairmq/Tools.h>
|
||||
#include "../FairMQLogger.h"
|
||||
#include "../options/FairMQProgOptions.h"
|
||||
|
||||
#include <vector>
|
||||
#include <chrono>
|
||||
|
||||
using namespace std;
|
||||
|
||||
FairMQBenchmarkSampler::FairMQBenchmarkSampler()
|
||||
: fSameMessage(true)
|
||||
, fMsgSize(10000)
|
||||
, fMsgCounter(0)
|
||||
, fMsgRate(1)
|
||||
, fMsgRate(0)
|
||||
, fNumIterations(0)
|
||||
, fMaxIterations(0)
|
||||
, fOutChannelName()
|
||||
, fResetMsgCounter()
|
||||
{
|
||||
}
|
||||
|
||||
@@ -42,16 +35,11 @@ void FairMQBenchmarkSampler::InitTask()
|
||||
{
|
||||
fSameMessage = fConfig->GetValue<bool>("same-msg");
|
||||
fMsgSize = fConfig->GetValue<int>("msg-size");
|
||||
fMsgRate = fConfig->GetValue<int>("msg-rate");
|
||||
fMsgRate = fConfig->GetValue<float>("msg-rate");
|
||||
fMaxIterations = fConfig->GetValue<uint64_t>("max-iterations");
|
||||
fOutChannelName = fConfig->GetValue<string>("out-channel");
|
||||
}
|
||||
|
||||
void FairMQBenchmarkSampler::PreRun()
|
||||
{
|
||||
fResetMsgCounter = std::thread(&FairMQBenchmarkSampler::ResetMsgCounter, this);
|
||||
}
|
||||
|
||||
void FairMQBenchmarkSampler::Run()
|
||||
{
|
||||
// store the channel reference to avoid traversing the map on every loop iteration
|
||||
@@ -62,6 +50,8 @@ void FairMQBenchmarkSampler::Run()
|
||||
LOG(info) << "Starting the benchmark with message size of " << fMsgSize << " and " << fMaxIterations << " iterations.";
|
||||
auto tStart = chrono::high_resolution_clock::now();
|
||||
|
||||
fair::mq::tools::RateLimiter rateLimiter(fMsgRate);
|
||||
|
||||
while (CheckCurrentState(RUNNING))
|
||||
{
|
||||
if (fSameMessage)
|
||||
@@ -98,31 +88,14 @@ void FairMQBenchmarkSampler::Run()
|
||||
}
|
||||
}
|
||||
|
||||
--fMsgCounter;
|
||||
|
||||
while (fMsgCounter == 0)
|
||||
if (fMsgRate > 0)
|
||||
{
|
||||
this_thread::sleep_for(chrono::milliseconds(1));
|
||||
rateLimiter.maybe_sleep();
|
||||
}
|
||||
}
|
||||
|
||||
auto tEnd = chrono::high_resolution_clock::now();
|
||||
|
||||
LOG(info) << "Done " << fNumIterations << " iterations in " << chrono::duration<double, milli>(tEnd - tStart).count() << "ms.";
|
||||
|
||||
}
|
||||
|
||||
void FairMQBenchmarkSampler::PostRun()
|
||||
{
|
||||
fResetMsgCounter.join();
|
||||
}
|
||||
|
||||
void FairMQBenchmarkSampler::ResetMsgCounter()
|
||||
{
|
||||
while (CheckCurrentState(RUNNING))
|
||||
{
|
||||
fMsgCounter = fMsgRate / 100;
|
||||
this_thread::sleep_for(chrono::milliseconds(10));
|
||||
}
|
||||
fMsgCounter = -1;
|
||||
}
|
||||
|
@@ -1,16 +1,10 @@
|
||||
/********************************************************************************
|
||||
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* Copyright (C) 2014-2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* *
|
||||
* This software is distributed under the terms of the *
|
||||
* GNU Lesser General Public Licence (LGPL) version 3, *
|
||||
* copied verbatim in the file "LICENSE" *
|
||||
********************************************************************************/
|
||||
/**
|
||||
* FairMQBenchmarkSampler.h
|
||||
*
|
||||
* @since 2013-04-23
|
||||
* @author D. Klein, A. Rybalchenko
|
||||
*/
|
||||
|
||||
#ifndef FAIRMQBENCHMARKSAMPLER_H_
|
||||
#define FAIRMQBENCHMARKSAMPLER_H_
|
||||
@@ -31,20 +25,14 @@ class FairMQBenchmarkSampler : public FairMQDevice
|
||||
FairMQBenchmarkSampler();
|
||||
virtual ~FairMQBenchmarkSampler();
|
||||
|
||||
void PreRun() override;
|
||||
void PostRun() override;
|
||||
|
||||
void ResetMsgCounter();
|
||||
|
||||
protected:
|
||||
bool fSameMessage;
|
||||
int fMsgSize;
|
||||
std::atomic<int> fMsgCounter;
|
||||
int fMsgRate;
|
||||
float fMsgRate;
|
||||
uint64_t fNumIterations;
|
||||
uint64_t fMaxIterations;
|
||||
std::string fOutChannelName;
|
||||
std::thread fResetMsgCounter;
|
||||
|
||||
virtual void InitTask() override;
|
||||
virtual void Run() override;
|
||||
|
@@ -1,5 +1,5 @@
|
||||
/********************************************************************************
|
||||
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* Copyright (C) 2014-2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* *
|
||||
* This software is distributed under the terms of the *
|
||||
* GNU Lesser General Public Licence (LGPL) version 3, *
|
||||
@@ -14,6 +14,8 @@
|
||||
|
||||
#include "FairMQParser.h"
|
||||
#include "FairMQLogger.h"
|
||||
#include <fairmq/Tools.h>
|
||||
|
||||
#include <boost/property_tree/json_parser.hpp>
|
||||
#include <boost/property_tree/ptree.hpp>
|
||||
|
||||
|
@@ -1,5 +1,5 @@
|
||||
/********************************************************************************
|
||||
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* Copyright (C) 2014-2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* *
|
||||
* This software is distributed under the terms of the *
|
||||
* GNU Lesser General Public Licence (LGPL) version 3, *
|
||||
@@ -20,6 +20,9 @@
|
||||
#include "FairMQSuboptParser.h"
|
||||
|
||||
#include <boost/algorithm/string.hpp> // join/split
|
||||
#include <boost/uuid/uuid.hpp>
|
||||
#include <boost/uuid/uuid_generators.hpp>
|
||||
#include <boost/uuid/uuid_io.hpp>
|
||||
|
||||
#include <algorithm>
|
||||
#include <iomanip>
|
||||
@@ -136,16 +139,16 @@ int FairMQProgOptions::ParseAll(const int argc, char const* const* argv, bool al
|
||||
fair::Logger::SetConsoleSeverity(severity);
|
||||
}
|
||||
|
||||
string id;
|
||||
string idForParser;
|
||||
|
||||
// check if config-key for config parser is provided
|
||||
if (fVarMap.count("config-key"))
|
||||
{
|
||||
id = fVarMap["config-key"].as<string>();
|
||||
idForParser = fVarMap["config-key"].as<string>();
|
||||
}
|
||||
else if (fVarMap.count("id"))
|
||||
{
|
||||
id = fVarMap["id"].as<string>();
|
||||
idForParser = fVarMap["id"].as<string>();
|
||||
}
|
||||
|
||||
// check if any config parser is selected
|
||||
@@ -154,12 +157,12 @@ int FairMQProgOptions::ParseAll(const int argc, char const* const* argv, bool al
|
||||
if (fVarMap.count("mq-config"))
|
||||
{
|
||||
LOG(debug) << "mq-config: Using default JSON parser";
|
||||
UpdateChannelMap(parser::JSON().UserParser(fVarMap.at("mq-config").as<string>(), id));
|
||||
UpdateChannelMap(parser::JSON().UserParser(fVarMap.at("mq-config").as<string>(), idForParser));
|
||||
}
|
||||
else if (fVarMap.count("channel-config"))
|
||||
{
|
||||
LOG(debug) << "channel-config: Parsing channel configuration";
|
||||
UpdateChannelMap(parser::SUBOPT().UserParser(fVarMap.at("channel-config").as<vector<string>>(), id));
|
||||
UpdateChannelMap(parser::SUBOPT().UserParser(fVarMap.at("channel-config").as<vector<string>>(), idForParser));
|
||||
}
|
||||
else
|
||||
{
|
||||
@@ -184,6 +187,8 @@ int FairMQProgOptions::ParseAll(const int argc, char const* const* argv, bool al
|
||||
|
||||
void FairMQProgOptions::ParseCmdLine(const int argc, char const* const* argv, bool allowUnregistered)
|
||||
{
|
||||
fVarMap.clear();
|
||||
|
||||
// get options from cmd line and store in variable map
|
||||
// here we use command_line_parser instead of parse_command_line, to allow unregistered and positional options
|
||||
if (allowUnregistered)
|
||||
@@ -205,8 +210,7 @@ void FairMQProgOptions::ParseCmdLine(const int argc, char const* const* argv, bo
|
||||
|
||||
void FairMQProgOptions::ParseDefaults()
|
||||
{
|
||||
vector<string> emptyArgs;
|
||||
emptyArgs.push_back("dummy");
|
||||
vector<string> emptyArgs = {"dummy", "--id", boost::uuids::to_string(boost::uuids::random_generator()())};
|
||||
|
||||
vector<const char*> argv(emptyArgs.size());
|
||||
|
||||
@@ -423,7 +427,7 @@ int FairMQProgOptions::PrintOptions()
|
||||
{
|
||||
ss << setfill(' ') << left
|
||||
<< setw(maxLenKey) << p.first << " = "
|
||||
<< setw(maxLenValue) << p.second.value
|
||||
<< setw(maxLenValue) << p.second.value << " "
|
||||
<< setw(maxLenType) << p.second.type
|
||||
<< setw(maxLenDefault) << p.second.defaulted
|
||||
<< "\n";
|
||||
|
@@ -1,5 +1,5 @@
|
||||
/********************************************************************************
|
||||
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* Copyright (C) 2014-2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* *
|
||||
* This software is distributed under the terms of the *
|
||||
* GNU Lesser General Public Licence (LGPL) version 3, *
|
||||
@@ -162,6 +162,11 @@ class FairMQProgOptions
|
||||
int PrintOptions();
|
||||
int PrintOptionsRaw();
|
||||
|
||||
void AddChannel(const std::string& channelName, const FairMQChannel& channel)
|
||||
{
|
||||
fFairMQChannelMap[channelName].push_back(channel);
|
||||
}
|
||||
|
||||
private:
|
||||
struct ChannelKey
|
||||
{
|
||||
|
@@ -229,7 +229,7 @@ auto Control::WaitForNextState() -> DeviceState
|
||||
unique_lock<mutex> lock{fEventsMutex};
|
||||
while (fEvents.empty())
|
||||
{
|
||||
fNewEvent.wait(lock);
|
||||
fNewEvent.wait_for(lock, chrono::milliseconds(50));
|
||||
}
|
||||
|
||||
auto result = fEvents.front();
|
||||
|
@@ -12,7 +12,6 @@
|
||||
|
||||
#include <iostream>
|
||||
#include <exception>
|
||||
#include <sstream>
|
||||
#include <thread>
|
||||
#include <atomic>
|
||||
#include <unistd.h>
|
||||
@@ -34,24 +33,27 @@ int main(int argc, char* argv[])
|
||||
{
|
||||
try {
|
||||
string sessionID;
|
||||
char commandChar;
|
||||
char command;
|
||||
string topologyPath;
|
||||
|
||||
bpo::options_description options("fairmq-dds-command-ui options");
|
||||
options.add_options()
|
||||
("session,s", bpo::value<string>(&sessionID)->required(), "DDS Session ID")
|
||||
("command,c", bpo::value<char> (&commandChar)->default_value(' '), "Command character")
|
||||
("session,s", bpo::value<string> (&sessionID)->required(), "DDS Session ID")
|
||||
("command,c", bpo::value<char> (&command)->default_value(' '), "Command character")
|
||||
("path,p", bpo::value<string> (&topologyPath)->default_value(""), "DDS Topology path to send command to")
|
||||
("help,h", "Produce help message");
|
||||
|
||||
bpo::variables_map vm;
|
||||
bpo::store(bpo::command_line_parser(argc, argv).options(options).run(), vm);
|
||||
bpo::notify(vm);
|
||||
|
||||
if (vm.count("help")) {
|
||||
cout << "FairMQ DDS Command UI" << endl << options << endl;
|
||||
cout << "possible command characters: [c] check states, [o] dump config, [h] help, [p] pause, [r] run, [s] stop, [t] reset task, [d] reset device, [q] end, [j] init task, [i] init device" << endl;
|
||||
cout << "Commands: [c] check state, [o] dump config, [h] help, [p] pause, [r] run, [s] stop, [t] reset task, [d] reset device, [q] end, [j] init task, [i] init device" << endl;
|
||||
return EXIT_SUCCESS;
|
||||
}
|
||||
|
||||
bpo::notify(vm);
|
||||
|
||||
CIntercomService service;
|
||||
CCustomCmd ddsCustomCmd(service);
|
||||
|
||||
@@ -61,7 +63,7 @@ int main(int argc, char* argv[])
|
||||
|
||||
// subscribe to receive messages from DDS
|
||||
ddsCustomCmd.subscribe([](const string& msg, const string& /*condition*/, uint64_t /*senderId*/) {
|
||||
cout << "Received: " << msg << endl;
|
||||
cout << "Received: " << endl << msg << endl;
|
||||
});
|
||||
|
||||
service.start(sessionID);
|
||||
@@ -74,8 +76,8 @@ int main(int argc, char* argv[])
|
||||
t.c_lflag &= ~ICANON; // disable canonical input
|
||||
tcsetattr(STDIN_FILENO, TCSANOW, &t); // apply the new settings
|
||||
|
||||
if (commandChar != ' ') {
|
||||
cin.putback(commandChar);
|
||||
if (command != ' ') {
|
||||
cin.putback(command);
|
||||
} else {
|
||||
PrintControlsHelp();
|
||||
}
|
||||
@@ -84,39 +86,39 @@ int main(int argc, char* argv[])
|
||||
switch (c) {
|
||||
case 'c':
|
||||
cout << " > checking state of the devices" << endl;
|
||||
ddsCustomCmd.send("check-state", "");
|
||||
ddsCustomCmd.send("check-state", topologyPath);
|
||||
break;
|
||||
case 'o':
|
||||
cout << " > dumping config of the devices" << endl;
|
||||
ddsCustomCmd.send("dump-config", "");
|
||||
ddsCustomCmd.send("dump-config", topologyPath);
|
||||
break;
|
||||
case 'i':
|
||||
cout << " > init devices" << endl;
|
||||
ddsCustomCmd.send("INIT DEVICE", "");
|
||||
ddsCustomCmd.send("INIT DEVICE", topologyPath);
|
||||
break;
|
||||
case 'j':
|
||||
cout << " > init tasks" << endl;
|
||||
ddsCustomCmd.send("INIT TASK", "");
|
||||
ddsCustomCmd.send("INIT TASK", topologyPath);
|
||||
break;
|
||||
case 'p':
|
||||
cout << " > pause devices" << endl;
|
||||
ddsCustomCmd.send("PAUSE", "");
|
||||
ddsCustomCmd.send("PAUSE", topologyPath);
|
||||
break;
|
||||
case 'r':
|
||||
cout << " > run tasks" << endl;
|
||||
ddsCustomCmd.send("RUN", "");
|
||||
ddsCustomCmd.send("RUN", topologyPath);
|
||||
break;
|
||||
case 's':
|
||||
cout << " > stop devices" << endl;
|
||||
ddsCustomCmd.send("STOP", "");
|
||||
ddsCustomCmd.send("STOP", topologyPath);
|
||||
break;
|
||||
case 't':
|
||||
cout << " > reset tasks" << endl;
|
||||
ddsCustomCmd.send("RESET TASK", "");
|
||||
ddsCustomCmd.send("RESET TASK", topologyPath);
|
||||
break;
|
||||
case 'd':
|
||||
cout << " > reset devices" << endl;
|
||||
ddsCustomCmd.send("RESET DEVICE", "");
|
||||
ddsCustomCmd.send("RESET DEVICE", topologyPath);
|
||||
break;
|
||||
case 'h':
|
||||
cout << " > help" << endl;
|
||||
@@ -124,7 +126,7 @@ int main(int argc, char* argv[])
|
||||
break;
|
||||
case 'q':
|
||||
cout << " > end" << endl;
|
||||
ddsCustomCmd.send("END", "");
|
||||
ddsCustomCmd.send("END", topologyPath);
|
||||
break;
|
||||
default:
|
||||
cout << "Invalid input: [" << c << "]" << endl;
|
||||
@@ -132,8 +134,8 @@ int main(int argc, char* argv[])
|
||||
break;
|
||||
}
|
||||
|
||||
if (commandChar != ' ') {
|
||||
usleep(50000);
|
||||
if (command != ' ') {
|
||||
this_thread::sleep_for(chrono::milliseconds(100)); // give dds a chance to complete request
|
||||
return EXIT_SUCCESS;
|
||||
}
|
||||
}
|
||||
|
@@ -1,5 +1,5 @@
|
||||
/********************************************************************************
|
||||
* Copyright (C) 2014-2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* Copyright (C) 2014-2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* *
|
||||
* This software is distributed under the terms of the *
|
||||
* GNU Lesser General Public Licence (LGPL) version 3, *
|
||||
@@ -18,7 +18,7 @@ void addCustomOptions(bpo::options_description& options)
|
||||
("same-msg", bpo::value<bool>()->default_value(false), "Re-send the same message, or recreate for each iteration")
|
||||
("msg-size", bpo::value<int>()->default_value(1000000), "Message size in bytes")
|
||||
("max-iterations", bpo::value<uint64_t>()->default_value(0), "Number of run iterations (0 - infinite)")
|
||||
("msg-rate", bpo::value<int>()->default_value(0), "Msg rate limit in maximum number of messages per second");
|
||||
("msg-rate", bpo::value<float>()->default_value(0), "Msg rate limit in maximum number of messages per second");
|
||||
}
|
||||
|
||||
FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/)
|
||||
|
@@ -246,157 +246,180 @@ int FairMQSocketSHM::ReceiveImpl(FairMQMessagePtr& msg, const int flags, const i
|
||||
}
|
||||
}
|
||||
|
||||
int64_t FairMQSocketSHM::SendImpl(vector<FairMQMessagePtr>& msgVec, const int flags, const int /*timeout*/)
|
||||
int64_t FairMQSocketSHM::SendImpl(vector<FairMQMessagePtr>& msgVec, const int flags, const int timeout)
|
||||
{
|
||||
const unsigned int vecSize = msgVec.size();
|
||||
int64_t totalSize = 0;
|
||||
int elapsed = 0;
|
||||
|
||||
if (vecSize == 1) {
|
||||
return Send(msgVec.back(), flags);
|
||||
return SendImpl(msgVec.back(), flags, timeout);
|
||||
}
|
||||
|
||||
// put it into zmq message
|
||||
zmq_msg_t lZmqMsg;
|
||||
zmq_msg_init_size(&lZmqMsg, vecSize * sizeof(MetaHeader));
|
||||
zmq_msg_t zmqMsg;
|
||||
zmq_msg_init_size(&zmqMsg, vecSize * sizeof(MetaHeader));
|
||||
|
||||
// prepare the message with shm metas
|
||||
MetaHeader *lMetas = static_cast<MetaHeader*>(zmq_msg_data(&lZmqMsg));
|
||||
MetaHeader* metas = static_cast<MetaHeader*>(zmq_msg_data(&zmqMsg));
|
||||
|
||||
for (auto &lMsg : msgVec)
|
||||
for (auto &msg : msgVec)
|
||||
{
|
||||
zmq_msg_t *lMetaMsg = static_cast<FairMQMessageSHM*>(lMsg.get())->GetMessage();
|
||||
memcpy(lMetas++, zmq_msg_data(lMetaMsg), sizeof(MetaHeader));
|
||||
zmq_msg_t* metaMsg = static_cast<FairMQMessageSHM*>(msg.get())->GetMessage();
|
||||
memcpy(metas++, zmq_msg_data(metaMsg), sizeof(MetaHeader));
|
||||
}
|
||||
|
||||
while (!fInterrupted)
|
||||
{
|
||||
int nbytes = -1;
|
||||
nbytes = zmq_msg_send(&lZmqMsg, fSocket, flags);
|
||||
nbytes = zmq_msg_send(&zmqMsg, fSocket, flags);
|
||||
|
||||
if (nbytes == 0)
|
||||
{
|
||||
zmq_msg_close (&lZmqMsg);
|
||||
zmq_msg_close(&zmqMsg);
|
||||
return nbytes;
|
||||
}
|
||||
else if (nbytes > 0)
|
||||
{
|
||||
assert(nbytes == (vecSize * sizeof(MetaHeader))); // all or nothing
|
||||
|
||||
for (auto &lMsg : msgVec)
|
||||
for (auto &msg : msgVec)
|
||||
{
|
||||
FairMQMessageSHM *lShmMsg = static_cast<FairMQMessageSHM*>(lMsg.get());
|
||||
lShmMsg->fQueued = true;
|
||||
totalSize += lShmMsg->fSize;
|
||||
FairMQMessageSHM* shmMsg = static_cast<FairMQMessageSHM*>(msg.get());
|
||||
shmMsg->fQueued = true;
|
||||
totalSize += shmMsg->fSize;
|
||||
}
|
||||
|
||||
// store statistics on how many messages have been sent
|
||||
fMessagesTx++;
|
||||
fBytesTx += totalSize;
|
||||
|
||||
zmq_msg_close (&lZmqMsg);
|
||||
zmq_msg_close(&zmqMsg);
|
||||
return totalSize;
|
||||
}
|
||||
else if (zmq_errno() == EAGAIN)
|
||||
{
|
||||
if (!fInterrupted && ((flags & ZMQ_DONTWAIT) == 0))
|
||||
{
|
||||
if (timeout)
|
||||
{
|
||||
elapsed += fSndTimeout;
|
||||
if (elapsed >= timeout)
|
||||
{
|
||||
zmq_msg_close(&zmqMsg);
|
||||
return -2;
|
||||
}
|
||||
}
|
||||
continue;
|
||||
}
|
||||
else
|
||||
{
|
||||
zmq_msg_close (&lZmqMsg);
|
||||
zmq_msg_close(&zmqMsg);
|
||||
return -2;
|
||||
}
|
||||
}
|
||||
else if (zmq_errno() == ETERM)
|
||||
{
|
||||
zmq_msg_close (&lZmqMsg);
|
||||
zmq_msg_close(&zmqMsg);
|
||||
LOG(info) << "terminating socket " << fId;
|
||||
return -1;
|
||||
}
|
||||
else
|
||||
{
|
||||
zmq_msg_close (&lZmqMsg);
|
||||
zmq_msg_close(&zmqMsg);
|
||||
LOG(error) << "Failed sending on socket " << fId << ", reason: " << zmq_strerror(errno);
|
||||
return nbytes;
|
||||
}
|
||||
}
|
||||
|
||||
zmq_msg_close(&zmqMsg);
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
||||
int64_t FairMQSocketSHM::ReceiveImpl(vector<FairMQMessagePtr>& msgVec, const int flags, const int /*timeout*/)
|
||||
int64_t FairMQSocketSHM::ReceiveImpl(vector<FairMQMessagePtr>& msgVec, const int flags, const int timeout)
|
||||
{
|
||||
int64_t totalSize = 0;
|
||||
int elapsed = 0;
|
||||
|
||||
zmq_msg_t zmqMsg;
|
||||
zmq_msg_init(&zmqMsg);
|
||||
|
||||
while (!fInterrupted)
|
||||
{
|
||||
zmq_msg_t lRcvMsg;
|
||||
zmq_msg_init(&lRcvMsg);
|
||||
int nbytes = zmq_msg_recv(&lRcvMsg, fSocket, flags);
|
||||
int nbytes = zmq_msg_recv(&zmqMsg, fSocket, flags);
|
||||
if (nbytes == 0)
|
||||
{
|
||||
zmq_msg_close (&lRcvMsg);
|
||||
zmq_msg_close(&zmqMsg);
|
||||
return 0;
|
||||
}
|
||||
else if (nbytes > 0)
|
||||
{
|
||||
MetaHeader* lHdrVec = static_cast<MetaHeader*>(zmq_msg_data(&lRcvMsg));
|
||||
const auto lHdrVecSize = zmq_msg_size(&lRcvMsg);
|
||||
assert(lHdrVecSize > 0);
|
||||
assert(lHdrVecSize % sizeof(MetaHeader) == 0);
|
||||
MetaHeader* hdrVec = static_cast<MetaHeader*>(zmq_msg_data(&zmqMsg));
|
||||
const auto hdrVecSize = zmq_msg_size(&zmqMsg);
|
||||
assert(hdrVecSize > 0);
|
||||
assert(hdrVecSize % sizeof(MetaHeader) == 0);
|
||||
|
||||
const auto lNumMessages = lHdrVecSize / sizeof (MetaHeader);
|
||||
const auto numMessages = hdrVecSize / sizeof(MetaHeader);
|
||||
|
||||
msgVec.reserve(lNumMessages);
|
||||
msgVec.reserve(numMessages);
|
||||
|
||||
for (size_t m = 0; m < lNumMessages; m++)
|
||||
for (size_t m = 0; m < numMessages; m++)
|
||||
{
|
||||
MetaHeader lMetaHeader;
|
||||
memcpy(&lMetaHeader, &lHdrVec[m], sizeof(MetaHeader));
|
||||
MetaHeader metaHeader;
|
||||
memcpy(&metaHeader, &hdrVec[m], sizeof(MetaHeader));
|
||||
|
||||
msgVec.emplace_back(fair::mq::tools::make_unique<FairMQMessageSHM>(fManager));
|
||||
|
||||
FairMQMessageSHM *lMsg = static_cast<FairMQMessageSHM*>(msgVec.back().get());
|
||||
MetaHeader *lMsgHdr = static_cast<MetaHeader*>(zmq_msg_data(lMsg->GetMessage()));
|
||||
FairMQMessageSHM* msg = static_cast<FairMQMessageSHM*>(msgVec.back().get());
|
||||
MetaHeader* msgHdr = static_cast<MetaHeader*>(zmq_msg_data(msg->GetMessage()));
|
||||
|
||||
memcpy(lMsgHdr, &lMetaHeader, sizeof(MetaHeader));
|
||||
memcpy(msgHdr, &metaHeader, sizeof(MetaHeader));
|
||||
|
||||
lMsg->fHandle = lMetaHeader.fHandle;
|
||||
lMsg->fSize = lMetaHeader.fSize;
|
||||
lMsg->fRegionId = lMetaHeader.fRegionId;
|
||||
lMsg->fHint = lMetaHeader.fHint;
|
||||
msg->fHandle = metaHeader.fHandle;
|
||||
msg->fSize = metaHeader.fSize;
|
||||
msg->fRegionId = metaHeader.fRegionId;
|
||||
msg->fHint = metaHeader.fHint;
|
||||
|
||||
totalSize += lMsg->GetSize();
|
||||
totalSize += msg->GetSize();
|
||||
}
|
||||
|
||||
// store statistics on how many messages have been received (handle all parts as a single message)
|
||||
fMessagesRx++;
|
||||
fBytesRx += totalSize;
|
||||
|
||||
zmq_msg_close (&lRcvMsg);
|
||||
zmq_msg_close(&zmqMsg);
|
||||
return totalSize;
|
||||
}
|
||||
else if (zmq_errno() == EAGAIN)
|
||||
{
|
||||
zmq_msg_close(&lRcvMsg);
|
||||
if (!fInterrupted && ((flags & ZMQ_DONTWAIT) == 0))
|
||||
{
|
||||
if (timeout)
|
||||
{
|
||||
elapsed += fRcvTimeout;
|
||||
if (elapsed >= timeout)
|
||||
{
|
||||
zmq_msg_close(&zmqMsg);
|
||||
return -2;
|
||||
}
|
||||
}
|
||||
continue;
|
||||
}
|
||||
else
|
||||
{
|
||||
zmq_msg_close(&zmqMsg);
|
||||
return -2;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
zmq_msg_close (&lRcvMsg);
|
||||
zmq_msg_close(&zmqMsg);
|
||||
LOG(error) << "Failed receiving on socket " << fId << ", reason: " << zmq_strerror(errno);
|
||||
return nbytes;
|
||||
}
|
||||
}
|
||||
|
||||
zmq_msg_close(&zmqMsg);
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
136
fairmq/tools/RateLimit.h
Normal file
136
fairmq/tools/RateLimit.h
Normal file
@@ -0,0 +1,136 @@
|
||||
/********************************************************************************
|
||||
* Copyright (C) 2014-2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* *
|
||||
* This software is distributed under the terms of the *
|
||||
* GNU Lesser General Public Licence (LGPL) version 3, *
|
||||
* copied verbatim in the file "LICENSE" *
|
||||
********************************************************************************/
|
||||
|
||||
#ifndef FAIR_MQ_TOOLS_RATELIMIT_H
|
||||
#define FAIR_MQ_TOOLS_RATELIMIT_H
|
||||
|
||||
#include <cassert>
|
||||
#include <string>
|
||||
#include <iostream>
|
||||
#include <iomanip>
|
||||
#include <thread>
|
||||
#include <chrono>
|
||||
|
||||
namespace fair
|
||||
{
|
||||
namespace mq
|
||||
{
|
||||
namespace tools
|
||||
{
|
||||
|
||||
/**
|
||||
* Objects of type RateLimiter can be used to limit a loop to a given rate of iterations per second.
|
||||
*
|
||||
* Example:
|
||||
* \code
|
||||
* RateLimiter limit(100); // 100 Hz
|
||||
* while (do_more_work()) {
|
||||
* work();
|
||||
* limit.maybe_sleep(); // this needs to be at the end of the loop for a
|
||||
* // correct time measurement of the first iterations
|
||||
* }
|
||||
* \endcode
|
||||
*/
|
||||
class RateLimiter
|
||||
{
|
||||
using clock = std::chrono::steady_clock;
|
||||
|
||||
public:
|
||||
/**
|
||||
* Constructs a rate limiter.
|
||||
*
|
||||
* \param rate Work rate in Hz (calls to maybe_sleep per second). Values less than/equal
|
||||
* to 0 set the rate to 1 GHz (which is impossible to achieve, even with a
|
||||
* loop that only calls RateLimiter::maybe_sleep).
|
||||
*/
|
||||
RateLimiter(float rate) : tw_req(std::chrono::seconds(1)), start_time(clock::now())
|
||||
{
|
||||
if (rate <= 0) {
|
||||
tw_req = std::chrono::nanoseconds(1);
|
||||
} else {
|
||||
tw_req = std::chrono::duration_cast<clock::duration>(tw_req / rate);
|
||||
}
|
||||
skip_check_count = std::max(1, int(std::chrono::milliseconds(5) / tw_req));
|
||||
count = skip_check_count;
|
||||
//std::cerr << "skip_check_count: " << skip_check_count << '\n';
|
||||
}
|
||||
|
||||
/**
|
||||
* Call this function at the end of the iteration rate limited loop.
|
||||
*
|
||||
* This function might use `std::this_thread::sleep_for` to limit the iteration rate. If no sleeps
|
||||
* are necessary, the function will back off checking for the time to further allow increased
|
||||
* iteration rates (until the requested rate or 1s between rechecks is reached).
|
||||
*/
|
||||
void maybe_sleep()
|
||||
{
|
||||
using namespace std::chrono;
|
||||
if (--count == 0) {
|
||||
auto now = clock::now();
|
||||
if (tw == clock::duration::zero()) {
|
||||
tw = (now - start_time) / skip_check_count;
|
||||
} else {
|
||||
tw = (1 * tw + 3 * (now - start_time) / skip_check_count) / 4;
|
||||
}
|
||||
//std::ostringstream s; s << "tw = " << std::setw(10) << duration_cast<nanoseconds>(tw).count() << "ns, req = " << duration_cast<nanoseconds>(tw_req).count() << "ns, ";
|
||||
if (tw > tw_req * 65 / 64) {
|
||||
// the time between maybe_sleep calls is more than 1% too long
|
||||
// fix it by reducing ts towards 0 and if ts = 0 doesn't suffice, increase
|
||||
// skip_check_count
|
||||
if (ts > clock::duration::zero()) {
|
||||
ts = std::max(clock::duration::zero(),
|
||||
ts - (tw - tw_req) * skip_check_count * 1 / 2);
|
||||
//std::cerr << s.str() << "maybe_sleep: going too slow; sleep less: " << duration_cast<microseconds>(ts).count() << "µs\n";
|
||||
} else {
|
||||
skip_check_count =
|
||||
std::min(int(seconds(1) / tw_req), // recheck at least every second
|
||||
(skip_check_count * 5 + 3) / 4);
|
||||
//std::cerr << s.str() << "maybe_sleep: going too slow; work more: " << skip_check_count << "\n";
|
||||
}
|
||||
} else if (tw < tw_req * 63 / 64) {
|
||||
// the time between maybe_sleep calls is more than 1% too short
|
||||
// fix it by reducing skip_check_count towards 1 and if skip_check_count = 1
|
||||
// doesn't suffice, increase ts
|
||||
|
||||
// The minimum work count is defined such that a typical sleep time is greater
|
||||
// than 1ms.
|
||||
// The user requested 1/tw_req work iterations per second. Divided by 1000, that's
|
||||
// the count per ms.
|
||||
const int min_skip_count = std::max(1, int(milliseconds(5) / tw_req));
|
||||
if (skip_check_count > min_skip_count) {
|
||||
assert(ts == clock::duration::zero());
|
||||
skip_check_count = std::max(min_skip_count, skip_check_count * 3 / 4);
|
||||
//std::cerr << s.str() << "maybe_sleep: going too fast; work less: " << skip_check_count << "\n";
|
||||
} else {
|
||||
ts += (tw_req - tw) * (skip_check_count * 7) / 8;
|
||||
//std::cerr << s.str() << "maybe_sleep: going too fast; sleep more: " << duration_cast<microseconds>(ts).count() << "µs\n";
|
||||
}
|
||||
}
|
||||
|
||||
start_time = now;
|
||||
count = skip_check_count;
|
||||
if (ts > clock::duration::zero()) {
|
||||
std::this_thread::sleep_for(ts);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private:
|
||||
clock::duration tw{}, //! deduced duration between maybe_sleep calls
|
||||
ts{}, //! sleep duration
|
||||
tw_req; //! requested duration between maybe_sleep calls
|
||||
clock::time_point start_time;
|
||||
int count = 1;
|
||||
int skip_check_count = 1;
|
||||
};
|
||||
|
||||
} /* namespace tools */
|
||||
} /* namespace mq */
|
||||
} /* namespace fair */
|
||||
|
||||
#endif // FAIR_MQ_TOOLS_RATELIMIT_H
|
@@ -283,7 +283,7 @@ int64_t FairMQSocketZMQ::SendImpl(vector<FairMQMessagePtr>& msgVec, const int fl
|
||||
} // If there's only one part, send it as a regular message
|
||||
else if (vecSize == 1)
|
||||
{
|
||||
return Send(msgVec.back(), flags);
|
||||
return SendImpl(msgVec.back(), flags, timeout);
|
||||
}
|
||||
else // if the vector is empty, something might be wrong
|
||||
{
|
||||
|
@@ -1,5 +1,5 @@
|
||||
################################################################################
|
||||
# Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
|
||||
# Copyright (C) 2014-2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
|
||||
# #
|
||||
# This software is distributed under the terms of the #
|
||||
# GNU Lesser General Public Licence (LGPL) version 3, #
|
||||
@@ -30,6 +30,9 @@ add_testhelper(runTestDevice
|
||||
LINKS FairMQ
|
||||
)
|
||||
|
||||
if(BUILD_NANOMSG_TRANSPORT)
|
||||
set(definitions DEFINITIONS BUILD_NANOMSG_TRANSPORT)
|
||||
endif()
|
||||
|
||||
set(MQ_CONFIG "${CMAKE_BINARY_DIR}/test/testsuite_FairMQ.IOPatterns_config.json")
|
||||
set(RUN_TEST_DEVICE "${CMAKE_BINARY_DIR}/test/testhelper_runTestDevice")
|
||||
@@ -55,6 +58,7 @@ add_testsuite(FairMQ.Protocols
|
||||
${CMAKE_CURRENT_BINARY_DIR}
|
||||
TIMEOUT 30
|
||||
RUN_SERIAL ON
|
||||
${definitions}
|
||||
)
|
||||
|
||||
add_testsuite(FairMQ.Parts
|
||||
@@ -77,16 +81,17 @@ add_testsuite(FairMQ.MessageResize
|
||||
INCLUDES ${CMAKE_CURRENT_SOURCE_DIR}/message_resize
|
||||
${CMAKE_CURRENT_BINARY_DIR}
|
||||
TIMEOUT 5
|
||||
${definitions}
|
||||
)
|
||||
|
||||
add_testsuite(FairMQ.Device
|
||||
SOURCES
|
||||
device/TestSender.h
|
||||
device/TestReceiver.h
|
||||
device/TestVersion.h
|
||||
device/runner.cxx
|
||||
device/_multiple_devices.cxx
|
||||
device/_device_version.cxx
|
||||
device/_device_config.cxx
|
||||
|
||||
LINKS FairMQ
|
||||
INCLUDES ${CMAKE_CURRENT_SOURCE_DIR}/device
|
||||
|
@@ -1,30 +0,0 @@
|
||||
/********************************************************************************
|
||||
* Copyright (C) 2015-2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* *
|
||||
* This software is distributed under the terms of the *
|
||||
* GNU Lesser General Public Licence (LGPL) version 3, *
|
||||
* copied verbatim in the file "LICENSE" *
|
||||
********************************************************************************/
|
||||
|
||||
#include <FairMQDevice.h>
|
||||
#include <FairMQLogger.h>
|
||||
#include <options/FairMQProgOptions.h>
|
||||
|
||||
namespace fair
|
||||
{
|
||||
namespace mq
|
||||
{
|
||||
namespace test
|
||||
{
|
||||
|
||||
class TestVersion : public FairMQDevice
|
||||
{
|
||||
public:
|
||||
TestVersion(fair::mq::tools::Version version)
|
||||
: FairMQDevice(version)
|
||||
{}
|
||||
};
|
||||
|
||||
} // namespace test
|
||||
} // namespace mq
|
||||
} // namespace fair
|
122
test/device/_device_config.cxx
Normal file
122
test/device/_device_config.cxx
Normal file
@@ -0,0 +1,122 @@
|
||||
/********************************************************************************
|
||||
* Copyright (C) 2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* *
|
||||
* This software is distributed under the terms of the *
|
||||
* GNU Lesser General Public Licence (LGPL) version 3, *
|
||||
* copied verbatim in the file "LICENSE" *
|
||||
********************************************************************************/
|
||||
|
||||
#include <FairMQDevice.h>
|
||||
#include <options/FairMQProgOptions.h>
|
||||
|
||||
#include <fairmq/Tools.h>
|
||||
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
#include <sstream> // std::stringstream
|
||||
#include <thread>
|
||||
|
||||
namespace
|
||||
{
|
||||
|
||||
using namespace std;
|
||||
|
||||
void control(FairMQDevice& device)
|
||||
{
|
||||
device.ChangeState("INIT_DEVICE");
|
||||
device.WaitForEndOfState("INIT_DEVICE");
|
||||
device.ChangeState("INIT_TASK");
|
||||
device.WaitForEndOfState("INIT_TASK");
|
||||
|
||||
device.ChangeState("RUN");
|
||||
device.WaitForEndOfState("RUN");
|
||||
|
||||
device.ChangeState("RESET_TASK");
|
||||
device.WaitForEndOfState("RESET_TASK");
|
||||
device.ChangeState("RESET_DEVICE");
|
||||
device.WaitForEndOfState("RESET_DEVICE");
|
||||
|
||||
device.ChangeState("END");
|
||||
}
|
||||
|
||||
class DeviceConfig : public ::testing::Test
|
||||
{
|
||||
public:
|
||||
DeviceConfig()
|
||||
{}
|
||||
|
||||
string TestDeviceSetConfig(const string& transport)
|
||||
{
|
||||
FairMQProgOptions config;
|
||||
|
||||
vector<string> emptyArgs = {"dummy", "--id", "test", "--color", "false"};
|
||||
|
||||
if (config.ParseAll(emptyArgs, true))
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
config.SetValue<string>("transport", transport);
|
||||
|
||||
FairMQDevice device;
|
||||
device.SetConfig(config);
|
||||
|
||||
FairMQChannel channel;
|
||||
channel.UpdateType("pub");
|
||||
channel.UpdateMethod("connect");
|
||||
channel.UpdateAddress("tcp://localhost:5558");
|
||||
device.AddChannel("data", channel);
|
||||
|
||||
thread t(control, ref(device));
|
||||
|
||||
device.RunStateMachine();
|
||||
|
||||
if (t.joinable())
|
||||
{
|
||||
t.join();
|
||||
}
|
||||
|
||||
return device.GetTransportName();
|
||||
}
|
||||
|
||||
string TestDeviceSetTransport(const string& transport)
|
||||
{
|
||||
FairMQDevice device;
|
||||
device.SetTransport(transport);
|
||||
|
||||
FairMQChannel channel;
|
||||
channel.UpdateType("pub");
|
||||
channel.UpdateMethod("connect");
|
||||
channel.UpdateAddress("tcp://localhost:5558");
|
||||
device.AddChannel("data", channel);
|
||||
|
||||
std::thread t(control, std::ref(device));
|
||||
|
||||
device.RunStateMachine();
|
||||
|
||||
if (t.joinable())
|
||||
{
|
||||
t.join();
|
||||
}
|
||||
|
||||
return device.GetTransportName();
|
||||
}
|
||||
};
|
||||
|
||||
TEST_F(DeviceConfig, SetConfig)
|
||||
{
|
||||
string transport = "zeromq";
|
||||
string returnedTransport = TestDeviceSetConfig(transport);
|
||||
|
||||
EXPECT_EQ(transport, returnedTransport);
|
||||
}
|
||||
|
||||
TEST_F(DeviceConfig, SetTransport)
|
||||
{
|
||||
string transport = "zeromq";
|
||||
string returnedTransport = TestDeviceSetTransport(transport);
|
||||
|
||||
EXPECT_EQ(transport, returnedTransport);
|
||||
}
|
||||
|
||||
} // namespace
|
@@ -1,12 +1,12 @@
|
||||
/********************************************************************************
|
||||
* Copyright (C) 2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* Copyright (C) 2014-2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* *
|
||||
* This software is distributed under the terms of the *
|
||||
* GNU Lesser General Public Licence (LGPL) version 3, *
|
||||
* copied verbatim in the file "LICENSE" *
|
||||
********************************************************************************/
|
||||
|
||||
#include "TestVersion.h"
|
||||
#include <FairMQDevice.h>
|
||||
|
||||
#include <fairmq/Tools.h>
|
||||
|
||||
@@ -19,16 +19,24 @@ namespace
|
||||
{
|
||||
|
||||
using namespace std;
|
||||
using namespace fair::mq::test;
|
||||
|
||||
class DeviceVersion : public ::testing::Test {
|
||||
class TestVersion : public FairMQDevice
|
||||
{
|
||||
public:
|
||||
TestVersion(fair::mq::tools::Version version)
|
||||
: FairMQDevice(version)
|
||||
{}
|
||||
};
|
||||
|
||||
class DeviceVersion : public ::testing::Test
|
||||
{
|
||||
public:
|
||||
DeviceVersion()
|
||||
{}
|
||||
|
||||
fair::mq::tools::Version TestDeviceVersion()
|
||||
{
|
||||
fair::mq::test::TestVersion versionDevice({1, 2, 3});
|
||||
TestVersion versionDevice({1, 2, 3});
|
||||
versionDevice.ChangeState("END");
|
||||
|
||||
return versionDevice.GetVersion();
|
||||
|
@@ -1,5 +1,5 @@
|
||||
/********************************************************************************
|
||||
* Copyright (C) 2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* Copyright (C) 2014-2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* *
|
||||
* This software is distributed under the terms of the *
|
||||
* GNU Lesser General Public Licence (LGPL) version 3, *
|
||||
@@ -51,7 +51,7 @@ class MultipleDevices : public ::testing::Test {
|
||||
|
||||
FairMQChannel channel("push", "connect", "ipc://multiple-devices-test");
|
||||
channel.UpdateRateLogging(0);
|
||||
sender.fChannels["data"].push_back(channel);
|
||||
sender.AddChannel("data", channel);
|
||||
|
||||
thread t(control, std::ref(sender));
|
||||
|
||||
@@ -73,7 +73,7 @@ class MultipleDevices : public ::testing::Test {
|
||||
|
||||
FairMQChannel channel("pull", "bind", "ipc://multiple-devices-test");
|
||||
channel.UpdateRateLogging(0);
|
||||
receiver.fChannels["data"].push_back(channel);
|
||||
receiver.AddChannel("data", channel);
|
||||
|
||||
thread t(control, std::ref(receiver));
|
||||
|
||||
|
@@ -21,33 +21,88 @@ class TransferTimeout : public FairMQDevice
|
||||
protected:
|
||||
auto Run() -> void override
|
||||
{
|
||||
auto sendCanceling = false;
|
||||
auto receiveCanceling = false;
|
||||
bool sendMsgCanceling = false;
|
||||
bool receiveMsgCanceling = false;
|
||||
|
||||
auto msg1 = FairMQMessagePtr{NewMessage()};
|
||||
auto msg2 = FairMQMessagePtr{NewMessage()};
|
||||
FairMQMessagePtr msg1(NewMessage());
|
||||
FairMQMessagePtr msg2(NewMessage());
|
||||
|
||||
if (Send(msg1, "data-out", 0, 100) == -2)
|
||||
{
|
||||
LOG(info) << "send canceled";
|
||||
sendCanceling = true;
|
||||
LOG(info) << "send msg canceled";
|
||||
sendMsgCanceling = true;
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG(error) << "send did not cancel";
|
||||
LOG(error) << "send msg did not cancel";
|
||||
}
|
||||
|
||||
if (Receive(msg2, "data-in", 0, 100) == -2)
|
||||
{
|
||||
LOG(info) << "receive canceled";
|
||||
receiveCanceling = true;
|
||||
LOG(info) << "receive msg canceled";
|
||||
receiveMsgCanceling = true;
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG(error) << "receive did not cancel";
|
||||
LOG(error) << "receive msg did not cancel";
|
||||
}
|
||||
|
||||
if (sendCanceling && receiveCanceling)
|
||||
bool send1PartCanceling = false;
|
||||
bool receive1PartCanceling = false;
|
||||
|
||||
FairMQParts parts1;
|
||||
parts1.AddPart(NewMessage(10));
|
||||
FairMQParts parts2;
|
||||
|
||||
if (Send(parts1, "data-out", 0, 100) == -2)
|
||||
{
|
||||
LOG(info) << "send 1 part canceled";
|
||||
send1PartCanceling = true;
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG(error) << "send 1 part did not cancel";
|
||||
}
|
||||
|
||||
if (Receive(parts2, "data-in", 0, 100) == -2)
|
||||
{
|
||||
LOG(info) << "receive 1 part canceled";
|
||||
receive1PartCanceling = true;
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG(error) << "receive 1 part did not cancel";
|
||||
}
|
||||
|
||||
bool send2PartsCanceling = false;
|
||||
bool receive2PartsCanceling = false;
|
||||
|
||||
FairMQParts parts3;
|
||||
parts3.AddPart(NewMessage(10));
|
||||
parts3.AddPart(NewMessage(10));
|
||||
FairMQParts parts4;
|
||||
|
||||
if (Send(parts3, "data-out", 0, 100) == -2)
|
||||
{
|
||||
LOG(info) << "send 2 parts canceled";
|
||||
send2PartsCanceling = true;
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG(error) << "send 2 parts did not cancel";
|
||||
}
|
||||
|
||||
if (Receive(parts4, "data-in", 0, 100) == -2)
|
||||
{
|
||||
LOG(info) << "receive 2 parts canceled";
|
||||
receive2PartsCanceling = true;
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG(error) << "receive 2 parts did not cancel";
|
||||
}
|
||||
|
||||
if (sendMsgCanceling && receiveMsgCanceling && send1PartCanceling && receive1PartCanceling && send2PartsCanceling && receive2PartsCanceling)
|
||||
{
|
||||
LOG(info) << "Transfer timeout test successfull";
|
||||
}
|
||||
|
Reference in New Issue
Block a user