Compare commits

..

16 Commits

Author SHA1 Message Date
Dennis Klein
811e716731 Add missing header 2018-05-23 08:26:23 +02:00
Dennis Klein
5ab21946f8 Add new release 2018-05-22 16:49:46 +02:00
Dennis Klein
30367eb76d Add PR template 2018-05-22 16:33:13 +02:00
Dennis Klein
89d71ce14c Improve README 2018-05-22 15:21:35 +02:00
Dennis Klein
e54db27242 Remove the alias target
In some cases the definition of the alias target fails, because of
target visibility problems.
2018-05-22 15:21:35 +02:00
Alexey Rybalchenko
cc4a8b8f7f Modify target CI environment 2018-05-17 16:32:49 +02:00
Alexey Rybalchenko
e4c349888d Improve compilation speed 2018-05-17 16:32:49 +02:00
Alexey Rybalchenko
436f79bee5 Control plugin: add ability to switch log levels interactively 2018-05-17 10:30:18 +02:00
Alexey Rybalchenko
2a6e4de72c provide FAIRMQ_PATH to test suites 2018-05-15 19:18:37 +02:00
Alexey Rybalchenko
e295978b3e further shorten shm names 2018-05-15 19:18:37 +02:00
Alexey Rybalchenko
e4d73f1a9a Test all examples with all possible transports 2018-05-15 19:18:37 +02:00
Alexey Rybalchenko
d93dc2f7f7 Use enum transport types instead of strings in Channel/Device 2018-05-15 19:18:11 +02:00
Alexey Rybalchenko
7a4fd96b27 Fix namespaces in Transports.h and add conversion map 2018-05-15 19:18:11 +02:00
Alexey Rybalchenko
155618af57 Used cached default transport in FairMQDevice::Transport() 2018-05-15 19:18:11 +02:00
Dennis Klein
9906475b6f Add new release 2018-05-04 21:57:00 +02:00
Alexey Rybalchenko
de7ddc0ddd Fix type conflict for some environments 2018-05-04 16:59:50 +02:00
80 changed files with 1339 additions and 975 deletions

View File

@@ -42,7 +42,7 @@ if(BUILD_FAIRMQ)
find_package2(PUBLIC Boost VERSION 1.64 REQUIRED
COMPONENTS program_options thread system filesystem regex date_time signals
)
find_package2(PUBLIC FairLogger VERSION 1.0.6 REQUIRED)
find_package2(PUBLIC FairLogger VERSION 1.2.0 REQUIRED)
find_package2(PRIVATE ZeroMQ VERSION 4.1.5 REQUIRED)
endif()

6
Jenkinsfile vendored
View File

@@ -37,9 +37,9 @@ pipeline{
steps{
script {
parallel(buildMatrix([
[os: 'Debian8', arch: 'x86_64', compiler: 'gcc4.9', fairsoft: 'apr18'],
[os: 'MacOS10.11', arch: 'x86_64', compiler: 'AppleLLVM8.0.0', fairsoft: 'apr18'],
[os: 'MacOS10.13', arch: 'x86_64', compiler: 'AppleLLVM9.0.0', fairsoft: 'apr18'],
[os: 'Debian8', arch: 'x86_64', compiler: 'gcc4.9', fairsoft: 'may18'],
[os: 'MacOS10.11', arch: 'x86_64', compiler: 'AppleLLVM8.0.0', fairsoft: 'may18'],
[os: 'MacOS10.13', arch: 'x86_64', compiler: 'AppleLLVM9.0.0', fairsoft: 'may18'],
]) { spec, label ->
sh '''\
echo "export BUILDDIR=$PWD/build" >> Dart.cfg

View File

@@ -35,9 +35,9 @@ pipeline{
steps{
script {
parallel(buildMatrix([
[os: 'Debian8', arch: 'x86_64', compiler: 'gcc4.9', fairsoft: 'apr18'],
[os: 'MacOS10.11', arch: 'x86_64', compiler: 'AppleLLVM8.0.0', fairsoft: 'apr18'],
[os: 'MacOS10.13', arch: 'x86_64', compiler: 'AppleLLVM9.0.0', fairsoft: 'apr18'],
[os: 'Debian8', arch: 'x86_64', compiler: 'gcc4.9', fairsoft: 'may18'],
[os: 'MacOS10.11', arch: 'x86_64', compiler: 'AppleLLVM8.0.0', fairsoft: 'may18'],
[os: 'MacOS10.13', arch: 'x86_64', compiler: 'AppleLLVM9.0.0', fairsoft: 'may18'],
]) { spec, label ->
sh '''\
echo "export BUILDDIR=$PWD/build" >> Dart.cfg

9
PULL_REQUEST_TEMPLATE.md Normal file
View File

@@ -0,0 +1,9 @@
Replace me with your description.
---
Checklist:
* [ ] Rebased against `dev` branch
* [ ] My name is in the resp. CONTRIBUTORS/AUTHORS file
* [ ] Followed [the seven rules of great commit messages](https://chris.beams.io/posts/git-commit/#seven-rules)

View File

@@ -23,7 +23,7 @@ In addition to this core functionality FairMQ provides a framework for creating
are communicating through message passing. FairMQ does not only allow the user to use different transport but also to mix them; i.e: A Device can communicate using different transport on different channels at the same time. Device execution is modelled as a simple state machine that
shapes the integration points for the user task. Devices also incorporate a plugin system for runtime configuration and control.
Next to the provided devices and plugins (e.g. [DDS](https://github.com/FairRootGroup/DDS))
the user can extened FairMQ by developing his own plugins to integrate his devices with external
the user can extend FairMQ by developing his own plugins to integrate his devices with external
configuration and control services.
FairMQ has been developed in the context of its mother project [FairRoot](https://github.com/FairRootGroup/FairRoot) -
@@ -49,6 +49,8 @@ a simulation, reconstruction and analysis framework.
| Stable release | Date | API Docs |
| --- | --- | --- |
| [**1.2.3**](https://github.com/FairRootGroup/FairMQ/releases/tag/v1.2.3) | May 2018 | [link](https://fairrootgroup.github.io/FairMQ/v1.2.3/index.html) |
| [**1.2.1**](https://github.com/FairRootGroup/FairMQ/releases/tag/v1.2.1) | May 2018 | [link](https://fairrootgroup.github.io/FairMQ/v1.2.1/index.html) |
| [**1.2.0**](https://github.com/FairRootGroup/FairMQ/releases/tag/v1.2.0) | May 2018 | [link](https://fairrootgroup.github.io/FairMQ/v1.2.0/index.html) |
Find all FairMQ stable and development releases [here](https://github.com/FairRootGroup/FairMQ/releases).
@@ -79,14 +81,16 @@ set(CMAKE_PREFIX_PATH /path/to/FairMQ_install_prefix ${CMAKE_PREFIX_PATH})
find_package(FairMQ)
```
`find_package(FairMQ)` will define an imported target `FairMQ::FairMQ` (An alias `FairRoot::FairMQ` is also defined (if you use CMake 3.11+) for backwards compatibility, but it is deprecated).
`find_package(FairMQ)` will define an imported target `FairMQ::FairMQ`.
In order to succesfully compile and link against the `FairMQ::FairMQ` target, you need to discover its public package dependencies, too.
```cmake
find_package(FairMQ)
find_package(FairLogger ${FairMQ_FairLogger_VERSION})
find_package(Boost ${FairMQ_Boost_VERSION} COMPONENTS ${FairMQ_BOOST_COMPONENTS})
if(FairMQ_FOUND)
find_package(FairLogger ${FairMQ_FairLogger_VERSION})
find_package(Boost ${FairMQ_Boost_VERSION} COMPONENTS ${FairMQ_BOOST_COMPONENTS})
endif()
```
Of course, feel free to customize the above commands to your needs.
@@ -95,8 +99,10 @@ Optionally, you can require certain FairMQ package components and a minimum vers
```cmake
find_package(FairMQ 1.1.0 COMPONENTS nanomsg_transport dds_plugin)
find_package(FairLogger ${FairMQ_FairLogger_VERSION})
find_package(Boost ${FairMQ_Boost_VERSION} COMPONENTS ${FairMQ_BOOST_COMPONENTS})
if(FairMQ_FOUND)
find_package(FairLogger ${FairMQ_FairLogger_VERSION})
find_package(Boost ${FairMQ_Boost_VERSION} COMPONENTS ${FairMQ_BOOST_COMPONENTS})
endif()
```
When building FairMQ, CMake will print a summary table of all available package components.

View File

@@ -32,9 +32,4 @@ set(CMAKE_MODULE_PATH ${@PROJECT_NAME@_CMAKEMODDIR} ${CMAKE_MODULE_PATH})
### Import targets
include(@PACKAGE_CMAKE_INSTALL_PREFIX@/@PACKAGE_INSTALL_DESTINATION@/@PROJECT_EXPORT_SET@.cmake)
### Alias target for backwards compat (DEPRECATED)
if((NOT TARGET FairRoot::@PROJECT_NAME@) AND (CMAKE_VERSION VERSION_GREATER 3.10.99))
add_library(FairRoot::@PROJECT_NAME@ ALIAS @PROJECT_NAME@::@PROJECT_NAME@)
endif()
@PACKAGE_COMPONENTS@

View File

@@ -31,8 +31,14 @@ configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-1-1.sh.in ${CMAKE_CUR
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/test-ex-1-1.sh.in ${CMAKE_CURRENT_BINARY_DIR}/test-ex-1-1.sh)
add_test(NAME Example-1-1 COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-1-1.sh)
set_tests_properties(Example-1-1 PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received: ")
add_test(NAME Example-1-1-zeromq COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-1-1.sh zeromq)
set_tests_properties(Example-1-1-zeromq PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received: ")
add_test(NAME Example-1-1-nanomsg COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-1-1.sh nanomsg)
set_tests_properties(Example-1-1-nanomsg PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received: ")
add_test(NAME Example-1-1-shmem COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-1-1.sh shmem)
set_tests_properties(Example-1-1-shmem PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received: ")
# install

View File

@@ -1,10 +1,19 @@
#!/bin/bash
export FAIRMQ_PATH=@FAIRMQ_BIN_DIR@
transport="zeromq"
if [[ $1 =~ ^[a-z]+$ ]]; then
transport=$1
fi
# setup a trap to kill everything if the test fails/timeouts
trap 'kill -TERM $SAMPLER_PID; kill -TERM $SINK_PID; wait $SAMPLER_PID; wait $SINK_PID;' TERM
SAMPLER="fairmq-ex-1-1-sampler"
SAMPLER+=" --id sampler1"
SAMPLER+=" --transport $transport"
SAMPLER+=" --verbosity veryhigh"
SAMPLER+=" --control static --color false"
SAMPLER+=" --max-iterations 1"
@@ -14,6 +23,7 @@ SAMPLER_PID=$!
SINK="fairmq-ex-1-1-sink"
SINK+=" --id sink1"
SINK+=" --transport $transport"
SINK+=" --verbosity veryhigh"
SINK+=" --control static --color false"
SINK+=" --max-iterations 1"

View File

@@ -39,8 +39,14 @@ configure_file(${CMAKE_CURRENT_SOURCE_DIR}/ex-1-n-1.json ${CMAKE_CURRENT_BINARY_
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/test-ex-1-n-1.sh.in ${CMAKE_CURRENT_BINARY_DIR}/test-ex-1-n-1.sh)
add_test(NAME Example-1-n-1 COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-1-n-1.sh)
set_tests_properties(Example-1-n-1 PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received: ")
add_test(NAME Example-1-n-1-zeromq COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-1-n-1.sh zeromq)
set_tests_properties(Example-1-n-1-zeromq PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received: ")
add_test(NAME Example-1-n-1-nanomsg COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-1-n-1.sh nanomsg)
set_tests_properties(Example-1-n-1-nanomsg PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received: ")
add_test(NAME Example-1-n-1-shmem COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-1-n-1.sh shmem)
set_tests_properties(Example-1-n-1-shmem PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received: ")
# install

View File

@@ -2,6 +2,12 @@
export FAIRMQ_PATH=@FAIRMQ_BIN_DIR@
transport="zeromq"
if [[ $1 =~ ^[a-z]+$ ]]; then
transport=$1
fi
ex2config="@CMAKE_CURRENT_BINARY_DIR@/ex-1-n-1.json"
# setup a trap to kill everything if the test fails/timeouts
@@ -9,6 +15,7 @@ trap 'kill -TERM $SAMPLER_PID; kill -TERM $SINK_PID; kill -TERM $PROCESSOR1_PID;
SAMPLER="fairmq-ex-1-n-1-sampler"
SAMPLER+=" --id sampler1"
SAMPLER+=" --transport $transport"
SAMPLER+=" --verbosity veryhigh"
SAMPLER+=" --control static --color false"
SAMPLER+=" --max-iterations 2"
@@ -18,6 +25,7 @@ SAMPLER_PID=$!
PROCESSOR1="fairmq-ex-1-n-1-processor"
PROCESSOR1+=" --id processor1"
PROCESSOR1+=" --transport $transport"
PROCESSOR1+=" --verbosity veryhigh"
PROCESSOR1+=" --control static --color false"
PROCESSOR1+=" --mq-config $ex2config"
@@ -27,6 +35,7 @@ PROCESSOR1_PID=$!
PROCESSOR2="fairmq-ex-1-n-1-processor"
PROCESSOR2+=" --id processor2"
PROCESSOR2+=" --transport $transport"
PROCESSOR2+=" --verbosity veryhigh"
PROCESSOR2+=" --control static --color false"
PROCESSOR2+=" --mq-config $ex2config"
@@ -36,6 +45,7 @@ PROCESSOR2_PID=$!
SINK="fairmq-ex-1-n-1-sink"
SINK+=" --id sink1"
SINK+=" --transport $transport"
SINK+=" --verbosity veryhigh"
SINK+=" --control static --color false"
SINK+=" --max-iterations 2"

View File

@@ -32,8 +32,14 @@ configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-copypush.sh.in ${CMAK
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/test-ex-copypush.sh.in ${CMAKE_CURRENT_BINARY_DIR}/test-ex-copypush.sh)
add_test(NAME Example-CopyPush COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-copypush.sh)
set_tests_properties(Example-CopyPush PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received message: ")
add_test(NAME Example-CopyPush-zeromq COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-copypush.sh zeromq)
set_tests_properties(Example-CopyPush-zeromq PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received message: ")
add_test(NAME Example-CopyPush-nanomsg COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-copypush.sh nanomsg)
set_tests_properties(Example-CopyPush-nanomsg PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received message: ")
add_test(NAME Example-CopyPush-shmem COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-copypush.sh shmem)
set_tests_properties(Example-CopyPush-shmem PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received message: ")
# install

View File

@@ -2,11 +2,18 @@
export FAIRMQ_PATH=@FAIRMQ_BIN_DIR@
transport="zeromq"
if [[ $1 =~ ^[a-z]+$ ]]; then
transport=$1
fi
# setup a trap to kill everything if the test fails/timeouts
trap 'kill -TERM $SAMPLER_PID; kill -TERM $SINK1_PID; kill -TERM $SINK2_PID; wait $SAMPLER_PID; wait $SINK1_PID; wait $SINK2_PID;' TERM
SAMPLER="fairmq-ex-copypush-sampler"
SAMPLER+=" --id sampler1"
SAMPLER+=" --transport $transport"
SAMPLER+=" --verbosity veryhigh"
SAMPLER+=" --control static --color false"
SAMPLER+=" --max-iterations 1"
@@ -16,6 +23,7 @@ SAMPLER_PID=$!
SINK1="fairmq-ex-copypush-sink"
SINK1+=" --id sink1"
SINK1+=" --transport $transport"
SINK1+=" --verbosity veryhigh"
SINK1+=" --control static --color false"
SINK1+=" --max-iterations 1"
@@ -25,6 +33,7 @@ SINK1_PID=$!
SINK2="fairmq-ex-copypush-sink"
SINK2+=" --id sink2"
SINK2+=" --transport $transport"
SINK2+=" --verbosity veryhigh"
SINK2+=" --control static --color false"
SINK2+=" --max-iterations 1"

View File

@@ -31,8 +31,14 @@ configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-multipart.sh.in ${CMA
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/test-ex-multipart.sh.in ${CMAKE_CURRENT_BINARY_DIR}/test-ex-multipart.sh)
add_test(NAME Example-Multipart COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-multipart.sh)
set_tests_properties(Example-Multipart PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received message with 2 parts")
add_test(NAME Example-Multipart-zeromq COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-multipart.sh zeromq)
set_tests_properties(Example-Multipart-zeromq PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received message with 2 parts")
add_test(NAME Example-Multipart-nanomsg COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-multipart.sh nanomsg)
set_tests_properties(Example-Multipart-nanomsg PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received message with 2 parts")
add_test(NAME Example-Multipart-shmem COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-multipart.sh shmem)
set_tests_properties(Example-Multipart-shmem PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received message with 2 parts")
# install

View File

@@ -2,11 +2,18 @@
export FAIRMQ_PATH=@FAIRMQ_BIN_DIR@
transport="zeromq"
if [[ $1 =~ ^[a-z]+$ ]]; then
transport=$1
fi
# setup a trap to kill everything if the test fails/timeouts
trap 'kill -TERM $SAMPLER_PID; kill -TERM $SINK_PID; wait $SAMPLER_PID; wait $SINK_PID;' TERM
SAMPLER="fairmq-ex-multipart-sampler"
SAMPLER+=" --id sampler1"
SAMPLER+=" --transport $transport"
SAMPLER+=" --verbosity veryhigh"
SAMPLER+=" --max-iterations 1"
SAMPLER+=" --control static --color false"
@@ -16,6 +23,7 @@ SAMPLER_PID=$!
SINK="fairmq-ex-multipart-sink"
SINK+=" --id sink1"
SINK+=" --transport $transport"
SINK+=" --verbosity veryhigh"
SINK+=" --control static --color false"
SINK+=" --channel-config name=data,type=pull,method=bind,rateLogging=0,address=tcp://127.0.0.1:5555"

View File

@@ -36,8 +36,11 @@ configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-multiple-channels.sh.
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/test-ex-multiple-channels.sh.in ${CMAKE_CURRENT_BINARY_DIR}/test-ex-multiple-channels.sh)
add_test(NAME Example-Multiple-Channels COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-multiple-channels.sh)
set_tests_properties(Example-Multiple-Channels PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received messages from both sources.")
add_test(NAME Example-Multiple-Channels-zeromq COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-multiple-channels.sh zeromq)
set_tests_properties(Example-Multiple-Channels-zeromq PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received messages from both sources.")
add_test(NAME Example-Multiple-Channels-nanomsg COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-multiple-channels.sh nanomsg)
set_tests_properties(Example-Multiple-Channels-nanomsg PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received messages from both sources.")
# install

View File

@@ -2,12 +2,19 @@
export FAIRMQ_PATH=@FAIRMQ_BIN_DIR@
transport="zeromq"
if [[ $1 =~ ^[a-z]+$ ]]; then
transport=$1
fi
# setup a trap to kill everything if the test fails/timeouts
trap 'kill -TERM $SAMPLER_PID; kill -TERM $SINK_PID; kill -TERM $BROADCASTER_PID; wait $SAMPLER_PID; wait $SINK_PID; wait $BROADCASTER_PID;' TERM
SINK="fairmq-ex-multiple-channels-sink"
SINK+=" --id sink1"
SINK+=" --transport $transport"
SINK+=" --verbosity veryhigh"
SINK+=" --max-iterations 1"
SINK+=" --control static --color false"
@@ -20,6 +27,7 @@ sleep 1
SAMPLER="fairmq-ex-multiple-channels-sampler"
SAMPLER+=" --id sampler1"
SAMPLER+=" --transport $transport"
SAMPLER+=" --verbosity veryhigh"
SAMPLER+=" --max-iterations 1"
SAMPLER+=" --control static --color false"
@@ -30,6 +38,7 @@ SAMPLER_PID=$!
BROADCASTER="fairmq-ex-multiple-channels-broadcaster"
BROADCASTER+=" --id broadcaster1"
BROADCASTER+=" --transport $transport"
BROADCASTER+=" --verbosity veryhigh"
BROADCASTER+=" --control static --color false"
BROADCASTER+=" --channel-config name=broadcast,type=pub,method=bind,rateLogging=0,address=tcp://*:5005"

View File

@@ -32,8 +32,14 @@ configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-region.sh.in ${CMAKE_
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/test-ex-region.sh.in ${CMAKE_CURRENT_BINARY_DIR}/test-ex-region.sh)
add_test(NAME Example-Region COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-region.sh)
set_tests_properties(Example-Region PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received ack")
add_test(NAME Example-Region-zeromq COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-region.sh zeromq)
set_tests_properties(Example-Region-zeromq PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received ack")
add_test(NAME Example-Region-nanomsg COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-region.sh nanomsg)
set_tests_properties(Example-Region-nanomsg PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received ack")
add_test(NAME Example-Region-shmem COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-region.sh shmem)
set_tests_properties(Example-Region-shmem PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received ack")
# install

View File

@@ -2,6 +2,12 @@
export FAIRMQ_PATH=@FAIRMQ_BIN_DIR@
transport="zeromq"
if [[ $1 =~ ^[a-z]+$ ]]; then
transport=$1
fi
msgSize="1000000"
SESSION="$(@CMAKE_BINARY_DIR@/fairmq/fairmq-uuid-gen -h)"
@@ -10,23 +16,23 @@ trap 'kill -TERM $SAMPLER_PID; kill -TERM $SINK_PID; wait $SAMPLER_PID; wait $SI
SAMPLER="fairmq-ex-region-sampler"
SAMPLER+=" --id sampler1"
SAMPLER+=" --transport $transport"
SAMPLER+=" --severity debug"
SAMPLER+=" --session $SESSION"
SAMPLER+=" --control static --color false"
SAMPLER+=" --max-iterations 1"
SAMPLER+=" --msg-size $msgSize"
SAMPLER+=" --transport shmem"
SAMPLER+=" --channel-config name=data,type=push,method=bind,address=tcp://127.0.0.1:7777"
@CMAKE_CURRENT_BINARY_DIR@/$SAMPLER &
SAMPLER_PID=$!
SINK="fairmq-ex-region-sink"
SINK+=" --id sink1"
SINK+=" --transport $transport"
SINK+=" --session $SESSION"
SINK+=" --verbosity veryhigh"
SINK+=" --control static --color false"
SINK+=" --max-iterations 1"
SINK+=" --transport shmem"
SINK+=" --channel-config name=data,type=pull,method=connect,address=tcp://127.0.0.1:7777"
@CMAKE_CURRENT_BINARY_DIR@/$SINK &
SINK_PID=$!

View File

@@ -32,8 +32,14 @@ configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-req-rep.sh.in ${CMAKE
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/test-ex-req-rep.sh.in ${CMAKE_CURRENT_BINARY_DIR}/test-ex-req-rep.sh)
add_test(NAME Example-ReqRep COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-req-rep.sh)
set_tests_properties(Example-ReqRep PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received reply from server: ")
add_test(NAME Example-ReqRep-zeromq COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-req-rep.sh zeromq)
set_tests_properties(Example-ReqRep-zeromq PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received reply from server: ")
add_test(NAME Example-ReqRep-nanomsg COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-req-rep.sh nanomsg)
set_tests_properties(Example-ReqRep-nanomsg PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received reply from server: ")
add_test(NAME Example-ReqRep-shmem COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-req-rep.sh shmem)
set_tests_properties(Example-ReqRep-shmem PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received reply from server: ")
# install

View File

@@ -2,11 +2,18 @@
export FAIRMQ_PATH=@FAIRMQ_BIN_DIR@
transport="zeromq"
if [[ $1 =~ ^[a-z]+$ ]]; then
transport=$1
fi
# setup a trap to kill everything if the test fails/timeouts
trap 'kill -TERM $CLIENT_PID; kill -TERM $SERVER_PID; wait $CLIENT_PID; wait $SERVER_PID;' TERM
CLIENT="fairmq-ex-req-rep-client"
CLIENT+=" --id client"
CLIENT+=" --transport $transport"
CLIENT+=" --verbosity veryhigh"
CLIENT+=" --control static --color false"
CLIENT+=" --max-iterations 1"
@@ -16,6 +23,7 @@ CLIENT_PID=$!
SERVER="fairmq-ex-req-rep-server"
SERVER+=" --id server"
SERVER+=" --transport $transport"
SERVER+=" --verbosity veryhigh"
SERVER+=" --control static --color false"
SERVER+=" --max-iterations 1"

View File

@@ -148,6 +148,9 @@ set(FAIRMQ_SOURCE_FILES
shmem/Manager.cxx
shmem/Monitor.cxx
shmem/Region.cxx
tools/Network.cxx
tools/Process.cxx
tools/Unique.cxx
zeromq/FairMQMessageZMQ.cxx
zeromq/FairMQPollerZMQ.cxx
zeromq/FairMQUnmanagedRegionZMQ.cxx

View File

@@ -28,7 +28,6 @@ FairMQChannel::FairMQChannel()
, fType("unspecified")
, fMethod("unspecified")
, fAddress("unspecified")
, fTransport("default")
, fSndBufSize(1000)
, fRcvBufSize(1000)
, fSndKernelSize(0)
@@ -36,7 +35,7 @@ FairMQChannel::FairMQChannel()
, fRateLogging(1)
, fName("")
, fIsValid(false)
, fTransportType(FairMQ::Transport::DEFAULT)
, fTransportType(fair::mq::Transport::DEFAULT)
, fTransportFactory(nullptr)
, fMultipart(false)
, fModified(true)
@@ -49,7 +48,6 @@ FairMQChannel::FairMQChannel(const string& type, const string& method, const str
, fType(type)
, fMethod(method)
, fAddress(address)
, fTransport("default")
, fSndBufSize(1000)
, fRcvBufSize(1000)
, fSndKernelSize(0)
@@ -57,7 +55,7 @@ FairMQChannel::FairMQChannel(const string& type, const string& method, const str
, fRateLogging(1)
, fName("")
, fIsValid(false)
, fTransportType(FairMQ::Transport::DEFAULT)
, fTransportType(fair::mq::Transport::DEFAULT)
, fTransportFactory(nullptr)
, fMultipart(false)
, fModified(true)
@@ -70,7 +68,6 @@ FairMQChannel::FairMQChannel(const string& name, const string& type, std::shared
, fType(type)
, fMethod("unspecified")
, fAddress("unspecified")
, fTransport("default") // TODO refactor, either use string representation or enum type
, fSndBufSize(1000)
, fRcvBufSize(1000)
, fSndKernelSize(0)
@@ -91,7 +88,6 @@ FairMQChannel::FairMQChannel(const FairMQChannel& chan)
, fType(chan.fType)
, fMethod(chan.fMethod)
, fAddress(chan.fAddress)
, fTransport(chan.fTransport)
, fSndBufSize(chan.fSndBufSize)
, fRcvBufSize(chan.fRcvBufSize)
, fSndKernelSize(chan.fSndKernelSize)
@@ -99,7 +95,7 @@ FairMQChannel::FairMQChannel(const FairMQChannel& chan)
, fRateLogging(chan.fRateLogging)
, fName(chan.fName)
, fIsValid(false)
, fTransportType(FairMQ::Transport::DEFAULT)
, fTransportType(chan.fTransportType)
, fTransportFactory(nullptr)
, fMultipart(chan.fMultipart)
, fModified(chan.fModified)
@@ -111,7 +107,6 @@ FairMQChannel& FairMQChannel::operator=(const FairMQChannel& chan)
fType = chan.fType;
fMethod = chan.fMethod;
fAddress = chan.fAddress;
fTransport = chan.fTransport;
fSndBufSize = chan.fSndBufSize;
fRcvBufSize = chan.fRcvBufSize;
fSndKernelSize = chan.fSndKernelSize;
@@ -120,7 +115,7 @@ FairMQChannel& FairMQChannel::operator=(const FairMQChannel& chan)
fSocket = nullptr;
fName = chan.fName;
fIsValid = false;
fTransportType = FairMQ::Transport::DEFAULT;
fTransportType = chan.fTransportType;
fTransportFactory = nullptr;
return *this;
@@ -194,16 +189,16 @@ string FairMQChannel::GetAddress() const
}
}
string FairMQChannel::GetTransport() const
string FairMQChannel::GetTransportName() const
{
try
{
unique_lock<mutex> lock(fChannelMutex);
return fTransport;
return fair::mq::TransportNames.at(fTransportType);
}
catch (exception& e)
{
LOG(error) << "Exception caught in FairMQChannel::GetTransport: " << e.what();
LOG(error) << "Exception caught in FairMQChannel::GetTransportName: " << e.what();
exit(EXIT_FAILURE);
}
}
@@ -332,7 +327,9 @@ void FairMQChannel::UpdateTransport(const string& transport)
{
unique_lock<mutex> lock(fChannelMutex);
fIsValid = false;
fTransport = transport;
LOG(WARN) << fName << ": " << transport;
fTransportType = fair::mq::TransportTypes.at(transport);
LOG(WARN) << fName << ": " << fair::mq::TransportNames.at(fTransportType);
fModified = true;
}
catch (exception& e)
@@ -586,17 +583,6 @@ bool FairMQChannel::ValidateChannel()
}
}
// validate channel transport
// const string channelTransportNames[] = { "default", "zeromq", "nanomsg", "shmem" };
// const set<string> channelTransports(channelTransportNames, channelTransportNames + sizeof(channelTransportNames) / sizeof(string));
if (FairMQ::TransportTypes.find(fTransport) == FairMQ::TransportTypes.end())
{
ss << "INVALID";
LOG(debug) << ss.str();
LOG(error) << "Invalid channel transport: \"" << fTransport << "\"";
exit(EXIT_FAILURE);
}
// validate socket buffer size for sending
if (fSndBufSize < 0)
{

View File

@@ -92,7 +92,7 @@ class FairMQChannel
/// Get channel transport ("default", "zeromq", "nanomsg" or "shmem")
/// @return Returns channel transport (e.g. "default", "zeromq", "nanomsg" or "shmem")
std::string GetTransport() const;
std::string GetTransportName() const;
/// Get socket send buffer size (in number of messages)
/// @return Returns socket send buffer size (in number of messages)
@@ -301,7 +301,7 @@ class FairMQChannel
std::string fType;
std::string fMethod;
std::string fAddress;
std::string fTransport;
fair::mq::Transport fTransportType;
int fSndBufSize;
int fRcvBufSize;
int fSndKernelSize;
@@ -311,7 +311,6 @@ class FairMQChannel
std::string fName;
std::atomic<bool> fIsValid;
FairMQ::Transport fTransportType;
std::shared_ptr<FairMQTransportFactory> fTransportFactory;
bool CheckCompatibility(std::unique_ptr<FairMQMessage>& msg) const;

View File

@@ -29,6 +29,7 @@
using namespace std;
FairMQDevice::FairMQDevice()
: fTransportFactory(nullptr)
, fTransports()
@@ -42,7 +43,7 @@ FairMQDevice::FairMQDevice()
, fPortRangeMin(22000)
, fPortRangeMax(32000)
, fNetworkInterface()
, fDefaultTransport("default")
, fDefaultTransportType(fair::mq::Transport::DEFAULT)
, fInitializationTimeoutInS(120)
, fDataCallbacks(false)
, fMsgInputs()
@@ -72,7 +73,7 @@ FairMQDevice::FairMQDevice(const fair::mq::tools::Version version)
, fPortRangeMin(22000)
, fPortRangeMax(32000)
, fNetworkInterface()
, fDefaultTransport("default")
, fDefaultTransportType(fair::mq::Transport::DEFAULT)
, fInitializationTimeoutInS(120)
, fDataCallbacks(false)
, fMsgInputs()
@@ -246,15 +247,15 @@ bool FairMQDevice::AttachChannel(FairMQChannel& ch)
{
if (!ch.fTransportFactory)
{
if (ch.fTransport == "default" || ch.fTransport == fDefaultTransport)
if (ch.fTransportType == fair::mq::Transport::DEFAULT || ch.fTransportType == fTransportFactory->GetType())
{
LOG(debug) << ch.fName << ": using default transport";
ch.InitTransport(fTransportFactory);
}
else
{
LOG(debug) << ch.fName << ": channel transport (" << fDefaultTransport << ") overriden to " << ch.fTransport;
ch.InitTransport(AddTransport(ch.fTransport));
LOG(debug) << ch.fName << ": channel transport (" << fair::mq::TransportNames.at(fDefaultTransportType) << ") overriden to " << fair::mq::TransportNames.at(ch.fTransportType);
ch.InitTransport(AddTransport(ch.fTransportType));
}
ch.fTransportType = ch.fTransportFactory->GetType();
}
@@ -555,10 +556,10 @@ void FairMQDevice::HandleMultipleChannelInput()
fMultitransportInputs.clear();
for (const auto& k : fInputChannelKeys)
{
FairMQ::Transport t = fChannels.at(k).at(0).fTransportType;
fair::mq::Transport t = fChannels.at(k).at(0).fTransportType;
if (fMultitransportInputs.find(t) == fMultitransportInputs.end())
{
fMultitransportInputs.insert(pair<FairMQ::Transport, vector<string>>(t, vector<string>()));
fMultitransportInputs.insert(pair<fair::mq::Transport, vector<string>>(t, vector<string>()));
fMultitransportInputs.at(t).push_back(k);
}
else
@@ -760,24 +761,24 @@ void FairMQDevice::Pause()
LOG(debug) << "Unpausing";
}
shared_ptr<FairMQTransportFactory> FairMQDevice::AddTransport(const string& transport)
shared_ptr<FairMQTransportFactory> FairMQDevice::AddTransport(const fair::mq::Transport transport)
{
auto i = fTransports.find(FairMQ::TransportTypes.at(transport));
auto i = fTransports.find(transport);
if (i == fTransports.end())
{
auto tr = FairMQTransportFactory::CreateTransportFactory(transport, fId, fConfig);
auto tr = FairMQTransportFactory::CreateTransportFactory(fair::mq::TransportNames.at(transport), fId, fConfig);
LOG(debug) << "Adding '" << transport << "' transport to the device.";
LOG(debug) << "Adding '" << fair::mq::TransportNames.at(transport) << "' transport to the device.";
pair<FairMQ::Transport, shared_ptr<FairMQTransportFactory>> trPair(FairMQ::TransportTypes.at(transport), tr);
pair<fair::mq::Transport, shared_ptr<FairMQTransportFactory>> trPair(transport, tr);
fTransports.insert(trPair);
return tr;
}
else
{
LOG(debug) << "Reusing existing '" << transport << "' transport.";
LOG(debug) << "Reusing existing '" << fair::mq::TransportNames.at(transport) << "' transport.";
return i->second;
}
}
@@ -804,7 +805,11 @@ void FairMQDevice::CreateOwnConfig()
fNumIoThreads = fConfig->GetValue<int>("io-threads");
fInitializationTimeoutInS = fConfig->GetValue<int>("initialization-timeout");
fRate = fConfig->GetValue<float>("rate");
fDefaultTransport = fConfig->GetValue<string>("transport");
try {
fDefaultTransportType = fair::mq::TransportTypes.at(fConfig->GetValue<string>("transport"));
} catch(const exception& e) {
LOG(ERROR) << "invalid transport type provided: " << fConfig->GetValue<string>("transport");
}
}
void FairMQDevice::SetTransport(const string& transport)
@@ -819,7 +824,7 @@ void FairMQDevice::SetTransport(const string& transport)
if (fTransports.empty())
{
LOG(debug) << "Requesting '" << transport << "' as default transport for the device";
fTransportFactory = AddTransport(transport);
fTransportFactory = AddTransport(fair::mq::TransportTypes.at(transport));
}
else
{
@@ -844,8 +849,12 @@ void FairMQDevice::SetConfig(FairMQProgOptions& config)
fNumIoThreads = config.GetValue<int>("io-threads");
fInitializationTimeoutInS = config.GetValue<int>("initialization-timeout");
fRate = fConfig->GetValue<float>("rate");
fDefaultTransport = config.GetValue<string>("transport");
SetTransport(fDefaultTransport);
try {
fDefaultTransportType = fair::mq::TransportTypes.at(fConfig->GetValue<string>("transport"));
} catch(const exception& e) {
LOG(ERROR) << "invalid transport type provided: " << fConfig->GetValue<string>("transport");
}
SetTransport(fConfig->GetValue<string>("transport"));
}
void FairMQDevice::LogSocketRates()

View File

@@ -196,7 +196,7 @@ class FairMQDevice : public FairMQStateMachine
/// @brief Getter for default transport factory
auto Transport() const -> const FairMQTransportFactory*
{
return fTransports.at(fair::mq::TransportTypes[GetDefaultTransport()]).get();
return fTransportFactory.get();
}
template<typename... Args>
@@ -253,7 +253,7 @@ class FairMQDevice : public FairMQStateMachine
// if more than one channel provided, check compatibility
if (chans.size() > 1)
{
FairMQ::Transport type = fChannels.at(chans.at(0)).at(0).Transport()->GetType();
fair::mq::Transport type = fChannels.at(chans.at(0)).at(0).Transport()->GetType();
for (unsigned int i = 1; i < chans.size(); ++i)
{
@@ -273,7 +273,7 @@ class FairMQDevice : public FairMQStateMachine
// if more than one channel provided, check compatibility
if (channels.size() > 1)
{
FairMQ::Transport type = channels.at(0)->Transport()->GetType();
fair::mq::Transport type = channels.at(0)->Transport()->GetType();
for (unsigned int i = 1; i < channels.size(); ++i)
{
@@ -293,7 +293,7 @@ class FairMQDevice : public FairMQStateMachine
/// Adds a transport to the device if it doesn't exist
/// @param transport Transport string ("zeromq"/"nanomsg"/"shmem")
std::shared_ptr<FairMQTransportFactory> AddTransport(const std::string& transport);
std::shared_ptr<FairMQTransportFactory> AddTransport(const fair::mq::Transport transport);
/// Sets the default transport for the device
/// @param transport Transport string ("zeromq"/"nanomsg"/"shmem")
void SetTransport(const std::string& transport = "zeromq");
@@ -407,15 +407,15 @@ class FairMQDevice : public FairMQStateMachine
void SetNetworkInterface(const std::string& networkInterface) { fNetworkInterface = networkInterface; }
std::string GetNetworkInterface() const { return fNetworkInterface; }
void SetDefaultTransport(const std::string& defaultTransport) { fDefaultTransport = defaultTransport; }
std::string GetDefaultTransport() const { return fDefaultTransport; }
void SetDefaultTransport(const std::string& name) { fDefaultTransportType = fair::mq::TransportTypes.at(name); }
std::string GetDefaultTransport() const { return fair::mq::TransportNames.at(fDefaultTransportType); }
void SetInitializationTimeoutInS(int initializationTimeoutInS) { fInitializationTimeoutInS = initializationTimeoutInS; }
int GetInitializationTimeoutInS() const { return fInitializationTimeoutInS; }
protected:
std::shared_ptr<FairMQTransportFactory> fTransportFactory; ///< Transport factory
std::unordered_map<FairMQ::Transport, std::shared_ptr<FairMQTransportFactory>> fTransports; ///< Container for transports
std::shared_ptr<FairMQTransportFactory> fTransportFactory; ///< Default transport factory
std::unordered_map<fair::mq::Transport, std::shared_ptr<FairMQTransportFactory>> fTransports; ///< Container for transports
public:
std::unordered_map<std::string, std::vector<FairMQChannel>> fChannels; ///< Device channels
@@ -472,7 +472,7 @@ class FairMQDevice : public FairMQStateMachine
int fPortRangeMax; ///< Maximum value for the port range (if dynamic)
std::string fNetworkInterface; ///< Network interface to use for dynamic binding
std::string fDefaultTransport; ///< Default transport for the device
fair::mq::Transport fDefaultTransportType; ///< Default transport for the device
int fInitializationTimeoutInS; ///< Timeout for the initialization (in seconds)
@@ -521,7 +521,7 @@ class FairMQDevice : public FairMQStateMachine
bool fDataCallbacks;
std::unordered_map<std::string, InputMsgCallback> fMsgInputs;
std::unordered_map<std::string, InputMultipartCallback> fMultipartInputs;
std::unordered_map<FairMQ::Transport, std::vector<std::string>> fMultitransportInputs;
std::unordered_map<fair::mq::Transport, std::vector<std::string>> fMultitransportInputs;
std::unordered_map<std::string, std::pair<uint16_t, uint16_t>> fChannelRegistry;
std::vector<std::string> fInputChannelKeys;
std::mutex fMultitransportMutex;

View File

@@ -28,7 +28,7 @@ class FairMQMessage
virtual bool SetUsedSize(const size_t size) = 0;
virtual FairMQ::Transport GetType() const = 0;
virtual fair::mq::Transport GetType() const = 0;
virtual void Copy(const std::unique_ptr<FairMQMessage>& msg) __attribute__((deprecated("Use 'Copy(const FairMQMessage& msg)'"))) = 0;
virtual void Copy(const FairMQMessage& msg) = 0;

View File

@@ -14,14 +14,531 @@
#include "FairMQStateMachine.h"
FairMQStateMachine::FairMQStateMachine()
// Increase maximum number of boost::msm states (default is 10)
// This #define has to be before any msm header includes
#define FUSION_MAX_VECTOR_SIZE 20
#include <boost/mpl/for_each.hpp>
#include <boost/msm/back/state_machine.hpp>
#include <boost/msm/back/tools.hpp>
#include <boost/msm/back/metafunctions.hpp>
#include <boost/msm/front/state_machine_def.hpp>
#include <boost/msm/front/functor_row.hpp>
#include <boost/signals2.hpp> // signal/slot for onStateChange callbacks
#include <atomic>
#include <condition_variable>
#include <thread>
#include <chrono>
#include <unordered_map>
using namespace std;
namespace msmf = boost::msm::front;
namespace fair
{
start();
namespace mq
{
namespace fsm
{
// defining events for the boost MSM state machine
struct INIT_DEVICE_E { string name() const { return "INIT_DEVICE"; } };
struct internal_DEVICE_READY_E { string name() const { return "internal_DEVICE_READY"; } };
struct INIT_TASK_E { string name() const { return "INIT_TASK"; } };
struct internal_READY_E { string name() const { return "internal_READY"; } };
struct RUN_E { string name() const { return "RUN"; } };
struct PAUSE_E { string name() const { return "PAUSE"; } };
struct STOP_E { string name() const { return "STOP"; } };
struct RESET_TASK_E { string name() const { return "RESET_TASK"; } };
struct RESET_DEVICE_E { string name() const { return "RESET_DEVICE"; } };
struct internal_IDLE_E { string name() const { return "internal_IDLE"; } };
struct END_E { string name() const { return "END"; } };
struct ERROR_FOUND_E { string name() const { return "ERROR_FOUND"; } };
// deactivate the warning for non-virtual destructor thrown in the boost library
#if defined(__clang__)
_Pragma("clang diagnostic push")
_Pragma("clang diagnostic ignored \"-Wnon-virtual-dtor\"")
#elif defined(__GNUC__) || defined(__GNUG__)
_Pragma("GCC diagnostic push")
_Pragma("GCC diagnostic ignored \"-Wnon-virtual-dtor\"")
#endif
// defining the boost MSM state machine
struct Machine_ : public msmf::state_machine_def<Machine_>
{
public:
Machine_()
: fState()
, fWork()
, fWorkAvailableCondition()
, fWorkDoneCondition()
, fWorkMutex()
, fWorkerTerminated(false)
, fWorkActive(false)
, fWorkAvailable(false)
, fStateChangeSignal()
, fStateChangeSignalsMap()
, fTerminationRequested(false)
, fWorkerThread()
{}
virtual ~Machine_()
{}
template<typename Event, typename FSM>
void on_entry(Event const&, FSM& fsm)
{
LOG(state) << "Starting FairMQ state machine";
fState = FairMQStateMachine::IDLE;
fsm.CallStateChangeCallbacks(FairMQStateMachine::IDLE);
// start a worker thread to execute user states in.
fsm.fWorkerThread = thread(&Machine_::Worker, &fsm);
}
template<typename Event, typename FSM>
void on_exit(Event const&, FSM& /*fsm*/)
{
LOG(state) << "Exiting FairMQ state machine";
}
// list of FSM states
struct OK_FSM : public msmf::state<> {};
struct ERROR_FSM : public msmf::terminate_state<> {};
struct IDLE_FSM : public msmf::state<> {};
struct INITIALIZING_DEVICE_FSM : public msmf::state<> {};
struct DEVICE_READY_FSM : public msmf::state<> {};
struct INITIALIZING_TASK_FSM : public msmf::state<> {};
struct READY_FSM : public msmf::state<> {};
struct RUNNING_FSM : public msmf::state<> {};
struct PAUSED_FSM : public msmf::state<> {};
struct RESETTING_TASK_FSM : public msmf::state<> {};
struct RESETTING_DEVICE_FSM : public msmf::state<> {};
struct EXITING_FSM : public msmf::state<> {};
// initial states
using initial_state = boost::mpl::vector<IDLE_FSM, OK_FSM>;
// actions
struct IdleFct
{
template<typename EVT, typename FSM, typename SourceState, typename TargetState>
void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&)
{
LOG(state) << "Entering IDLE state";
fsm.fState = FairMQStateMachine::IDLE;
}
};
struct InitDeviceFct
{
template<typename EVT, typename FSM, typename SourceState, typename TargetState>
void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&)
{
fsm.fState = FairMQStateMachine::INITIALIZING_DEVICE;
unique_lock<mutex> lock(fsm.fWorkMutex);
while (fsm.fWorkActive)
{
fsm.fWorkDoneCondition.wait(lock);
}
fsm.fWorkAvailable = true;
LOG(state) << "Entering INITIALIZING DEVICE state";
fsm.fWork = fsm.fInitWrapperHandler;
fsm.fWorkAvailableCondition.notify_one();
}
};
struct DeviceReadyFct
{
template<typename EVT, typename FSM, typename SourceState, typename TargetState>
void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&)
{
LOG(state) << "Entering DEVICE READY state";
fsm.fState = FairMQStateMachine::DEVICE_READY;
}
};
struct InitTaskFct
{
template<typename EVT, typename FSM, typename SourceState, typename TargetState>
void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&)
{
fsm.fState = FairMQStateMachine::INITIALIZING_TASK;
unique_lock<mutex> lock(fsm.fWorkMutex);
while (fsm.fWorkActive)
{
fsm.fWorkDoneCondition.wait(lock);
}
fsm.fWorkAvailable = true;
LOG(state) << "Entering INITIALIZING TASK state";
fsm.fWork = fsm.fInitTaskWrapperHandler;
fsm.fWorkAvailableCondition.notify_one();
}
};
struct ReadyFct
{
template<typename EVT, typename FSM, typename SourceState, typename TargetState>
void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&)
{
LOG(state) << "Entering READY state";
fsm.fState = FairMQStateMachine::READY;
}
};
struct RunFct
{
template<typename EVT, typename FSM, typename SourceState, typename TargetState>
void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&)
{
fsm.fState = FairMQStateMachine::RUNNING;
unique_lock<mutex> lock(fsm.fWorkMutex);
while (fsm.fWorkActive)
{
fsm.fWorkDoneCondition.wait(lock);
}
fsm.fWorkAvailable = true;
LOG(state) << "Entering RUNNING state";
fsm.fWork = fsm.fRunWrapperHandler;
fsm.fWorkAvailableCondition.notify_one();
}
};
struct PauseFct
{
template<typename EVT, typename FSM, typename SourceState, typename TargetState>
void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&)
{
fsm.fState = FairMQStateMachine::PAUSED;
fsm.fUnblockHandler();
unique_lock<mutex> lock(fsm.fWorkMutex);
while (fsm.fWorkActive)
{
fsm.fWorkDoneCondition.wait(lock);
}
fsm.fWorkAvailable = true;
LOG(state) << "Entering PAUSED state";
fsm.fWork = fsm.fPauseWrapperHandler;
fsm.fWorkAvailableCondition.notify_one();
}
};
struct ResumeFct
{
template<typename EVT, typename FSM, typename SourceState, typename TargetState>
void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&)
{
fsm.fState = FairMQStateMachine::RUNNING;
unique_lock<mutex> lock(fsm.fWorkMutex);
while (fsm.fWorkActive)
{
fsm.fWorkDoneCondition.wait(lock);
}
fsm.fWorkAvailable = true;
LOG(state) << "Entering RUNNING state";
fsm.fWork = fsm.fRunWrapperHandler;
fsm.fWorkAvailableCondition.notify_one();
}
};
struct StopFct
{
template<typename EVT, typename FSM, typename SourceState, typename TargetState>
void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&)
{
fsm.fState = FairMQStateMachine::READY;
fsm.fUnblockHandler();
unique_lock<mutex> lock(fsm.fWorkMutex);
while (fsm.fWorkActive)
{
fsm.fWorkDoneCondition.wait(lock);
}
LOG(state) << "Entering READY state";
}
};
struct InternalStopFct
{
template<typename EVT, typename FSM, typename SourceState, typename TargetState>
void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&)
{
fsm.fState = FairMQStateMachine::READY;
fsm.fUnblockHandler();
LOG(state) << "RUNNING state finished without an external event, entering READY state";
}
};
struct ResetTaskFct
{
template<typename EVT, typename FSM, typename SourceState, typename TargetState>
void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&)
{
fsm.fState = FairMQStateMachine::RESETTING_TASK;
unique_lock<mutex> lock(fsm.fWorkMutex);
while (fsm.fWorkActive)
{
fsm.fWorkDoneCondition.wait(lock);
}
fsm.fWorkAvailable = true;
LOG(state) << "Entering RESETTING TASK state";
fsm.fWork = fsm.fResetTaskWrapperHandler;
fsm.fWorkAvailableCondition.notify_one();
}
};
struct ResetDeviceFct
{
template<typename EVT, typename FSM, typename SourceState, typename TargetState>
void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&)
{
fsm.fState = FairMQStateMachine::RESETTING_DEVICE;
unique_lock<mutex> lock(fsm.fWorkMutex);
while (fsm.fWorkActive)
{
fsm.fWorkDoneCondition.wait(lock);
}
fsm.fWorkAvailable = true;
LOG(state) << "Entering RESETTING DEVICE state";
fsm.fWork = fsm.fResetWrapperHandler;
fsm.fWorkAvailableCondition.notify_one();
}
};
struct ExitingFct
{
template<typename EVT, typename FSM, typename SourceState, typename TargetState>
void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&)
{
LOG(state) << "Entering EXITING state";
fsm.fState = FairMQStateMachine::EXITING;
fsm.fTerminationRequested = true;
fsm.CallStateChangeCallbacks(FairMQStateMachine::EXITING);
// terminate worker thread
{
lock_guard<mutex> lock(fsm.fWorkMutex);
fsm.fWorkerTerminated = true;
fsm.fWorkAvailableCondition.notify_one();
}
// join the worker thread (executing user states)
if (fsm.fWorkerThread.joinable())
{
fsm.fWorkerThread.join();
}
fsm.fExitHandler();
}
};
struct ErrorFoundFct
{
template<typename EVT, typename FSM, typename SourceState, typename TargetState>
void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&)
{
LOG(state) << "Entering ERROR state";
fsm.fState = FairMQStateMachine::Error;
fsm.CallStateChangeCallbacks(FairMQStateMachine::Error);
}
};
// Transition table for Machine_
struct transition_table : boost::mpl::vector<
// Start Event Next Action Guard
msmf::Row<IDLE_FSM, INIT_DEVICE_E, INITIALIZING_DEVICE_FSM, InitDeviceFct, msmf::none>,
msmf::Row<IDLE_FSM, END_E, EXITING_FSM, ExitingFct, msmf::none>,
msmf::Row<INITIALIZING_DEVICE_FSM, internal_DEVICE_READY_E, DEVICE_READY_FSM, DeviceReadyFct, msmf::none>,
msmf::Row<DEVICE_READY_FSM, INIT_TASK_E, INITIALIZING_TASK_FSM, InitTaskFct, msmf::none>,
msmf::Row<DEVICE_READY_FSM, RESET_DEVICE_E, RESETTING_DEVICE_FSM, ResetDeviceFct, msmf::none>,
msmf::Row<INITIALIZING_TASK_FSM, internal_READY_E, READY_FSM, ReadyFct, msmf::none>,
msmf::Row<READY_FSM, RUN_E, RUNNING_FSM, RunFct, msmf::none>,
msmf::Row<READY_FSM, RESET_TASK_E, RESETTING_TASK_FSM, ResetTaskFct, msmf::none>,
msmf::Row<RUNNING_FSM, PAUSE_E, PAUSED_FSM, PauseFct, msmf::none>,
msmf::Row<RUNNING_FSM, STOP_E, READY_FSM, StopFct, msmf::none>,
msmf::Row<RUNNING_FSM, internal_READY_E, READY_FSM, InternalStopFct, msmf::none>,
msmf::Row<PAUSED_FSM, RUN_E, RUNNING_FSM, ResumeFct, msmf::none>,
msmf::Row<RESETTING_TASK_FSM, internal_DEVICE_READY_E, DEVICE_READY_FSM, DeviceReadyFct, msmf::none>,
msmf::Row<RESETTING_DEVICE_FSM, internal_IDLE_E, IDLE_FSM, IdleFct, msmf::none>,
msmf::Row<OK_FSM, ERROR_FOUND_E, ERROR_FSM, ErrorFoundFct, msmf::none>>
{};
// replaces the default no-transition response.
template<typename FSM, typename Event>
void no_transition(Event const& e, FSM&, int state)
{
using recursive_stt = typename boost::msm::back::recursive_get_transition_table<FSM>::type;
using all_states = typename boost::msm::back::generate_state_set<recursive_stt>::type;
string stateName;
boost::mpl::for_each<all_states, boost::msm::wrap<boost::mpl::placeholders::_1>>(boost::msm::back::get_state_name<recursive_stt>(stateName, state));
stateName = stateName.substr(24);
size_t pos = stateName.find("_FSME");
stateName.erase(pos);
if (stateName == "1RUNNING" || stateName == "6DEVICE_READY" || stateName == "0PAUSED" || stateName == "8RESETTING_TASK" || stateName == "0RESETTING_DEVICE")
{
stateName = stateName.substr(1);
}
if (stateName != "OK")
{
LOG(state) << "No transition from state " << stateName << " on event " << e.name();
}
// LOG(state) << "no transition from state " << GetStateName(state) << " (" << state << ") on event " << e.name();
}
static string GetStateName(const int state)
{
switch(state)
{
case FairMQStateMachine::OK:
return "OK";
case FairMQStateMachine::Error:
return "Error";
case FairMQStateMachine::IDLE:
return "IDLE";
case FairMQStateMachine::INITIALIZING_DEVICE:
return "INITIALIZING_DEVICE";
case FairMQStateMachine::DEVICE_READY:
return "DEVICE_READY";
case FairMQStateMachine::INITIALIZING_TASK:
return "INITIALIZING_TASK";
case FairMQStateMachine::READY:
return "READY";
case FairMQStateMachine::RUNNING:
return "RUNNING";
case FairMQStateMachine::PAUSED:
return "PAUSED";
case FairMQStateMachine::RESETTING_TASK:
return "RESETTING_TASK";
case FairMQStateMachine::RESETTING_DEVICE:
return "RESETTING_DEVICE";
case FairMQStateMachine::EXITING:
return "EXITING";
default:
return "requested name for non-existent state...";
}
}
void CallStateChangeCallbacks(const FairMQStateMachine::State state) const
{
if (!fStateChangeSignal.empty())
{
fStateChangeSignal(state);
}
}
function<void(void)> fInitWrapperHandler;
function<void(void)> fInitTaskWrapperHandler;
function<void(void)> fRunWrapperHandler;
function<void(void)> fPauseWrapperHandler;
function<void(void)> fResetWrapperHandler;
function<void(void)> fResetTaskWrapperHandler;
function<void(void)> fExitHandler;
function<void(void)> fUnblockHandler;
// function to execute user states in a worker thread
function<void(void)> fWork;
condition_variable fWorkAvailableCondition;
condition_variable fWorkDoneCondition;
mutex fWorkMutex;
bool fWorkerTerminated;
bool fWorkActive;
bool fWorkAvailable;
boost::signals2::signal<void(const FairMQStateMachine::State)> fStateChangeSignal;
unordered_map<string, boost::signals2::connection> fStateChangeSignalsMap;
atomic<bool> fTerminationRequested;
atomic<FairMQStateMachine::State> fState;
private:
void Worker()
{
while (true)
{
{
unique_lock<mutex> lock(fWorkMutex);
// Wait for work to be done.
while (!fWorkAvailable && !fWorkerTerminated)
{
fWorkAvailableCondition.wait(lock);
}
if (fWorkerTerminated)
{
break;
}
fWorkActive = true;
}
fWork();
{
lock_guard<mutex> lock(fWorkMutex);
fWorkActive = false;
fWorkAvailable = false;
fWorkDoneCondition.notify_one();
}
CallStateChangeCallbacks(fState);
}
}
// run state handlers in a separate thread
thread fWorkerThread;
}; // Machine_
using FairMQFSM = boost::msm::back::state_machine<Machine_>;
// reactivate the warning for non-virtual destructor
#if defined(__clang__)
_Pragma("clang diagnostic pop")
#elif defined(__GNUC__) || defined(__GNUG__)
_Pragma("GCC diagnostic pop")
#endif
} // namespace fsm
} // namespace mq
} // namespace fair
using namespace fair::mq::fsm;
FairMQStateMachine::FairMQStateMachine()
: fFsm(new FairMQFSM)
, fChangeStateMutex()
{
static_pointer_cast<FairMQFSM>(fFsm)->fInitWrapperHandler = bind(&FairMQStateMachine::InitWrapper, this);
static_pointer_cast<FairMQFSM>(fFsm)->fInitTaskWrapperHandler = bind(&FairMQStateMachine::InitTaskWrapper, this);
static_pointer_cast<FairMQFSM>(fFsm)->fRunWrapperHandler = bind(&FairMQStateMachine::RunWrapper, this);
static_pointer_cast<FairMQFSM>(fFsm)->fPauseWrapperHandler = bind(&FairMQStateMachine::PauseWrapper, this);
static_pointer_cast<FairMQFSM>(fFsm)->fResetWrapperHandler = bind(&FairMQStateMachine::ResetWrapper, this);
static_pointer_cast<FairMQFSM>(fFsm)->fResetTaskWrapperHandler = bind(&FairMQStateMachine::ResetTaskWrapper, this);
static_pointer_cast<FairMQFSM>(fFsm)->fExitHandler = bind(&FairMQStateMachine::Exit, this);
static_pointer_cast<FairMQFSM>(fFsm)->fUnblockHandler = bind(&FairMQStateMachine::Unblock, this);
static_pointer_cast<FairMQFSM>(fFsm)->start();
}
FairMQStateMachine::~FairMQStateMachine()
{
stop();
static_pointer_cast<FairMQFSM>(fFsm)->stop();
}
int FairMQStateMachine::GetInterfaceVersion() const
@@ -37,85 +554,85 @@ bool FairMQStateMachine::ChangeState(int event)
{
case INIT_DEVICE:
{
std::lock_guard<std::mutex> lock(fChangeStateMutex);
process_event(fair::mq::fsm::INIT_DEVICE());
lock_guard<mutex> lock(fChangeStateMutex);
static_pointer_cast<FairMQFSM>(fFsm)->process_event(INIT_DEVICE_E());
return true;
}
case internal_DEVICE_READY:
{
std::lock_guard<std::mutex> lock(fChangeStateMutex);
process_event(fair::mq::fsm::internal_DEVICE_READY());
lock_guard<mutex> lock(fChangeStateMutex);
static_pointer_cast<FairMQFSM>(fFsm)->process_event(internal_DEVICE_READY_E());
return true;
}
case INIT_TASK:
{
std::lock_guard<std::mutex> lock(fChangeStateMutex);
process_event(fair::mq::fsm::INIT_TASK());
lock_guard<mutex> lock(fChangeStateMutex);
static_pointer_cast<FairMQFSM>(fFsm)->process_event(INIT_TASK_E());
return true;
}
case internal_READY:
{
std::lock_guard<std::mutex> lock(fChangeStateMutex);
process_event(fair::mq::fsm::internal_READY());
lock_guard<mutex> lock(fChangeStateMutex);
static_pointer_cast<FairMQFSM>(fFsm)->process_event(internal_READY_E());
return true;
}
case RUN:
{
std::lock_guard<std::mutex> lock(fChangeStateMutex);
process_event(fair::mq::fsm::RUN());
lock_guard<mutex> lock(fChangeStateMutex);
static_pointer_cast<FairMQFSM>(fFsm)->process_event(RUN_E());
return true;
}
case PAUSE:
{
std::lock_guard<std::mutex> lock(fChangeStateMutex);
process_event(fair::mq::fsm::PAUSE());
lock_guard<mutex> lock(fChangeStateMutex);
static_pointer_cast<FairMQFSM>(fFsm)->process_event(PAUSE_E());
return true;
}
case STOP:
{
std::lock_guard<std::mutex> lock(fChangeStateMutex);
process_event(fair::mq::fsm::STOP());
lock_guard<mutex> lock(fChangeStateMutex);
static_pointer_cast<FairMQFSM>(fFsm)->process_event(STOP_E());
return true;
}
case RESET_DEVICE:
{
std::lock_guard<std::mutex> lock(fChangeStateMutex);
process_event(fair::mq::fsm::RESET_DEVICE());
lock_guard<mutex> lock(fChangeStateMutex);
static_pointer_cast<FairMQFSM>(fFsm)->process_event(RESET_DEVICE_E());
return true;
}
case RESET_TASK:
{
std::lock_guard<std::mutex> lock(fChangeStateMutex);
process_event(fair::mq::fsm::RESET_TASK());
lock_guard<mutex> lock(fChangeStateMutex);
static_pointer_cast<FairMQFSM>(fFsm)->process_event(RESET_TASK_E());
return true;
}
case internal_IDLE:
{
std::lock_guard<std::mutex> lock(fChangeStateMutex);
process_event(fair::mq::fsm::internal_IDLE());
lock_guard<mutex> lock(fChangeStateMutex);
static_pointer_cast<FairMQFSM>(fFsm)->process_event(internal_IDLE_E());
return true;
}
case END:
{
std::lock_guard<std::mutex> lock(fChangeStateMutex);
process_event(fair::mq::fsm::END());
lock_guard<mutex> lock(fChangeStateMutex);
static_pointer_cast<FairMQFSM>(fFsm)->process_event(END_E());
return true;
}
case ERROR_FOUND:
{
std::lock_guard<std::mutex> lock(fChangeStateMutex);
process_event(fair::mq::fsm::ERROR_FOUND());
lock_guard<mutex> lock(fChangeStateMutex);
static_pointer_cast<FairMQFSM>(fFsm)->process_event(ERROR_FOUND_E());
return true;
}
default:
{
LOG(error) << "Requested state transition with an unsupported event: " << event << std::endl
LOG(error) << "Requested state transition with an unsupported event: " << event << endl
<< "Supported are: INIT_DEVICE, INIT_TASK, RUN, PAUSE, STOP, RESET_TASK, RESET_DEVICE, END, ERROR_FOUND";
return false;
}
}
}
catch (std::exception& e)
catch (exception& e)
{
LOG(error) << "Exception in FairMQStateMachine::ChangeState(): " << e.what();
exit(EXIT_FAILURE);
@@ -123,7 +640,7 @@ bool FairMQStateMachine::ChangeState(int event)
return false;
}
bool FairMQStateMachine::ChangeState(const std::string& event)
bool FairMQStateMachine::ChangeState(const string& event)
{
return ChangeState(GetEventNumber(event));
}
@@ -140,10 +657,10 @@ void FairMQStateMachine::WaitForEndOfState(int event)
case RESET_TASK:
case RESET_DEVICE:
{
std::unique_lock<std::mutex> lock(fWorkMutex);
while (fWorkActive || fWorkAvailable)
unique_lock<mutex> lock(static_pointer_cast<FairMQFSM>(fFsm)->fWorkMutex);
while (static_pointer_cast<FairMQFSM>(fFsm)->fWorkActive || static_pointer_cast<FairMQFSM>(fFsm)->fWorkAvailable)
{
fWorkDoneCondition.wait_for(lock, std::chrono::seconds(1));
static_pointer_cast<FairMQFSM>(fFsm)->fWorkDoneCondition.wait_for(lock, chrono::seconds(1));
}
break;
@@ -153,13 +670,13 @@ void FairMQStateMachine::WaitForEndOfState(int event)
break;
}
}
catch (std::exception& e)
catch (exception& e)
{
LOG(error) << "Exception in FairMQStateMachine::WaitForEndOfState(): " << e.what();
}
}
void FairMQStateMachine::WaitForEndOfState(const std::string& event)
void FairMQStateMachine::WaitForEndOfState(const string& event)
{
return WaitForEndOfState(GetEventNumber(event));
}
@@ -176,11 +693,11 @@ bool FairMQStateMachine::WaitForEndOfStateForMs(int event, int durationInMs)
case RESET_TASK:
case RESET_DEVICE:
{
std::unique_lock<std::mutex> lock(fWorkMutex);
while (fWorkActive || fWorkAvailable)
unique_lock<mutex> lock(static_pointer_cast<FairMQFSM>(fFsm)->fWorkMutex);
while (static_pointer_cast<FairMQFSM>(fFsm)->fWorkActive || static_pointer_cast<FairMQFSM>(fFsm)->fWorkAvailable)
{
fWorkDoneCondition.wait_for(lock, std::chrono::milliseconds(durationInMs));
if (fWorkActive)
static_pointer_cast<FairMQFSM>(fFsm)->fWorkDoneCondition.wait_for(lock, chrono::milliseconds(durationInMs));
if (static_pointer_cast<FairMQFSM>(fFsm)->fWorkActive)
{
return false;
}
@@ -192,32 +709,59 @@ bool FairMQStateMachine::WaitForEndOfStateForMs(int event, int durationInMs)
return false;
}
}
catch (std::exception& e)
catch (exception& e)
{
LOG(error) << "Exception in FairMQStateMachine::WaitForEndOfStateForMs(): " << e.what();
}
return false;
}
bool FairMQStateMachine::WaitForEndOfStateForMs(const std::string& event, int durationInMs)
bool FairMQStateMachine::WaitForEndOfStateForMs(const string& event, int durationInMs)
{
return WaitForEndOfStateForMs(GetEventNumber(event), durationInMs);
}
void FairMQStateMachine::SubscribeToStateChange(const std::string& key, std::function<void(const State)> callback)
void FairMQStateMachine::SubscribeToStateChange(const string& key, function<void(const State)> callback)
{
fStateChangeSignalsMap.insert({key, fStateChangeSignal.connect(callback)});
static_pointer_cast<FairMQFSM>(fFsm)->fStateChangeSignalsMap.insert({key, static_pointer_cast<FairMQFSM>(fFsm)->fStateChangeSignal.connect(callback)});
}
void FairMQStateMachine::UnsubscribeFromStateChange(const std::string& key)
void FairMQStateMachine::UnsubscribeFromStateChange(const string& key)
{
if (fStateChangeSignalsMap.count(key))
if (static_pointer_cast<FairMQFSM>(fFsm)->fStateChangeSignalsMap.count(key))
{
fStateChangeSignalsMap.at(key).disconnect();
fStateChangeSignalsMap.erase(key);
static_pointer_cast<FairMQFSM>(fFsm)->fStateChangeSignalsMap.at(key).disconnect();
static_pointer_cast<FairMQFSM>(fFsm)->fStateChangeSignalsMap.erase(key);
}
}
int FairMQStateMachine::GetEventNumber(const std::string& event)
void FairMQStateMachine::CallStateChangeCallbacks(const State state) const
{
static_pointer_cast<FairMQFSM>(fFsm)->CallStateChangeCallbacks(state);
}
string FairMQStateMachine::GetCurrentStateName() const
{
return static_pointer_cast<FairMQFSM>(fFsm)->GetStateName(static_pointer_cast<FairMQFSM>(fFsm)->fState);
}
int FairMQStateMachine::GetCurrentState() const
{
return static_pointer_cast<FairMQFSM>(fFsm)->fState;
}
bool FairMQStateMachine::CheckCurrentState(int state) const
{
return state == static_pointer_cast<FairMQFSM>(fFsm)->fState;
}
bool FairMQStateMachine::CheckCurrentState(string state) const
{
return state == GetCurrentStateName();
}
bool FairMQStateMachine::Terminated()
{
return static_pointer_cast<FairMQFSM>(fFsm)->fTerminationRequested;
}
int FairMQStateMachine::GetEventNumber(const string& event)
{
if (event == "INIT_DEVICE") return INIT_DEVICE;
if (event == "INIT_TASK") return INIT_TASK;
@@ -228,7 +772,7 @@ int FairMQStateMachine::GetEventNumber(const std::string& event)
if (event == "RESET_TASK") return RESET_TASK;
if (event == "END") return END;
if (event == "ERROR_FOUND") return ERROR_FOUND;
LOG(error) << "Requested number for non-existent event... " << event << std::endl
LOG(error) << "Requested number for non-existent event... " << event << endl
<< "Supported are: INIT_DEVICE, INIT_TASK, RUN, PAUSE, STOP, RESET_DEVICE, RESET_TASK, END, ERROR_FOUND";
return -1;
}

View File

@@ -17,555 +17,14 @@
#define FAIRMQ_INTERFACE_VERSION 3
#include <string>
#include <atomic>
#include <mutex>
#include <condition_variable>
#include <thread>
#include <chrono>
#include <functional>
#include <unordered_map>
// Increase maximum number of boost::msm states (default is 10)
// This #define has to be before any msm header includes
#define FUSION_MAX_VECTOR_SIZE 20
#include <boost/mpl/for_each.hpp>
#include <boost/msm/back/state_machine.hpp>
#include <boost/msm/back/tools.hpp>
#include <boost/msm/back/metafunctions.hpp>
#include <boost/msm/front/state_machine_def.hpp>
#include <boost/msm/front/functor_row.hpp>
#include <boost/signals2.hpp> // signal/slot for onStateChange callbacks
#include "FairMQLogger.h"
namespace msmf = boost::msm::front;
#include <string>
#include <memory>
#include <functional>
#include <mutex>
namespace fair
{
namespace mq
{
namespace fsm
{
// defining events for the boost MSM state machine
struct INIT_DEVICE { std::string name() const { return "INIT_DEVICE"; } };
struct internal_DEVICE_READY { std::string name() const { return "internal_DEVICE_READY"; } };
struct INIT_TASK { std::string name() const { return "INIT_TASK"; } };
struct internal_READY { std::string name() const { return "internal_READY"; } };
struct RUN { std::string name() const { return "RUN"; } };
struct PAUSE { std::string name() const { return "PAUSE"; } };
struct STOP { std::string name() const { return "STOP"; } };
struct RESET_TASK { std::string name() const { return "RESET_TASK"; } };
struct RESET_DEVICE { std::string name() const { return "RESET_DEVICE"; } };
struct internal_IDLE { std::string name() const { return "internal_IDLE"; } };
struct END { std::string name() const { return "END"; } };
struct ERROR_FOUND { std::string name() const { return "ERROR_FOUND"; } };
// deactivate the warning for non-virtual destructor thrown in the boost library
#if defined(__clang__)
_Pragma("clang diagnostic push")
_Pragma("clang diagnostic ignored \"-Wnon-virtual-dtor\"")
#elif defined(__GNUC__) || defined(__GNUG__)
_Pragma("GCC diagnostic push")
_Pragma("GCC diagnostic ignored \"-Wnon-virtual-dtor\"")
#endif
// defining the boost MSM state machine
struct FairMQFSM : public msmf::state_machine_def<FairMQFSM>
{
public:
FairMQFSM()
: fState()
, fChangeStateMutex()
, fWork()
, fWorkAvailableCondition()
, fWorkDoneCondition()
, fWorkMutex()
, fWorkerTerminated(false)
, fWorkActive(false)
, fWorkAvailable(false)
, fStateChangeSignal()
, fStateChangeSignalsMap()
, fTerminationRequested(false)
, fWorkerThread()
{}
virtual ~FairMQFSM()
{}
template<typename Event, typename FSM>
void on_entry(Event const&, FSM& fsm)
{
LOG(state) << "Starting FairMQ state machine";
fState = IDLE;
fsm.CallStateChangeCallbacks(IDLE);
// start a worker thread to execute user states in.
fsm.fWorkerThread = std::thread(&FairMQFSM::Worker, &fsm);
}
template<typename Event, typename FSM>
void on_exit(Event const&, FSM& /*fsm*/)
{
LOG(state) << "Exiting FairMQ state machine";
}
// list of FSM states
struct OK_FSM : public msmf::state<> {};
struct ERROR_FSM : public msmf::terminate_state<> {};
struct IDLE_FSM : public msmf::state<> {};
struct INITIALIZING_DEVICE_FSM : public msmf::state<> {};
struct DEVICE_READY_FSM : public msmf::state<> {};
struct INITIALIZING_TASK_FSM : public msmf::state<> {};
struct READY_FSM : public msmf::state<> {};
struct RUNNING_FSM : public msmf::state<> {};
struct PAUSED_FSM : public msmf::state<> {};
struct RESETTING_TASK_FSM : public msmf::state<> {};
struct RESETTING_DEVICE_FSM : public msmf::state<> {};
struct EXITING_FSM : public msmf::state<> {};
// initial states
using initial_state = boost::mpl::vector<IDLE_FSM, OK_FSM>;
// actions
struct IdleFct
{
template<typename EVT, typename FSM, typename SourceState, typename TargetState>
void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&)
{
LOG(state) << "Entering IDLE state";
fsm.fState = IDLE;
}
};
struct InitDeviceFct
{
template<typename EVT, typename FSM, typename SourceState, typename TargetState>
void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&)
{
fsm.fState = INITIALIZING_DEVICE;
std::unique_lock<std::mutex> lock(fsm.fWorkMutex);
while (fsm.fWorkActive)
{
fsm.fWorkDoneCondition.wait(lock);
}
fsm.fWorkAvailable = true;
LOG(state) << "Entering INITIALIZING DEVICE state";
fsm.fWork = std::bind(&FairMQFSM::InitWrapper, &fsm);
fsm.fWorkAvailableCondition.notify_one();
}
};
struct DeviceReadyFct
{
template<typename EVT, typename FSM, typename SourceState, typename TargetState>
void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&)
{
LOG(state) << "Entering DEVICE READY state";
fsm.fState = DEVICE_READY;
}
};
struct InitTaskFct
{
template<typename EVT, typename FSM, typename SourceState, typename TargetState>
void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&)
{
fsm.fState = INITIALIZING_TASK;
std::unique_lock<std::mutex> lock(fsm.fWorkMutex);
while (fsm.fWorkActive)
{
fsm.fWorkDoneCondition.wait(lock);
}
fsm.fWorkAvailable = true;
LOG(state) << "Entering INITIALIZING TASK state";
fsm.fWork = std::bind(&FairMQFSM::InitTaskWrapper, &fsm);
fsm.fWorkAvailableCondition.notify_one();
}
};
struct ReadyFct
{
template<typename EVT, typename FSM, typename SourceState, typename TargetState>
void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&)
{
LOG(state) << "Entering READY state";
fsm.fState = READY;
}
};
struct RunFct
{
template<typename EVT, typename FSM, typename SourceState, typename TargetState>
void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&)
{
fsm.fState = RUNNING;
std::unique_lock<std::mutex> lock(fsm.fWorkMutex);
while (fsm.fWorkActive)
{
fsm.fWorkDoneCondition.wait(lock);
}
fsm.fWorkAvailable = true;
LOG(state) << "Entering RUNNING state";
fsm.fWork = std::bind(&FairMQFSM::RunWrapper, &fsm);
fsm.fWorkAvailableCondition.notify_one();
}
};
struct PauseFct
{
template<typename EVT, typename FSM, typename SourceState, typename TargetState>
void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&)
{
fsm.fState = PAUSED;
fsm.Unblock();
std::unique_lock<std::mutex> lock(fsm.fWorkMutex);
while (fsm.fWorkActive)
{
fsm.fWorkDoneCondition.wait(lock);
}
fsm.fWorkAvailable = true;
LOG(state) << "Entering PAUSED state";
fsm.fWork = std::bind(&FairMQFSM::PauseWrapper, &fsm);
fsm.fWorkAvailableCondition.notify_one();
}
};
struct ResumeFct
{
template<typename EVT, typename FSM, typename SourceState, typename TargetState>
void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&)
{
fsm.fState = RUNNING;
std::unique_lock<std::mutex> lock(fsm.fWorkMutex);
while (fsm.fWorkActive)
{
fsm.fWorkDoneCondition.wait(lock);
}
fsm.fWorkAvailable = true;
LOG(state) << "Entering RUNNING state";
fsm.fWork = std::bind(&FairMQFSM::RunWrapper, &fsm);
fsm.fWorkAvailableCondition.notify_one();
}
};
struct StopFct
{
template<typename EVT, typename FSM, typename SourceState, typename TargetState>
void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&)
{
fsm.fState = READY;
fsm.Unblock();
std::unique_lock<std::mutex> lock(fsm.fWorkMutex);
while (fsm.fWorkActive)
{
fsm.fWorkDoneCondition.wait(lock);
}
LOG(state) << "Entering READY state";
}
};
struct InternalStopFct
{
template<typename EVT, typename FSM, typename SourceState, typename TargetState>
void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&)
{
fsm.fState = READY;
fsm.Unblock();
LOG(state) << "RUNNING state finished without an external event, entering READY state";
}
};
struct ResetTaskFct
{
template<typename EVT, typename FSM, typename SourceState, typename TargetState>
void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&)
{
fsm.fState = RESETTING_TASK;
std::unique_lock<std::mutex> lock(fsm.fWorkMutex);
while (fsm.fWorkActive)
{
fsm.fWorkDoneCondition.wait(lock);
}
fsm.fWorkAvailable = true;
LOG(state) << "Entering RESETTING TASK state";
fsm.fWork = std::bind(&FairMQFSM::ResetTaskWrapper, &fsm);
fsm.fWorkAvailableCondition.notify_one();
}
};
struct ResetDeviceFct
{
template<typename EVT, typename FSM, typename SourceState, typename TargetState>
void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&)
{
fsm.fState = RESETTING_DEVICE;
std::unique_lock<std::mutex> lock(fsm.fWorkMutex);
while (fsm.fWorkActive)
{
fsm.fWorkDoneCondition.wait(lock);
}
fsm.fWorkAvailable = true;
LOG(state) << "Entering RESETTING DEVICE state";
fsm.fWork = std::bind(&FairMQFSM::ResetWrapper, &fsm);
fsm.fWorkAvailableCondition.notify_one();
}
};
struct ExitingFct
{
template<typename EVT, typename FSM, typename SourceState, typename TargetState>
void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&)
{
LOG(state) << "Entering EXITING state";
fsm.fState = EXITING;
fsm.fTerminationRequested = true;
fsm.CallStateChangeCallbacks(EXITING);
// terminate worker thread
{
std::lock_guard<std::mutex> lock(fsm.fWorkMutex);
fsm.fWorkerTerminated = true;
fsm.fWorkAvailableCondition.notify_one();
}
// join the worker thread (executing user states)
if (fsm.fWorkerThread.joinable())
{
fsm.fWorkerThread.join();
}
fsm.Exit();
}
};
struct ErrorFoundFct
{
template<typename EVT, typename FSM, typename SourceState, typename TargetState>
void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&)
{
LOG(state) << "Entering ERROR state";
fsm.fState = Error;
fsm.CallStateChangeCallbacks(Error);
}
};
// Transition table for FairMQFSM
struct transition_table : boost::mpl::vector<
// Start Event Next Action Guard
msmf::Row<IDLE_FSM, INIT_DEVICE, INITIALIZING_DEVICE_FSM, InitDeviceFct, msmf::none>,
msmf::Row<IDLE_FSM, END, EXITING_FSM, ExitingFct, msmf::none>,
msmf::Row<INITIALIZING_DEVICE_FSM, internal_DEVICE_READY, DEVICE_READY_FSM, DeviceReadyFct, msmf::none>,
msmf::Row<DEVICE_READY_FSM, INIT_TASK, INITIALIZING_TASK_FSM, InitTaskFct, msmf::none>,
msmf::Row<DEVICE_READY_FSM, RESET_DEVICE, RESETTING_DEVICE_FSM, ResetDeviceFct, msmf::none>,
msmf::Row<INITIALIZING_TASK_FSM, internal_READY, READY_FSM, ReadyFct, msmf::none>,
msmf::Row<READY_FSM, RUN, RUNNING_FSM, RunFct, msmf::none>,
msmf::Row<READY_FSM, RESET_TASK, RESETTING_TASK_FSM, ResetTaskFct, msmf::none>,
msmf::Row<RUNNING_FSM, PAUSE, PAUSED_FSM, PauseFct, msmf::none>,
msmf::Row<RUNNING_FSM, STOP, READY_FSM, StopFct, msmf::none>,
msmf::Row<RUNNING_FSM, internal_READY, READY_FSM, InternalStopFct, msmf::none>,
msmf::Row<PAUSED_FSM, RUN, RUNNING_FSM, ResumeFct, msmf::none>,
msmf::Row<RESETTING_TASK_FSM, internal_DEVICE_READY, DEVICE_READY_FSM, DeviceReadyFct, msmf::none>,
msmf::Row<RESETTING_DEVICE_FSM, internal_IDLE, IDLE_FSM, IdleFct, msmf::none>,
msmf::Row<OK_FSM, ERROR_FOUND, ERROR_FSM, ErrorFoundFct, msmf::none>>
{};
// replaces the default no-transition response.
template<typename FSM, typename Event>
void no_transition(Event const& e, FSM&, int state)
{
using recursive_stt = typename boost::msm::back::recursive_get_transition_table<FSM>::type;
using all_states = typename boost::msm::back::generate_state_set<recursive_stt>::type;
std::string stateName;
boost::mpl::for_each<all_states, boost::msm::wrap<boost::mpl::placeholders::_1>>(boost::msm::back::get_state_name<recursive_stt>(stateName, state));
stateName = stateName.substr(24);
std::size_t pos = stateName.find("_FSME");
stateName.erase(pos);
if (stateName == "1RUNNING" || stateName == "6DEVICE_READY" || stateName == "0PAUSED" || stateName == "8RESETTING_TASK" || stateName == "0RESETTING_DEVICE")
{
stateName = stateName.substr(1);
}
if (stateName != "OK")
{
LOG(state) << "No transition from state " << stateName << " on event " << e.name();
}
// LOG(state) << "no transition from state " << GetStateName(state) << " (" << state << ") on event " << e.name();
}
// backward compatibility to FairMQStateMachine
enum State
{
OK,
Error,
IDLE,
INITIALIZING_DEVICE,
DEVICE_READY,
INITIALIZING_TASK,
READY,
RUNNING,
PAUSED,
RESETTING_TASK,
RESETTING_DEVICE,
EXITING
};
static std::string GetStateName(const int state)
{
switch(state)
{
case OK:
return "OK";
case Error:
return "Error";
case IDLE:
return "IDLE";
case INITIALIZING_DEVICE:
return "INITIALIZING_DEVICE";
case DEVICE_READY:
return "DEVICE_READY";
case INITIALIZING_TASK:
return "INITIALIZING_TASK";
case READY:
return "READY";
case RUNNING:
return "RUNNING";
case PAUSED:
return "PAUSED";
case RESETTING_TASK:
return "RESETTING_TASK";
case RESETTING_DEVICE:
return "RESETTING_DEVICE";
case EXITING:
return "EXITING";
default:
return "requested name for non-existent state...";
}
}
std::string GetCurrentStateName() const
{
return GetStateName(fState);
}
int GetCurrentState() const
{
return fState;
}
bool CheckCurrentState(int state) const
{
return state == fState;
}
bool CheckCurrentState(std::string state) const
{
return state == GetCurrentStateName();
}
// actions to be overwritten by derived classes
virtual void InitWrapper() {}
virtual void InitTaskWrapper() {}
virtual void RunWrapper() {}
virtual void PauseWrapper() {}
virtual void ResetWrapper() {}
virtual void ResetTaskWrapper() {}
virtual void Exit() {}
virtual void Unblock() {}
bool Terminated()
{
return fTerminationRequested;
}
protected:
std::atomic<State> fState;
std::mutex fChangeStateMutex;
// function to execute user states in a worker thread
std::function<void(void)> fWork;
std::condition_variable fWorkAvailableCondition;
std::condition_variable fWorkDoneCondition;
std::mutex fWorkMutex;
bool fWorkerTerminated;
bool fWorkActive;
bool fWorkAvailable;
boost::signals2::signal<void(const State)> fStateChangeSignal;
std::unordered_map<std::string, boost::signals2::connection> fStateChangeSignalsMap;
std::atomic<bool> fTerminationRequested;
void CallStateChangeCallbacks(const State state) const
{
if (!fStateChangeSignal.empty())
{
fStateChangeSignal(state);
}
}
private:
void Worker()
{
while (true)
{
{
std::unique_lock<std::mutex> lock(fWorkMutex);
// Wait for work to be done.
while (!fWorkAvailable && !fWorkerTerminated)
{
fWorkAvailableCondition.wait(lock);
}
if (fWorkerTerminated)
{
break;
}
fWorkActive = true;
}
fWork();
{
std::lock_guard<std::mutex> lock(fWorkMutex);
fWorkActive = false;
fWorkAvailable = false;
fWorkDoneCondition.notify_one();
}
CallStateChangeCallbacks(fState);
}
}
// run state handlers in a separate thread
std::thread fWorkerThread;
};
// reactivate the warning for non-virtual destructor
#if defined(__clang__)
_Pragma("clang diagnostic pop")
#elif defined(__GNUC__) || defined(__GNUG__)
_Pragma("GCC diagnostic pop")
#endif
} // namespace fsm
} // namespace mq
} // namespace fair
class FairMQStateMachine : public boost::msm::back::state_machine<fair::mq::fsm::FairMQFSM>
class FairMQStateMachine
{
public:
enum Event
@@ -584,6 +43,22 @@ class FairMQStateMachine : public boost::msm::back::state_machine<fair::mq::fsm:
ERROR_FOUND
};
enum State
{
OK,
Error,
IDLE,
INITIALIZING_DEVICE,
DEVICE_READY,
INITIALIZING_TASK,
READY,
RUNNING,
PAUSED,
RESETTING_TASK,
RESETTING_DEVICE,
EXITING
};
FairMQStateMachine();
virtual ~FairMQStateMachine();
@@ -601,8 +76,30 @@ class FairMQStateMachine : public boost::msm::back::state_machine<fair::mq::fsm:
void SubscribeToStateChange(const std::string& key, std::function<void(const State)> callback);
void UnsubscribeFromStateChange(const std::string& key);
void CallStateChangeCallbacks(const State state) const;
std::string GetCurrentStateName() const;
int GetCurrentState() const;
bool CheckCurrentState(int state) const;
bool CheckCurrentState(std::string state) const;
bool Terminated();
// actions to be overwritten by derived classes
virtual void InitWrapper() {}
virtual void InitTaskWrapper() {}
virtual void RunWrapper() {}
virtual void PauseWrapper() {}
virtual void ResetWrapper() {}
virtual void ResetTaskWrapper() {}
virtual void Exit() {}
virtual void Unblock() {}
private:
int GetEventNumber(const std::string& event);
std::mutex fChangeStateMutex;
std::shared_ptr<void> fFsm;
};
#endif /* FAIRMQSTATEMACHINE_H_ */

View File

@@ -29,7 +29,7 @@ class FairMQTransportFactory
private:
/// Topology wide unique id
const std::string fkId;
public:
/// ctor
/// @param id Topology wide unique id, usually the device id.
@@ -69,7 +69,7 @@ class FairMQTransportFactory
virtual FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback = nullptr) const = 0;
/// Get transport type
virtual FairMQ::Transport GetType() const = 0;
virtual fair::mq::Transport GetType() const = 0;
virtual void Interrupt() = 0;
virtual void Resume() = 0;

View File

@@ -99,6 +99,11 @@ class Plugin
auto SubscribeToPropertyChangeAsString(std::function<void(const std::string& key, std::string newValue)> callback) -> void { fPluginServices->SubscribeToPropertyChangeAsString(fkName, callback); }
auto UnsubscribeFromPropertyChangeAsString() -> void { fPluginServices->UnsubscribeFromPropertyChangeAsString(fkName); }
auto CycleLogConsoleSeverityUp() -> void { fPluginServices->CycleLogConsoleSeverityUp(); }
auto CycleLogConsoleSeverityDown() -> void { fPluginServices->CycleLogConsoleSeverityDown(); }
auto CycleLogVerbosityUp() -> void { fPluginServices->CycleLogVerbosityUp(); }
auto CycleLogVerbosityDown() -> void { fPluginServices->CycleLogVerbosityDown(); }
private:
const std::string fkName;
const Version fkVersion;

View File

@@ -253,6 +253,10 @@ class PluginServices
/// @param subscriber
auto UnsubscribeFromPropertyChangeAsString(const std::string& subscriber) -> void { fConfig->UnsubscribeAsString(subscriber); }
auto CycleLogConsoleSeverityUp() -> void { Logger::CycleConsoleSeverityUp(); }
auto CycleLogConsoleSeverityDown() -> void { Logger::CycleConsoleSeverityDown(); }
auto CycleLogVerbosityUp() -> void { Logger::CycleVerbosityUp(); }
auto CycleLogVerbosityDown() -> void { Logger::CycleVerbosityDown(); }
static const std::unordered_map<std::string, DeviceState> fkDeviceStateStrMap;
static const std::unordered_map<DeviceState, std::string, tools::HashEnum<DeviceState>> fkStrDeviceStateMap;

View File

@@ -15,8 +15,9 @@
#include <string>
#include <unordered_map>
/// TODO deprecate this namespace
namespace FairMQ
namespace fair
{
namespace mq
{
enum class Transport
@@ -28,6 +29,21 @@ enum class Transport
OFI
};
} /* namespace mq */
} /* namespace fair */
namespace std
{
template<>
struct hash<fair::mq::Transport> : fair::mq::tools::HashEnum<fair::mq::Transport> {};
} /* namespace std */
namespace fair
{
namespace mq
{
static std::unordered_map<std::string, Transport> TransportTypes {
{ "default", Transport::DEFAULT },
@@ -37,25 +53,15 @@ static std::unordered_map<std::string, Transport> TransportTypes {
{ "ofi", Transport::OFI }
};
}
namespace fair
{
namespace mq
{
using Transport = ::FairMQ::Transport;
using ::FairMQ::TransportTypes;
static std::unordered_map<Transport, std::string> TransportNames {
{ Transport::DEFAULT, "default" },
{ Transport::ZMQ, "zeromq" },
{ Transport::NN, "nanomsg" },
{ Transport::SHM, "shmem" },
{ Transport::OFI, "ofi" }
};
} /* namespace mq */
} /* namespace fair */
namespace std
{
template<>
struct hash<FairMQ::Transport> : fair::mq::tools::HashEnum<FairMQ::Transport> {};
} /* namespace std */
#endif /* FAIR_MQ_TRANSPORTS_H */

View File

@@ -22,7 +22,7 @@
using namespace std;
FairMQ::Transport FairMQMessageNN::fTransportType = FairMQ::Transport::NN;
fair::mq::Transport FairMQMessageNN::fTransportType = fair::mq::Transport::NN;
FairMQMessageNN::FairMQMessageNN()
: fMessage(nullptr)
@@ -172,7 +172,7 @@ void FairMQMessageNN::SetMessage(void* data, const size_t size)
fSize = size;
}
FairMQ::Transport FairMQMessageNN::GetType() const
fair::mq::Transport FairMQMessageNN::GetType() const
{
return fTransportType;
}

View File

@@ -46,7 +46,7 @@ class FairMQMessageNN : public FairMQMessage
bool SetUsedSize(const size_t size) override;
FairMQ::Transport GetType() const override;
fair::mq::Transport GetType() const override;
void Copy(const FairMQMessage& msg) override;
void Copy(const FairMQMessagePtr& msg) override;
@@ -59,7 +59,7 @@ class FairMQMessageNN : public FairMQMessage
size_t fHint;
bool fReceiving;
FairMQUnmanagedRegion* fRegionPtr;
static FairMQ::Transport fTransportType;
static fair::mq::Transport fTransportType;
void* GetMessage() const;
void CloseMessage();

View File

@@ -12,7 +12,7 @@
using namespace std;
FairMQ::Transport FairMQTransportFactoryNN::fTransportType = FairMQ::Transport::NN;
fair::mq::Transport FairMQTransportFactoryNN::fTransportType = fair::mq::Transport::NN;
FairMQTransportFactoryNN::FairMQTransportFactoryNN(const string& id, const FairMQProgOptions* /*config*/)
: FairMQTransportFactory(id)
@@ -70,7 +70,7 @@ FairMQUnmanagedRegionPtr FairMQTransportFactoryNN::CreateUnmanagedRegion(const s
return unique_ptr<FairMQUnmanagedRegion>(new FairMQUnmanagedRegionNN(size, callback));
}
FairMQ::Transport FairMQTransportFactoryNN::GetType() const
fair::mq::Transport FairMQTransportFactoryNN::GetType() const
{
return fTransportType;
}

View File

@@ -39,13 +39,13 @@ class FairMQTransportFactoryNN : public FairMQTransportFactory
FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback) const override;
FairMQ::Transport GetType() const override;
fair::mq::Transport GetType() const override;
void Interrupt() override { FairMQSocketNN::Interrupt(); }
void Resume() override { FairMQSocketNN::Resume(); }
private:
static FairMQ::Transport fTransportType;
static fair::mq::Transport fTransportType;
};
#endif /* FAIRMQTRANSPORTFACTORYNN_H_ */

View File

@@ -15,6 +15,7 @@
#include "FairMQParser.h"
#include "FairMQLogger.h"
#include <boost/property_tree/json_parser.hpp>
#include <boost/property_tree/ptree.hpp>
using namespace std;
@@ -147,7 +148,7 @@ void ChannelParser(const boost::property_tree::ptree& tree, FairMQMap& channelMa
commonChannel.UpdateType(q.second.get<string>("type", commonChannel.GetType()));
commonChannel.UpdateMethod(q.second.get<string>("method", commonChannel.GetMethod()));
commonChannel.UpdateAddress(q.second.get<string>("address", commonChannel.GetAddress()));
commonChannel.UpdateTransport(q.second.get<string>("transport", commonChannel.GetTransport()));
commonChannel.UpdateTransport(q.second.get<string>("transport", commonChannel.GetTransportName()));
commonChannel.UpdateSndBufSize(q.second.get<int>("sndBufSize", commonChannel.GetSndBufSize()));
commonChannel.UpdateRcvBufSize(q.second.get<int>("rcvBufSize", commonChannel.GetRcvBufSize()));
commonChannel.UpdateSndKernelSize(q.second.get<int>("sndKernelSize", commonChannel.GetSndKernelSize()));
@@ -166,7 +167,7 @@ void ChannelParser(const boost::property_tree::ptree& tree, FairMQMap& channelMa
LOG(debug) << "\ttype = " << commonChannel.GetType();
LOG(debug) << "\tmethod = " << commonChannel.GetMethod();
LOG(debug) << "\taddress = " << commonChannel.GetAddress();
LOG(debug) << "\ttransport = " << commonChannel.GetTransport();
LOG(debug) << "\ttransport = " << commonChannel.GetTransportName();
LOG(debug) << "\tsndBufSize = " << commonChannel.GetSndBufSize();
LOG(debug) << "\trcvBufSize = " << commonChannel.GetRcvBufSize();
LOG(debug) << "\tsndKernelSize = " << commonChannel.GetSndKernelSize();
@@ -208,7 +209,7 @@ void SocketParser(const boost::property_tree::ptree& tree, vector<FairMQChannel>
channel.UpdateType(q.second.get<string>("type", channel.GetType()));
channel.UpdateMethod(q.second.get<string>("method", channel.GetMethod()));
channel.UpdateAddress(q.second.get<string>("address", channel.GetAddress()));
channel.UpdateTransport(q.second.get<string>("transport", channel.GetTransport()));
channel.UpdateTransport(q.second.get<string>("transport", channel.GetTransportName()));
channel.UpdateSndBufSize(q.second.get<int>("sndBufSize", channel.GetSndBufSize()));
channel.UpdateRcvBufSize(q.second.get<int>("rcvBufSize", channel.GetRcvBufSize()));
channel.UpdateSndKernelSize(q.second.get<int>("sndKernelSize", channel.GetSndKernelSize()));
@@ -219,7 +220,7 @@ void SocketParser(const boost::property_tree::ptree& tree, vector<FairMQChannel>
LOG(debug) << "\ttype = " << channel.GetType();
LOG(debug) << "\tmethod = " << channel.GetMethod();
LOG(debug) << "\taddress = " << channel.GetAddress();
LOG(debug) << "\ttransport = " << channel.GetTransport();
LOG(debug) << "\ttransport = " << channel.GetTransportName();
LOG(debug) << "\tsndBufSize = " << channel.GetSndBufSize();
LOG(debug) << "\trcvBufSize = " << channel.GetRcvBufSize();
LOG(debug) << "\tsndKernelSize = " << channel.GetSndKernelSize();
@@ -247,7 +248,7 @@ void SocketParser(const boost::property_tree::ptree& tree, vector<FairMQChannel>
LOG(debug) << "\ttype = " << channel.GetType();
LOG(debug) << "\tmethod = " << channel.GetMethod();
LOG(debug) << "\taddress = " << channel.GetAddress();
LOG(debug) << "\ttransport = " << channel.GetTransport();
LOG(debug) << "\ttransport = " << channel.GetTransportName();
LOG(debug) << "\tsndBufSize = " << channel.GetSndBufSize();
LOG(debug) << "\trcvBufSize = " << channel.GetRcvBufSize();
LOG(debug) << "\tsndKernelSize = " << channel.GetSndKernelSize();

View File

@@ -13,7 +13,7 @@
#include <map>
#include <unordered_map>
#include <boost/property_tree/ptree.hpp>
#include <boost/property_tree/ptree_fwd.hpp>
#include "FairMQChannel.h"

View File

@@ -196,7 +196,7 @@ void FairMQProgOptions::UpdateMQValues()
UpdateVarMap<string>(typeKey, channel.GetType());
UpdateVarMap<string>(methodKey, channel.GetMethod());
UpdateVarMap<string>(addressKey, channel.GetAddress());
UpdateVarMap<string>(transportKey, channel.GetTransport());
UpdateVarMap<string>(transportKey, channel.GetTransportName());
UpdateVarMap<int>(sndBufSizeKey, channel.GetSndBufSize());
UpdateVarMap<int>(rcvBufSizeKey, channel.GetRcvBufSize());
UpdateVarMap<int>(sndKernelSizeKey, channel.GetSndKernelSize());

View File

@@ -107,6 +107,7 @@ auto Control::InteractiveMode() -> void
struct termios t;
tcgetattr(STDIN_FILENO, &t); // get the current terminal I/O structure
t.c_lflag &= ~ICANON; // disable canonical input
t.c_lflag &= ~ECHO; // do not echo input chars
tcsetattr(STDIN_FILENO, TCSANOW, &t); // apply the new settings
PrintInteractiveHelp();
@@ -154,6 +155,22 @@ auto Control::InteractiveMode() -> void
LOG(info) << "\n\n --> [d] reset device\n";
ChangeDeviceState(DeviceStateTransition::ResetDevice);
break;
case 'k':
LOG(info) << "\n\n --> [k] increase log severity\n";
CycleLogConsoleSeverityUp();
break;
case 'l':
LOG(info) << "\n\n --> [l] decrease log severity\n";
CycleLogConsoleSeverityDown();
break;
case 'n':
LOG(info) << "\n\n --> [n] increase log verbosity\n";
CycleLogVerbosityUp();
break;
case 'm':
LOG(info) << "\n\n --> [m] decrease log verbosity\n";
CycleLogVerbosityDown();
break;
case 'h':
LOG(info) << "\n\n --> [h] help\n";
PrintInteractiveHelp();
@@ -181,6 +198,7 @@ auto Control::InteractiveMode() -> void
tcgetattr(STDIN_FILENO, &t); // get the current terminal I/O structure
t.c_lflag |= ICANON; // re-enable canonical input
t.c_lflag |= ECHO; // echo input chars
tcsetattr(STDIN_FILENO, TCSANOW, &t); // apply the new settings
if (!fDeviceTerminationRequested)
@@ -197,8 +215,11 @@ auto Control::InteractiveMode() -> void
auto Control::PrintInteractiveHelp() -> void
{
LOG(info) << "Use keys to control the state machine:\n\n"
<< "[h] help, [p] pause, [r] run, [s] stop, [t] reset task, [d] reset device, [q] end, [j] init task, [i] init device\n";
stringstream ss;
ss << "\nFollowing control commands are available:\n\n"
<< "[h] help, [p] pause, [r] run, [s] stop, [t] reset task, [d] reset device, [q] end, [j] init task, [i] init device\n"
<< "[k] increase log severity [l] decrease log severity [n] increase log verbosity [m] decrease log verbosity\n\n";
cout << ss.str() << flush;
}
auto Control::WaitForNextState() -> DeviceState

View File

@@ -12,6 +12,7 @@
#include <boost/algorithm/string/join.hpp>
#include <boost/algorithm/string/split.hpp>
#include <boost/algorithm/string/classification.hpp>
#include <termios.h> // for the interactive mode
#include <poll.h> // for the interactive mode

View File

@@ -23,7 +23,7 @@ namespace bipc = boost::interprocess;
namespace bpt = boost::posix_time;
atomic<bool> FairMQMessageSHM::fInterrupted(false);
FairMQ::Transport FairMQMessageSHM::fTransportType = FairMQ::Transport::SHM;
fair::mq::Transport FairMQMessageSHM::fTransportType = fair::mq::Transport::SHM;
FairMQMessageSHM::FairMQMessageSHM(Manager& manager)
: fManager(manager)
@@ -284,7 +284,7 @@ bool FairMQMessageSHM::SetUsedSize(const size_t size)
}
}
FairMQ::Transport FairMQMessageSHM::GetType() const
fair::mq::Transport FairMQMessageSHM::GetType() const
{
return fTransportType;
}

View File

@@ -44,7 +44,7 @@ class FairMQMessageSHM : public FairMQMessage
bool SetUsedSize(const size_t size) override;
FairMQ::Transport GetType() const override;
fair::mq::Transport GetType() const override;
void Copy(const FairMQMessage& msg) override;
void Copy(const FairMQMessagePtr& msg) override;
@@ -57,7 +57,7 @@ class FairMQMessageSHM : public FairMQMessage
bool fQueued;
bool fMetaCreated;
static std::atomic<bool> fInterrupted;
static FairMQ::Transport fTransportType;
static fair::mq::Transport fTransportType;
size_t fRegionId;
mutable fair::mq::shmem::Region* fRegionPtr;
boost::interprocess::managed_shared_memory::handle_t fHandle;

View File

@@ -32,7 +32,7 @@ namespace bfs = boost::filesystem;
namespace bpt = boost::posix_time;
namespace bipc = boost::interprocess;
FairMQ::Transport FairMQTransportFactorySHM::fTransportType = FairMQ::Transport::SHM;
fair::mq::Transport FairMQTransportFactorySHM::fTransportType = fair::mq::Transport::SHM;
FairMQTransportFactorySHM::FairMQTransportFactorySHM(const string& id, const FairMQProgOptions* config)
: FairMQTransportFactory(id)
@@ -76,7 +76,7 @@ FairMQTransportFactorySHM::FairMQTransportFactorySHM(const string& id, const Fai
try
{
fShMutex = fair::mq::tools::make_unique<bipc::named_mutex>(bipc::open_or_create, string("fmq_shm_" + fSessionName + "_mutex").c_str());
fShMutex = fair::mq::tools::make_unique<bipc::named_mutex>(bipc::open_or_create, string("fmq_" + fSessionName + "_mtx").c_str());
if (zmq_ctx_set(fContext, ZMQ_IO_THREADS, numIoThreads) != 0)
{
@@ -90,7 +90,7 @@ FairMQTransportFactorySHM::FairMQTransportFactorySHM(const string& id, const Fai
}
fManager = fair::mq::tools::make_unique<Manager>(fSessionName, segmentSize);
LOG(debug) << "created/opened shared memory segment '" << "fmq_shm_" << fSessionName << "_main" << "' of " << segmentSize << " bytes. Available are " << fManager->Segment().get_free_memory() << " bytes.";
LOG(debug) << "created/opened shared memory segment '" << "fmq_" << fSessionName << "_main" << "' of " << segmentSize << " bytes. Available are " << fManager->Segment().get_free_memory() << " bytes.";
{
bipc::scoped_lock<bipc::named_mutex> lock(*fShMutex);
@@ -190,7 +190,7 @@ void FairMQTransportFactorySHM::StartMonitor()
void FairMQTransportFactorySHM::SendHeartbeats()
{
string controlQueueName("fmq_shm_" + fSessionName + "_control_queue");
string controlQueueName("fmq_" + fSessionName + "_cq");
while (fSendHeartbeats)
{
try
@@ -312,11 +312,11 @@ FairMQTransportFactorySHM::~FairMQTransportFactorySHM()
if (lastRemoved)
{
boost::interprocess::named_mutex::remove(string("fmq_shm_" + fSessionName + "_mutex").c_str());
boost::interprocess::named_mutex::remove(string("fmq_" + fSessionName + "_mtx").c_str());
}
}
FairMQ::Transport FairMQTransportFactorySHM::GetType() const
fair::mq::Transport FairMQTransportFactorySHM::GetType() const
{
return fTransportType;
}

View File

@@ -47,7 +47,7 @@ class FairMQTransportFactorySHM : public FairMQTransportFactory
FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback = nullptr) const override;
FairMQ::Transport GetType() const override;
fair::mq::Transport GetType() const override;
void Interrupt() override { FairMQSocketSHM::Interrupt(); }
void Resume() override { FairMQSocketSHM::Resume(); }
@@ -58,7 +58,7 @@ class FairMQTransportFactorySHM : public FairMQTransportFactory
void SendHeartbeats();
void StartMonitor();
static FairMQ::Transport fTransportType;
static fair::mq::Transport fTransportType;
std::string fDeviceId;
std::string fSessionName;
void* fContext;

View File

@@ -23,8 +23,8 @@ std::unordered_map<uint64_t, Region> Manager::fRegions;
Manager::Manager(const string& name, size_t size)
: fSessionName(name)
, fSegmentName("fmq_shm_" + fSessionName + "_main")
, fManagementSegmentName("fmq_shm_" + fSessionName + "_management")
, fSegmentName("fmq_" + fSessionName + "_main")
, fManagementSegmentName("fmq_" + fSessionName + "_mng")
, fSegment(bipc::open_or_create, fSegmentName.c_str(), size)
, fManagementSegment(bipc::open_or_create, fManagementSegmentName.c_str(), 65536)
{}

View File

@@ -59,9 +59,9 @@ Monitor::Monitor(const string& sessionName, bool selfDestruct, bool interactive,
, fCleanOnExit(cleanOnExit)
, fTimeoutInMS(timeoutInMS)
, fSessionName(sessionName)
, fSegmentName("fmq_shm_" + fSessionName + "_main")
, fManagementSegmentName("fmq_shm_" + fSessionName + "_management")
, fControlQueueName("fmq_shm_" + fSessionName + "_control_queue")
, fSegmentName("fmq_" + fSessionName + "_main")
, fManagementSegmentName("fmq_" + fSessionName + "_mng")
, fControlQueueName("fmq_" + fSessionName + "_cq")
, fTerminating(false)
, fHeartbeatTriggered(false)
, fLastHeartbeat(chrono::high_resolution_clock::now())
@@ -170,6 +170,7 @@ void Monitor::Interactive()
struct termios t;
tcgetattr(STDIN_FILENO, &t); // get the current terminal I/O structure
t.c_lflag &= ~ICANON; // disable canonical input
t.c_lflag &= ~ECHO; // do not echo input chars
tcsetattr(STDIN_FILENO, TCSANOW, &t); // apply the new settings
cout << endl;
@@ -238,6 +239,7 @@ void Monitor::Interactive()
tcgetattr(STDIN_FILENO, &t); // get the current terminal I/O structure
t.c_lflag |= ICANON; // re-enable canonical input
t.c_lflag |= ECHO; // echo input chars
tcsetattr(STDIN_FILENO, TCSANOW, &t); // apply the new settings
}
@@ -360,7 +362,7 @@ void Monitor::CheckSegment()
void Monitor::Cleanup(const string& sessionName)
{
string managementSegmentName("fmq_shm_" + sessionName + "_management");
string managementSegmentName("fmq_" + sessionName + "_mng");
try
{
bipc::managed_shared_memory managementSegment(bipc::open_only, managementSegmentName.c_str());
@@ -371,8 +373,8 @@ void Monitor::Cleanup(const string& sessionName)
unsigned int regionCount = rc->fCount;
for (unsigned int i = 1; i <= regionCount; ++i)
{
RemoveObject("fmq_shm_" + sessionName + "_region_" + to_string(i));
RemoveQueue(string("fmq_shm_" + sessionName + "_region_queue_" + to_string(i)));
RemoveObject("fmq_" + sessionName + "_rg_" + to_string(i));
RemoveQueue(string("fmq_" + sessionName + "_rgq_" + to_string(i)));
}
}
else
@@ -387,9 +389,9 @@ void Monitor::Cleanup(const string& sessionName)
cout << "Did not find '" << managementSegmentName << "' shared memory segment. No regions to cleanup." << endl;
}
RemoveObject("fmq_shm_" + sessionName + "_main");
RemoveObject("fmq_" + sessionName + "_main");
boost::interprocess::named_mutex::remove(string("fmq_shm_" + sessionName + "_mutex").c_str());
boost::interprocess::named_mutex::remove(string("fmq_" + sessionName + "_mtx").c_str());
cout << endl;
}
@@ -425,7 +427,7 @@ void Monitor::PrintQueues()
try
{
bipc::managed_shared_memory segment(bipc::open_only, fSegmentName.c_str());
StringVector* queues = segment.find<StringVector>(string("fmq_shm_" + fSessionName + "_queues").c_str()).first;
StringVector* queues = segment.find<StringVector>(string("fmq_" + fSessionName + "_qs").c_str()).first;
if (queues)
{
cout << "found " << queues->size() << " queue(s):" << endl;

View File

@@ -27,9 +27,9 @@ The Monitor class can also be used independently from the supplied executable (b
FairMQ Shared Memory currently uses following names to register shared memory on the system:
`fmq_shm_<sessionName>_main` - main segment name, used for user data (session name can be overridden via `--session`).
`fmq_shm_<sessionName>_management` - management segment name, used for storing management data.
`fmq_shm_<sessionName>_control_queue` - message queue for communicating between shm transport and shm monitor (exists independent of above segments).
`fmq_shm_<sessionName>_mutex` - boost::interprocess::named_mutex for management purposes (exists independent of above segments).
`fmq_shm_<sessionName>_region_<index>` - names of unmanaged regions.
`fmq_shm_<sessionName>_region_queue_<index>` - names of queues for the unmanaged regions.
`fmq_<sessionName>_main` - main segment name, used for user data (session name can be overridden via `--session`).
`fmq_<sessionName>_mng` - management segment name, used for storing management data.
`fmq_<sessionName>_cq` - message queue for communicating between shm transport and shm monitor (exists independent of above segments).
`fmq_<sessionName>_mtx` - boost::interprocess::named_mutex for management purposes (exists independent of above segments).
`fmq_<sessionName>_rg_<index>` - names of unmanaged regions.
`fmq_<sessionName>_rgq_<index>` - names of queues for the unmanaged regions.

View File

@@ -28,8 +28,8 @@ Region::Region(Manager& manager, uint64_t id, uint64_t size, bool remote, FairMQ
: fManager(manager)
, fRemote(remote)
, fStop(false)
, fName("fmq_shm_" + fManager.fSessionName +"_region_" + to_string(id))
, fQueueName("fmq_shm_" + fManager.fSessionName +"_region_queue_" + to_string(id))
, fName("fmq_" + fManager.fSessionName +"_rg_" + to_string(id))
, fQueueName("fmq_" + fManager.fSessionName +"_rgq_" + to_string(id))
, fShmemObject()
, fQueue(nullptr)
, fWorker()

View File

@@ -108,7 +108,7 @@ int main(int argc, char** argv)
{
cout << "Cleaning up \"" << sessionName << "\"..." << endl;
fair::mq::shmem::Monitor::Cleanup(sessionName);
fair::mq::shmem::Monitor::RemoveQueue("fmq_shm_" + sessionName + "_control_queue");
fair::mq::shmem::Monitor::RemoveQueue("fmq_" + sessionName + "_cq");
return 0;
}

192
fairmq/tools/Network.cxx Normal file
View File

@@ -0,0 +1,192 @@
/********************************************************************************
* Copyright (C) 2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <fairmq/tools/Network.h>
#ifndef _GNU_SOURCE
#define _GNU_SOURCE // To get defns of NI_MAXSERV and NI_MAXHOST
#endif
#include "FairMQLogger.h"
#include <sys/socket.h>
#include <sys/types.h>
#include <netdb.h>
#include <ifaddrs.h>
#include <stdio.h>
#include <boost/algorithm/string.hpp> // trim
#include <boost/asio.hpp>
#include <map>
#include <string>
#include <iostream>
#include <array>
#include <exception>
#include <algorithm>
using namespace std;
namespace fair
{
namespace mq
{
namespace tools
{
// returns a map with network interface names as keys and their IP addresses as values
int getHostIPs(map<string, string>& addressMap)
{
struct ifaddrs *ifaddr, *ifa;
int s;
char host[NI_MAXHOST];
if (getifaddrs(&ifaddr) == -1)
{
perror("getifaddrs");
return -1;
}
for (ifa = ifaddr; ifa != NULL; ifa = ifa->ifa_next)
{
if (ifa->ifa_addr == NULL)
{
continue;
}
if (ifa->ifa_addr->sa_family == AF_INET)
{
s = getnameinfo(ifa->ifa_addr, sizeof(struct sockaddr_in), host, NI_MAXHOST, NULL, 0, NI_NUMERICHOST);
if (s != 0)
{
cout << "getnameinfo() failed: " << gai_strerror(s) << endl;
return -1;
}
addressMap.insert(pair<string, string>(ifa->ifa_name, host));
}
}
freeifaddrs(ifaddr);
return 0;
}
// get IP address of a given interface name
string getInterfaceIP(const string& interface)
{
map<string, string> IPs;
getHostIPs(IPs);
if (IPs.count(interface))
{
return IPs[interface];
}
else
{
LOG(error) << "Could not find provided network interface: \"" << interface << "\"!, exiting.";
return "";
}
}
// get name of the default route interface
string getDefaultRouteNetworkInterface()
{
array<char, 128> buffer;
string interfaceName;
#ifdef __APPLE__ // MacOS
unique_ptr<FILE, decltype(pclose) *> file(popen("route -n get default | grep interface | cut -d \":\" -f 2", "r"), pclose);
#else // Linux
unique_ptr<FILE, decltype(pclose) *> file(popen("ip route | grep default | cut -d \" \" -f 5 | head -n 1", "r"), pclose);
#endif
if (!file)
{
LOG(error) << "Could not detect default route network interface name - popen() failed!";
return "";
}
while (!feof(file.get()))
{
if (fgets(buffer.data(), 128, file.get()) != NULL)
{
interfaceName += buffer.data();
}
}
boost::algorithm::trim(interfaceName);
if (interfaceName == "")
{
LOG(error) << "Could not detect default route network interface name";
}
else
{
LOG(debug) << "Detected network interface name for the default route: " << interfaceName;
}
return interfaceName;
}
string getIpFromHostname(const string& hostname)
{
try {
namespace bai = boost::asio::ip;
boost::asio::io_service ios;
bai::tcp::resolver resolver(ios);
bai::tcp::resolver::query query(hostname, "");
bai::tcp::resolver::iterator end;
auto it = find_if(static_cast<bai::basic_resolver_iterator<bai::tcp>>(resolver.resolve(query)), end, [](const bai::tcp::endpoint& ep) {
return ep.address().is_v4();
});
if (it != end) {
stringstream ss;
ss << static_cast<bai::tcp::endpoint>(*it).address();
return ss.str();
}
LOG(warn) << "could not find ipv4 address for hostname '" << hostname << "'";
return "";
} catch (exception& e) {
LOG(error) << "could not resolve hostname '" << hostname << "', reason: " << e.what();
return "";
}
}
string getIpFromHostname(const string& hostname, boost::asio::io_service& ios)
{
try {
namespace bai = boost::asio::ip;
bai::tcp::resolver resolver(ios);
bai::tcp::resolver::query query(hostname, "");
bai::tcp::resolver::iterator end;
auto it = find_if(static_cast<bai::basic_resolver_iterator<bai::tcp>>(resolver.resolve(query)), end, [](const bai::tcp::endpoint& ep) {
return ep.address().is_v4();
});
if (it != end) {
stringstream ss;
ss << static_cast<bai::tcp::endpoint>(*it).address();
return ss.str();
}
LOG(warn) << "could not find ipv4 address for hostname '" << hostname << "'";
return "";
} catch (exception& e) {
LOG(error) << "could not resolve hostname '" << hostname << "', reason: " << e.what();
return "";
}
}
} /* namespace tools */
} /* namespace mq */
} /* namespace fair */

View File

@@ -9,26 +9,20 @@
#ifndef FAIR_MQ_TOOLS_NETWORK_H
#define FAIR_MQ_TOOLS_NETWORK_H
#ifndef _GNU_SOURCE
#define _GNU_SOURCE // To get defns of NI_MAXSERV and NI_MAXHOST
#endif
#include "FairMQLogger.h"
#include <sys/socket.h>
#include <sys/types.h>
#include <netdb.h>
#include <ifaddrs.h>
#include <stdio.h>
#include <boost/algorithm/string.hpp> // trim
#include <boost/asio.hpp>
#include <map>
#include <string>
#include <iostream>
#include <array>
#include <exception>
// forward declarations
namespace boost
{
namespace asio
{
class io_context;
typedef class io_context io_service;
} // namespace asio
} // namespace boost
namespace fair
{
@@ -38,150 +32,17 @@ namespace tools
{
// returns a map with network interface names as keys and their IP addresses as values
inline int getHostIPs(std::map<std::string, std::string>& addressMap)
{
struct ifaddrs *ifaddr, *ifa;
int s;
char host[NI_MAXHOST];
if (getifaddrs(&ifaddr) == -1)
{
perror("getifaddrs");
return -1;
}
for (ifa = ifaddr; ifa != NULL; ifa = ifa->ifa_next)
{
if (ifa->ifa_addr == NULL)
{
continue;
}
if (ifa->ifa_addr->sa_family == AF_INET)
{
s = getnameinfo(ifa->ifa_addr, sizeof(struct sockaddr_in), host, NI_MAXHOST, NULL, 0, NI_NUMERICHOST);
if (s != 0)
{
std::cout << "getnameinfo() failed: " << gai_strerror(s) << std::endl;
return -1;
}
addressMap.insert(std::pair<std::string, std::string>(ifa->ifa_name, host));
}
}
freeifaddrs(ifaddr);
return 0;
}
int getHostIPs(std::map<std::string, std::string>& addressMap);
// get IP address of a given interface name
inline std::string getInterfaceIP(std::string interface)
{
std::map<std::string, std::string> IPs;
getHostIPs(IPs);
if (IPs.count(interface))
{
return IPs[interface];
}
else
{
LOG(error) << "Could not find provided network interface: \"" << interface << "\"!, exiting.";
return "";
}
}
std::string getInterfaceIP(const std::string& interface);
// get name of the default route interface
inline std::string getDefaultRouteNetworkInterface()
{
std::array<char, 128> buffer;
std::string interfaceName;
std::string getDefaultRouteNetworkInterface();
#ifdef __APPLE__ // MacOS
std::unique_ptr<FILE, decltype(pclose) *> file(popen("route -n get default | grep interface | cut -d \":\" -f 2", "r"), pclose);
#else // Linux
std::unique_ptr<FILE, decltype(pclose) *> file(popen("ip route | grep default | cut -d \" \" -f 5 | head -n 1", "r"), pclose);
#endif
std::string getIpFromHostname(const std::string& hostname);
if (!file)
{
LOG(error) << "Could not detect default route network interface name - popen() failed!";
return "";
}
while (!feof(file.get()))
{
if (fgets(buffer.data(), 128, file.get()) != NULL)
{
interfaceName += buffer.data();
}
}
boost::algorithm::trim(interfaceName);
if (interfaceName == "")
{
LOG(error) << "Could not detect default route network interface name";
}
else
{
LOG(debug) << "Detected network interface name for the default route: " << interfaceName;
}
return interfaceName;
}
inline std::string getIpFromHostname(const std::string& hostname)
{
try {
boost::asio::io_service ios;
boost::asio::ip::tcp::resolver resolver(ios);
boost::asio::ip::tcp::resolver::query query(hostname, "");
boost::asio::ip::tcp::resolver::iterator end;
auto it = std::find_if(resolver.resolve(query), end, [](const boost::asio::ip::tcp::endpoint& ep) {
return ep.address().is_v4();
});
if (it != end) {
std::stringstream ss;
ss << static_cast<boost::asio::ip::tcp::endpoint>(*it).address();
return ss.str();
}
LOG(warn) << "could not find ipv4 address for hostname '" << hostname << "'";
return "";
} catch (std::exception& e) {
LOG(error) << "could not resolve hostname '" << hostname << "', reason: " << e.what();
return "";
}
}
inline std::string getIpFromHostname(const std::string& hostname, boost::asio::io_service& ios)
{
try {
boost::asio::ip::tcp::resolver resolver(ios);
boost::asio::ip::tcp::resolver::query query(hostname, "");
boost::asio::ip::tcp::resolver::iterator end;
auto it = std::find_if(resolver.resolve(query), end, [](const boost::asio::ip::tcp::endpoint& ep) {
return ep.address().is_v4();
});
if (it != end) {
std::stringstream ss;
ss << static_cast<boost::asio::ip::tcp::endpoint>(*it).address();
return ss.str();
}
LOG(warn) << "could not find ipv4 address for hostname '" << hostname << "'";
return "";
} catch (std::exception& e) {
LOG(error) << "could not resolve hostname '" << hostname << "', reason: " << e.what();
return "";
}
}
std::string getIpFromHostname(const std::string& hostname, boost::asio::io_service& ios);
} /* namespace tools */
} /* namespace mq */

73
fairmq/tools/Process.cxx Normal file
View File

@@ -0,0 +1,73 @@
/********************************************************************************
* Copyright (C) 2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <fairmq/tools/Process.h>
#include <boost/process.hpp>
#include <iostream>
#include <sstream>
using namespace std;
namespace fair
{
namespace mq
{
namespace tools
{
/**
* Execute given command in forked process and capture stdout output
* and exit code.
*
* @param[in] cmd Command to execute
* @param[in] log_prefix How to prefix each captured output line with
* @return Captured stdout output and exit code
*/
execute_result execute(string cmd, string prefix)
{
execute_result result;
stringstream out;
// print full line thread-safe
stringstream printCmd;
printCmd << prefix << cmd << "\n";
cout << printCmd.str() << flush;
out << prefix << cmd << endl;
// Execute command and capture stdout, add prefix line by line
boost::process::ipstream stdout;
boost::process::child c(cmd, boost::process::std_out > stdout);
string line;
while (getline(stdout, line))
{
// print full line thread-safe
stringstream printLine;
printLine << prefix << line << "\n";
cout << printLine.str() << flush;
out << prefix << line << "\n";
}
c.wait();
// Capture exit code
result.exit_code = c.exit_code();
out << prefix << " Exit code: " << result.exit_code << endl;
result.console_out = out.str();
// Return result
return result;
}
} /* namespace tools */
} /* namespace mq */
} /* namespace fair */

View File

@@ -9,8 +9,6 @@
#ifndef FAIR_MQ_TOOLS_PROCESS_H
#define FAIR_MQ_TOOLS_PROCESS_H
#include <boost/process.hpp>
#include <string>
namespace fair
@@ -37,43 +35,7 @@ struct execute_result
* @param[in] log_prefix How to prefix each captured output line with
* @return Captured stdout output and exit code
*/
inline execute_result execute(std::string cmd, std::string prefix = "")
{
execute_result result;
std::stringstream out;
// print full line thread-safe
std::stringstream printCmd;
printCmd << prefix << cmd << "\n";
std::cout << printCmd.str() << std::flush;
out << prefix << cmd << std::endl;
// Execute command and capture stdout, add prefix line by line
boost::process::ipstream stdout;
boost::process::child c(cmd, boost::process::std_out > stdout);
std::string line;
while (getline(stdout, line))
{
// print full line thread-safe
std::stringstream printLine;
printLine << prefix << line << "\n";
std::cout << printLine.str() << std::flush;
out << prefix << line << "\n";
}
c.wait();
// Capture exit code
result.exit_code = c.exit_code();
out << prefix << " Exit code: " << result.exit_code << std::endl;
result.console_out = out.str();
// Return result
return result;
}
execute_result execute(std::string cmd, std::string prefix = "");
} /* namespace tools */
} /* namespace mq */

44
fairmq/tools/Unique.cxx Normal file
View File

@@ -0,0 +1,44 @@
/********************************************************************************
* Copyright (C) 2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <fairmq/tools/Unique.h>
#include <boost/uuid/uuid.hpp>
#include <boost/uuid/uuid_generators.hpp>
#include <boost/uuid/uuid_io.hpp>
#include <boost/functional/hash.hpp>
using namespace std;
namespace fair
{
namespace mq
{
namespace tools
{
// generates UUID string
string Uuid()
{
boost::uuids::random_generator gen;
boost::uuids::uuid u = gen();
return boost::uuids::to_string(u);
}
// generates UUID and returns its hash
size_t UuidHash()
{
boost::uuids::random_generator gen;
boost::hash<boost::uuids::uuid> uuid_hasher;
boost::uuids::uuid u = gen();
return uuid_hasher(u);
}
} /* namespace tools */
} /* namespace mq */
} /* namespace fair */

View File

@@ -9,11 +9,6 @@
#ifndef FAIR_MQ_TOOLS_UNIQUE_H
#define FAIR_MQ_TOOLS_UNIQUE_H
#include <boost/uuid/uuid.hpp>
#include <boost/uuid/uuid_generators.hpp>
#include <boost/uuid/uuid_io.hpp>
#include <boost/functional/hash.hpp>
#include <string>
namespace fair
@@ -24,21 +19,10 @@ namespace tools
{
// generates UUID string
inline std::string Uuid()
{
boost::uuids::random_generator gen;
boost::uuids::uuid u = gen();
return boost::uuids::to_string(u);
}
std::string Uuid();
// generates UUID and returns its hash
inline std::size_t UuidHash()
{
boost::uuids::random_generator gen;
boost::hash<boost::uuids::uuid> uuid_hasher;
boost::uuids::uuid u = gen();
return uuid_hasher(u);
}
std::size_t UuidHash();
} /* namespace tools */
} /* namespace mq */

View File

@@ -18,9 +18,11 @@
#include <fairmq/Tools.h>
#include "FairMQUnmanagedRegionZMQ.h"
#include <cstring>
using namespace std;
FairMQ::Transport FairMQMessageZMQ::fTransportType = FairMQ::Transport::ZMQ;
fair::mq::Transport FairMQMessageZMQ::fTransportType = fair::mq::Transport::ZMQ;
FairMQMessageZMQ::FairMQMessageZMQ()
: fUsedSizeModified(false)
@@ -190,7 +192,7 @@ void FairMQMessageZMQ::ApplyUsedSize()
}
}
FairMQ::Transport FairMQMessageZMQ::GetType() const
fair::mq::Transport FairMQMessageZMQ::GetType() const
{
return fTransportType;
}

View File

@@ -46,7 +46,7 @@ class FairMQMessageZMQ : public FairMQMessage
bool SetUsedSize(const size_t size) override;
void ApplyUsedSize();
FairMQ::Transport GetType() const override;
fair::mq::Transport GetType() const override;
void Copy(const FairMQMessagePtr& msg) override;
void Copy(const FairMQMessage& msg) override;
@@ -58,7 +58,7 @@ class FairMQMessageZMQ : public FairMQMessage
size_t fUsedSize;
std::unique_ptr<zmq_msg_t> fMsg;
std::unique_ptr<zmq_msg_t> fViewMsg; // view on a subset of fMsg (treating it as user buffer)
static FairMQ::Transport fTransportType;
static fair::mq::Transport fTransportType;
zmq_msg_t* GetMessage() const;
void CloseMessage();

View File

@@ -12,6 +12,8 @@
#include <zmq.h>
#include <cassert>
using namespace std;
atomic<bool> FairMQSocketZMQ::fInterrupted(false);

View File

@@ -11,7 +11,7 @@
using namespace std;
FairMQ::Transport FairMQTransportFactoryZMQ::fTransportType = FairMQ::Transport::ZMQ;
fair::mq::Transport FairMQTransportFactoryZMQ::fTransportType = fair::mq::Transport::ZMQ;
FairMQTransportFactoryZMQ::FairMQTransportFactoryZMQ(const string& id, const FairMQProgOptions* config)
: FairMQTransportFactory(id)
@@ -100,7 +100,7 @@ FairMQUnmanagedRegionPtr FairMQTransportFactoryZMQ::CreateUnmanagedRegion(const
return unique_ptr<FairMQUnmanagedRegion>(new FairMQUnmanagedRegionZMQ(size, callback));
}
FairMQ::Transport FairMQTransportFactoryZMQ::GetType() const
fair::mq::Transport FairMQTransportFactoryZMQ::GetType() const
{
return fTransportType;
}

View File

@@ -48,13 +48,13 @@ class FairMQTransportFactoryZMQ : public FairMQTransportFactory
FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback) const override;
FairMQ::Transport GetType() const override;
fair::mq::Transport GetType() const override;
void Interrupt() override { FairMQSocketZMQ::Interrupt(); }
void Resume() override { FairMQSocketZMQ::Resume(); }
private:
static FairMQ::Transport fTransportType;
static fair::mq::Transport fTransportType;
void* fContext;
};

View File

@@ -33,8 +33,10 @@ add_testhelper(runTestDevice
set(MQ_CONFIG "${CMAKE_BINARY_DIR}/test/testsuite_FairMQ.IOPatterns_config.json")
set(RUN_TEST_DEVICE "${CMAKE_BINARY_DIR}/test/testhelper_runTestDevice")
set(FAIRMQ_BIN_DIR ${CMAKE_BINARY_DIR}/fairmq)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/protocols/config.json.in ${MQ_CONFIG})
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/protocols/runner.cxx.in ${CMAKE_CURRENT_BINARY_DIR}/protocols/runner.cxx)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/TestEnvironment.h.in ${CMAKE_CURRENT_BINARY_DIR}/TestEnvironment.h)
add_testsuite(FairMQ.Protocols
SOURCES
@@ -50,6 +52,7 @@ add_testsuite(FairMQ.Protocols
LINKS FairMQ
DEPENDS testhelper_runTestDevice
INCLUDES ${CMAKE_CURRENT_SOURCE_DIR}/protocols
${CMAKE_CURRENT_BINARY_DIR}
TIMEOUT 30
RUN_SERIAL ON
)
@@ -61,6 +64,7 @@ add_testsuite(FairMQ.Parts
LINKS FairMQ
INCLUDES ${CMAKE_CURRENT_SOURCE_DIR}/parts
${CMAKE_CURRENT_BINARY_DIR}
TIMEOUT 5
)
@@ -71,6 +75,7 @@ add_testsuite(FairMQ.MessageResize
LINKS FairMQ
INCLUDES ${CMAKE_CURRENT_SOURCE_DIR}/message_resize
${CMAKE_CURRENT_BINARY_DIR}
TIMEOUT 5
)
@@ -85,6 +90,7 @@ add_testsuite(FairMQ.Device
LINKS FairMQ
INCLUDES ${CMAKE_CURRENT_SOURCE_DIR}/device
${CMAKE_CURRENT_BINARY_DIR}
TIMEOUT 5
RUN_SERIAL ON
)
@@ -128,6 +134,7 @@ add_testsuite(FairMQ.Plugins
plugins/_plugin_manager.cxx
LINKS FairMQ
INCLUDES ${CMAKE_CURRENT_BINARY_DIR}
DEPENDS FairMQPlugin_test_dummy FairMQPlugin_test_dummy2
TIMEOUT 10
)
@@ -138,6 +145,7 @@ add_testsuite(FairMQ.PluginsPrelinked
plugins/_plugin_manager_prelink.cxx
LINKS FairMQ FairMQPlugin_test_dummy FairMQPlugin_test_dummy2
INCLUDES ${CMAKE_CURRENT_BINARY_DIR}
TIMEOUT 10
)
@@ -149,6 +157,7 @@ add_testsuite(FairMQ.PluginServices
plugin_services/Fixture.h
LINKS FairMQ
INCLUDES ${CMAKE_CURRENT_BINARY_DIR}
TIMEOUT 10
)
@@ -158,6 +167,7 @@ add_testsuite(FairMQ.EventManager
event_manager/_event_manager.cxx
LINKS FairMQ
INCLUDES ${CMAKE_CURRENT_BINARY_DIR}
TIMEOUT 10
)
@@ -167,5 +177,6 @@ add_testsuite(FairMQ.StateMachine
state_machine/_state_machine.cxx
LINKS FairMQ
INCLUDES ${CMAKE_CURRENT_BINARY_DIR}
TIMEOUT 10
)

14
test/TestEnvironment.h.in Normal file
View File

@@ -0,0 +1,14 @@
/********************************************************************************
* Copyright (C) 2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#ifndef FAIR_MQ_TEST_ENVIRONMENT_H
#define FAIR_MQ_TEST_ENVIRONMENT_H
#define FAIRMQ_TEST_ENVIRONMENT "@FAIRMQ_BIN_DIR@"
#endif /* FAIR_MQ_TEST_ENVIRONMENT_H */

View File

@@ -13,6 +13,7 @@
#include <FairMQLogger.h>
#include <string>
#include <thread>
namespace fair
{

View File

@@ -13,6 +13,7 @@
#include <FairMQLogger.h>
#include <string>
#include <thread>
namespace fair
{

View File

@@ -6,11 +6,15 @@
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <TestEnvironment.h>
#include <gtest/gtest.h>
#include <stdlib.h>
auto main(int argc, char** argv) -> int
{
::testing::InitGoogleTest(&argc, argv);
::testing::FLAGS_gtest_death_test_style = "threadsafe";
setenv("FAIRMQ_PATH", FAIRMQ_TEST_ENVIRONMENT, 0);
return RUN_ALL_TESTS();
}

View File

@@ -6,11 +6,14 @@
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <TestEnvironment.h>
#include <gtest/gtest.h>
auto main(int argc, char** argv) -> int
{
::testing::InitGoogleTest(&argc, argv);
::testing::FLAGS_gtest_death_test_style = "threadsafe";
setenv("FAIRMQ_PATH", FAIRMQ_TEST_ENVIRONMENT, 0);
return RUN_ALL_TESTS();
}

View File

@@ -8,6 +8,8 @@
#include <FairMQDevice.h>
#include <thread>
namespace fair
{
namespace mq

View File

@@ -8,6 +8,7 @@
#include <FairMQDevice.h>
#include <string>
#include <thread>
namespace fair
{

View File

@@ -9,6 +9,7 @@
#include <FairMQDevice.h>
#include <FairMQLogger.h>
#include <options/FairMQProgOptions.h>
#include <thread>
namespace fair
{

View File

@@ -7,6 +7,7 @@
********************************************************************************/
#include <FairMQDevice.h>
#include <thread>
namespace fair
{

View File

@@ -8,6 +8,7 @@
#include <FairMQDevice.h>
#include <FairMQLogger.h>
#include <thread>
namespace fair
{

View File

@@ -7,6 +7,7 @@
********************************************************************************/
#include <FairMQDevice.h>
#include <thread>
namespace fair
{

View File

@@ -8,6 +8,7 @@
#include <FairMQDevice.h>
#include <FairMQLogger.h>
#include <thread>
namespace fair
{

View File

@@ -8,6 +8,7 @@
#include <FairMQDevice.h>
#include <FairMQLogger.h>
#include <thread>
namespace fair
{

View File

@@ -6,11 +6,14 @@
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <TestEnvironment.h>
#include <gtest/gtest.h>
auto main(int argc, char** argv) -> int
{
::testing::InitGoogleTest(&argc, argv);
::testing::FLAGS_gtest_death_test_style = "threadsafe";
setenv("FAIRMQ_PATH", FAIRMQ_TEST_ENVIRONMENT, 0);
return RUN_ALL_TESTS();
}

View File

@@ -6,11 +6,14 @@
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <TestEnvironment.h>
#include <gtest/gtest.h>
int main(int argc, char** argv)
{
::testing::InitGoogleTest(&argc, argv);
::testing::FLAGS_gtest_death_test_style = "threadsafe";
setenv("FAIRMQ_PATH", FAIRMQ_TEST_ENVIRONMENT, 0);
return RUN_ALL_TESTS();
}

View File

@@ -6,11 +6,14 @@
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <TestEnvironment.h>
#include <gtest/gtest.h>
int main(int argc, char** argv)
{
::testing::InitGoogleTest(&argc, argv);
::testing::FLAGS_gtest_death_test_style = "threadsafe";
setenv("FAIRMQ_PATH", FAIRMQ_TEST_ENVIRONMENT, 0);
return RUN_ALL_TESTS();
}

View File

@@ -7,6 +7,7 @@
********************************************************************************/
#include "runner.h"
#include <TestEnvironment.h>
#include <gtest/gtest.h>
@@ -33,5 +34,6 @@ auto main(int argc, char** argv) -> int
{
::testing::InitGoogleTest(&argc, argv);
::testing::FLAGS_gtest_death_test_style = "threadsafe";
setenv("FAIRMQ_PATH", FAIRMQ_TEST_ENVIRONMENT, 0);
return RUN_ALL_TESTS();
}

View File

@@ -6,11 +6,14 @@
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <TestEnvironment.h>
#include <gtest/gtest.h>
auto main(int argc, char** argv) -> int
{
::testing::InitGoogleTest(&argc, argv);
::testing::FLAGS_gtest_death_test_style = "threadsafe";
setenv("FAIRMQ_PATH", FAIRMQ_TEST_ENVIRONMENT, 0);
return RUN_ALL_TESTS();
}