Compare commits

..

18 Commits

Author SHA1 Message Date
Dennis Klein
8867e15d8c build: ABI version is defined to be equal to the API version 2022-05-30 09:15:25 +02:00
Dennis Klein
6b2c3c4486 fix(tidy): Regex 2022-05-28 15:14:25 +02:00
Dennis Klein
adf743750f fix(tidy): Only emit diagnostic if source location is valid 2022-05-28 15:14:25 +02:00
Dennis Klein
f3385173b2 fix(zeromq): Leaking monitor socket messages 2022-05-28 15:14:25 +02:00
Dennis Klein
f798f18b95 fix: Remove long obsolete hotfix version component 2022-05-28 15:14:25 +02:00
Dennis Klein
d1dd9d7824 ci: Update macOS builds 2022-05-28 15:14:25 +02:00
Dennis Klein
df742272ed build: Bump bundled GTest to @a1cc8c55 2022-05-28 15:14:25 +02:00
Dennis Klein
58b78995b4 build(fairmq-tidy): Add missing EXPERIMENTAL tag 2022-05-28 15:14:25 +02:00
Dennis Klein
efd6523112 feat(Parts)!: Refine and tweak
* Optimize appending another Parts container
* Remove redundant/verbose comments
* Change r-value args to move-only types into l-value args for
  readability
* BREAKING CHANGE: Remove `AtRef(int)` and `AddPart(Message*)` member functions
* Add various const overloads
* Add `Empty()` and `Clear()` member functions
* Add `noexcept` where applicable
2022-05-28 15:14:25 +02:00
Dennis Klein
6fc2839d02 build: Use kebab-case library names in install tree 2022-05-28 15:14:25 +02:00
Dennis Klein
c585b0d486 feat(plugins): Allow kebab-case plugin names, e.g. libfairmq-plugin-pmix
Camel+snake-case plugin names are still allowed! e.g. `libFairMQPlugin_pmix`
2022-05-28 15:14:25 +02:00
Dennis Klein
933ed8de6e test: Increase robustness of the test suite for high -j 2022-05-28 15:14:25 +02:00
Dennis Klein
08d64ad463 test(channel): Increase sleep time
The logic of the GetNumberOfConnectedPeers test case relies on sleeping
a certain time. We have observed the 10ms sleep time to sometimes be too
short. Increasing it to 100ms should improve test stability.
2022-05-28 15:14:25 +02:00
Dennis Klein
56a7b6e9cb build!: Create a single library again
BREAKING CHANGE: Removes exported targets FairMQ::Tools and
FairMQ::StateMachine. However, it is unlikely those were used
by anyone.
2022-05-28 15:14:25 +02:00
Dennis Klein
8d575a23a5 fix: Use namespaced typenames/headers 2022-05-28 15:14:25 +02:00
Dennis Klein
18f50871ef feat: Deprecate non-namespaced headers
For more details see https://github.com/FairRootGroup/FairMQ/discussions/423
2022-05-28 15:14:25 +02:00
Dennis Klein
c81b03ff29 feat: Deprecate non-namespaced typenames
For more details see https://github.com/FairRootGroup/FairMQ/discussions/423
2022-05-28 15:14:25 +02:00
Dennis Klein
a9ffa2a8af feat!: Remove deprecated components sdk, sdk_commands, dds_plugin
BREAKING CHANGE: Components have been moved to ODC project, see
https://github.com/FairRootGroup/FairMQ/discussions/392 for details.
2022-05-28 15:14:25 +02:00
165 changed files with 5032 additions and 3577 deletions

View File

@@ -1,5 +0,0 @@
{
"image": "ghcr.io/fairrootgroup/fairmq-dev/fedora-38:latest",
"features": {
}
}

View File

@@ -1,12 +0,0 @@
version: 2
updates:
- package-ecosystem: "github-actions"
directory: "/"
target-branch: "dev"
schedule:
interval: "monthly"
- package-ecosystem: "gitsubmodule"
directory: "/"
target-branch: "dev"
schedule:
interval: "monthly"

View File

@@ -1,29 +0,0 @@
# SPDX-FileCopyrightText: 2023 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH, Darmstadt, Germany
#
# SPDX-License-Identifier: CC0-1.0
name: Check AUTHORS and CONTRIBUTORS in metadata
on:
push:
paths:
- AUTHORS
- CONTRIBUTORS
- codemeta.json
- .zenodo.json
pull_request:
paths:
- AUTHORS
- CONTRIBUTORS
- codemeta.json
- .zenodo.json
jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Try updating metadata
run: python meta_update.py
- name: Check for Updates
run: git diff --exit-code

View File

@@ -1,21 +0,0 @@
name: validate codemeta
on:
push:
paths:
- codemeta.json
- .github/workflows/codemeta_validate.yaml
pull_request:
paths:
- codemeta.json
- .github/workflows/codemeta_validate.yaml
jobs:
build:
runs-on: ubuntu-latest
container:
image: gitlab-registry.in2p3.fr/escape2020/wp3/eossr:v1.0
steps:
- uses: actions/checkout@v4
- name: validate codemeta
run: eossr-metadata-validator codemeta.json

View File

@@ -1,16 +0,0 @@
name: fair-software
on: push
jobs:
verify:
name: "fair-software"
runs-on: ubuntu-latest
if: github.repository == 'FairRootGroup/FairMQ'
steps:
- uses: fair-software/howfairis-github-action@0.2.1
name: Measure compliance with fair-software.eu recommendations
env:
PYCHARM_HOSTED: "Trick colorama into displaying colored output"
with:
MY_REPO_URL: "https://github.com/${{ github.repository }}"

2
.gitignore vendored
View File

@@ -4,5 +4,3 @@ install
.vscode .vscode
/compile_commands.json /compile_commands.json
.cache .cache
.spack-env
spack.lock

View File

@@ -1,88 +0,0 @@
{
"creators": [
{
"orcid": "0000-0002-8071-4497",
"name": "Al-Turany, Mohammad"
},
{
"orcid": "0000-0003-3787-1910",
"name": "Klein, Dennis"
},
{
"name": "Kollegger, Thorsten"
},
{
"orcid": "0000-0002-6249-155X",
"name": "Rybalchenko, Alexey"
},
{
"name": "Winckler, Nicolas"
}
],
"contributors": [
{
"type": "Other",
"name": "Aphecetche, Laurent"
},
{
"type": "Other",
"name": "Binet, Sebastien"
},
{
"type": "Other",
"name": "Eulisse, Giulio"
},
{
"type": "Other",
"name": "Karabowicz, Radoslaw"
},
{
"type": "Other",
"name": "Kretz, Matthias"
},
{
"type": "Other",
"name": "Krzewicki, Mikolaj"
},
{
"type": "Other",
"name": "Lebedev, Andrey"
},
{
"type": "Other",
"name": "Mrnjavac, Teo"
},
{
"type": "Other",
"name": "Neskovic, Gvozden"
},
{
"type": "Other",
"name": "Richter, Matthias"
},
{
"type": "Other",
"orcid": "0000-0002-5321-8404",
"name": "Tacke, Christian"
},
{
"type": "Other",
"name": "Uhlig, Florian"
},
{
"type": "Other",
"name": "Wenzel, Sandro"
}
],
"description": "<p>C++ Message Queuing Library and Framework</p>",
"related_identifiers": [
{
"identifier": "https://github.com/FairRootGroup/FairMQ/",
"relation": "isSupplementTo",
"resource_type": "software",
"scheme": "url"
}
],
"title": "FairMQ",
"license": "LGPL-3.0-only"
}

View File

@@ -1,5 +1,5 @@
Al-Turany, Mohammad [https://orcid.org/0000-0002-8071-4497] Al-Turany, Mohammad
Klein, Dennis [https://orcid.org/0000-0003-3787-1910] Klein, Dennis
Kollegger, Thorsten Kollegger, Thorsten
Rybalchenko, Alexey [https://orcid.org/0000-0002-6249-155X] Rybalchenko, Alexey
Winckler, Nicolas Winckler, Nicolas

View File

@@ -1,5 +1,5 @@
################################################################################ ################################################################################
# Copyright (C) 2018-2024 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH # # Copyright (C) 2018-2022 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
# # # #
# This software is distributed under the terms of the # # This software is distributed under the terms of the #
# GNU Lesser General Public Licence (LGPL) version 3, # # GNU Lesser General Public Licence (LGPL) version 3, #
@@ -8,7 +8,8 @@
# Project ###################################################################### # Project ######################################################################
cmake_minimum_required(VERSION 3.15...3.30 FATAL_ERROR) cmake_minimum_required(VERSION 3.15 FATAL_ERROR)
cmake_policy(VERSION 3.15...3.22)
list(PREPEND CMAKE_MODULE_PATH ${CMAKE_SOURCE_DIR}/cmake) list(PREPEND CMAKE_MODULE_PATH ${CMAKE_SOURCE_DIR}/cmake)
include(GitHelper) include(GitHelper)
@@ -26,6 +27,10 @@ fairmq_build_option(BUILD_FAIRMQ "Build FairMQ library and devices."
DEFAULT ON) DEFAULT ON)
fairmq_build_option(BUILD_TESTING "Build tests." fairmq_build_option(BUILD_TESTING "Build tests."
DEFAULT OFF REQUIRES "BUILD_FAIRMQ") DEFAULT OFF REQUIRES "BUILD_FAIRMQ")
fairmq_build_option(BUILD_OFI_TRANSPORT "Build experimental OFI transport."
DEFAULT OFF REQUIRES "BUILD_FAIRMQ")
fairmq_build_option(BUILD_PMIX_PLUGIN "Build PMIx plugin."
DEFAULT OFF REQUIRES "BUILD_FAIRMQ")
fairmq_build_option(BUILD_EXAMPLES "Build FairMQ examples." fairmq_build_option(BUILD_EXAMPLES "Build FairMQ examples."
DEFAULT ON REQUIRES "BUILD_FAIRMQ") DEFAULT ON REQUIRES "BUILD_FAIRMQ")
fairmq_build_option(BUILD_TIDY_TOOL "Build the fairmq-tidy tool." fairmq_build_option(BUILD_TIDY_TOOL "Build the fairmq-tidy tool."
@@ -81,6 +86,12 @@ endif()
if(BUILD_TESTING) if(BUILD_TESTING)
list(APPEND PROJECT_PACKAGE_COMPONENTS tests) list(APPEND PROJECT_PACKAGE_COMPONENTS tests)
endif() endif()
if(BUILD_PMIX_PLUGIN)
list(APPEND PROJECT_PACKAGE_COMPONENTS pmix_plugin)
endif()
if(BUILD_OFI_TRANSPORT)
list(APPEND PROJECT_PACKAGE_COMPONENTS ofi_transport)
endif()
if(BUILD_EXAMPLES) if(BUILD_EXAMPLES)
list(APPEND PROJECT_PACKAGE_COMPONENTS examples) list(APPEND PROJECT_PACKAGE_COMPONENTS examples)
endif() endif()
@@ -126,6 +137,5 @@ fairmq_summary_components()
fairmq_summary_static_analysis() fairmq_summary_static_analysis()
fairmq_summary_install_prefix() fairmq_summary_install_prefix()
fairmq_summary_debug_mode() fairmq_summary_debug_mode()
fairmq_summary_compile_definitions()
message(STATUS " ") message(STATUS " ")
################################################################################ ################################################################################

View File

@@ -3,11 +3,11 @@ Binet, Sebastien
Eulisse, Giulio Eulisse, Giulio
Karabowicz, Radoslaw Karabowicz, Radoslaw
Kretz, Matthias <kretz@kde.org> Kretz, Matthias <kretz@kde.org>
Krzewicki, Mikolaj Krzewicki, Mikolaj
Lebedev, Andrey Lebedev, Andrey
Mrnjavac, Teo <teo.m@cern.ch> Mrnjavac, Teo <teo.m@cern.ch>
Neskovic, Gvozden Neskovic, Gvozden
Richter, Matthias Richter, Matthias
Tacke, Christian [https://orcid.org/0000-0002-5321-8404] Tacke, Christian
Uhlig, Florian Uhlig, Florian
Wenzel, Sandro Wenzel, Sandro

View File

@@ -1,5 +1,5 @@
################################################################################ ################################################################################
# Copyright (C) 2021-2023 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH # # Copyright (C) 2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
# # # #
# This software is distributed under the terms of the # # This software is distributed under the terms of the #
# GNU Lesser General Public Licence (LGPL) version 3, # # GNU Lesser General Public Licence (LGPL) version 3, #
@@ -42,6 +42,12 @@ endif()
ctest_start(Continuous) ctest_start(Continuous)
list(APPEND options "-DDISABLE_COLOR=ON" "-DBUILD_EXAMPLES=ON" "-DBUILD_TESTING=ON") list(APPEND options "-DDISABLE_COLOR=ON" "-DBUILD_EXAMPLES=ON" "-DBUILD_TESTING=ON")
if(HAS_PMIX)
list(APPEND options "-DBUILD_PMIX_PLUGIN=ON")
endif()
if(HAS_ASIO AND HAS_ASIOFI)
list(APPEND options "-DBUILD_OFI_TRANSPORT=ON")
endif()
if(RUN_STATIC_ANALYSIS) if(RUN_STATIC_ANALYSIS)
list(APPEND options "-DRUN_STATIC_ANALYSIS=ON") list(APPEND options "-DRUN_STATIC_ANALYSIS=ON")
endif() endif()
@@ -81,7 +87,7 @@ ctest_submit()
if(NOT RUN_STATIC_ANALYSIS) if(NOT RUN_STATIC_ANALYSIS)
ctest_test(BUILD "${CTEST_BINARY_DIRECTORY}" ctest_test(BUILD "${CTEST_BINARY_DIRECTORY}"
PARALLEL_LEVEL ${NCPUS} PARALLEL_LEVEL 1
SCHEDULE_RANDOM ON SCHEDULE_RANDOM ON
RETURN_VALUE _ctest_test_ret_val) RETURN_VALUE _ctest_test_ret_val)

48
Jenkinsfile vendored
View File

@@ -4,21 +4,20 @@ def jobMatrix(String type, List specs) {
def nodes = [:] def nodes = [:]
for (spec in specs) { for (spec in specs) {
def job = "" def job = ""
def selector = "slurm" def selector = ""
def os = "" def os = ""
def ver = "" def ver = ""
if (type == 'build') { if (type == 'build') {
job = "${spec.os}-${spec.ver}-${spec.arch}-${spec.compiler}" job = "${spec.os}-${spec.ver}-${spec.arch}-${spec.compiler}"
if (spec.os =~ /^macos/) { selector = "${spec.os}-${spec.ver}-${spec.arch}"
selector = "${spec.os}-${spec.ver}-${spec.arch}"
}
os = spec.os os = spec.os
ver = spec.ver ver = spec.ver
} else { // == 'check' } else { // == 'check'
job = "${spec.name}" job = "${spec.name}"
selector = 'fedora-35-x86_64'
os = 'fedora' os = 'fedora'
ver = '36' ver = '35'
} }
def label = "${job}" def label = "${job}"
@@ -37,14 +36,14 @@ def jobMatrix(String type, List specs) {
sh "echo \"export LABEL=\\\"\${JOB_BASE_NAME} ${label}\\\"\" >> ${jobscript}" sh "echo \"export LABEL=\\\"\${JOB_BASE_NAME} ${label}\\\"\" >> ${jobscript}"
if (selector =~ /^macos/) { if (selector =~ /^macos/) {
sh """\ sh """\
echo \"export DDS_ROOT=\\\"\\\$(brew --prefix dds)\\\"\" >> ${jobscript}
echo \"export PATH=\\\"\\\$(brew --prefix dds)/bin:\\\$PATH\\\"\" >> ${jobscript}
echo \"${ctestcmd}\" >> ${jobscript} echo \"${ctestcmd}\" >> ${jobscript}
""" """
sh "cat ${jobscript}" sh "cat ${jobscript}"
sh "bash ${jobscript}" sh "bash ${jobscript}"
} else { // selector == "slurm" } else {
def imageurl = "oras://ghcr.io/fairrootgroup/fairmq-dev/${os}-${ver}-sif:latest" def containercmd = "singularity exec --net --ipc --uts --pid -B/shared ${env.SINGULARITY_CONTAINER_ROOT}/fairmq/${os}.${ver}.sif bash -l -c \\\"${ctestcmd} ${extra}\\\""
def execopts = "--ipc --uts --pid -B/shared"
def containercmd = "singularity exec ${execopts} ${imageurl} bash -l -c \\\"${ctestcmd} ${extra}\\\""
sh """\ sh """\
echo \"echo \\\"*** Job started at .......: \\\$(date -R)\\\"\" >> ${jobscript} echo \"echo \\\"*** Job started at .......: \\\$(date -R)\\\"\" >> ${jobscript}
echo \"echo \\\"*** Job ID ...............: \\\${SLURM_JOB_ID}\\\"\" >> ${jobscript} echo \"echo \\\"*** Job ID ...............: \\\${SLURM_JOB_ID}\\\"\" >> ${jobscript}
@@ -68,6 +67,12 @@ def jobMatrix(String type, List specs) {
deleteDir() deleteDir()
githubNotify(context: "${label}", description: 'Success', status: 'SUCCESS') githubNotify(context: "${label}", description: 'Success', status: 'SUCCESS')
} catch (e) { } catch (e) {
def tarball = "${type}_${job}_dds_logs.tar.gz"
if (fileExists("build/test/.DDS")) {
sh "tar czvf ${tarball} -C \${WORKSPACE}/build/test .DDS/"
archiveArtifacts tarball
}
deleteDir() deleteDir()
githubNotify(context: "${label}", description: 'Error', status: 'ERROR') githubNotify(context: "${label}", description: 'Error', status: 'ERROR')
throw e throw e
@@ -84,24 +89,19 @@ pipeline{
stage("CI") { stage("CI") {
steps{ steps{
script { script {
def all = '-DHAS_ASIO=ON -DHAS_ASIOFI=ON -DHAS_PMIX=ON'
def builds = jobMatrix('build', [ def builds = jobMatrix('build', [
[os: 'ubuntu', ver: '20.04', arch: 'x86_64', compiler: 'gcc-9'], [os: 'ubuntu', ver: '20.04', arch: 'x86_64', compiler: 'gcc-9', extra: all],
[os: 'ubuntu', ver: '22.04', arch: 'x86_64', compiler: 'gcc-11'], [os: 'fedora', ver: '32', arch: 'x86_64', compiler: 'gcc-10', extra: all],
[os: 'ubuntu', ver: '24.04', arch: 'x86_64', compiler: 'gcc-13'], [os: 'fedora', ver: '33', arch: 'x86_64', compiler: 'gcc-10', extra: all],
[os: 'fedora', ver: '33', arch: 'x86_64', compiler: 'gcc-10'], [os: 'fedora', ver: '34', arch: 'x86_64', compiler: 'gcc-11', extra: all],
[os: 'fedora', ver: '34', arch: 'x86_64', compiler: 'gcc-11'], [os: 'fedora', ver: '35', arch: 'x86_64', compiler: 'gcc-11', extra: all],
[os: 'fedora', ver: '35', arch: 'x86_64', compiler: 'gcc-11'], [os: 'macos', ver: '12', arch: 'x86_64', compiler: 'apple-clang-13', extra: '-DHAS_ASIO=ON'],
[os: 'fedora', ver: '36', arch: 'x86_64', compiler: 'gcc-12'], [os: 'macos', ver: '12', arch: 'arm64', compiler: 'apple-clang-13', extra: '-DHAS_ASIO=ON'],
[os: 'fedora', ver: '37', arch: 'x86_64', compiler: 'gcc-12'],
[os: 'fedora', ver: '38', arch: 'x86_64', compiler: 'gcc-13'],
[os: 'fedora', ver: '39', arch: 'x86_64', compiler: 'gcc-13'],
[os: 'fedora', ver: '40', arch: 'x86_64', compiler: 'gcc-14'],
[os: 'macos', ver: '14', arch: 'x86_64', compiler: 'apple-clang-16'],
[os: 'macos', ver: '15', arch: 'x86_64', compiler: 'apple-clang-16'],
[os: 'macos', ver: '15', arch: 'arm64', compiler: 'apple-clang-16'],
]) ])
def all_debug = "-DCMAKE_BUILD_TYPE=Debug" def all_debug = "${all} -DCMAKE_BUILD_TYPE=Debug"
def checks = jobMatrix('check', [ def checks = jobMatrix('check', [
[name: 'static-analyzers', extra: "${all_debug} -DRUN_STATIC_ANALYSIS=ON"], [name: 'static-analyzers', extra: "${all_debug} -DRUN_STATIC_ANALYSIS=ON"],

View File

@@ -1,11 +1,5 @@
<!-- {#mainpage} --> <!-- {#mainpage} -->
# FairMQ # FairMQ [![license](https://alfa-ci.gsi.de/shields/badge/license-LGPL--3.0-orange.svg)](COPYRIGHT)
[![license](https://alfa-ci.gsi.de/shields/badge/license-LGPL--3.0-orange.svg)](COPYRIGHT)
[![DOI](https://zenodo.org/badge/DOI/10.5281/zenodo.1689985.svg)](https://doi.org/10.5281/zenodo.1689985)
[![OpenSSF Best Practices](https://bestpractices.coreinfrastructure.org/projects/6915/badge)](https://bestpractices.coreinfrastructure.org/projects/6915)
[![fair-software.eu](https://img.shields.io/badge/fair--software.eu-%E2%97%8F%20%20%E2%97%8F%20%20%E2%97%8B%20%20%E2%97%8F%20%20%E2%97%8F-yellow)](https://github.com/FairRootGroup/FairMQ/actions/workflows/fair-software.yml)
[![Spack package](https://repology.org/badge/version-for-repo/spack/fairmq.svg)](https://repology.org/project/fairmq/versions)
C++ Message Queuing Library and Framework C++ Message Queuing Library and Framework
@@ -24,8 +18,7 @@ FairMQ is designed to help implementing large-scale data processing workflows ne
The core of FairMQ provides an abstract asynchronous message passing API with scalability protocols The core of FairMQ provides an abstract asynchronous message passing API with scalability protocols
inspired by [ZeroMQ](https://github.com/zeromq/libzmq) (e.g. PUSH/PULL, PUB/SUB). inspired by [ZeroMQ](https://github.com/zeromq/libzmq) (e.g. PUSH/PULL, PUB/SUB).
FairMQ provides multiple implementations for its API (so-called "transports", FairMQ provides multiple implementations for its API (so-called "transports",
e.g. `zeromq` and `shmem` (latest release of the `ofi` transport in v1.4.56, removed since v1.5+)) to cover e.g. `zeromq`, `shmem` and `ofi` (in development)) to cover a variety of use cases
a variety of use cases
(e.g. inter-thread, inter-process, inter-node communication) and machines (e.g. Ethernet, Infiniband). (e.g. inter-thread, inter-process, inter-node communication) and machines (e.g. Ethernet, Infiniband).
In addition to this core functionality FairMQ provides a framework for creating "devices" - actors which In addition to this core functionality FairMQ provides a framework for creating "devices" - actors which
are communicating through message passing. FairMQ does not only allow the user to use different transport are communicating through message passing. FairMQ does not only allow the user to use different transport
@@ -45,10 +38,10 @@ Recommended:
```bash ```bash
git clone https://github.com/FairRootGroup/FairMQ fairmq_source git clone https://github.com/FairRootGroup/FairMQ fairmq_source
cmake -S fairmq_source -B fairmq_build -GNinja -DCMAKE_BUILD_TYPE=Release [-DBUILD_TESTING=ON] cmake -S fairmq_source -B fairmq_build -GNinja -DCMAKE_BUILD_TYPE=Release -DCMAKE_INSTALL_PREFIX=fairmq_install
cmake --build fairmq_build cmake --build fairmq_build
[ctest --test-dir fairmq_build --output-on-failure --schedule-random -j<ncpus>] # needs -DBUILD_TESTING=ON cmake --build fairmq_build --target test
cmake --install fairmq_build --prefix $(pwd)/fairmq_install cmake --build fairmq_build --target install
``` ```
Please consult the [manpages of your CMake version](https://cmake.org/cmake/help/latest/manual/cmake.1.html) for more options. Please consult the [manpages of your CMake version](https://cmake.org/cmake/help/latest/manual/cmake.1.html) for more options.
@@ -56,25 +49,6 @@ Please consult the [manpages of your CMake version](https://cmake.org/cmake/help
If dependencies are not installed in standard system directories, you can hint the installation location via If dependencies are not installed in standard system directories, you can hint the installation location via
`-DCMAKE_PREFIX_PATH=...` or per dependency via `-D{DEPENDENCY}_ROOT=...` (`*_ROOT` variables can also be environment variables). `-DCMAKE_PREFIX_PATH=...` or per dependency via `-D{DEPENDENCY}_ROOT=...` (`*_ROOT` variables can also be environment variables).
## Installation via Spack
Prerequisite: [Spack](https://spack.readthedocs.io/en/latest/getting_started.html)
```bash
spack info fairmq # inspect build options
spack install fairmq # build latest packaged version with default options
```
Build FairMQ's dependencies via Spack for development:
```bash
git clone -b dev https://github.com/FairRootGroup/FairMQ fairmq_source
spack --env fairmq_source install # installs deps declared in fairmq_source/spack.yaml
spack env activate fairmq_source # sets $CMAKE_PREFIX_PATH which is used by CMake to find FairMQ's deps
cmake -S fairmq_source -B fairmq_build -GNinja -DCMAKE_BUILD_TYPE=Debug -DBUILD_TESTING=ON
# develop, compile, test
spack env deactivate # at end of dev session, or simply close the shell
```
## Usage ## Usage
FairMQ ships as a CMake package, so in your `CMakeLists.txt` you can discover it like this: FairMQ ships as a CMake package, so in your `CMakeLists.txt` you can discover it like this:
@@ -94,14 +68,25 @@ If FairMQ is not installed in system directories, you can hint the installation:
list(PREPEND CMAKE_PREFIX_PATH /path/to/fairmq_install) list(PREPEND CMAKE_PREFIX_PATH /path/to/fairmq_install)
``` ```
Optionally, you can require certain FairMQ package components and a minimum version:
```cmake
find_package(FairMQ 1.4.50 COMPONENTS ofi_transport)
```
When building FairMQ, CMake will print a summary table of all available package components.
## Dependencies ## Dependencies
* [asio](https://github.com/chriskohlhoff/asio)
* [asiofi](https://github.com/FairRootGroup/asiofi)
* [Boost](https://www.boost.org/) * [Boost](https://www.boost.org/)
* [CMake](https://cmake.org/) * [CMake](https://cmake.org/)
* [Doxygen](http://www.doxygen.org/) * [Doxygen](http://www.doxygen.org/)
* [FairCMakeModules](https://github.com/FairRootGroup/FairCMakeModules) (optionally bundled) * [FairCMakeModules](https://github.com/FairRootGroup/FairCMakeModules) (optionally bundled)
* [FairLogger](https://github.com/FairRootGroup/FairLogger) * [FairLogger](https://github.com/FairRootGroup/FairLogger)
* [GTest](https://github.com/google/googletest) (optionally bundled) * [GTest](https://github.com/google/googletest) (optionally bundled)
* [PMIx](https://pmix.org/)
* [ZeroMQ](http://zeromq.org/) * [ZeroMQ](http://zeromq.org/)
Which dependencies are required depends on which components are built. Which dependencies are required depends on which components are built.
@@ -115,8 +100,9 @@ On command line:
* `-DDISABLE_COLOR=ON` disables coloured console output. * `-DDISABLE_COLOR=ON` disables coloured console output.
* `-DBUILD_TESTING=OFF` disables building of tests. * `-DBUILD_TESTING=OFF` disables building of tests.
* `-DBUILD_EXAMPLES=OFF` disables building of examples. * `-DBUILD_EXAMPLES=OFF` disables building of examples.
* `-DBUILD_OFI_TRANSPORT=ON` enables building of the experimental OFI transport.
* `-DBUILD_PMIX_PLUGIN=ON` enables building of the PMIx plugin.
* `-DBUILD_DOCS=ON` enables building of API docs. * `-DBUILD_DOCS=ON` enables building of API docs.
* `-DFAIRMQ_CHANNEL_DEFAULT_AUTOBIND=OFF` disable channel `autoBind` by default
* You can hint non-system installations for dependent packages, see the #installation-from-source section above * You can hint non-system installations for dependent packages, see the #installation-from-source section above
After the `find_package(FairMQ)` call the following CMake variables are defined: After the `find_package(FairMQ)` call the following CMake variables are defined:
@@ -173,4 +159,4 @@ After the `find_package(FairMQ)` call the following CMake variables are defined:
1. [Usage](docs/Plugins.md#71-usage) 1. [Usage](docs/Plugins.md#71-usage)
2. [Development](docs/Plugins.md#72-development) 2. [Development](docs/Plugins.md#72-development)
3. [Provided Plugins](docs/Plugins.md#73-provided-plugins) 3. [Provided Plugins](docs/Plugins.md#73-provided-plugins)
1. [PMIx](docs/Plugins.md#731-pmix) 2. [PMIx](docs/Plugins.md#731-pmix)

View File

@@ -1,5 +1,5 @@
################################################################################ ################################################################################
# Copyright (C) 2018-2025 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH # # Copyright (C) 2018-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
# # # #
# This software is distributed under the terms of the # # This software is distributed under the terms of the #
# GNU Lesser General Public Licence (LGPL) version 3, # # GNU Lesser General Public Licence (LGPL) version 3, #
@@ -18,6 +18,15 @@ if(BUILD_FAIRMQ)
set(Threads_PREFIX "<system>") set(Threads_PREFIX "<system>")
endif() endif()
if(BUILD_OFI_TRANSPORT)
find_package2(PRIVATE asiofi REQUIRED VERSION 0.5)
find_package2(PRIVATE OFI REQUIRED)
endif()
if(BUILD_PMIX_PLUGIN)
find_package2(PRIVATE PMIx REQUIRED VERSION 2.1.4)
endif()
if(BUILD_FAIRMQ OR BUILD_TIDY_TOOL) if(BUILD_FAIRMQ OR BUILD_TIDY_TOOL)
find_package2(PUBLIC FairLogger REQUIRED VERSION 1.6.0) find_package2(PUBLIC FairLogger REQUIRED VERSION 1.6.0)
find_package2(PUBLIC Boost REQUIRED VERSION 1.66 find_package2(PUBLIC Boost REQUIRED VERSION 1.66
@@ -25,6 +34,14 @@ if(BUILD_FAIRMQ OR BUILD_TIDY_TOOL)
) )
endif() endif()
if(BUILD_OFI_TRANSPORT)
set(__old ${CMAKE_FIND_PACKAGE_PREFER_CONFIG})
set(CMAKE_FIND_PACKAGE_PREFER_CONFIG ON)
find_package2(PUBLIC asio REQUIRED VERSION 1.18)
set(CMAKE_FIND_PACKAGE_PREFER_CONFIG ${__old})
unset(__old)
endif()
if(BUILD_FAIRMQ) if(BUILD_FAIRMQ)
find_package2(PRIVATE ZeroMQ REQUIRED VERSION 4.1.4) find_package2(PRIVATE ZeroMQ REQUIRED VERSION 4.1.4)
if(NOT PicoSHA2_BUNDLED) if(NOT PicoSHA2_BUNDLED)
@@ -41,7 +58,7 @@ if(BUILD_TESTING)
endif() endif()
find_package2(BUNDLED GTest REQUIRED) find_package2(BUNDLED GTest REQUIRED)
if(GTest_BUNDLED) if(GTest_BUNDLED)
set(GTest_VERSION "Dec 26 2024 @7d76a23") set(GTest_VERSION "Apr 8 2022 @a1cc8c55")
set(GTest_PREFIX "<bundled>") set(GTest_PREFIX "<bundled>")
endif() endif()
endif() endif()
@@ -70,6 +87,10 @@ if(PROJECT_PACKAGE_DEPENDENCIES)
if(NOT FairLogger_PREFIX AND FairLogger_ROOT) if(NOT FairLogger_PREFIX AND FairLogger_ROOT)
set(FairLogger_PREFIX ${FairLogger_ROOT}) set(FairLogger_PREFIX ${FairLogger_ROOT})
endif() endif()
elseif(${dep} STREQUAL asiofi)
if(NOT asiofi_PREFIX AND asiofi_ROOT)
set(asiofi_PREFIX ${asiofi_ROOT})
endif()
elseif(${dep} STREQUAL Boost) elseif(${dep} STREQUAL Boost)
if(TARGET Boost::headers) if(TARGET Boost::headers)
get_target_property(boost_include Boost::headers INTERFACE_INCLUDE_DIRECTORIES) get_target_property(boost_include Boost::headers INTERFACE_INCLUDE_DIRECTORIES)

View File

@@ -1,5 +1,5 @@
################################################################################ ################################################################################
# Copyright (C) 2018-2023 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH # # Copyright (C) 2018-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
# # # #
# This software is distributed under the terms of the # # This software is distributed under the terms of the #
# GNU Lesser General Public Licence (LGPL) version 3, # # GNU Lesser General Public Licence (LGPL) version 3, #
@@ -34,6 +34,8 @@ endfunction()
# Configure/Install CMake package # Configure/Install CMake package
macro(install_cmake_package) macro(install_cmake_package)
list(SORT PROJECT_PACKAGE_DEPENDENCIES)
list(SORT PROJECT_INTERFACE_PACKAGE_DEPENDENCIES)
include(CMakePackageConfigHelpers) include(CMakePackageConfigHelpers)
set(PACKAGE_INSTALL_DESTINATION set(PACKAGE_INSTALL_DESTINATION
${CMAKE_INSTALL_LIBDIR}/cmake/${PROJECT_NAME}-${PROJECT_VERSION} ${CMAKE_INSTALL_LIBDIR}/cmake/${PROJECT_NAME}-${PROJECT_VERSION}

View File

@@ -1,5 +1,5 @@
################################################################################ ################################################################################
# Copyright (C) 2018-2023 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH # # Copyright (C) 2018-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
# # # #
# This software is distributed under the terms of the # # This software is distributed under the terms of the #
# GNU Lesser General Public Licence (LGPL) version 3, # # GNU Lesser General Public Licence (LGPL) version 3, #
@@ -114,8 +114,8 @@ endif()
set(CMAKE_CONFIGURATION_TYPES "Debug" "Release" "RelWithDebInfo") set(CMAKE_CONFIGURATION_TYPES "Debug" "Release" "RelWithDebInfo")
set(_warnings "-Wshadow -Wall -Wextra -Wpedantic") set(_warnings "-Wshadow -Wall -Wextra -Wpedantic")
set(CMAKE_CXX_FLAGS_DEBUG "-Og -g ${_warnings} ${_sanitizers}") set(CMAKE_CXX_FLAGS_DEBUG "-Og -g ${_warnings} ${_sanitizers}")
set(CMAKE_CXX_FLAGS_RELEASE "-O3 -DNDEBUG") set(CMAKE_CXX_FLAGS_RELEASE "-O2 -DNDEBUG")
set(CMAKE_CXX_FLAGS_RELWITHDEBINFO "-O3 -g ${_warnings} -DNDEBUG ${_sanitizers}") set(CMAKE_CXX_FLAGS_RELWITHDEBINFO "-O2 -g ${_warnings} -DNDEBUG ${_sanitizers}")
unset(_warnings) unset(_warnings)
unset(_sanitizers) unset(_sanitizers)
@@ -168,22 +168,9 @@ if(CMAKE_GENERATOR STREQUAL Ninja AND ENABLE_CCACHE)
endif() endif()
endif() endif()
if(NOT DEFINED FAIRMQ_HAS_STD_FILESYSTEM) if( CMAKE_CXX_COMPILER_ID STREQUAL "GNU"
if( ( CMAKE_CXX_COMPILER_ID STREQUAL "GNU" AND CMAKE_CXX_COMPILER_VERSION VERSION_LESS 9)
AND CMAKE_CXX_COMPILER_VERSION VERSION_LESS 9) set(FAIRMQ_HAS_STD_FILESYSTEM 0)
OR ( CMAKE_CXX_COMPILER_ID STREQUAL "Clang" else()
AND CMAKE_CXX_COMPILER_VERSION VERSION_LESS 9)) set(FAIRMQ_HAS_STD_FILESYSTEM 1)
set(FAIRMQ_HAS_STD_FILESYSTEM 0)
else()
set(FAIRMQ_HAS_STD_FILESYSTEM 1)
endif()
endif()
if(NOT DEFINED FAIRMQ_HAS_STD_PMR)
if(CMAKE_CXX_COMPILER_ID MATCHES "Clang")
# Clang (to be more precise: libc++) currently does not implement <memory_resource>
set(FAIRMQ_HAS_STD_PMR 0)
else()
set(FAIRMQ_HAS_STD_PMR 1)
endif()
endif() endif()

View File

@@ -1,5 +1,5 @@
################################################################################ ################################################################################
# Copyright (C) 2018-2023 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH # # Copyright (C) 2018-2022 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
# # # #
# This software is distributed under the terms of the # # This software is distributed under the terms of the #
# GNU Lesser General Public Licence (LGPL) version 3, # # GNU Lesser General Public Licence (LGPL) version 3, #
@@ -27,6 +27,18 @@ macro(fairmq_summary_components)
set(tests_summary "${BRed} NO${CR} (default, enable with ${BMagenta}-DBUILD_TESTING=ON${CR})") set(tests_summary "${BRed} NO${CR} (default, enable with ${BMagenta}-DBUILD_TESTING=ON${CR})")
endif() endif()
message(STATUS " ${BWhite}tests${CR} ${tests_summary}") message(STATUS " ${BWhite}tests${CR} ${tests_summary}")
if(BUILD_OFI_TRANSPORT)
set(ofi_summary "${BGreen}YES${CR} EXPERIMENTAL (disable with ${BMagenta}-DBUILD_OFI_TRANSPORT=OFF${CR})")
else()
set(ofi_summary "${BRed} NO${CR} EXPERIMENTAL (default, enable with ${BMagenta}-DBUILD_OFI_TRANSPORT=ON${CR})")
endif()
message(STATUS " ${BWhite}ofi_transport${CR} ${ofi_summary}")
if(BUILD_PMIX_PLUGIN)
set(pmix_summary "${BGreen}YES${CR} EXPERIMENTAL (disable with ${BMagenta}-DBUILD_PMIX_PLUGIN=OFF${CR})")
else()
set(pmix_summary "${BRed} NO${CR} EXPERIMENTAL (default, enable with ${BMagenta}-DBUILD_PMIX_PLUGIN=ON${CR})")
endif()
message(STATUS " ${BWhite}pmix_plugin${CR} ${pmix_summary}")
if(BUILD_EXAMPLES) if(BUILD_EXAMPLES)
set(examples_summary "${BGreen}YES${CR} (default, disable with ${BMagenta}-DBUILD_EXAMPLES=OFF${CR})") set(examples_summary "${BGreen}YES${CR} (default, disable with ${BMagenta}-DBUILD_EXAMPLES=OFF${CR})")
else() else()
@@ -93,13 +105,3 @@ macro(fairmq_summary_debug_mode)
message(STATUS " ${Cyan}DEBUG MODE${CR} ${BRed}${FAIRMQ_DEBUG_MODE}${CR} (enable with ${BMagenta}-DFAIRMQ_DEBUG_MODE=ON${CR})") message(STATUS " ${Cyan}DEBUG MODE${CR} ${BRed}${FAIRMQ_DEBUG_MODE}${CR} (enable with ${BMagenta}-DFAIRMQ_DEBUG_MODE=ON${CR})")
endif() endif()
endmacro() endmacro()
macro(fairmq_summary_compile_definitions)
message(STATUS " ")
message(STATUS " ${Cyan}COMPILE DEFINITION VALUE${CR}")
message(STATUS " ${BWhite}FAIRMQ_HAS_STD_FILESYSTEM${CR} ${FAIRMQ_HAS_STD_FILESYSTEM} (overridable with ${BMagenta}-DFAIRMQ_HAS_STD_FILESYSTEM=0|1${CR})")
message(STATUS " ${BWhite}FAIRMQ_HAS_STD_PMR${CR} ${FAIRMQ_HAS_STD_PMR} (overridable with ${BMagenta}-DFAIRMQ_HAS_STD_PMR=0|1${CR})")
if(DEFINED FAIRMQ_CHANNEL_DEFAULT_AUTOBIND)
message(STATUS " ${BWhite}FAIRMQ_CHANNEL_DEFAULT_AUTOBIND${CR} ${FAIRMQ_CHANNEL_DEFAULT_AUTOBIND}")
endif()
endmacro()

74
cmake/FindPMIx.cmake Normal file
View File

@@ -0,0 +1,74 @@
################################################################################
# Copyright (C) 2019 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
# #
# This software is distributed under the terms of the #
# GNU Lesser General Public Licence (LGPL) version 3, #
# copied verbatim in the file "LICENSE" #
################################################################################
# The "lib/${CMAKE_LIBRARY_ARCHITECTURE}/pmix" part in all
# the PATH_SUFFIXES is here to be able to find Debian's
# libpmix-dev package. It installs everything below
# /usr/lib/${CMAKE_LIBRARY_ARCHITECTURE}/pmix
find_path(PMIx_INCLUDE_DIR
NAMES pmix.h
HINTS ${PMIX_ROOT} $ENV{PMIX_ROOT}
PATH_SUFFIXES include lib/${CMAKE_LIBRARY_ARCHITECTURE}/pmix/include
)
find_path(PMIx_LIBRARY_DIR
NAMES libpmix.dylib libpmix.so
HINTS ${PMIX_ROOT} $ENV{PMIX_ROOT}
PATH_SUFFIXES lib lib64 lib/${CMAKE_LIBRARY_ARCHITECTURE}/pmix/lib
)
find_library(PMIx_LIBRARY_SHARED
NAMES libpmix.dylib libpmix.so
HINTS ${PMIX_ROOT} $ENV{PMIX_ROOT}
PATH_SUFFIXES lib lib64 lib/${CMAKE_LIBRARY_ARCHITECTURE}/pmix/lib
)
find_file(PMIx_VERSION_FILE
NAMES pmix_version.h
HINTS ${PMIX_ROOT} $ENV{PMIX_ROOT}
PATH_SUFFIXES include lib/${CMAKE_LIBRARY_ARCHITECTURE}/pmix/include
)
file(READ "${PMIx_VERSION_FILE}" __version_raw)
string(REGEX MATCH "#define PMIX_VERSION_MAJOR ([0-9]?)L?"
__version_major_raw "${__version_raw}"
)
set(PMIx_VERSION_MAJOR "${CMAKE_MATCH_1}")
string(REGEX MATCH "#define PMIX_VERSION_MINOR ([0-9]?)L?"
__version_minor_raw "${__version_raw}"
)
set(PMIx_VERSION_MINOR "${CMAKE_MATCH_1}")
string(REGEX MATCH "#define PMIX_VERSION_RELEASE ([0-9]?)L?"
__version_patch_raw "${__version_raw}"
)
set(PMIx_VERSION_PATCH "${CMAKE_MATCH_1}")
set(PMIx_VERSION "${PMIx_VERSION_MAJOR}.${PMIx_VERSION_MINOR}.${PMIx_VERSION_PATCH}")
include(FindPackageHandleStandardArgs)
find_package_handle_standard_args(PMIx
REQUIRED_VARS
PMIx_INCLUDE_DIR
PMIx_LIBRARY_DIR
PMIx_LIBRARY_SHARED
VERSION_VAR PMIx_VERSION
)
if(NOT TARGET PMIx::libpmix AND PMIx_FOUND)
add_library(PMIx::libpmix SHARED IMPORTED)
set_target_properties(PMIx::libpmix PROPERTIES
IMPORTED_LOCATION ${PMIx_LIBRARY_SHARED}
INTERFACE_INCLUDE_DIRECTORIES ${PMIx_INCLUDE_DIR}
)
endif()

View File

@@ -1,5 +1,5 @@
################################################################################ ################################################################################
# Copyright (C) 2017-2023 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH # # Copyright (C) 2017-2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
# # # #
# This software is distributed under the terms of the # # This software is distributed under the terms of the #
# GNU Lesser General Public Licence (LGPL) version 3, # # GNU Lesser General Public Licence (LGPL) version 3, #
@@ -20,7 +20,6 @@
# [DEPENDS dep1 [dep2 ...]] # [DEPENDS dep1 [dep2 ...]]
# [LINKS linklib1 [linklib2 ...] # [LINKS linklib1 [linklib2 ...]
# [INCLUDES dir1 [dir2 ...] # [INCLUDES dir1 [dir2 ...]
# [ENVIRONMENT var1=op:val1[;var2=op:val2 ...]]
# [TIMEOUT seconds] # [TIMEOUT seconds]
# [RUN_SERIAL ON/OFF]) # [RUN_SERIAL ON/OFF])
# #
@@ -57,7 +56,7 @@ function(add_testsuite suitename)
cmake_parse_arguments(testsuite cmake_parse_arguments(testsuite
"" ""
"TIMEOUT;RUN_SERIAL" "TIMEOUT;RUN_SERIAL"
"SOURCES;LINKS;DEPENDS;INCLUDES;DEFINITIONS;ENVIRONMENT" "SOURCES;LINKS;DEPENDS;INCLUDES;DEFINITIONS"
${ARGN} ${ARGN}
) )
@@ -78,9 +77,6 @@ function(add_testsuite suitename)
if(testsuite_DEFINITIONS) if(testsuite_DEFINITIONS)
target_compile_definitions("${target}" PUBLIC ${testsuite_DEFINITIONS}) target_compile_definitions("${target}" PUBLIC ${testsuite_DEFINITIONS})
endif() endif()
if(testsuite_ENVIRONMENT AND CMAKE_VERSION VERSION_GREATER_EQUAL 3.22)
set(env "ENVIRONMENT_MODIFICATION" ${testsuite_ENVIRONMENT})
endif()
if(BUILD_TIDY_TOOL AND RUN_FAIRMQ_TIDY) if(BUILD_TIDY_TOOL AND RUN_FAIRMQ_TIDY)
fairmq_target_tidy(TARGET ${target}) fairmq_target_tidy(TARGET ${target})
endif() endif()
@@ -101,7 +97,6 @@ function(add_testsuite suitename)
TEST_PREFIX ${suitename}. TEST_PREFIX ${suitename}.
PROPERTIES RUN_SERIAL ${testsuite_RUN_SERIAL} PROPERTIES RUN_SERIAL ${testsuite_RUN_SERIAL}
TIMEOUT ${testsuite_TIMEOUT} TIMEOUT ${testsuite_TIMEOUT}
${env}
) )
list(APPEND ALL_TEST_TARGETS ${target}) list(APPEND ALL_TEST_TARGETS ${target})

View File

@@ -6,29 +6,17 @@
"license": "./COPYRIGHT", "license": "./COPYRIGHT",
"datePublished": "2018-04-15", "datePublished": "2018-04-15",
"developmentStatus": "active", "developmentStatus": "active",
"softwareVersion": "master",
"releaseNotes": "https://github.com/FairRootGroup/FairMQ/releases",
"codeRepository": "https://github.com/FairRootGroup/FairMQ/", "codeRepository": "https://github.com/FairRootGroup/FairMQ/",
"readme": "https://github.com/FairRootGroup/FairMQ/#readme",
"issueTracker": "https://github.com/FairRootGroup/FairMQ/issues", "issueTracker": "https://github.com/FairRootGroup/FairMQ/issues",
"identifier": "https://doi.org/10.5281/zenodo.1689985", "identifier": "https://doi.org/10.5281/zenodo.1689985",
"maintainer": [
{
"@type": "ResearchOrganisation",
"@id": "https://ror.org/02k8cbn47",
"name": "GSI Helmholtz Centre for Heavy Ion Research"
}
],
"author": [ "author": [
{ {
"@type": "Person", "@type": "Person",
"@id": "https://orcid.org/0000-0002-8071-4497",
"givenName": "Mohammad", "givenName": "Mohammad",
"familyName": "Al-Turany" "familyName": "Al-Turany"
}, },
{ {
"@type": "Person", "@type": "Person",
"@id": "https://orcid.org/0000-0003-3787-1910",
"givenName": "Dennis", "givenName": "Dennis",
"familyName": "Klein" "familyName": "Klein"
}, },
@@ -39,7 +27,6 @@
}, },
{ {
"@type": "Person", "@type": "Person",
"@id": "https://orcid.org/0000-0002-6249-155X",
"givenName": "Alexey", "givenName": "Alexey",
"familyName": "Rybalchenko" "familyName": "Rybalchenko"
}, },
@@ -104,7 +91,6 @@
}, },
{ {
"@type": "Person", "@type": "Person",
"@id": "https://orcid.org/0000-0002-5321-8404",
"givenName": "Christian", "givenName": "Christian",
"familyName": "Tacke" "familyName": "Tacke"
}, },

83
codemeta_update.py Executable file
View File

@@ -0,0 +1,83 @@
#! /usr/bin/env python3
# Copyright (C) 2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH
#
# SPDX-License-Identifier: LGPL-3.0-or-later
import argparse
import json
import re
from collections import OrderedDict
class CodeMetaManipulator(object):
def load(self, filename='codemeta.json'):
with open(filename, 'rb') as fp:
self.data = json.load(fp, object_pairs_hook=OrderedDict)
@staticmethod
def _dict_entry_cmp(dict1, dict2, field):
if (field in dict1) and (field in dict2):
return dict1[field] == dict2[field]
else:
return False
@classmethod
def find_person_entry(cls, person_list, matchdict):
for entry in person_list:
if cls._dict_entry_cmp(entry, matchdict, 'email'):
return entry
if cls._dict_entry_cmp(entry, matchdict, 'familyName') \
and cls._dict_entry_cmp(entry, matchdict, 'givenName'):
return entry
return None
@staticmethod
def update_person_entry(entry, matchdict):
if entry is None:
entry = OrderedDict()
entry['@type'] = 'Person'
for field in ('givenName', 'familyName', 'email'):
val = matchdict.get(field, None)
if val is not None:
entry[field] = val
return entry
def handle_person_list_file(self, filename, cm_field_name):
fp = open(filename, 'r', encoding='utf8')
findregex = re.compile(r'^(?P<familyName>[-\w\s]*[-\w]),\s*'
r'(?P<givenName>[-\w\s]*[-\w])\s*'
r'(?:<(?P<email>\S+@\S+)>)?$')
person_list = self.data.setdefault(cm_field_name, [])
for line in fp:
line = line.strip()
m = findregex.match(line)
if m is None:
raise RuntimeError("Could not analyze line %r" % line)
found_entry = self.find_person_entry(person_list, m.groupdict())
entry = self.update_person_entry(found_entry, m.groupdict())
if found_entry is None:
person_list.append(entry)
def save(self, filename='codemeta.json'):
with open('codemeta.json', 'w', encoding='utf8') as fp:
json.dump(self.data, fp, indent=2)
fp.write('\n')
def main():
parser = argparse.ArgumentParser(description='Update codemeta.json')
parser.add_argument('--set-version', dest='newversion')
args = parser.parse_args()
cm = CodeMetaManipulator()
cm.load()
if args.newversion is not None:
cm.data['softwareVersion'] = args.newversion
cm.handle_person_list_file('AUTHORS', 'author')
cm.handle_person_list_file('CONTRIBUTORS', 'contributor')
cm.save()
if __name__ == '__main__':
main()

View File

@@ -28,6 +28,7 @@ Here is an overview of the device/channel options and when they are applied:
| `init-timeout` | at the end of `fair::mq::State::InitializingDevice` | | `init-timeout` | at the end of `fair::mq::State::InitializingDevice` |
| `shm-segment-size` | at the end of `fair::mq::State::InitializingDevice` | | `shm-segment-size` | at the end of `fair::mq::State::InitializingDevice` |
| `shm-monitor` | at the end of `fair::mq::State::InitializingDevice` | | `shm-monitor` | at the end of `fair::mq::State::InitializingDevice` |
| `ofi-size-hint` | at the end of `fair::mq::State::InitializingDevice` |
| `rate` | at the end of `fair::mq::State::InitializingDevice` | | `rate` | at the end of `fair::mq::State::InitializingDevice` |
| `session` | at the end of `fair::mq::State::InitializingDevice` | | `session` | at the end of `fair::mq::State::InitializingDevice` |
| `chan.*` | at the end of `fair::mq::State::InitializingDevice` (channel addresses can be also applied during `fair::mq::State::Binding`/`fair::mq::State::Connecting`) | | `chan.*` | at the end of `fair::mq::State::InitializingDevice` (channel addresses can be also applied during `fair::mq::State::Binding`/`fair::mq::State::Connecting`) |

View File

@@ -56,6 +56,8 @@ A more complete example which may serve as a start including example CMake code
### 7.3.1 PMIx ### 7.3.1 PMIx
The [PMIx](https://pmix.org/) plugin enables launching a FairMQ topology with any PMIx capable launcher, e.g. the [Open Run-Time Environment (ORTE) of OpenMPI](https://www.open-mpi.org/doc/v4.0/man1/mpirun.1.php) or the [Slurm workload manager](https://slurm.schedmd.com/srun.html). This experimental plugin has been last released in v1.4.56 and is removed in v1.5+. For now there are no plans to pick up development of it again. The [PMIx](https://pmix.org/) plugin enables launching a FairMQ topology with any PMIx capable launcher, e.g. the [Open Run-Time Environment (ORTE) of OpenMPI](https://www.open-mpi.org/doc/v4.0/man1/mpirun.1.php) or the [Slurm workload manager](https://slurm.schedmd.com/srun.html). This plugin is not (yet) very mature and serves as a proof of concept at the moment.
TODO example usage
← [Back](../README.md) ← [Back](../README.md)

View File

@@ -1,9 +1,51 @@
################################################################################ ################################################################################
# Copyright (C) 2014-2023 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH # # Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
# # # #
# This software is distributed under the terms of the # # This software is distributed under the terms of the #
# GNU Lesser General Public Licence (LGPL) version 3, # # GNU Lesser General Public Licence (LGPL) version 3, #
# copied verbatim in the file "LICENSE" # # copied verbatim in the file "LICENSE" #
################################################################################ ################################################################################
add_example(NAME 1-1 DEVICE sampler sink) add_executable(fairmq-ex-1-1-sampler sampler.cxx)
target_link_libraries(fairmq-ex-1-1-sampler PRIVATE FairMQ)
add_executable(fairmq-ex-1-1-sink sink.cxx)
target_link_libraries(fairmq-ex-1-1-sink PRIVATE FairMQ)
add_custom_target(Example11 DEPENDS fairmq-ex-1-1-sampler fairmq-ex-1-1-sink)
set(EX_BIN_DIR ${CMAKE_CURRENT_BINARY_DIR})
set(FAIRMQ_BIN_DIR ${CMAKE_BINARY_DIR}/fairmq)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-1-1.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-1-1.sh)
# test
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/test-ex-1-1.sh.in ${CMAKE_CURRENT_BINARY_DIR}/test-ex-1-1.sh)
add_test(NAME Example.1-1.zeromq COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-1-1.sh zeromq)
set_tests_properties(Example.1-1.zeromq PROPERTIES TIMEOUT "30" PASS_REGULAR_EXPRESSION "Received: ")
add_test(NAME Example.1-1.shmem COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-1-1.sh shmem)
set_tests_properties(Example.1-1.shmem PROPERTIES TIMEOUT "30" PASS_REGULAR_EXPRESSION "Received: ")
# install
install(
TARGETS
fairmq-ex-1-1-sampler
fairmq-ex-1-1-sink
LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR}
RUNTIME DESTINATION ${PROJECT_INSTALL_BINDIR}
)
# configure run script with different executable paths for build and for install directories
set(EX_BIN_DIR ${CMAKE_INSTALL_PREFIX}/${PROJECT_INSTALL_BINDIR})
set(FAIRMQ_BIN_DIR ${CMAKE_INSTALL_PREFIX}/${PROJECT_INSTALL_BINDIR}/fairmq)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-1-1.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-1-1.sh_install)
install(
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-1-1.sh_install
DESTINATION ${PROJECT_INSTALL_BINDIR}
RENAME fairmq-start-ex-1-1.sh
)

View File

@@ -1,7 +1,5 @@
#!/bin/bash #!/bin/bash
set -e
export FAIRMQ_PATH=@FAIRMQ_BIN_DIR@ export FAIRMQ_PATH=@FAIRMQ_BIN_DIR@
transport="zeromq" transport="zeromq"
@@ -15,7 +13,7 @@ chan="data"
chanAddr="/tmp/fmq_$session""_""$chan""_""$transport" chanAddr="/tmp/fmq_$session""_""$chan""_""$transport"
# setup a trap to kill everything if the test fails/timeouts # setup a trap to kill everything if the test fails/timeouts
trap 'set +e; kill -TERM $SAMPLER_PID; kill -TERM $SINK_PID; wait $SAMPLER_PID; wait $SINK_PID; rm $chanAddr; exit 0' TERM trap 'kill -TERM $SAMPLER_PID; kill -TERM $SINK_PID; wait $SAMPLER_PID; wait $SINK_PID; rm $chanAddr' TERM
SAMPLER="fairmq-ex-1-1-sampler" SAMPLER="fairmq-ex-1-1-sampler"
SAMPLER+=" --id sampler1" SAMPLER+=" --id sampler1"
@@ -24,7 +22,6 @@ SAMPLER+=" --transport $transport"
SAMPLER+=" --verbosity veryhigh" SAMPLER+=" --verbosity veryhigh"
SAMPLER+=" --session $session" SAMPLER+=" --session $session"
SAMPLER+=" --shm-segment-size 100000000" SAMPLER+=" --shm-segment-size 100000000"
SAMPLER+=" --shm-monitor true"
SAMPLER+=" --control static --color false" SAMPLER+=" --control static --color false"
SAMPLER+=" --max-iterations 1" SAMPLER+=" --max-iterations 1"
SAMPLER+=" --channel-config name=$chan,type=push,method=bind,address=ipc://$chanAddr,rateLogging=0" SAMPLER+=" --channel-config name=$chan,type=push,method=bind,address=ipc://$chanAddr,rateLogging=0"
@@ -37,7 +34,6 @@ SINK+=" --transport $transport"
SINK+=" --verbosity veryhigh" SINK+=" --verbosity veryhigh"
SINK+=" --session $session" SINK+=" --session $session"
SINK+=" --shm-segment-size 100000000" SINK+=" --shm-segment-size 100000000"
SINK+=" --shm-monitor true"
SINK+=" --control static --color false" SINK+=" --control static --color false"
SINK+=" --max-iterations 1" SINK+=" --max-iterations 1"
SINK+=" --channel-config name=$chan,type=pull,method=connect,address=ipc://$chanAddr,rateLogging=0" SINK+=" --channel-config name=$chan,type=pull,method=connect,address=ipc://$chanAddr,rateLogging=0"
@@ -48,6 +44,4 @@ SINK_PID=$!
wait $SAMPLER_PID wait $SAMPLER_PID
wait $SINK_PID wait $SINK_PID
set +e
rm $chanAddr rm $chanAddr
exit 0

View File

@@ -1,9 +1,64 @@
################################################################################ ################################################################################
# Copyright (C) 2014-2023 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH # # Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
# # # #
# This software is distributed under the terms of the # # This software is distributed under the terms of the #
# GNU Lesser General Public Licence (LGPL) version 3, # # GNU Lesser General Public Licence (LGPL) version 3, #
# copied verbatim in the file "LICENSE" # # copied verbatim in the file "LICENSE" #
################################################################################ ################################################################################
add_example(NAME 1-n-1 DEVICE sampler processor sink CONFIG) add_executable(fairmq-ex-1-n-1-sampler sampler.cxx)
target_link_libraries(fairmq-ex-1-n-1-sampler PRIVATE FairMQ)
add_executable(fairmq-ex-1-n-1-processor processor.cxx)
target_link_libraries(fairmq-ex-1-n-1-processor PRIVATE FairMQ)
add_executable(fairmq-ex-1-n-1-sink sink.cxx)
target_link_libraries(fairmq-ex-1-n-1-sink PRIVATE FairMQ)
add_custom_target(Example1N1 DEPENDS fairmq-ex-1-n-1-sampler fairmq-ex-1-n-1-processor fairmq-ex-1-n-1-sink)
set(EX_BIN_DIR ${CMAKE_CURRENT_BINARY_DIR})
set(EX_CONF_DIR ${CMAKE_CURRENT_BINARY_DIR})
set(FAIRMQ_BIN_DIR ${CMAKE_BINARY_DIR}/fairmq)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-1-n-1.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-1-n-1.sh)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/ex-1-n-1.json ${CMAKE_CURRENT_BINARY_DIR}/ex-1-n-1.json)
# test
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/test-ex-1-n-1.sh.in ${CMAKE_CURRENT_BINARY_DIR}/test-ex-1-n-1.sh)
add_test(NAME Example.1-n-1.zeromq COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-1-n-1.sh zeromq)
set_tests_properties(Example.1-n-1.zeromq PROPERTIES TIMEOUT "30" PASS_REGULAR_EXPRESSION "Received: ")
add_test(NAME Example.1-n-1.shmem COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-1-n-1.sh shmem)
set_tests_properties(Example.1-n-1.shmem PROPERTIES TIMEOUT "30" PASS_REGULAR_EXPRESSION "Received: ")
# install
install(
TARGETS
fairmq-ex-1-n-1-sampler
fairmq-ex-1-n-1-processor
fairmq-ex-1-n-1-sink
LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR}
RUNTIME DESTINATION ${PROJECT_INSTALL_BINDIR}
)
# configure run script with different executable paths for build and for install directories
set(EX_BIN_DIR ${CMAKE_INSTALL_PREFIX}/${PROJECT_INSTALL_BINDIR})
set(EX_CONF_DIR ${CMAKE_INSTALL_PREFIX}/${PROJECT_INSTALL_DATADIR})
set(FAIRMQ_BIN_DIR ${CMAKE_INSTALL_PREFIX}/${PROJECT_INSTALL_BINDIR}/fairmq)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-1-n-1.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-1-n-1.sh_install)
install(
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-1-n-1.sh_install
DESTINATION ${PROJECT_INSTALL_BINDIR}
RENAME fairmq-start-ex-1-n-1.sh
)
install(
FILES ${CMAKE_CURRENT_BINARY_DIR}/ex-1-n-1.json
DESTINATION ${PROJECT_INSTALL_DATADIR}
)

View File

@@ -1,7 +1,5 @@
#!/bin/bash #!/bin/bash
set -e
export FAIRMQ_PATH=@FAIRMQ_BIN_DIR@ export FAIRMQ_PATH=@FAIRMQ_BIN_DIR@
transport="zeromq" transport="zeromq"
@@ -17,7 +15,7 @@ chan1Addr="/tmp/fmq_$session""_""$chan1""_""$transport"
chan2Addr="/tmp/fmq_$session""_""$chan2""_""$transport" chan2Addr="/tmp/fmq_$session""_""$chan2""_""$transport"
# setup a trap to kill everything if the test fails/timeouts # setup a trap to kill everything if the test fails/timeouts
trap 'set +e; kill -TERM $SAMPLER_PID; kill -TERM $SINK_PID; kill -TERM $PROCESSOR1_PID; kill -TERM $PROCESSOR2_PID; wait $SAMPLER_PID; wait $SINK_PID; wait $PROCESSOR1_PID; wait $PROCESSOR2_PID; rm $chan1Addr; rm $chan2Addr; exit 0' TERM trap 'kill -TERM $SAMPLER_PID; kill -TERM $SINK_PID; kill -TERM $PROCESSOR1_PID; kill -TERM $PROCESSOR2_PID; wait $SAMPLER_PID; wait $SINK_PID; wait $PROCESSOR1_PID; wait $PROCESSOR2_PID; rm $chan1Addr; rm $chan2Addr' TERM
SAMPLER="fairmq-ex-1-n-1-sampler" SAMPLER="fairmq-ex-1-n-1-sampler"
SAMPLER+=" --id sampler1" SAMPLER+=" --id sampler1"
@@ -26,7 +24,6 @@ SAMPLER+=" --verbosity veryhigh"
SAMPLER+=" --session $session" SAMPLER+=" --session $session"
SAMPLER+=" --severity debug" SAMPLER+=" --severity debug"
SAMPLER+=" --shm-segment-size 100000000" SAMPLER+=" --shm-segment-size 100000000"
SAMPLER+=" --shm-monitor true"
SAMPLER+=" --control static --color false" SAMPLER+=" --control static --color false"
SAMPLER+=" --max-iterations 2" SAMPLER+=" --max-iterations 2"
SAMPLER+=" --channel-config name=$chan1,type=push,method=bind,address=ipc://$chan1Addr,rateLogging=0" SAMPLER+=" --channel-config name=$chan1,type=push,method=bind,address=ipc://$chan1Addr,rateLogging=0"
@@ -40,7 +37,6 @@ PROCESSOR1+=" --verbosity veryhigh"
PROCESSOR1+=" --session $session" PROCESSOR1+=" --session $session"
PROCESSOR1+=" --severity debug" PROCESSOR1+=" --severity debug"
PROCESSOR1+=" --shm-segment-size 100000000" PROCESSOR1+=" --shm-segment-size 100000000"
PROCESSOR1+=" --shm-monitor true"
PROCESSOR1+=" --control static --color false" PROCESSOR1+=" --control static --color false"
PROCESSOR1+=" --channel-config name=$chan1,type=pull,method=connect,address=ipc://$chan1Addr,rateLogging=0" PROCESSOR1+=" --channel-config name=$chan1,type=pull,method=connect,address=ipc://$chan1Addr,rateLogging=0"
PROCESSOR1+=" name=$chan2,type=push,method=connect,address=ipc://$chan2Addr,rateLogging=0" PROCESSOR1+=" name=$chan2,type=push,method=connect,address=ipc://$chan2Addr,rateLogging=0"
@@ -54,7 +50,6 @@ PROCESSOR2+=" --verbosity veryhigh"
PROCESSOR2+=" --session $session" PROCESSOR2+=" --session $session"
PROCESSOR2+=" --severity debug" PROCESSOR2+=" --severity debug"
PROCESSOR2+=" --shm-segment-size 100000000" PROCESSOR2+=" --shm-segment-size 100000000"
PROCESSOR2+=" --shm-monitor true"
PROCESSOR2+=" --control static --color false" PROCESSOR2+=" --control static --color false"
PROCESSOR2+=" --channel-config name=$chan1,type=pull,method=connect,address=ipc://$chan1Addr,rateLogging=0" PROCESSOR2+=" --channel-config name=$chan1,type=pull,method=connect,address=ipc://$chan1Addr,rateLogging=0"
PROCESSOR2+=" name=$chan2,type=push,method=connect,address=ipc://$chan2Addr,rateLogging=0" PROCESSOR2+=" name=$chan2,type=push,method=connect,address=ipc://$chan2Addr,rateLogging=0"
@@ -68,7 +63,6 @@ SINK+=" --verbosity veryhigh"
SINK+=" --session $session" SINK+=" --session $session"
SINK+=" --severity debug" SINK+=" --severity debug"
SINK+=" --shm-segment-size 100000000" SINK+=" --shm-segment-size 100000000"
SINK+=" --shm-monitor true"
SINK+=" --control static --color false" SINK+=" --control static --color false"
SINK+=" --max-iterations 2" SINK+=" --max-iterations 2"
SINK+=" --channel-config name=$chan2,type=pull,method=bind,address=ipc://$chan2Addr,rateLogging=0" SINK+=" --channel-config name=$chan2,type=pull,method=bind,address=ipc://$chan2Addr,rateLogging=0"
@@ -87,6 +81,4 @@ kill -SIGINT $PROCESSOR2_PID
wait $PROCESSOR1_PID wait $PROCESSOR1_PID
wait $PROCESSOR2_PID wait $PROCESSOR2_PID
set +e
rm $chan1Addr; rm $chan2Addr rm $chan1Addr; rm $chan2Addr
exit 0

View File

@@ -1,145 +1,15 @@
################################################################################ ################################################################################
# Copyright (C) 2018-2023 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH # # Copyright (C) 2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
# # # #
# This software is distributed under the terms of the # # This software is distributed under the terms of the #
# GNU Lesser General Public Licence (LGPL) version 3, # # GNU Lesser General Public Licence (LGPL) version 3, #
# copied verbatim in the file "LICENSE" # # copied verbatim in the file "LICENSE" #
################################################################################ ################################################################################
set(exe_prefix "fairmq-ex")
set(script_prefix "fairmq-start-ex")
set(test_script_prefix "test-ex")
set(testsuite "Example")
set(transports "zeromq" "shmem")
function(add_example)
cmake_parse_arguments(PARSE_ARGV 0 ARG
"CONFIG;NO_TRANSPORT;NO_TEST"
"NAME"
"DEVICE;VARIANT;TRANSPORT;SCRIPT"
)
if(ARG_UNPARSED_ARGUMENTS)
message(FATAL_ERROR "Unexpected unparsed arguments: ${ARG_UNPARSED_ARGUMENTS}")
endif()
if(ARG_NAME)
set(name ${ARG_NAME})
else()
message(FATAL_ERROR "NAME arg is required")
endif()
if(ENABLE_SANITIZER_LEAK AND CMAKE_VERSION VERSION_GREATER_EQUAL 3.22)
get_filename_component(lsan_supps "${CMAKE_SOURCE_DIR}/test/leak_sanitizer_suppressions.txt" ABSOLUTE)
set(lsan_options "LSAN_OPTIONS=set:suppressions=${lsan_supps}")
endif()
if(ARG_DEVICE)
set(exe_targets)
foreach(device IN LISTS ARG_DEVICE)
set(exe "${exe_prefix}-${name}-${device}")
list(APPEND exe_targets ${exe})
add_executable(${exe} "${device}.cxx")
target_link_libraries(${exe} PRIVATE FairMQ)
endforeach()
endif()
if(ARG_TRANSPORT)
set(transports ${ARG_TRANSPORT})
endif()
if(ARG_SCRIPT)
set(scripts ${ARG_SCRIPT})
else()
set(scripts ${ARG_NAME})
endif()
set(EX_BIN_DIR ${CMAKE_CURRENT_BINARY_DIR})
if(ARG_CONFIG)
set(EX_CONF_DIR ${CMAKE_CURRENT_BINARY_DIR})
endif()
set(FAIRMQ_BIN_DIR ${CMAKE_BINARY_DIR}/fairmq)
foreach(script IN LISTS scripts)
set(script_file "${script_prefix}-${script}.sh")
configure_file("${CMAKE_CURRENT_SOURCE_DIR}/${script_file}.in" "${CMAKE_CURRENT_BINARY_DIR}/${script_file}" @ONLY)
endforeach()
if(ARG_CONFIG)
set(config "ex-${name}.json")
configure_file("${CMAKE_CURRENT_SOURCE_DIR}/${config}" "${CMAKE_CURRENT_BINARY_DIR}/${config}")
endif()
# test
if(NOT ARG_NO_TEST)
set(test_script "${test_script_prefix}-${name}.sh")
configure_file("${CMAKE_CURRENT_SOURCE_DIR}/${test_script}.in" "${CMAKE_CURRENT_BINARY_DIR}/${test_script}")
if(ARG_NO_TRANSPORT)
set(test "${testsuite}.${name}.${transport}")
add_test(NAME ${test} COMMAND ${CMAKE_CURRENT_BINARY_DIR}/${test_script} ${transport})
set_tests_properties(${test} PROPERTIES TIMEOUT "30")
if(lsan_options)
set_tests_properties(${test} PROPERTIES ENVIRONMENT_MODIFICATION ${lsan_options})
endif()
else()
foreach(transport IN LISTS transports)
if(ARG_VARIANT)
foreach(variant IN LISTS ARG_VARIANT)
set(test "${testsuite}.${name}.${variant}.${transport}")
add_test(NAME ${test} COMMAND ${CMAKE_CURRENT_BINARY_DIR}/${test_script} ${transport} ${variant})
set_tests_properties(${test} PROPERTIES TIMEOUT "30")
if(lsan_options)
set_tests_properties(${test} PROPERTIES ENVIRONMENT_MODIFICATION ${lsan_options})
endif()
endforeach()
else()
set(test "${testsuite}.${name}.${transport}")
add_test(NAME ${test} COMMAND ${CMAKE_CURRENT_BINARY_DIR}/${test_script} ${transport})
set_tests_properties(${test} PROPERTIES TIMEOUT "30")
if(lsan_options)
set_tests_properties(${test} PROPERTIES ENVIRONMENT_MODIFICATION ${lsan_options})
endif()
endif()
endforeach()
endif()
endif()
# install
install(
TARGETS ${exe_targets}
LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR}
RUNTIME DESTINATION ${PROJECT_INSTALL_BINDIR}
)
# configure run script with different executable paths for build and for install directories
set(EX_BIN_DIR ${CMAKE_INSTALL_PREFIX}/${PROJECT_INSTALL_BINDIR})
if(ARG_CONFIG)
set(EX_CONF_DIR ${CMAKE_INSTALL_PREFIX}/${PROJECT_INSTALL_DATADIR})
endif()
set(FAIRMQ_BIN_DIR ${CMAKE_INSTALL_PREFIX}/${PROJECT_INSTALL_BINDIR}/fairmq)
foreach(script IN LISTS scripts)
set(script_file "${script_prefix}-${script}.sh")
configure_file("${CMAKE_CURRENT_SOURCE_DIR}/${script_file}.in" "${CMAKE_CURRENT_BINARY_DIR}/${script_file}_install" @ONLY)
install(
PROGRAMS "${CMAKE_CURRENT_BINARY_DIR}/${script_file}_install"
DESTINATION ${PROJECT_INSTALL_BINDIR}
RENAME ${script_file}
)
endforeach()
if(ARG_CONFIG)
install(
FILES ${CMAKE_CURRENT_BINARY_DIR}/${config}
DESTINATION ${PROJECT_INSTALL_DATADIR}
)
endif()
endfunction()
add_subdirectory(1-1) add_subdirectory(1-1)
add_subdirectory(1-n-1) add_subdirectory(1-n-1)
add_subdirectory(builtin-devices) add_subdirectory(builtin-devices)
add_subdirectory(copypush) add_subdirectory(copypush)
add_subdirectory(custom-controller)
add_subdirectory(dds) add_subdirectory(dds)
add_subdirectory(multipart) add_subdirectory(multipart)
add_subdirectory(multiple-channels) add_subdirectory(multiple-channels)

View File

@@ -1,11 +1,40 @@
################################################################################ ################################################################################
# Copyright (C) 2014-2023 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH # # Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
# # # #
# This software is distributed under the terms of the # # This software is distributed under the terms of the #
# GNU Lesser General Public Licence (LGPL) version 3, # # GNU Lesser General Public Licence (LGPL) version 3, #
# copied verbatim in the file "LICENSE" # # copied verbatim in the file "LICENSE" #
################################################################################ ################################################################################
add_example(NAME builtin-devices set(EX_BIN_DIR ${CMAKE_CURRENT_BINARY_DIR})
VARIANT single_msg multipart set(FAIRMQ_BIN_DIR ${CMAKE_BINARY_DIR}/fairmq)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-builtin-devices.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-builtin-devices.sh)
# test
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/test-ex-builtin-devices.sh.in ${CMAKE_CURRENT_BINARY_DIR}/test-ex-builtin-devices.sh)
add_test(NAME Example.BuiltinDevices.zeromq COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-builtin-devices.sh zeromq)
set_tests_properties(Example.BuiltinDevices.zeromq PROPERTIES TIMEOUT "30" PASS_REGULAR_EXPRESSION "Configured maximum number of iterations reached")
add_test(NAME Example.BuiltinDevices.shmem COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-builtin-devices.sh shmem)
set_tests_properties(Example.BuiltinDevices.shmem PROPERTIES TIMEOUT "30" PASS_REGULAR_EXPRESSION "Configured maximum number of iterations reached")
add_test(NAME Example.BuiltinDevices.multipart.zeromq COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-builtin-devices.sh zeromq true 2)
set_tests_properties(Example.BuiltinDevices.multipart.zeromq PROPERTIES TIMEOUT "30" PASS_REGULAR_EXPRESSION "Configured maximum number of iterations reached")
add_test(NAME Example.BuiltinDevices.multipart.shmem COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-builtin-devices.sh shmem true 2)
set_tests_properties(Example.BuiltinDevices.multipart.shmem PROPERTIES TIMEOUT "30" PASS_REGULAR_EXPRESSION "Configured maximum number of iterations reached")
# install
# configure run script with different executable paths for build and for install directories
set(EX_BIN_DIR ${CMAKE_INSTALL_PREFIX}/${PROJECT_INSTALL_BINDIR})
set(FAIRMQ_BIN_DIR ${CMAKE_INSTALL_PREFIX}/${PROJECT_INSTALL_BINDIR}/fairmq)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-builtin-devices.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-builtin-devices.sh_install)
install(
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-builtin-devices.sh_install
DESTINATION ${PROJECT_INSTALL_BINDIR}
RENAME fairmq-start-ex-builtin-devices.sh
) )

View File

@@ -1,7 +1,5 @@
#!/bin/bash #!/bin/bash
set -e
export FAIRMQ_PATH=@FAIRMQ_BIN_DIR@ export FAIRMQ_PATH=@FAIRMQ_BIN_DIR@
transport="zeromq" transport="zeromq"
@@ -12,9 +10,12 @@ if [[ $1 =~ ^[a-z]+$ ]]; then
transport=$1 transport=$1
fi fi
if [[ $2 =~ ^multipart$ ]]; then if [[ $2 =~ ^[a-z]+$ ]]; then
multipart="true" multipart=$2
numParts=2 fi
if [[ $3 =~ ^[0-9]+$ ]]; then
numParts=$3
fi fi
session="$(@CMAKE_BINARY_DIR@/fairmq/fairmq-uuid-gen -h)" session="$(@CMAKE_BINARY_DIR@/fairmq/fairmq-uuid-gen -h)"
@@ -32,7 +33,7 @@ chan4Addr="/tmp/fmq_$session""_""$chan4""_""$transport"
chan5Addr="/tmp/fmq_$session""_""$chan5""_""$transport" chan5Addr="/tmp/fmq_$session""_""$chan5""_""$transport"
# setup a trap to kill everything if the test fails/timeouts # setup a trap to kill everything if the test fails/timeouts
trap 'set +e; kill -TERM $SAMPLER_PID; kill -TERM $SPLITTER_PID; kill -TERM $PROXY1_PID; kill -TERM $PROXY2_PID; kill -TERM $MERGER_PID; kill -TERM $MULTIPLIER_PID; kill -TERM $SINK_PID; rm $chan1Addr; rm $chan2Addr1; rm $chan2Addr2; rm $chan3Addr1; rm $chan3Addr2; rm $chan4Addr; rm $chan5Addr; exit 0' TERM trap 'kill -TERM $SAMPLER_PID; kill -TERM $SPLITTER_PID; kill -TERM $PROXY1_PID; kill -TERM $PROXY2_PID; kill -TERM $MERGER_PID; kill -TERM $MULTIPLIER_PID; kill -TERM $SINK_PID; rm $chan1Addr; rm $chan2Addr1; rm $chan2Addr2; rm $chan3Addr1; rm $chan3Addr2; rm $chan4Addr; rm $chan5Addr' TERM
SAMPLER="fairmq-bsampler" SAMPLER="fairmq-bsampler"
SAMPLER+=" --id bsampler1" SAMPLER+=" --id bsampler1"
@@ -42,7 +43,6 @@ SAMPLER+=" --color false"
SAMPLER+=" --control static" SAMPLER+=" --control static"
SAMPLER+=" --verbosity veryhigh" SAMPLER+=" --verbosity veryhigh"
SAMPLER+=" --shm-segment-size 100000000" SAMPLER+=" --shm-segment-size 100000000"
SAMPLER+=" --shm-monitor true"
SAMPLER+=" --severity debug" SAMPLER+=" --severity debug"
SAMPLER+=" --msg-size 100000" SAMPLER+=" --msg-size 100000"
SAMPLER+=" --multipart $multipart" SAMPLER+=" --multipart $multipart"
@@ -62,7 +62,6 @@ SPLITTER+=" --color false"
SPLITTER+=" --control static" SPLITTER+=" --control static"
SPLITTER+=" --verbosity veryhigh" SPLITTER+=" --verbosity veryhigh"
SPLITTER+=" --shm-segment-size 100000000" SPLITTER+=" --shm-segment-size 100000000"
SPLITTER+=" --shm-monitor true"
SPLITTER+=" --multipart $multipart" SPLITTER+=" --multipart $multipart"
SPLITTER+=" --in-channel $chan1" SPLITTER+=" --in-channel $chan1"
SPLITTER+=" --out-channel $chan2" SPLITTER+=" --out-channel $chan2"
@@ -79,7 +78,6 @@ PROXY1+=" --color false"
PROXY1+=" --control static" PROXY1+=" --control static"
PROXY1+=" --verbosity veryhigh" PROXY1+=" --verbosity veryhigh"
PROXY1+=" --shm-segment-size 100000000" PROXY1+=" --shm-segment-size 100000000"
PROXY1+=" --shm-monitor true"
PROXY1+=" --multipart $multipart" PROXY1+=" --multipart $multipart"
PROXY1+=" --in-channel $chan2" PROXY1+=" --in-channel $chan2"
PROXY1+=" --out-channel $chan3" PROXY1+=" --out-channel $chan3"
@@ -96,7 +94,6 @@ PROXY2+=" --color false"
PROXY2+=" --control static" PROXY2+=" --control static"
PROXY2+=" --verbosity veryhigh" PROXY2+=" --verbosity veryhigh"
PROXY2+=" --shm-segment-size 100000000" PROXY2+=" --shm-segment-size 100000000"
PROXY2+=" --shm-monitor true"
PROXY2+=" --multipart $multipart" PROXY2+=" --multipart $multipart"
PROXY2+=" --in-channel $chan2" PROXY2+=" --in-channel $chan2"
PROXY2+=" --out-channel $chan3" PROXY2+=" --out-channel $chan3"
@@ -113,7 +110,6 @@ MERGER+=" --color false"
MERGER+=" --control static" MERGER+=" --control static"
MERGER+=" --verbosity veryhigh" MERGER+=" --verbosity veryhigh"
MERGER+=" --shm-segment-size 100000000" MERGER+=" --shm-segment-size 100000000"
MERGER+=" --shm-monitor true"
MERGER+=" --multipart $multipart" MERGER+=" --multipart $multipart"
MERGER+=" --in-channel $chan3" MERGER+=" --in-channel $chan3"
MERGER+=" --out-channel $chan4" MERGER+=" --out-channel $chan4"
@@ -130,7 +126,6 @@ MULTIPLIER+=" --color false"
MULTIPLIER+=" --control static" MULTIPLIER+=" --control static"
MULTIPLIER+=" --verbosity veryhigh" MULTIPLIER+=" --verbosity veryhigh"
MULTIPLIER+=" --shm-segment-size 100000000" MULTIPLIER+=" --shm-segment-size 100000000"
MULTIPLIER+=" --shm-monitor true"
MULTIPLIER+=" --multipart $multipart" MULTIPLIER+=" --multipart $multipart"
MULTIPLIER+=" --in-channel $chan4" MULTIPLIER+=" --in-channel $chan4"
MULTIPLIER+=" --out-channel $chan5" MULTIPLIER+=" --out-channel $chan5"
@@ -148,7 +143,6 @@ SINK+=" --control static"
SINK+=" --verbosity veryhigh" SINK+=" --verbosity veryhigh"
SINK+=" --severity debug" SINK+=" --severity debug"
SINK+=" --multipart $multipart" SINK+=" --multipart $multipart"
SINK+=" --shm-monitor true"
SINK+=" --max-iterations 2" SINK+=" --max-iterations 2"
SINK+=" --in-channel $chan5" SINK+=" --in-channel $chan5"
SINK+=" --channel-config name=$chan5,type=pull,method=bind,sndBufSize=50,rcvBufSize=50,address=ipc://$chan5Addr" SINK+=" --channel-config name=$chan5,type=pull,method=bind,sndBufSize=50,rcvBufSize=50,address=ipc://$chan5Addr"
@@ -171,6 +165,4 @@ wait $PROXY2_PID
wait $MERGER_PID wait $MERGER_PID
wait $MULTIPLIER_PID wait $MULTIPLIER_PID
set +e
rm $chan1Addr; rm $chan2Addr1; rm $chan2Addr2; rm $chan3Addr1; rm $chan3Addr2; rm $chan4Addr; rm $chan5Addr rm $chan1Addr; rm $chan2Addr1; rm $chan2Addr2; rm $chan3Addr1; rm $chan3Addr2; rm $chan4Addr; rm $chan5Addr
exit 0

View File

@@ -1,11 +1,52 @@
################################################################################ ################################################################################
# Copyright (C) 2014-2023 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH # # Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
# # # #
# This software is distributed under the terms of the # # This software is distributed under the terms of the #
# GNU Lesser General Public Licence (LGPL) version 3, # # GNU Lesser General Public Licence (LGPL) version 3, #
# copied verbatim in the file "LICENSE" # # copied verbatim in the file "LICENSE" #
################################################################################ ################################################################################
add_example(NAME copypush
DEVICE sampler sink add_executable(fairmq-ex-copypush-sampler sampler.cxx)
target_link_libraries(fairmq-ex-copypush-sampler PRIVATE FairMQ)
add_executable(fairmq-ex-copypush-sink sink.cxx)
target_link_libraries(fairmq-ex-copypush-sink PRIVATE FairMQ)
add_custom_target(ExampleCopyPush DEPENDS fairmq-ex-copypush-sampler fairmq-ex-copypush-sink)
set(EX_BIN_DIR ${CMAKE_CURRENT_BINARY_DIR})
set(FAIRMQ_BIN_DIR ${CMAKE_BINARY_DIR}/fairmq)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-copypush.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-copypush.sh)
# test
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/test-ex-copypush.sh.in ${CMAKE_CURRENT_BINARY_DIR}/test-ex-copypush.sh)
add_test(NAME Example.CopyPush.zeromq COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-copypush.sh zeromq)
set_tests_properties(Example.CopyPush.zeromq PROPERTIES TIMEOUT "30" PASS_REGULAR_EXPRESSION "Received message: ")
add_test(NAME Example.CopyPush.shmem COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-copypush.sh shmem)
set_tests_properties(Example.CopyPush.shmem PROPERTIES TIMEOUT "30" PASS_REGULAR_EXPRESSION "Received message: ")
# install
install(
TARGETS
fairmq-ex-copypush-sampler
fairmq-ex-copypush-sink
LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR}
RUNTIME DESTINATION ${PROJECT_INSTALL_BINDIR}
)
# configure run script with different executable paths for build and for install directories
set(EX_BIN_DIR ${CMAKE_INSTALL_PREFIX}/${PROJECT_INSTALL_BINDIR})
set(FAIRMQ_BIN_DIR ${CMAKE_INSTALL_PREFIX}/${PROJECT_INSTALL_BINDIR}/fairmq)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-copypush.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-copypush.sh_install)
install(
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-copypush.sh_install
DESTINATION ${PROJECT_INSTALL_BINDIR}
RENAME fairmq-start-ex-copypush.sh
) )

View File

@@ -1,7 +1,5 @@
#!/bin/bash #!/bin/bash
set -e
export FAIRMQ_PATH=@FAIRMQ_BIN_DIR@ export FAIRMQ_PATH=@FAIRMQ_BIN_DIR@
transport="zeromq" transport="zeromq"
@@ -16,7 +14,7 @@ chanAddr1="/tmp/fmq_$session""_""$chan""_1""_""$transport"
chanAddr2="/tmp/fmq_$session""_""$chan""_2""_""$transport" chanAddr2="/tmp/fmq_$session""_""$chan""_2""_""$transport"
# setup a trap to kill everything if the test fails/timeouts # setup a trap to kill everything if the test fails/timeouts
trap 'set +e; kill -TERM $SAMPLER_PID; kill -TERM $SINK1_PID; kill -TERM $SINK2_PID; wait $SAMPLER_PID; wait $SINK1_PID; wait $SINK2_PID; rm $chanAddr1; rm $chanAddr2; exit 0' TERM trap 'kill -TERM $SAMPLER_PID; kill -TERM $SINK1_PID; kill -TERM $SINK2_PID; wait $SAMPLER_PID; wait $SINK1_PID; wait $SINK2_PID; rm $chanAddr1; rm $chanAddr2' TERM
SAMPLER="fairmq-ex-copypush-sampler" SAMPLER="fairmq-ex-copypush-sampler"
SAMPLER+=" --id sampler1" SAMPLER+=" --id sampler1"
@@ -24,7 +22,6 @@ SAMPLER+=" --transport $transport"
SAMPLER+=" --verbosity veryhigh" SAMPLER+=" --verbosity veryhigh"
SAMPLER+=" --severity debug" SAMPLER+=" --severity debug"
SAMPLER+=" --shm-segment-size 100000000" SAMPLER+=" --shm-segment-size 100000000"
SAMPLER+=" --shm-monitor true"
SAMPLER+=" --session $session" SAMPLER+=" --session $session"
SAMPLER+=" --control static --color false" SAMPLER+=" --control static --color false"
SAMPLER+=" --max-iterations 1" SAMPLER+=" --max-iterations 1"
@@ -38,7 +35,6 @@ SINK1+=" --transport $transport"
SINK1+=" --verbosity veryhigh" SINK1+=" --verbosity veryhigh"
SINK1+=" --severity debug" SINK1+=" --severity debug"
SINK1+=" --shm-segment-size 100000000" SINK1+=" --shm-segment-size 100000000"
SINK1+=" --shm-monitor true"
SINK1+=" --session $session" SINK1+=" --session $session"
SINK1+=" --control static --color false" SINK1+=" --control static --color false"
SINK1+=" --max-iterations 1" SINK1+=" --max-iterations 1"
@@ -52,7 +48,6 @@ SINK2+=" --transport $transport"
SINK2+=" --verbosity veryhigh" SINK2+=" --verbosity veryhigh"
SINK2+=" --severity debug" SINK2+=" --severity debug"
SINK2+=" --shm-segment-size 100000000" SINK2+=" --shm-segment-size 100000000"
SINK2+=" --shm-monitor true"
SINK2+=" --session $session" SINK2+=" --session $session"
SINK2+=" --control static --color false" SINK2+=" --control static --color false"
SINK2+=" --max-iterations 1" SINK2+=" --max-iterations 1"
@@ -65,6 +60,4 @@ wait $SAMPLER_PID
wait $SINK1_PID wait $SINK1_PID
wait $SINK2_PID wait $SINK2_PID
set +e
rm $chanAddr1; rm $chanAddr2 rm $chanAddr1; rm $chanAddr2
exit 0

View File

@@ -1,20 +0,0 @@
################################################################################
# Copyright (C) 2022-2023 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
# #
# This software is distributed under the terms of the #
# GNU Lesser General Public Licence (LGPL) version 3, #
# copied verbatim in the file "LICENSE" #
################################################################################
set(name "custom-controller")
set(exe "${exe_prefix}-${name}")
add_executable(${exe} main.cxx)
target_link_libraries(${exe} PRIVATE FairMQ)
set_target_properties(${exe} PROPERTIES ENABLE_EXPORTS ON)
set(test "${testsuite}.${name}")
add_test(NAME ${test} COMMAND ${CMAKE_CURRENT_BINARY_DIR}/${exe})
set_tests_properties(${test} PROPERTIES TIMEOUT 30)
if(lsan_options)
set_tests_properties(${test} PROPERTIES ENVIRONMENT_MODIFICATION ${lsan_options})
endif()

View File

@@ -1,97 +0,0 @@
/********************************************************************************
* Copyright (C) 2022 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#ifndef FAIR_MQ_EXAMPLE_CUSTOM_CONTROLLER_PLUGIN
#define FAIR_MQ_EXAMPLE_CUSTOM_CONTROLLER_PLUGIN
#include <fairmq/Plugin.h>
#include <utility> // for std::forward
namespace example {
struct MyController : fair::mq::Plugin // NOLINT
{
template<typename... Args>
MyController(Args&&... args)
: Plugin(std::forward<Args>(args)...)
{
TakeDeviceControl();
SubscribeToDeviceStateChange([this](auto state) {
auto shutdown = GetProperty<bool>("please-shut-me-down", false);
try {
switch (state) {
case DeviceState::Idle: {
ChangeDeviceState(shutdown ? DeviceStateTransition::End
: DeviceStateTransition::InitDevice);
break;
}
case DeviceState::InitializingDevice: {
ChangeDeviceState(DeviceStateTransition::CompleteInit);
break;
}
case DeviceState::Initialized: {
ChangeDeviceState(DeviceStateTransition::Bind);
break;
}
case DeviceState::Bound: {
ChangeDeviceState(DeviceStateTransition::Connect);
break;
}
case DeviceState::DeviceReady: {
ChangeDeviceState(shutdown ? DeviceStateTransition::ResetDevice
: DeviceStateTransition::InitTask);
break;
}
case DeviceState::Ready: {
ChangeDeviceState(shutdown ? DeviceStateTransition::ResetTask
: DeviceStateTransition::Run);
break;
}
case DeviceState::Running: {
ChangeDeviceState(DeviceStateTransition::Stop);
break;
}
case DeviceState::Exiting: {
ReleaseDeviceControl();
break;
}
default:
break;
}
} catch (fair::mq::PluginServices::DeviceControlError const&) {
// this means we do not have device control
}
});
}
~MyController() override { ReleaseDeviceControl(); }
};
// auto MyControllerProgramOptions() -> fair::mq::Plugin::ProgOptions
// {
// auto plugin_options = boost::program_options::options_description{"MyController Plugin"};
// plugin_options.add_options()
// ("custom-dummy-option", boost::program_options::value<std::string>(), "Cool custom option.")
// ("custom-dummy-option2", boost::program_options::value<std::string>(), "Another one.");
// return plugin_options;
// }
} // namespace example
REGISTER_FAIRMQ_PLUGIN(example::MyController, // Class name
mycontroller, // Plugin name (string, lower case chars only)
(fair::mq::Plugin::Version{0, 42, 0}), // Version
"Mr. Dummy <dummy@test.net>", // Maintainer
"https://git.test.net/mycontroller.git", // Homepage
// example::MyControllerProgramOptions // Free
// function which declares custom
// program options for the plugin
fair::mq::Plugin::NoProgramOptions)
#endif /* FAIR_MQ_EXAMPLE_CUSTOM_CONTROLLER_PLUGIN */

View File

@@ -1,21 +0,0 @@
/********************************************************************************
* Copyright (C) 2022 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#ifndef FAIR_MQ_EXAMPLE_CUSTOM_CONTROLLER_DEVICE
#define FAIR_MQ_EXAMPLE_CUSTOM_CONTROLLER_DEVICE
#include <fairmq/Device.h>
namespace example {
using MyDevice = fair::mq::Device;
} // namespace example
#endif /* FAIR_MQ_EXAMPLE_CUSTOM_CONTROLLER_DEVICE */

View File

@@ -1,46 +0,0 @@
/********************************************************************************
* Copyright (C) 2022 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "MyController.h"
#include "MyDevice.h"
#include <chrono> // for std::chrono_literals
#include <fairlogger/Logger.h>
#include <fairmq/DeviceRunner.h>
#include <memory> // for std::make_unique
int main(int argc, char* argv[])
{
using namespace fair::mq;
using namespace std::chrono_literals;
DeviceRunner runner(argc, argv);
runner.AddHook<hooks::LoadPlugins>([](DeviceRunner& r) {
r.fPluginManager.LoadPlugin("s:mycontroller");
// 's:' stands for static because the plugin is compiled into the executable
// 'mycontroller' is the plugin name passed as second arg to REGISTER_FAIRMQ_PLUGIN
});
fair::Logger::SetConsoleSeverity(fair::Severity::debug);
runner.AddHook<hooks::InstantiateDevice>([](DeviceRunner& r) {
r.fConfig.SetProperty<int>("catch-signals", 0);
r.fConfig.SetProperty<bool>("please-shut-me-down", false);
r.fDevice = std::make_unique<example::MyDevice>(r.fConfig);
r.fDevice->SubscribeToStateChange("example", [&r](auto state) {
if (state == State::Running) {
r.fDevice->WaitFor(3s);
r.fConfig.SetProperty<bool>("please-shut-me-down", true);
}
});
});
return runner.RunWithExceptionHandlers();
}

View File

@@ -1,11 +1,56 @@
################################################################################ ################################################################################
# Copyright (C) 2014-2023 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH # # Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
# # # #
# This software is distributed under the terms of the # # This software is distributed under the terms of the #
# GNU Lesser General Public Licence (LGPL) version 3, # # GNU Lesser General Public Licence (LGPL) version 3, #
# copied verbatim in the file "LICENSE" # # copied verbatim in the file "LICENSE" #
################################################################################ ################################################################################
add_example(NAME multipart add_executable(fairmq-ex-multipart-sampler sampler.cxx)
DEVICE sampler sink target_link_libraries(fairmq-ex-multipart-sampler PRIVATE FairMQ)
add_executable(fairmq-ex-multipart-sink sink.cxx)
target_link_libraries(fairmq-ex-multipart-sink PRIVATE FairMQ)
add_custom_target(ExampleMultipart DEPENDS fairmq-ex-multipart-sampler fairmq-ex-multipart-sink)
set(EX_BIN_DIR ${CMAKE_CURRENT_BINARY_DIR})
set(FAIRMQ_BIN_DIR ${CMAKE_BINARY_DIR}/fairmq)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-multipart.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-multipart.sh)
# test
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/test-ex-multipart.sh.in ${CMAKE_CURRENT_BINARY_DIR}/test-ex-multipart.sh)
add_test(NAME Example.Multipart.zeromq COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-multipart.sh zeromq)
set_tests_properties(Example.Multipart.zeromq PROPERTIES TIMEOUT "30" PASS_REGULAR_EXPRESSION "Received message with 7 parts")
add_test(NAME Example.Multipart.shmem COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-multipart.sh shmem)
set_tests_properties(Example.Multipart.shmem PROPERTIES TIMEOUT "30" PASS_REGULAR_EXPRESSION "Received message with 7 parts")
if(BUILD_OFI_TRANSPORT)
add_test(NAME Example.Multipart.ofi COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-multipart.sh ofi)
set_tests_properties(Example.Multipart.ofi PROPERTIES TIMEOUT "30" PASS_REGULAR_EXPRESSION "Received message with 7 parts")
endif()
# install
install(
TARGETS
fairmq-ex-multipart-sampler
fairmq-ex-multipart-sink
LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR}
RUNTIME DESTINATION ${PROJECT_INSTALL_BINDIR}
)
# configure run script with different executable paths for build and for install directories
set(EX_BIN_DIR ${CMAKE_INSTALL_PREFIX}/${PROJECT_INSTALL_BINDIR})
set(FAIRMQ_BIN_DIR ${CMAKE_INSTALL_PREFIX}/${PROJECT_INSTALL_BINDIR}/fairmq)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-multipart.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-multipart.sh_install)
install(
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-multipart.sh_install
DESTINATION ${PROJECT_INSTALL_BINDIR}
RENAME fairmq-start-ex-multipart.sh
) )

View File

@@ -24,10 +24,6 @@ struct Sink : fair::mq::Device
{ {
LOG(info) << "Received message with " << parts.Size() << " parts"; LOG(info) << "Received message with " << parts.Size() << " parts";
if (parts.Size() != 7) {
throw std::runtime_error("Number of received parts != 7");
}
example_multipart::Header header; example_multipart::Header header;
header.stopFlag = (static_cast<example_multipart::Header*>(parts.At(0)->GetData()))->stopFlag; header.stopFlag = (static_cast<example_multipart::Header*>(parts.At(0)->GetData()))->stopFlag;

View File

@@ -1,7 +1,5 @@
#!/bin/bash #!/bin/bash
set -e
export FAIRMQ_PATH=@FAIRMQ_BIN_DIR@ export FAIRMQ_PATH=@FAIRMQ_BIN_DIR@
transport="zeromq" transport="zeromq"
@@ -14,10 +12,14 @@ session="$(@CMAKE_BINARY_DIR@/fairmq/fairmq-uuid-gen -h)"
chan="data" chan="data"
chanAddr="" chanAddr=""
chanIpcFile="/tmp/fmq_$session""_""$chan""_""$transport" chanIpcFile="/tmp/fmq_$session""_""$chan""_""$transport"
chanAddr="ipc://""$chanIpcFile" if [ $transport = "ofi" ]; then
chanAddr="tcp://127.0.0.1:5656"
else
chanAddr="ipc://""$chanIpcFile"
fi
# setup a trap to kill everything if the test fails/timeouts # setup a trap to kill everything if the test fails/timeouts
trap 'set +e; kill -TERM $SAMPLER_PID; kill -TERM $SINK_PID; wait $SAMPLER_PID; wait $SINK_PID; rm $chanIpcFile; exit 0' TERM trap 'kill -TERM $SAMPLER_PID; kill -TERM $SINK_PID; wait $SAMPLER_PID; wait $SINK_PID; rm $chanIpcFile' TERM
SAMPLER="fairmq-ex-multipart-sampler" SAMPLER="fairmq-ex-multipart-sampler"
SAMPLER+=" --id sampler1" SAMPLER+=" --id sampler1"
@@ -25,7 +27,6 @@ SAMPLER+=" --transport $transport"
SAMPLER+=" --verbosity veryhigh" SAMPLER+=" --verbosity veryhigh"
SAMPLER+=" --session $session" SAMPLER+=" --session $session"
SAMPLER+=" --shm-segment-size 100000000" SAMPLER+=" --shm-segment-size 100000000"
SAMPLER+=" --shm-monitor true"
SAMPLER+=" --max-iterations 1" SAMPLER+=" --max-iterations 1"
SAMPLER+=" --control static --color false" SAMPLER+=" --control static --color false"
SAMPLER+=" --channel-config name=$chan,type=pair,method=connect,rateLogging=0,address=$chanAddr,linger=1000" SAMPLER+=" --channel-config name=$chan,type=pair,method=connect,rateLogging=0,address=$chanAddr,linger=1000"
@@ -38,7 +39,6 @@ SINK+=" --transport $transport"
SINK+=" --verbosity veryhigh" SINK+=" --verbosity veryhigh"
SINK+=" --session $session" SINK+=" --session $session"
SINK+=" --shm-segment-size 100000000" SINK+=" --shm-segment-size 100000000"
SINK+=" --shm-monitor true"
SINK+=" --control static --color false" SINK+=" --control static --color false"
SINK+=" --channel-config name=$chan,type=pair,method=bind,rateLogging=0,address=$chanAddr" SINK+=" --channel-config name=$chan,type=pair,method=bind,rateLogging=0,address=$chanAddr"
@CMAKE_CURRENT_BINARY_DIR@/$SINK & @CMAKE_CURRENT_BINARY_DIR@/$SINK &
@@ -47,6 +47,4 @@ SINK_PID=$!
wait $SAMPLER_PID wait $SAMPLER_PID
wait $SINK_PID wait $SINK_PID
set +e
rm $chanIpcFile rm $chanIpcFile
exit 0

View File

@@ -1,12 +1,52 @@
################################################################################ ################################################################################
# Copyright (C) 2014-2023 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH # # Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
# # # #
# This software is distributed under the terms of the # # This software is distributed under the terms of the #
# GNU Lesser General Public Licence (LGPL) version 3, # # GNU Lesser General Public Licence (LGPL) version 3, #
# copied verbatim in the file "LICENSE" # # copied verbatim in the file "LICENSE" #
################################################################################ ################################################################################
add_example(NAME multiple-channels add_executable(fairmq-ex-multiple-channels-sampler sampler.cxx)
DEVICE sampler broadcaster sink target_link_libraries(fairmq-ex-multiple-channels-sampler PRIVATE FairMQ)
TRANSPORT zeromq
add_executable(fairmq-ex-multiple-channels-broadcaster broadcaster.cxx)
target_link_libraries(fairmq-ex-multiple-channels-broadcaster PRIVATE FairMQ)
add_executable(fairmq-ex-multiple-channels-sink sink.cxx)
target_link_libraries(fairmq-ex-multiple-channels-sink PRIVATE FairMQ)
add_custom_target(ExampleMultipleChannels DEPENDS fairmq-ex-multiple-channels-sampler fairmq-ex-multiple-channels-broadcaster fairmq-ex-multiple-channels-sink)
set(EX_BIN_DIR ${CMAKE_CURRENT_BINARY_DIR})
set(FAIRMQ_BIN_DIR ${CMAKE_BINARY_DIR}/fairmq)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-multiple-channels.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-multiple-channels.sh)
# test
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/test-ex-multiple-channels.sh.in ${CMAKE_CURRENT_BINARY_DIR}/test-ex-multiple-channels.sh)
add_test(NAME Example.MultipleChannels.zeromq COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-multiple-channels.sh zeromq)
set_tests_properties(Example.MultipleChannels.zeromq PROPERTIES TIMEOUT "30" PASS_REGULAR_EXPRESSION "Received messages from both sources.")
# install
install(
TARGETS
fairmq-ex-multiple-channels-sampler
fairmq-ex-multiple-channels-broadcaster
fairmq-ex-multiple-channels-sink
LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR}
RUNTIME DESTINATION ${PROJECT_INSTALL_BINDIR}
)
# configure run script with different executable paths for build and for install directories
set(EX_BIN_DIR ${CMAKE_INSTALL_PREFIX}/${PROJECT_INSTALL_BINDIR})
set(FAIRMQ_BIN_DIR ${CMAKE_INSTALL_PREFIX}/${PROJECT_INSTALL_BINDIR}/fairmq)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-multiple-channels.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-multiple-channels.sh_install)
install(
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-multiple-channels.sh_install
DESTINATION ${PROJECT_INSTALL_BINDIR}
RENAME fairmq-start-ex-multiple-channels.sh
) )

View File

@@ -1,7 +1,5 @@
#!/bin/bash #!/bin/bash
set -e
export FAIRMQ_PATH=@FAIRMQ_BIN_DIR@ export FAIRMQ_PATH=@FAIRMQ_BIN_DIR@
transport="zeromq" transport="zeromq"
@@ -17,7 +15,7 @@ dataChanAddr="/tmp/fmq_$session""_""$dataChan""_""$transport"
broadcastChanAddr="/tmp/fmq_$session""_""$broadcastChan""_""$transport" broadcastChanAddr="/tmp/fmq_$session""_""$broadcastChan""_""$transport"
# setup a trap to kill everything if the test fails/timeouts # setup a trap to kill everything if the test fails/timeouts
trap 'set +e; kill -TERM $SAMPLER_PID; kill -TERM $SINK_PID; kill -TERM $BROADCASTER_PID; wait $SAMPLER_PID; wait $SINK_PID; wait $BROADCASTER_PID; rm $dataChanAddr; rm $broadcastChanAddr; exit 0' TERM trap 'kill -TERM $SAMPLER_PID; kill -TERM $SINK_PID; kill -TERM $BROADCASTER_PID; wait $SAMPLER_PID; wait $SINK_PID; wait $BROADCASTER_PID; rm $dataChanAddr; rm $broadcastChanAddr' TERM
SINK="fairmq-ex-multiple-channels-sink" SINK="fairmq-ex-multiple-channels-sink"
SINK+=" --id sink1" SINK+=" --id sink1"
@@ -25,7 +23,6 @@ SINK+=" --session $session"
SINK+=" --transport $transport" SINK+=" --transport $transport"
SINK+=" --verbosity veryhigh --severity debug" SINK+=" --verbosity veryhigh --severity debug"
SINK+=" --shm-segment-size 100000000" SINK+=" --shm-segment-size 100000000"
SINK+=" --shm-monitor true"
SINK+=" --max-iterations 1" SINK+=" --max-iterations 1"
SINK+=" --control static --color false" SINK+=" --control static --color false"
SINK+=" --channel-config name=$dataChan,type=pull,method=connect,rateLogging=0,address=ipc://$dataChanAddr" SINK+=" --channel-config name=$dataChan,type=pull,method=connect,rateLogging=0,address=ipc://$dataChanAddr"
@@ -41,7 +38,6 @@ SAMPLER+=" --session $session"
SAMPLER+=" --transport $transport" SAMPLER+=" --transport $transport"
SAMPLER+=" --verbosity veryhigh --severity debug" SAMPLER+=" --verbosity veryhigh --severity debug"
SAMPLER+=" --shm-segment-size 100000000" SAMPLER+=" --shm-segment-size 100000000"
SAMPLER+=" --shm-monitor true"
SAMPLER+=" --max-iterations 1" SAMPLER+=" --max-iterations 1"
SAMPLER+=" --control static --color false" SAMPLER+=" --control static --color false"
SAMPLER+=" --channel-config name=$dataChan,type=push,method=bind,rateLogging=0,address=ipc://$dataChanAddr" SAMPLER+=" --channel-config name=$dataChan,type=push,method=bind,rateLogging=0,address=ipc://$dataChanAddr"
@@ -55,7 +51,6 @@ BROADCASTER+=" --session $session"
BROADCASTER+=" --transport $transport" BROADCASTER+=" --transport $transport"
BROADCASTER+=" --verbosity veryhigh --severity debug" BROADCASTER+=" --verbosity veryhigh --severity debug"
BROADCASTER+=" --shm-segment-size 100000000" BROADCASTER+=" --shm-segment-size 100000000"
BROADCASTER+=" --shm-monitor true"
BROADCASTER+=" --control static --color false" BROADCASTER+=" --control static --color false"
BROADCASTER+=" --channel-config name=$broadcastChan,type=pub,method=bind,rateLogging=0,address=ipc://$broadcastChanAddr" BROADCASTER+=" --channel-config name=$broadcastChan,type=pub,method=bind,rateLogging=0,address=ipc://$broadcastChanAddr"
@CMAKE_CURRENT_BINARY_DIR@/$BROADCASTER & @CMAKE_CURRENT_BINARY_DIR@/$BROADCASTER &
@@ -70,6 +65,4 @@ kill -SIGINT $BROADCASTER_PID
# wait for broadcaster to finish # wait for broadcaster to finish
wait $BROADCASTER_PID wait $BROADCASTER_PID
set +e
rm $dataChanAddr; rm $broadcastChanAddr rm $dataChanAddr; rm $broadcastChanAddr
exit 0

View File

@@ -1,12 +1,51 @@
################################################################################ ################################################################################
# Copyright (C) 2014-2023 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH # # Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
# # # #
# This software is distributed under the terms of the # # This software is distributed under the terms of the #
# GNU Lesser General Public Licence (LGPL) version 3, # # GNU Lesser General Public Licence (LGPL) version 3, #
# copied verbatim in the file "LICENSE" # # copied verbatim in the file "LICENSE" #
################################################################################ ################################################################################
add_example(NAME multiple-transports add_executable(fairmq-ex-multiple-transports-sampler1 sampler1.cxx)
DEVICE sampler1 sampler2 sink target_link_libraries(fairmq-ex-multiple-transports-sampler1 PRIVATE FairMQ)
NO_TRANSPORT
add_executable(fairmq-ex-multiple-transports-sampler2 sampler2.cxx)
target_link_libraries(fairmq-ex-multiple-transports-sampler2 PRIVATE FairMQ)
add_executable(fairmq-ex-multiple-transports-sink sink.cxx)
target_link_libraries(fairmq-ex-multiple-transports-sink PRIVATE FairMQ)
add_custom_target(ExampleMultipleTransports DEPENDS fairmq-ex-multiple-transports-sampler1 fairmq-ex-multiple-transports-sampler2 fairmq-ex-multiple-transports-sink)
set(EX_BIN_DIR ${CMAKE_CURRENT_BINARY_DIR})
set(FAIRMQ_BIN_DIR ${CMAKE_BINARY_DIR}/fairmq)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-multiple-transports.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-multiple-transports.sh)
# test
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/test-ex-multiple-transports.sh.in ${CMAKE_CURRENT_BINARY_DIR}/test-ex-multiple-transports.sh)
add_test(NAME Example.MultipleTransports COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-multiple-transports.sh)
set_tests_properties(Example.MultipleTransports PROPERTIES TIMEOUT "30" PASS_REGULAR_EXPRESSION "Received messages from both sources.")
# install
install(
TARGETS
fairmq-ex-multiple-transports-sampler1
fairmq-ex-multiple-transports-sampler2
fairmq-ex-multiple-transports-sink
LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR}
RUNTIME DESTINATION ${PROJECT_INSTALL_BINDIR}
)
# configure run script with different executable paths for install directories
set(EX_BIN_DIR ${CMAKE_INSTALL_PREFIX}/${PROJECT_INSTALL_BINDIR})
set(FAIRMQ_BIN_DIR ${CMAKE_INSTALL_PREFIX}/${PROJECT_INSTALL_BINDIR}/fairmq)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-multiple-transports.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-multiple-transports.sh_install)
install(
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-multiple-transports.sh_install
DESTINATION ${PROJECT_INSTALL_BINDIR}
RENAME fairmq-start-ex-multiple-transports.sh
) )

View File

@@ -9,8 +9,6 @@
#include <fairmq/Device.h> #include <fairmq/Device.h>
#include <fairmq/runDevice.h> #include <fairmq/runDevice.h>
#include <stdexcept>
namespace bpo = boost::program_options; namespace bpo = boost::program_options;
struct Sink : fair::mq::Device struct Sink : fair::mq::Device
@@ -34,7 +32,7 @@ struct Sink : fair::mq::Device
// Creates a message using the transport of channel ack // Creates a message using the transport of channel ack
fair::mq::MessagePtr ack(NewMessageFor("ack", 0)); fair::mq::MessagePtr ack(NewMessageFor("ack", 0));
if (Send(ack, "ack") < 0) { if (Send(ack, "ack") < 0) {
throw std::runtime_error("could not send an ack"); return false;
} }
// return true if want to be called again (otherwise go to IDLE state) // return true if want to be called again (otherwise go to IDLE state)

View File

@@ -1,7 +1,5 @@
#!/bin/bash #!/bin/bash
set -e
export FAIRMQ_PATH=@FAIRMQ_BIN_DIR@ export FAIRMQ_PATH=@FAIRMQ_BIN_DIR@
session="$(@CMAKE_BINARY_DIR@/fairmq/fairmq-uuid-gen -h)" session="$(@CMAKE_BINARY_DIR@/fairmq/fairmq-uuid-gen -h)"
@@ -12,13 +10,12 @@ chan1Addr="/tmp/fmq_$session""_""$chan1"
chan2Addr="/tmp/fmq_$session""_""$chan2" chan2Addr="/tmp/fmq_$session""_""$chan2"
ackChanAddr="/tmp/fmq_$session""_""$ackChan" ackChanAddr="/tmp/fmq_$session""_""$ackChan"
trap 'set +e; kill -TERM $SAMPLER1_PID; kill -TERM $SAMPLER2_PID; kill -TERM $SINK_PID; wait $SAMPLER1_PID; wait $SAMPLER2_PID; wait $SINK_PID; @CMAKE_BINARY_DIR@/fairmq/fairmq-shmmonitor --cleanup --session $SESSION; rm $chan1Addr; rm $chan2Addr; rm $ackChanAddr; exit 0' TERM trap 'kill -TERM $SAMPLER1_PID; kill -TERM $SAMPLER2_PID; kill -TERM $SINK_PID; wait $SAMPLER1_PID; wait $SAMPLER2_PID; wait $SINK_PID; @CMAKE_BINARY_DIR@/fairmq/fairmq-shmmonitor --cleanup --session $SESSION; rm $chan1Addr; rm $chan2Addr; rm $ackChanAddr' TERM
SINK="fairmq-ex-multiple-transports-sink" SINK="fairmq-ex-multiple-transports-sink"
SINK+=" --id sink1" SINK+=" --id sink1"
SINK+=" --verbosity veryhigh --severity debug" SINK+=" --verbosity veryhigh --severity debug"
SINK+=" --shm-segment-size 100000000" SINK+=" --shm-segment-size 100000000"
SINK+=" --shm-monitor true"
SINK+=" --session $session" SINK+=" --session $session"
SINK+=" --max-iterations 1" SINK+=" --max-iterations 1"
SINK+=" --control static --color false" SINK+=" --control static --color false"
@@ -34,7 +31,6 @@ SAMPLER1+=" --id sampler1"
SAMPLER1+=" --session $session" SAMPLER1+=" --session $session"
SAMPLER1+=" --verbosity veryhigh --severity debug" SAMPLER1+=" --verbosity veryhigh --severity debug"
SAMPLER1+=" --shm-segment-size 100000000" SAMPLER1+=" --shm-segment-size 100000000"
SAMPLER1+=" --shm-monitor true"
SAMPLER1+=" --max-iterations 1" SAMPLER1+=" --max-iterations 1"
SAMPLER1+=" --control static --color false" SAMPLER1+=" --control static --color false"
SAMPLER1+=" --transport shmem" SAMPLER1+=" --transport shmem"
@@ -48,7 +44,6 @@ SAMPLER2+=" --id sampler2"
SAMPLER2+=" --session $session" SAMPLER2+=" --session $session"
SAMPLER2+=" --verbosity veryhigh --severity debug" SAMPLER2+=" --verbosity veryhigh --severity debug"
SAMPLER2+=" --shm-segment-size 100000000" SAMPLER2+=" --shm-segment-size 100000000"
SAMPLER2+=" --shm-monitor true"
SAMPLER2+=" --max-iterations 1" SAMPLER2+=" --max-iterations 1"
SAMPLER2+=" --control static --color false" SAMPLER2+=" --control static --color false"
SAMPLER2+=" --transport zeromq" SAMPLER2+=" --transport zeromq"
@@ -60,6 +55,4 @@ wait $SAMPLER1_PID
wait $SAMPLER2_PID wait $SAMPLER2_PID
wait $SINK_PID wait $SINK_PID
set +e
rm $chan1Addr; rm $chan2Addr; rm $ackChanAddr rm $chan1Addr; rm $chan2Addr; rm $ackChanAddr
exit 0

View File

@@ -1,5 +1,5 @@
/******************************************************************************** /********************************************************************************
* Copyright (C) 2020-2023 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * Copyright (C) 2020-2022 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* * * *
* This software is distributed under the terms of the * * This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, * * GNU Lesser General Public Licence (LGPL) version 3, *
@@ -54,7 +54,7 @@ struct Receiver : fair::mq::Device
fBuffer[h.id].start = chrono::steady_clock::now(); fBuffer[h.id].start = chrono::steady_clock::now();
} }
// if the received ID has not previously been discarded, store the data part in the buffer // if the received ID has not previously been discarded, store the data part in the buffer
fBuffer[h.id].parts.AddPart(std::move(parts.At(1))); fBuffer[h.id].parts.AddPart(move(parts.At(1)));
} else { } else {
// if received ID has been previously discarded. // if received ID has been previously discarded.
LOG(debug) << "Received part from an already discarded timeframe with id " << h.id; LOG(debug) << "Received part from an already discarded timeframe with id " << h.id;

View File

@@ -1,13 +1,59 @@
################################################################################ ################################################################################
# Copyright (C) 2014-2023 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH # # Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
# # # #
# This software is distributed under the terms of the # # This software is distributed under the terms of the #
# GNU Lesser General Public Licence (LGPL) version 3, # # GNU Lesser General Public Licence (LGPL) version 3, #
# copied verbatim in the file "LICENSE" # # copied verbatim in the file "LICENSE" #
################################################################################ ################################################################################
add_example(NAME readout add_executable(fairmq-ex-readout-readout readout.cxx)
DEVICE readout builder processor sender receiver target_link_libraries(fairmq-ex-readout-readout PRIVATE FairMQ)
SCRIPT readout readout-processing
NO_TEST add_executable(fairmq-ex-readout-builder builder.cxx)
target_link_libraries(fairmq-ex-readout-builder PRIVATE FairMQ)
add_executable(fairmq-ex-readout-processor processor.cxx)
target_link_libraries(fairmq-ex-readout-processor PRIVATE FairMQ)
add_executable(fairmq-ex-readout-sender sender.cxx)
target_link_libraries(fairmq-ex-readout-sender PRIVATE FairMQ)
add_executable(fairmq-ex-readout-receiver receiver.cxx)
target_link_libraries(fairmq-ex-readout-receiver PRIVATE FairMQ)
set(EX_BIN_DIR ${CMAKE_CURRENT_BINARY_DIR})
set(FAIRMQ_BIN_DIR ${CMAKE_BINARY_DIR}/fairmq)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-readout.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-readout.sh)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-readout-processing.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-readout-processing.sh)
# install
install(
TARGETS
fairmq-ex-readout-readout
fairmq-ex-readout-builder
fairmq-ex-readout-processor
fairmq-ex-readout-sender
fairmq-ex-readout-receiver
LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR}
RUNTIME DESTINATION ${PROJECT_INSTALL_BINDIR}
)
# configure run script with different executable paths for build and for install directories
set(EX_BIN_DIR ${CMAKE_INSTALL_PREFIX}/${PROJECT_INSTALL_BINDIR})
set(FAIRMQ_BIN_DIR ${CMAKE_INSTALL_PREFIX}/${PROJECT_INSTALL_BINDIR}/fairmq)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-readout.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-readout.sh_install)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-readout-processing.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-readout-processing.sh_install)
install(
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-readout.sh_install
DESTINATION ${PROJECT_INSTALL_BINDIR}
RENAME fairmq-start-ex-readout.sh
)
install(
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-readout-processing.sh_install
DESTINATION ${PROJECT_INSTALL_BINDIR}
RENAME fairmq-start-ex-readout-processing.sh
) )

View File

@@ -7,19 +7,19 @@ This examples shows two possible topologies (out of many) for a node connected t
``` ```
|------------------------------- Readout Node ---------------------------| |- Processing Node -| |------------------------------- Readout Node ---------------------------| |- Processing Node -|
| Readout --> Builder --> Sender | --> | Receiver | | Readout --> Builder --> Sender | --> | Receiver |
| [# shared memory segment (unused in this topology) ##################] | zmq | | | [# shared memory segment (unused in this topology) ##################] | ofi | |
| [# shmem unmanaged region (readout writes here, others read) ########] | | | | [# shmem unmanaged region (readout writes here, others read) ########] | | |
|------------------------------------------------------------------------| |-------------------| |------------------------------------------------------------------------| |-------------------|
``` ```
The devices one the Readout Node communicate via shared memory transport. Readout device writes into shared memory unmanaged region. The data is then forwarded through Builder to Sender process, which sends it out via zeromq transport. The devices one the Readout Node communicate via shared memory transport. Readout device writes into shared memory unmanaged region. The data is then forwarded through Builder to Sender process, which sends it out via OFI transport.
## Setup with generating new data on the Readout node ## Setup with generating new data on the Readout node
``` ```
|------------------------------- Readout Node ---------------------------| |- Processing Node -| |------------------------------- Readout Node ---------------------------| |- Processing Node -|
| Readout --> Builder --> Processor --> Sender | --> | Receiver | | Readout --> Builder --> Processor --> Sender | --> | Receiver |
| [# shared memory segment (used between Proccessor and Sender) #######] | zmq | | | [# shared memory segment (used between Proccessor and Sender) #######] | ofi | |
| [# shmem unmanaged region (readout writes here, builder & proc read) ] | | | | [# shmem unmanaged region (readout writes here, builder & proc read) ] | | |
|------------------------------------------------------------------------| |-------------------| |------------------------------------------------------------------------| |-------------------|
``` ```

View File

@@ -31,10 +31,12 @@ SENDER="fairmq-ex-readout-sender"
SENDER+=" --id sender1" SENDER+=" --id sender1"
SENDER+=" --input-name ps" SENDER+=" --input-name ps"
SENDER+=" --channel-config name=ps,type=pair,method=bind,address=tcp://localhost:7779,transport=shmem" SENDER+=" --channel-config name=ps,type=pair,method=bind,address=tcp://localhost:7779,transport=shmem"
#SENDER+=" name=sr,type=pair,method=connect,address=tcp://localhost:7780,transport=ofi"
SENDER+=" name=sr,type=pair,method=connect,address=tcp://localhost:7780,transport=zeromq" SENDER+=" name=sr,type=pair,method=connect,address=tcp://localhost:7780,transport=zeromq"
xterm -geometry 80x23+1000+0 -hold -e @EX_BIN_DIR@/$SENDER & xterm -geometry 80x23+1000+0 -hold -e @EX_BIN_DIR@/$SENDER &
RECEIVER="fairmq-ex-readout-receiver" RECEIVER="fairmq-ex-readout-receiver"
RECEIVER+=" --id receiver1" RECEIVER+=" --id receiver1"
#RECEIVER+=" --channel-config name=sr,type=pair,method=bind,address=tcp://localhost:7780,transport=ofi"
RECEIVER+=" --channel-config name=sr,type=pair,method=bind,address=tcp://localhost:7780,transport=zeromq" RECEIVER+=" --channel-config name=sr,type=pair,method=bind,address=tcp://localhost:7780,transport=zeromq"
xterm -geometry 80x23+1500+0 -hold -e @EX_BIN_DIR@/$RECEIVER & xterm -geometry 80x23+1500+0 -hold -e @EX_BIN_DIR@/$RECEIVER &

View File

@@ -25,10 +25,12 @@ SENDER="fairmq-ex-readout-sender"
SENDER+=" --id sender1" SENDER+=" --id sender1"
SENDER+=" --input-name bs" SENDER+=" --input-name bs"
SENDER+=" --channel-config name=bs,type=pair,method=bind,address=tcp://localhost:7778,transport=shmem" SENDER+=" --channel-config name=bs,type=pair,method=bind,address=tcp://localhost:7778,transport=shmem"
# SENDER+=" name=sr,type=pair,method=connect,address=tcp://localhost:7779,transport=ofi"
SENDER+=" name=sr,type=pair,method=connect,address=tcp://localhost:7779,transport=zeromq" SENDER+=" name=sr,type=pair,method=connect,address=tcp://localhost:7779,transport=zeromq"
xterm -geometry 80x23+1000+0 -hold -e @EX_BIN_DIR@/$SENDER & xterm -geometry 80x23+1000+0 -hold -e @EX_BIN_DIR@/$SENDER &
RECEIVER="fairmq-ex-readout-receiver" RECEIVER="fairmq-ex-readout-receiver"
RECEIVER+=" --id receiver1" RECEIVER+=" --id receiver1"
# RECEIVER+=" --channel-config name=sr,type=pair,method=bind,address=tcp://localhost:7779,transport=ofi"
RECEIVER+=" --channel-config name=sr,type=pair,method=bind,address=tcp://localhost:7779,transport=zeromq" RECEIVER+=" --channel-config name=sr,type=pair,method=bind,address=tcp://localhost:7779,transport=zeromq"
xterm -geometry 80x23+1500+0 -hold -e @EX_BIN_DIR@/$RECEIVER & xterm -geometry 80x23+1500+0 -hold -e @EX_BIN_DIR@/$RECEIVER &

View File

@@ -1,12 +1,54 @@
################################################################################ ################################################################################
# Copyright (C) 2014-2023 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH # # Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
# # # #
# This software is distributed under the terms of the # # This software is distributed under the terms of the #
# GNU Lesser General Public Licence (LGPL) version 3, # # GNU Lesser General Public Licence (LGPL) version 3, #
# copied verbatim in the file "LICENSE" # # copied verbatim in the file "LICENSE" #
################################################################################ ################################################################################
add_example(NAME region add_executable(fairmq-ex-region-sampler sampler.cxx)
DEVICE sampler processor sink keep-alive target_link_libraries(fairmq-ex-region-sampler PRIVATE FairMQ)
SCRIPT region region-advanced region-advanced-external
add_executable(fairmq-ex-region-sink sink.cxx)
target_link_libraries(fairmq-ex-region-sink PRIVATE FairMQ)
add_executable(fairmq-ex-region-keep-alive keep-alive.cxx)
target_link_libraries(fairmq-ex-region-keep-alive PRIVATE FairMQ)
add_custom_target(ExampleRegion DEPENDS fairmq-ex-region-sampler fairmq-ex-region-sink fairmq-ex-region-keep-alive)
set(EX_BIN_DIR ${CMAKE_CURRENT_BINARY_DIR})
set(FAIRMQ_BIN_DIR ${CMAKE_BINARY_DIR}/fairmq)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-region.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-region.sh)
# test
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/test-ex-region.sh.in ${CMAKE_CURRENT_BINARY_DIR}/test-ex-region.sh)
add_test(NAME Example.Region.zeromq COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-region.sh zeromq)
set_tests_properties(Example.Region.zeromq PROPERTIES TIMEOUT "30" PASS_REGULAR_EXPRESSION "Received [0-9*] acks")
add_test(NAME Example.Region.shmem COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-region.sh shmem)
set_tests_properties(Example.Region.shmem PROPERTIES TIMEOUT "30" PASS_REGULAR_EXPRESSION "Received [0-9*] acks")
# install
install(
TARGETS
fairmq-ex-region-sampler
fairmq-ex-region-sink
LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR}
RUNTIME DESTINATION ${PROJECT_INSTALL_BINDIR}
)
# configure run script with different executable paths for build and for install directories
set(EX_BIN_DIR ${CMAKE_INSTALL_PREFIX}/${PROJECT_INSTALL_BINDIR})
set(FAIRMQ_BIN_DIR ${CMAKE_INSTALL_PREFIX}/${PROJECT_INSTALL_BINDIR}/fairmq)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-region.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-region.sh_install)
install(
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-region.sh_install
DESTINATION ${PROJECT_INSTALL_BINDIR}
RENAME fairmq-start-ex-region.sh
) )

View File

@@ -1,95 +0,0 @@
#!/bin/bash
export FAIRMQ_PATH=@FAIRMQ_BIN_DIR@
transport=${1:-shmem}
msgSize=${2:-1000000}
SAMPLER="fairmq-ex-region-sampler"
SAMPLER+=" --id sampler1"
# SAMPLER+=" --sampling-rate 10"
SAMPLER+=" --severity debug"
SAMPLER+=" --msg-size $msgSize"
SAMPLER+=" --transport $transport"
SAMPLER+=" --shmid 1"
SAMPLER+=" --shm-monitor false"
SAMPLER+=" --rc-segment-size 200000000"
SAMPLER+=" --external-region true"
SAMPLER+=" --shm-no-cleanup true"
SAMPLER+=" --chan-name data1"
SAMPLER+=" --channel-config name=data1,type=push,method=bind,address=tcp://127.0.0.1:7777"
xterm -geometry 90x60+0+0 -hold -e @EX_BIN_DIR@/$SAMPLER &
PROCESSOR1="fairmq-ex-region-processor"
PROCESSOR1+=" --id processor1"
PROCESSOR1+=" --severity debug"
PROCESSOR1+=" --transport $transport"
PROCESSOR1+=" --shmid 1"
PROCESSOR1+=" --shm-segment-id 1"
PROCESSOR1+=" --shm-monitor false"
PROCESSOR1+=" --shm-no-cleanup true"
PROCESSOR1+=" --channel-config name=data1,type=pull,method=connect,address=tcp://127.0.0.1:7777"
PROCESSOR1+=" name=data2,type=push,method=bind,address=tcp://127.0.0.1:7778"
PROCESSOR1+=" name=data3,type=push,method=bind,address=tcp://127.0.0.1:7779"
xterm -geometry 90x40+550+40 -hold -e @EX_BIN_DIR@/$PROCESSOR1 &
PROCESSOR2="fairmq-ex-region-processor"
PROCESSOR2+=" --id processor2"
PROCESSOR2+=" --severity debug"
PROCESSOR2+=" --transport $transport"
PROCESSOR2+=" --shmid 1"
PROCESSOR2+=" --shm-segment-id 2"
PROCESSOR2+=" --shm-monitor false"
PROCESSOR2+=" --shm-no-cleanup true"
PROCESSOR2+=" --channel-config name=data1,type=pull,method=connect,address=tcp://127.0.0.1:7777"
PROCESSOR2+=" name=data2,type=push,method=bind,address=tcp://127.0.0.1:7788"
PROCESSOR2+=" name=data3,type=push,method=bind,address=tcp://127.0.0.1:7789"
xterm -geometry 90x40+550+600 -hold -e @EX_BIN_DIR@/$PROCESSOR2 &
SINK1_1="fairmq-ex-region-sink"
SINK1_1+=" --id sink1_1"
SINK1_1+=" --severity debug"
SINK1_1+=" --chan-name data2"
SINK1_1+=" --transport $transport"
SINK1_1+=" --shmid 1"
SINK1_1+=" --shm-segment-id 1"
SINK1_1+=" --shm-monitor false"
SINK1_1+=" --shm-no-cleanup true"
SINK1_1+=" --channel-config name=data2,type=pull,method=connect,address=tcp://127.0.0.1:7778"
xterm -geometry 90x20+1100+0 -hold -e @EX_BIN_DIR@/$SINK1_1 &
SINK1_2="fairmq-ex-region-sink"
SINK1_2+=" --id sink1_2"
SINK1_2+=" --severity debug"
SINK1_2+=" --chan-name data3"
SINK1_2+=" --transport $transport"
SINK1_2+=" --shmid 1"
SINK1_2+=" --shm-segment-id 1"
SINK1_2+=" --shm-monitor false"
SINK1_2+=" --shm-no-cleanup true"
SINK1_2+=" --channel-config name=data3,type=pull,method=connect,address=tcp://127.0.0.1:7779"
xterm -geometry 90x20+1100+300 -hold -e @EX_BIN_DIR@/$SINK1_2 &
SINK2_1="fairmq-ex-region-sink"
SINK2_1+=" --id sink2_1"
SINK2_1+=" --severity debug"
SINK2_1+=" --chan-name data2"
SINK2_1+=" --transport $transport"
SINK2_1+=" --shmid 1"
SINK2_1+=" --shm-segment-id 2"
SINK2_1+=" --shm-monitor false"
SINK2_1+=" --shm-no-cleanup true"
SINK2_1+=" --channel-config name=data2,type=pull,method=connect,address=tcp://127.0.0.1:7788"
xterm -geometry 90x20+1100+600 -hold -e @EX_BIN_DIR@/$SINK2_1 &
SINK2_2="fairmq-ex-region-sink"
SINK2_2+=" --id sink2_2"
SINK2_2+=" --severity debug"
SINK2_2+=" --chan-name data3"
SINK2_2+=" --transport $transport"
SINK2_2+=" --shmid 1"
SINK2_2+=" --shm-segment-id 2"
SINK2_2+=" --shm-monitor false"
SINK2_2+=" --shm-no-cleanup true"
SINK2_2+=" --channel-config name=data3,type=pull,method=connect,address=tcp://127.0.0.1:7789"
xterm -geometry 90x20+1100+900 -hold -e @EX_BIN_DIR@/$SINK2_2 &

View File

@@ -1,80 +0,0 @@
#!/bin/bash
export FAIRMQ_PATH=@FAIRMQ_BIN_DIR@
transport=${1:-shmem}
msgSize=${2:-1000000}
SAMPLER="fairmq-ex-region-sampler"
SAMPLER+=" --id sampler1"
# SAMPLER+=" --sampling-rate 10"
SAMPLER+=" --severity debug"
SAMPLER+=" --msg-size $msgSize"
SAMPLER+=" --transport $transport"
#SAMPLER+=" --rc-segment-size 0"
SAMPLER+=" --shm-monitor true"
SAMPLER+=" --chan-name data1"
SAMPLER+=" --channel-config name=data1,type=push,method=bind,address=tcp://127.0.0.1:7777"
xterm -geometry 90x60+0+0 -hold -e @EX_BIN_DIR@/$SAMPLER &
PROCESSOR1="fairmq-ex-region-processor"
PROCESSOR1+=" --id processor1"
PROCESSOR1+=" --severity debug"
PROCESSOR1+=" --transport $transport"
PROCESSOR1+=" --shm-segment-id 1"
PROCESSOR1+=" --shm-monitor true"
PROCESSOR1+=" --channel-config name=data1,type=pull,method=connect,address=tcp://127.0.0.1:7777"
PROCESSOR1+=" name=data2,type=push,method=bind,address=tcp://127.0.0.1:7778"
PROCESSOR1+=" name=data3,type=push,method=bind,address=tcp://127.0.0.1:7779"
xterm -geometry 90x40+550+40 -hold -e @EX_BIN_DIR@/$PROCESSOR1 &
PROCESSOR2="fairmq-ex-region-processor"
PROCESSOR2+=" --id processor2"
PROCESSOR2+=" --severity debug"
PROCESSOR2+=" --transport $transport"
PROCESSOR2+=" --shm-segment-id 2"
PROCESSOR2+=" --shm-monitor true"
PROCESSOR2+=" --channel-config name=data1,type=pull,method=connect,address=tcp://127.0.0.1:7777"
PROCESSOR2+=" name=data2,type=push,method=bind,address=tcp://127.0.0.1:7788"
PROCESSOR2+=" name=data3,type=push,method=bind,address=tcp://127.0.0.1:7789"
xterm -geometry 90x40+550+600 -hold -e @EX_BIN_DIR@/$PROCESSOR2 &
SINK1_1="fairmq-ex-region-sink"
SINK1_1+=" --id sink1_1"
SINK1_1+=" --severity debug"
SINK1_1+=" --chan-name data2"
SINK1_1+=" --transport $transport"
SINK1_1+=" --shm-segment-id 1"
SINK1_1+=" --shm-monitor true"
SINK1_1+=" --channel-config name=data2,type=pull,method=connect,address=tcp://127.0.0.1:7778"
xterm -geometry 90x20+1100+0 -hold -e @EX_BIN_DIR@/$SINK1_1 &
SINK1_2="fairmq-ex-region-sink"
SINK1_2+=" --id sink1_2"
SINK1_2+=" --severity debug"
SINK1_2+=" --chan-name data3"
SINK1_2+=" --transport $transport"
SINK1_2+=" --shm-segment-id 1"
SINK1_2+=" --shm-monitor true"
SINK1_2+=" --channel-config name=data3,type=pull,method=connect,address=tcp://127.0.0.1:7779"
xterm -geometry 90x20+1100+300 -hold -e @EX_BIN_DIR@/$SINK1_2 &
SINK2_1="fairmq-ex-region-sink"
SINK2_1+=" --id sink2_1"
SINK2_1+=" --severity debug"
SINK2_1+=" --chan-name data2"
SINK2_1+=" --transport $transport"
SINK2_1+=" --shm-segment-id 2"
SINK2_1+=" --shm-monitor true"
SINK2_1+=" --channel-config name=data2,type=pull,method=connect,address=tcp://127.0.0.1:7788"
xterm -geometry 90x20+1100+600 -hold -e @EX_BIN_DIR@/$SINK2_1 &
SINK2_2="fairmq-ex-region-sink"
SINK2_2+=" --id sink2_2"
SINK2_2+=" --severity debug"
SINK2_2+=" --chan-name data3"
SINK2_2+=" --transport $transport"
SINK2_2+=" --shm-segment-id 2"
SINK2_2+=" --shm-monitor true"
SINK2_2+=" --channel-config name=data3,type=pull,method=connect,address=tcp://127.0.0.1:7789"
xterm -geometry 90x20+1100+900 -hold -e @EX_BIN_DIR@/$SINK2_2 &

View File

@@ -2,8 +2,16 @@
export FAIRMQ_PATH=@FAIRMQ_BIN_DIR@ export FAIRMQ_PATH=@FAIRMQ_BIN_DIR@
transport=${1:-shmem} transport="shmem"
msgSize=${2:-1000000} msgSize="1000000"
if [[ $1 =~ ^[a-z]+$ ]]; then
transport=$1
fi
if [[ $2 =~ ^[0-9]+$ ]]; then
msgSize=$1
fi
SAMPLER="fairmq-ex-region-sampler" SAMPLER="fairmq-ex-region-sampler"
SAMPLER+=" --id sampler1" SAMPLER+=" --id sampler1"
@@ -11,10 +19,6 @@ SAMPLER+=" --severity debug"
SAMPLER+=" --msg-size $msgSize" SAMPLER+=" --msg-size $msgSize"
# SAMPLER+=" --rate 10" # SAMPLER+=" --rate 10"
SAMPLER+=" --transport $transport" SAMPLER+=" --transport $transport"
# SAMPLER+=" --external-region true"
# SAMPLER+=" --shm-no-cleaup true"
SAMPLER+=" --shm-monitor true"
# SAMPLER+=" --shmid 1"
SAMPLER+=" --channel-config name=data,type=push,method=bind,address=tcp://127.0.0.1:7777,sndKernelSize=212992" SAMPLER+=" --channel-config name=data,type=push,method=bind,address=tcp://127.0.0.1:7777,sndKernelSize=212992"
xterm -geometry 120x60+0+0 -hold -e @EX_BIN_DIR@/$SAMPLER & xterm -geometry 120x60+0+0 -hold -e @EX_BIN_DIR@/$SAMPLER &
@@ -22,8 +26,5 @@ SINK="fairmq-ex-region-sink"
SINK+=" --id sink1" SINK+=" --id sink1"
SINK+=" --severity debug" SINK+=" --severity debug"
SINK+=" --transport $transport" SINK+=" --transport $transport"
# SINK+=" --shm-no-cleaup true"
SINK+=" --shm-monitor true"
# SINK+=" --shmid 1"
SINK+=" --channel-config name=data,type=pull,method=connect,address=tcp://127.0.0.1:7777,rcvKernelSize=212992" SINK+=" --channel-config name=data,type=pull,method=connect,address=tcp://127.0.0.1:7777,rcvKernelSize=212992"
xterm -geometry 120x60+750+0 -hold -e @EX_BIN_DIR@/$SINK & xterm -geometry 120x60+750+0 -hold -e @EX_BIN_DIR@/$SINK &

View File

@@ -6,9 +6,10 @@
* copied verbatim in the file "LICENSE" * * copied verbatim in the file "LICENSE" *
********************************************************************************/ ********************************************************************************/
#include <fairmq/shmem/Common.h> #include <fairmq/shmem/Common.h>
#include <fairmq/shmem/Monitor.h>
#include <fairmq/shmem/Segment.h>
#include <fairmq/shmem/UnmanagedRegion.h> #include <fairmq/shmem/UnmanagedRegion.h>
#include <fairmq/shmem/Segment.h>
#include <fairmq/shmem/Monitor.h>
#include <fairmq/tools/Unique.h> #include <fairmq/tools/Unique.h>
#include <fairlogger/Logger.h> #include <fairlogger/Logger.h>
@@ -16,130 +17,75 @@
#include <boost/algorithm/string.hpp> #include <boost/algorithm/string.hpp>
#include <boost/program_options.hpp> #include <boost/program_options.hpp>
#include <chrono>
#include <csignal> #include <csignal>
#include <chrono>
#include <map> #include <map>
#include <mutex>
#include <string> #include <string>
#include <thread> #include <thread>
using namespace std; using namespace std;
using namespace boost::program_options; using namespace boost::program_options;
namespace { namespace
volatile sig_atomic_t gStopping = 0; {
volatile sig_atomic_t gResetContent = 0; volatile sig_atomic_t gStopping = 0;
} // namespace }
void signalHandler(int /* signal */) { gStopping = 1; } void signalHandler(int /* signal */)
{
void resetContentHandler(int /* signal */) { gResetContent = 1; } gStopping = 1;
}
struct ShmManager struct ShmManager
{ {
ShmManager(uint64_t _shmId, const vector<string>& _segments, const vector<string>& _regions, bool zero = true) ShmManager(uint64_t _shmId, const vector<string>& _segments, const vector<string>& _regions)
: shmId(fair::mq::shmem::makeShmIdStr(_shmId)) : shmId(fair::mq::shmem::makeShmIdStr(_shmId))
{
LOG(info) << "Starting ShmManager for shmId: " << shmId;
LOG(info) << "Performing full reset...";
FullReset();
LOG(info) << "Done.";
LOG(info) << "Adding managed segments...";
AddSegments(_segments, zero);
LOG(info) << "Done.";
LOG(info) << "Adding unmanaged regions...";
AddRegions(_regions, zero);
LOG(info) << "Done.";
LOG(info) << "Shared memory is ready for use.";
}
void AddSegments(const vector<string>& _segments, bool zero)
{ {
for (const auto& s : _segments) { for (const auto& s : _segments) {
vector<string> conf; vector<string> segmentConf;
boost::algorithm::split(conf, s, boost::algorithm::is_any_of(",")); boost::algorithm::split(segmentConf, s, boost::algorithm::is_any_of(","));
if (conf.size() != 3) { if (segmentConf.size() != 2) {
LOG(error) << "incorrect format for --segments. Expecting pairs of <id>,<size><numaid>."; LOG(error) << "incorrect format for --segments. Expecting pairs of <id>,<size>.";
fair::mq::shmem::Monitor::Cleanup(fair::mq::shmem::ShmId{shmId}); fair::mq::shmem::Monitor::Cleanup(fair::mq::shmem::ShmId{shmId});
throw runtime_error("incorrect format for --segments. Expecting pairs of <id>,<size>,<numaid>."); throw runtime_error("incorrect format for --segments. Expecting pairs of <id>,<size>.");
} }
uint16_t id = stoi(conf.at(0)); uint16_t id = stoi(segmentConf.at(0));
uint64_t size = stoull(conf.at(1)); uint64_t size = stoull(segmentConf.at(1));
segmentCfgs.emplace_back(fair::mq::shmem::SegmentConfig{id, size, "rbtree_best_fit"});
auto ret = segments.emplace(id, fair::mq::shmem::Segment(shmId, id, size, fair::mq::shmem::rbTreeBestFit)); auto ret = segments.emplace(id, fair::mq::shmem::Segment(shmId, id, size, fair::mq::shmem::rbTreeBestFit));
fair::mq::shmem::Segment& segment = ret.first->second; fair::mq::shmem::Segment& segment = ret.first->second;
LOG(info) << "Created segment " << id << " of size " << segment.GetSize() LOG(info) << "Created segment " << id << " of size " << segment.GetSize() << ", starting at " << segment.GetData() << ". Locking...";
<< ", starting at " << segment.GetData() << ". Locking...";
segment.Lock(); segment.Lock();
LOG(info) << "Done."; LOG(info) << "Done.";
if (zero) { LOG(info) << "Zeroing...";
LOG(info) << "Zeroing..."; segment.Zero();
segment.Zero(); LOG(info) << "Done.";
LOG(info) << "Done.";
}
} }
}
void AddRegions(const vector<string>& _regions, bool zero)
{
for (const auto& r : _regions) { for (const auto& r : _regions) {
vector<string> conf; vector<string> regionConf;
boost::algorithm::split(conf, r, boost::algorithm::is_any_of(",")); boost::algorithm::split(regionConf, r, boost::algorithm::is_any_of(","));
if (conf.size() != 3) { if (regionConf.size() != 2) {
LOG(error) << "incorrect format for --regions. Expecting pairs of <id>,<size>,<numaid>."; LOG(error) << "incorrect format for --regions. Expecting pairs of <id>,<size>.";
fair::mq::shmem::Monitor::Cleanup(fair::mq::shmem::ShmId{shmId}); fair::mq::shmem::Monitor::Cleanup(fair::mq::shmem::ShmId{shmId});
throw runtime_error("incorrect format for --regions. Expecting pairs of <id>,<size>,<numaid>."); throw runtime_error("incorrect format for --regions. Expecting pairs of <id>,<size>.");
} }
uint16_t id = stoi(conf.at(0)); uint16_t id = stoi(regionConf.at(0));
uint64_t size = stoull(conf.at(1)); uint64_t size = stoull(regionConf.at(1));
fair::mq::RegionConfig cfg; auto ret = regions.emplace(id, make_unique<fair::mq::shmem::UnmanagedRegion>(shmId, id, size));
cfg.id = id;
cfg.rcSegmentSize = 0;
cfg.size = size;
regionCfgs.push_back(cfg);
auto ret = regions.emplace(id, make_unique<fair::mq::shmem::UnmanagedRegion>(shmId, cfg));
fair::mq::shmem::UnmanagedRegion& region = *(ret.first->second); fair::mq::shmem::UnmanagedRegion& region = *(ret.first->second);
LOG(info) << "Created unamanged region " << id << " of size " << region.GetSize() LOG(info) << "Created unamanged region " << id << " of size " << region.GetSize() << ", starting at " << region.GetData() << ". Locking...";
<< ", starting at " << region.GetData() << ". Locking...";
region.Lock(); region.Lock();
LOG(info) << "Done."; LOG(info) << "Done.";
if (zero) { LOG(info) << "Zeroing...";
LOG(info) << "Zeroing..."; region.Zero();
region.Zero(); LOG(info) << "Done.";
LOG(info) << "Done.";
}
} }
} }
bool CheckPresence()
{
std::lock_guard<std::mutex> lock(localMtx);
for (const auto& sc : segmentCfgs) {
if (!(fair::mq::shmem::Monitor::SegmentIsPresent(fair::mq::shmem::ShmId{shmId}, sc.id))) {
return false;
}
}
for (const auto& rc : regionCfgs) {
if (!(fair::mq::shmem::Monitor::RegionIsPresent(fair::mq::shmem::ShmId{shmId}, rc.id.value()))) {
return false;
}
}
return true;
}
void ResetContent() void ResetContent()
{ {
std::lock_guard<std::mutex> lock(localMtx); fair::mq::shmem::Monitor::ResetContent(fair::mq::shmem::ShmId{shmId});
fair::mq::shmem::Monitor::ResetContent(fair::mq::shmem::ShmId{shmId}, segmentCfgs, regionCfgs);
}
void FullReset()
{
segments.clear();
regions.clear();
fair::mq::shmem::Monitor::Cleanup(fair::mq::shmem::ShmId{shmId});
} }
~ShmManager() ~ShmManager()
@@ -149,11 +95,8 @@ struct ShmManager
} }
std::string shmId; std::string shmId;
std::mutex localMtx;
map<uint16_t, fair::mq::shmem::Segment> segments; map<uint16_t, fair::mq::shmem::Segment> segments;
map<uint16_t, unique_ptr<fair::mq::shmem::UnmanagedRegion>> regions; map<uint16_t, unique_ptr<fair::mq::shmem::UnmanagedRegion>> regions;
std::vector<fair::mq::shmem::SegmentConfig> segmentCfgs;
std::vector<fair::mq::RegionConfig> regionCfgs;
}; };
int main(int argc, char** argv) int main(int argc, char** argv)
@@ -162,11 +105,8 @@ int main(int argc, char** argv)
signal(SIGINT, signalHandler); signal(SIGINT, signalHandler);
signal(SIGTERM, signalHandler); signal(SIGTERM, signalHandler);
signal(SIGUSR1, resetContentHandler);
try { try {
bool nozero = false;
bool checkPresence = true;
uint64_t shmId = 0; uint64_t shmId = 0;
vector<string> segments; vector<string> segments;
vector<string> regions; vector<string> regions;
@@ -174,10 +114,8 @@ int main(int argc, char** argv)
options_description desc("Options"); options_description desc("Options");
desc.add_options() desc.add_options()
("shmid", value<uint64_t>(&shmId)->required(), "Shm id") ("shmid", value<uint64_t>(&shmId)->required(), "Shm id")
("segments", value<vector<string>>(&segments)->multitoken()->composing(), "Segments, as <id>,<size>,<numaid> <id>,<size>,<numaid> <id>,<size>,<numaid> ... (numaid: -2 disabled, -1 interleave, >=0 node)") ("segments", value<vector<string>>(&segments)->multitoken()->composing(), "Segments, as <id>,<size> <id>,<size> <id>,<size> ...")
("regions", value<vector<string>>(&regions)->multitoken()->composing(), "Regions, as <id>,<size> <id>,<size>,<numaid> <id>,<size>,<numaid> ...") ("regions", value<vector<string>>(&regions)->multitoken()->composing(), "Regions, as <id>,<size> <id>,<size> <id>,<size> ...")
("nozero", value<bool>(&nozero)->default_value(false)->implicit_value(true), "Do not zero segments after initialization")
("check-presence", value<bool>(&checkPresence)->default_value(true)->implicit_value(true), "Check periodically if configured segments/regions are still present, and cleanup and leave if they are not")
("help,h", "Print help"); ("help,h", "Print help");
variables_map vm; variables_map vm;
@@ -190,35 +128,15 @@ int main(int argc, char** argv)
notify(vm); notify(vm);
ShmManager shmManager(shmId, segments, regions, !nozero); ShmManager shmManager(shmId, segments, regions);
std::thread resetContentThread([&shmManager]() { while (!gStopping) {
while (!gStopping) { std::this_thread::sleep_for(std::chrono::milliseconds(50));
std::this_thread::sleep_for(std::chrono::milliseconds(50));
if (gResetContent == 1) {
LOG(info) << "Resetting content for shmId " << shmManager.shmId;
shmManager.ResetContent();
gResetContent = 0;
LOG(info) << "Done resetting content for shmId " << shmManager.shmId;
}
}
});
if (checkPresence) {
while (!gStopping) {
if (shmManager.CheckPresence() == false) {
LOG(error) << "Failed to find segments, exiting.";
gStopping = true;
}
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
} }
resetContentThread.join();
LOG(info) << "stopping."; LOG(info) << "stopping.";
} catch (exception& e) { } catch (exception& e) {
LOG(error) << "Exception reached the top of main: " << e.what() << ", exiting"; LOG(error) << "Unhandled Exception reached the top of main: " << e.what() << ", application will now exit";
return 2; return 2;
} }

View File

@@ -1,80 +0,0 @@
/********************************************************************************
* Copyright (C) 2014-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <fairmq/Device.h>
#include <fairmq/runDevice.h>
#include <memory>
namespace bpo = boost::program_options;
using namespace std;
using namespace fair::mq;
namespace {
struct Processor : Device
{
void InitTask() override
{
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
GetChannel("data1", 0).Transport()->SubscribeToRegionEvents([](RegionInfo info) {
LOG(info) << "Region event: " << info.event << ": "
<< (info.managed ? "managed" : "unmanaged") << ", id: " << info.id
<< ", ptr: " << info.ptr << ", size: " << info.size
<< ", flags: " << info.flags;
});
}
void Run() override
{
Channel& dataIn = GetChannel("data1", 0);
Channel& dataOut1 = GetChannel("data2", 0);
Channel& dataOut2 = GetChannel("data3", 0);
while (!NewStatePending()) {
fair::mq::Parts inParts;
dataIn.Receive(inParts);
fair::mq::Parts outParts1;
fair::mq::Parts outParts2;
for (const auto& inPart : inParts) {
outParts1.AddPart(NewMessage());
outParts1.fParts.back()->Copy(*inPart);
outParts2.AddPart(NewMessage());
outParts2.fParts.back()->Copy(*inPart);
}
dataOut1.Send(outParts1);
dataOut2.Send(outParts2);
if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) {
LOG(info) << "Configured max number of iterations reached. Leaving RUNNING state.";
break;
}
}
}
void ResetTask() override
{
GetChannel("data1", 0).Transport()->UnsubscribeFromRegionEvents();
}
private:
uint64_t fMaxIterations = 0;
uint64_t fNumIterations = 0;
};
} // namespace
void addCustomOptions(bpo::options_description& options)
{
options.add_options()("max-iterations", bpo::value<uint64_t>()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
}
unique_ptr<Device> getDevice(ProgOptions& /*config*/) { return make_unique<Processor>(); }

View File

@@ -8,7 +8,6 @@
#include <fairmq/Device.h> #include <fairmq/Device.h>
#include <fairmq/runDevice.h> #include <fairmq/runDevice.h>
#include <fairmq/tools/RateLimit.h>
#include <cstdint> #include <cstdint>
#include <mutex> #include <mutex>
@@ -20,15 +19,11 @@ struct Sampler : fair::mq::Device
{ {
void InitTask() override void InitTask() override
{ {
fExternalRegion = fConfig->GetProperty<bool>("external-region");
fMsgSize = fConfig->GetProperty<int>("msg-size"); fMsgSize = fConfig->GetProperty<int>("msg-size");
fLinger = fConfig->GetProperty<uint32_t>("region-linger"); fLinger = fConfig->GetProperty<uint32_t>("region-linger");
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations"); fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
fChanName = fConfig->GetProperty<std::string>("chan-name");
fSamplingRate = fConfig->GetProperty<float>("sampling-rate");
fRCSegmentSize = fConfig->GetProperty<uint64_t>("rc-segment-size");
GetChannel(fChanName, 0).Transport()->SubscribeToRegionEvents([](fair::mq::RegionInfo info) { GetChannel("data", 0).Transport()->SubscribeToRegionEvents([](fair::mq::RegionInfo info) {
LOG(info) << "Region event: " << info.event << ": " LOG(info) << "Region event: " << info.event << ": "
<< (info.managed ? "managed" : "unmanaged") << (info.managed ? "managed" : "unmanaged")
<< ", id: " << info.id << ", id: " << info.id
@@ -39,109 +34,76 @@ struct Sampler : fair::mq::Device
fair::mq::RegionConfig regionCfg; fair::mq::RegionConfig regionCfg;
regionCfg.linger = fLinger; // delay in ms before region destruction to collect outstanding events regionCfg.linger = fLinger; // delay in ms before region destruction to collect outstanding events
// options for testing with an externally-created -region regionCfg.lock = true; // mlock region after creation
if (fExternalRegion) { regionCfg.zero = true; // zero region content after creation
regionCfg.id = 1; fRegion = fair::mq::UnmanagedRegionPtr(NewUnmanagedRegionFor("data", // region is created using the transport of this channel...
regionCfg.removeOnDestruction = false; 0, // ... and this sub-channel
} 10000000, // region size
regionCfg.lock = !fExternalRegion; // mlock region after creation [this](const std::vector<fair::mq::RegionBlock>& blocks) { // callback to be called when message buffers no longer needed by transport
regionCfg.zero = !fExternalRegion; // zero region content after creation std::lock_guard<std::mutex> lock(fMtx);
regionCfg.rcSegmentSize = fRCSegmentSize; // size of the corresponding reference count segment fNumUnackedMsgs -= blocks.size();
fRegion = fair::mq::UnmanagedRegionPtr(NewUnmanagedRegionFor( if (fMaxIterations > 0) {
fChanName, // region is created using the transport of this channel... LOG(info) << "Received " << blocks.size() << " acks";
0, // ... and this sub-channel }
10000000, // region size }, regionCfg));
[this](const std::vector<fair::mq::RegionBlock>& blocks) { // callback to be called when message buffers no longer needed by transport
std::lock_guard<std::mutex> lock(fMtx);
fNumUnackedMsgs -= blocks.size();
if (fMaxIterations > 0) {
LOG(info) << "Received " << blocks.size() << " acks";
}
},
regionCfg
));
} }
void Run() override bool ConditionalRun() override
{ {
fair::mq::MessagePtr msg(NewMessageFor("data", // channel
0, // sub-channel
fRegion, // region
fRegion->GetData(), // ptr within region
fMsgSize, // offset from ptr
nullptr // hint
));
fair::mq::tools::RateLimiter rateLimiter(fSamplingRate); // static_cast<char*>(fRegion->GetData())[3] = 97;
// LOG(info) << "check: " << static_cast<char*>(fRegion->GetData())[3];
// std::this_thread::sleep_for(std::chrono::seconds(1));
while (!NewStatePending()) { std::lock_guard<std::mutex> lock(fMtx);
fair::mq::Parts parts; ++fNumUnackedMsgs;
// make 64 parts if (Send(msg, "data", 0) > 0) {
for (int i = 0; i < 64; ++i) { if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) {
parts.AddPart(NewMessageFor( LOG(info) << "Configured maximum number of iterations reached. Leaving RUNNING state.";
fChanName, // channel return false;
0, // sub-channel
fRegion, // region
fRegion->GetData(), // ptr within region
fMsgSize, // offset from ptr
nullptr // hint
));
}
std::lock_guard<std::mutex> lock(fMtx);
fNumUnackedMsgs += parts.Size();
if (Send(parts, fChanName, 0) > 0) {
if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) {
LOG(info) << "Configured maximum number of iterations reached. Stopping sending.";
break;
}
if (fSamplingRate > 0.001) {
rateLimiter.maybe_sleep();
}
} }
} }
// wait for all acks to arrive return true;
while (!NewStatePending()) {
{
std::lock_guard<std::mutex> lock(fMtx);
if (fNumUnackedMsgs == 0) {
break;
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(25));
}
if (fNumUnackedMsgs != 0) {
LOG(info) << "Done, still not acknowledged: " << fNumUnackedMsgs;
} else {
LOG(info) << "All acknowledgements received.";
}
} }
void ResetTask() override void ResetTask() override
{ {
fRegion.reset(); fRegion.reset();
GetChannel(fChanName, 0).Transport()->UnsubscribeFromRegionEvents(); {
std::lock_guard<std::mutex> lock(fMtx);
if (fNumUnackedMsgs != 0) {
LOG(info) << "Done, still not acknowledged: " << fNumUnackedMsgs;
} else {
LOG(info) << "All acknowledgements received.";
}
}
GetChannel("data", 0).Transport()->UnsubscribeFromRegionEvents();
} }
private: private:
int fExternalRegion = false;
int fMsgSize = 10000; int fMsgSize = 10000;
uint32_t fLinger = 100; uint32_t fLinger = 100;
uint64_t fMaxIterations = 0; uint64_t fMaxIterations = 0;
uint64_t fNumIterations = 0; uint64_t fNumIterations = 0;
uint64_t fRCSegmentSize = 10000000;
fair::mq::UnmanagedRegionPtr fRegion = nullptr; fair::mq::UnmanagedRegionPtr fRegion = nullptr;
std::mutex fMtx; std::mutex fMtx;
uint64_t fNumUnackedMsgs = 0; uint64_t fNumUnackedMsgs = 0;
std::string fChanName;
float fSamplingRate = 0.;
}; };
void addCustomOptions(bpo::options_description& options) void addCustomOptions(bpo::options_description& options)
{ {
options.add_options() options.add_options()
("chan-name", bpo::value<std::string>()->default_value("data"), "name of the output channel")
("msg-size", bpo::value<int>()->default_value(1000), "Message size in bytes") ("msg-size", bpo::value<int>()->default_value(1000), "Message size in bytes")
("sampling-rate", bpo::value<float>()->default_value(0.), "Sampling rate (Hz).")
("region-linger", bpo::value<uint32_t>()->default_value(100), "Linger period for regions") ("region-linger", bpo::value<uint32_t>()->default_value(100), "Linger period for regions")
("max-iterations", bpo::value<uint64_t>()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)") ("max-iterations", bpo::value<uint64_t>()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
("external-region", bpo::value<bool>()->default_value(false), "Use region created by another process")
("rc-segment-size", bpo::value<uint64_t>()->default_value(10000000), "Size of the reference count segment for Unamanged Region");
} }
std::unique_ptr<fair::mq::Device> getDevice(fair::mq::ProgOptions& /*config*/) std::unique_ptr<fair::mq::Device> getDevice(fair::mq::ProgOptions& /*config*/)

View File

@@ -22,8 +22,7 @@ struct Sink : Device
{ {
// Get the fMaxIterations value from the command line options (via fConfig) // Get the fMaxIterations value from the command line options (via fConfig)
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations"); fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
fChanName = fConfig->GetProperty<std::string>("chan-name"); GetChannel("data", 0).Transport()->SubscribeToRegionEvents([](RegionInfo info) {
GetChannel(fChanName, 0).Transport()->SubscribeToRegionEvents([](RegionInfo info) {
LOG(info) << "Region event: " << info.event << ": " LOG(info) << "Region event: " << info.event << ": "
<< (info.managed ? "managed" : "unmanaged") << ", id: " << info.id << (info.managed ? "managed" : "unmanaged") << ", id: " << info.id
<< ", ptr: " << info.ptr << ", size: " << info.size << ", ptr: " << info.ptr << ", size: " << info.size
@@ -33,11 +32,15 @@ struct Sink : Device
void Run() override void Run() override
{ {
Channel& dataIn = GetChannel(fChanName, 0); Channel& dataInChannel = GetChannel("data", 0);
while (!NewStatePending()) { while (!NewStatePending()) {
fair::mq::Parts parts; auto msg(dataInChannel.Transport()->CreateMessage());
dataIn.Receive(parts); dataInChannel.Receive(msg);
// void* ptr = msg->GetData();
// char* cptr = static_cast<char*>(ptr);
// LOG(info) << "check: " << cptr[3];
if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) { if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) {
LOG(info) << "Configured max number of iterations reached. Leaving RUNNING state."; LOG(info) << "Configured max number of iterations reached. Leaving RUNNING state.";
@@ -48,22 +51,22 @@ struct Sink : Device
void ResetTask() override void ResetTask() override
{ {
GetChannel(fChanName, 0).Transport()->UnsubscribeFromRegionEvents(); GetChannel("data", 0).Transport()->UnsubscribeFromRegionEvents();
} }
private: private:
uint64_t fMaxIterations = 0; uint64_t fMaxIterations = 0;
uint64_t fNumIterations = 0; uint64_t fNumIterations = 0;
std::string fChanName;
}; };
} // namespace } // namespace
void addCustomOptions(bpo::options_description& options) void addCustomOptions(bpo::options_description& options)
{ {
options.add_options() options.add_options()(
("chan-name", bpo::value<std::string>()->default_value("data"), "name of the input channel") "max-iterations",
("max-iterations", bpo::value<uint64_t>()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)"); bpo::value<uint64_t>()->default_value(0),
"Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
} }
unique_ptr<Device> getDevice(ProgOptions& /*config*/) { return make_unique<Sink>(); } unique_ptr<Device> getDevice(ProgOptions& /*config*/) { return make_unique<Sink>(); }

View File

@@ -1,7 +1,5 @@
#!/bin/bash #!/bin/bash
set -e
export FAIRMQ_PATH=@FAIRMQ_BIN_DIR@ export FAIRMQ_PATH=@FAIRMQ_BIN_DIR@
transport="zeromq" transport="zeromq"
@@ -16,7 +14,7 @@ chan="data"
chanAddr="/tmp/fmq_$session""_""$chan""_""$transport" chanAddr="/tmp/fmq_$session""_""$chan""_""$transport"
# setup a trap to kill everything if the test fails/timeouts # setup a trap to kill everything if the test fails/timeouts
trap 'set +e; kill -TERM $SAMPLER_PID; kill -TERM $SINK_PID; wait $SAMPLER_PID; wait $SINK_PID; @CMAKE_BINARY_DIR@/fairmq/fairmq-shmmonitor --cleanup --session $SESSION; rm $chanAddr; exit 0' TERM trap 'kill -TERM $SAMPLER_PID; kill -TERM $SINK_PID; wait $SAMPLER_PID; wait $SINK_PID; @CMAKE_BINARY_DIR@/fairmq/fairmq-shmmonitor --cleanup --session $SESSION; rm $chanAddr' TERM
SAMPLER="fairmq-ex-region-sampler" SAMPLER="fairmq-ex-region-sampler"
SAMPLER+=" --id sampler1" SAMPLER+=" --id sampler1"
@@ -24,7 +22,6 @@ SAMPLER+=" --transport $transport"
SAMPLER+=" --severity debug" SAMPLER+=" --severity debug"
SAMPLER+=" --session $session" SAMPLER+=" --session $session"
SAMPLER+=" --shm-segment-size 100000000" SAMPLER+=" --shm-segment-size 100000000"
SAMPLER+=" --shm-monitor true"
SAMPLER+=" --verbosity veryhigh" SAMPLER+=" --verbosity veryhigh"
SAMPLER+=" --control static --color false" SAMPLER+=" --control static --color false"
SAMPLER+=" --max-iterations 1" SAMPLER+=" --max-iterations 1"
@@ -40,7 +37,6 @@ SINK+=" --transport $transport"
SINK+=" --severity debug" SINK+=" --severity debug"
SINK+=" --session $session" SINK+=" --session $session"
SINK+=" --shm-segment-size 100000000" SINK+=" --shm-segment-size 100000000"
SINK+=" --shm-monitor true"
SINK+=" --verbosity veryhigh" SINK+=" --verbosity veryhigh"
SINK+=" --control static --color false" SINK+=" --control static --color false"
SINK+=" --max-iterations 1" SINK+=" --max-iterations 1"
@@ -52,6 +48,4 @@ SINK_PID=$!
wait $SAMPLER_PID wait $SAMPLER_PID
wait $SINK_PID wait $SINK_PID
set +e
rm $chanAddr rm $chanAddr
exit 0

View File

@@ -1,11 +1,52 @@
################################################################################ ################################################################################
# Copyright (C) 2014-2023 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH # # Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
# # # #
# This software is distributed under the terms of the # # This software is distributed under the terms of the #
# GNU Lesser General Public Licence (LGPL) version 3, # # GNU Lesser General Public Licence (LGPL) version 3, #
# copied verbatim in the file "LICENSE" # # copied verbatim in the file "LICENSE" #
################################################################################ ################################################################################
add_example(NAME req-rep add_executable(fairmq-ex-req-rep-client client.cxx)
DEVICE client server target_link_libraries(fairmq-ex-req-rep-client PRIVATE FairMQ)
add_executable(fairmq-ex-req-rep-server server.cxx)
target_link_libraries(fairmq-ex-req-rep-server PRIVATE FairMQ)
add_custom_target(ExampleReqRep DEPENDS fairmq-ex-req-rep-client fairmq-ex-req-rep-server)
set(EX_BIN_DIR ${CMAKE_CURRENT_BINARY_DIR})
set(FAIRMQ_BIN_DIR ${CMAKE_BINARY_DIR}/fairmq)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-req-rep.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-req-rep.sh)
# test
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/test-ex-req-rep.sh.in ${CMAKE_CURRENT_BINARY_DIR}/test-ex-req-rep.sh)
add_test(NAME Example.ReqRep.zeromq COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-req-rep.sh zeromq)
set_tests_properties(Example.ReqRep.zeromq PROPERTIES TIMEOUT "30" PASS_REGULAR_EXPRESSION "Received reply from server: ")
add_test(NAME Example.ReqRep.shmem COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-req-rep.sh shmem)
set_tests_properties(Example.ReqRep.shmem PROPERTIES TIMEOUT "30" PASS_REGULAR_EXPRESSION "Received reply from server: ")
# install
install(
TARGETS
fairmq-ex-req-rep-client
fairmq-ex-req-rep-server
LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR}
RUNTIME DESTINATION ${PROJECT_INSTALL_BINDIR}
)
# configure run script with different executable paths for build and for install directories
set(EX_BIN_DIR ${CMAKE_INSTALL_PREFIX}/${PROJECT_INSTALL_BINDIR})
set(FAIRMQ_BIN_DIR ${CMAKE_INSTALL_PREFIX}/${PROJECT_INSTALL_BINDIR}/fairmq)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-req-rep.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-req-rep.sh_install)
install(
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-req-rep.sh_install
DESTINATION ${PROJECT_INSTALL_BINDIR}
RENAME fairmq-start-ex-req-rep.sh
) )

View File

@@ -1,7 +1,5 @@
#!/bin/bash #!/bin/bash
set -e
export FAIRMQ_PATH=@FAIRMQ_BIN_DIR@ export FAIRMQ_PATH=@FAIRMQ_BIN_DIR@
transport="zeromq" transport="zeromq"
@@ -15,7 +13,7 @@ chan="data"
chanAddr="/tmp/fmq_$session""_""$chan""_""$transport" chanAddr="/tmp/fmq_$session""_""$chan""_""$transport"
# setup a trap to kill everything if the test fails/timeouts # setup a trap to kill everything if the test fails/timeouts
trap 'set +e; kill -TERM $CLIENT_PID; kill -TERM $SERVER_PID; wait $CLIENT_PID; wait $SERVER_PID; rm $chanAddr; exit 0' TERM trap 'kill -TERM $CLIENT_PID; kill -TERM $SERVER_PID; wait $CLIENT_PID; wait $SERVER_PID; rm $chanAddr' TERM
CLIENT="fairmq-ex-req-rep-client" CLIENT="fairmq-ex-req-rep-client"
CLIENT+=" --id client" CLIENT+=" --id client"
@@ -23,7 +21,6 @@ CLIENT+=" --transport $transport"
CLIENT+=" --verbosity veryhigh" CLIENT+=" --verbosity veryhigh"
CLIENT+=" --session $session" CLIENT+=" --session $session"
CLIENT+=" --shm-segment-size 100000000" CLIENT+=" --shm-segment-size 100000000"
CLIENT+=" --shm-monitor true"
CLIENT+=" --control static --color false" CLIENT+=" --control static --color false"
CLIENT+=" --max-iterations 1" CLIENT+=" --max-iterations 1"
CLIENT+=" --channel-config name=$chan,type=req,method=connect,rateLogging=0,address=ipc://$chanAddr" CLIENT+=" --channel-config name=$chan,type=req,method=connect,rateLogging=0,address=ipc://$chanAddr"
@@ -36,7 +33,6 @@ SERVER+=" --transport $transport"
SERVER+=" --verbosity veryhigh" SERVER+=" --verbosity veryhigh"
SERVER+=" --session $session" SERVER+=" --session $session"
SERVER+=" --shm-segment-size 100000000" SERVER+=" --shm-segment-size 100000000"
SERVER+=" --shm-monitor true"
SERVER+=" --control static --color false" SERVER+=" --control static --color false"
SERVER+=" --max-iterations 1" SERVER+=" --max-iterations 1"
SERVER+=" --channel-config name=$chan,type=rep,method=bind,rateLogging=0,address=ipc://$chanAddr" SERVER+=" --channel-config name=$chan,type=rep,method=bind,rateLogging=0,address=ipc://$chanAddr"
@@ -47,6 +43,4 @@ SERVER_PID=$!
wait $CLIENT_PID wait $CLIENT_PID
wait $SERVER_PID wait $SERVER_PID
set +e
rm $chanAddr rm $chanAddr
exit 0

View File

@@ -1,5 +1,5 @@
################################################################################ ################################################################################
# Copyright (C) 2012-2025 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH # # Copyright (C) 2012-2022 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
# # # #
# This software is distributed under the terms of the # # This software is distributed under the terms of the #
# GNU Lesser General Public Licence (LGPL) version 3, # # GNU Lesser General Public Licence (LGPL) version 3, #
@@ -63,22 +63,14 @@ if(BUILD_FAIRMQ)
Tools.h Tools.h
TransportFactory.h TransportFactory.h
Transports.h Transports.h
TransportEnum.h
UnmanagedRegion.h UnmanagedRegion.h
options/FairMQProgOptions.h options/FairMQProgOptions.h
runDevice.h runDevice.h
runFairMQDevice.h runFairMQDevice.h
shmem/Common.h shmem/Common.h
shmem/Manager.h
shmem/Message.h
shmem/Monitor.h shmem/Monitor.h
shmem/Poller.h
shmem/Segment.h shmem/Segment.h
shmem/Socket.h
shmem/TransportFactory.h
shmem/UnmanagedRegion.h shmem/UnmanagedRegion.h
shmem/UnmanagedRegionImpl.h
tools/Compiler.h
tools/CppSTL.h tools/CppSTL.h
tools/Exceptions.h tools/Exceptions.h
tools/IO.h tools/IO.h
@@ -102,6 +94,12 @@ if(BUILD_FAIRMQ)
plugins/Builtin.h plugins/Builtin.h
plugins/config/Config.h plugins/config/Config.h
plugins/control/Control.h plugins/control/Control.h
shmem/Message.h
shmem/Poller.h
shmem/UnmanagedRegionImpl.h
shmem/Socket.h
shmem/TransportFactory.h
shmem/Manager.h
zeromq/Common.h zeromq/Common.h
zeromq/Context.h zeromq/Context.h
zeromq/Message.h zeromq/Message.h
@@ -109,9 +107,18 @@ if(BUILD_FAIRMQ)
zeromq/UnmanagedRegion.h zeromq/UnmanagedRegion.h
zeromq/Socket.h zeromq/Socket.h
zeromq/TransportFactory.h zeromq/TransportFactory.h
zeromq/ZMsg.h
) )
if(BUILD_OFI_TRANSPORT)
set(FAIRMQ_PRIVATE_HEADER_FILES ${FAIRMQ_PRIVATE_HEADER_FILES}
ofi/Context.h
ofi/ControlMessages.h
ofi/Message.h
ofi/Socket.h
ofi/TransportFactory.h
)
endif()
########################## ##########################
# libFairMQ source files # # libFairMQ source files #
########################## ##########################
@@ -119,7 +126,7 @@ if(BUILD_FAIRMQ)
Channel.cxx Channel.cxx
Device.cxx Device.cxx
DeviceRunner.cxx DeviceRunner.cxx
EventManager.cxx Error.cxx
JSONParser.cxx JSONParser.cxx
MemoryResources.cxx MemoryResources.cxx
Plugin.cxx Plugin.cxx
@@ -142,6 +149,14 @@ if(BUILD_FAIRMQ)
tools/Unique.cxx tools/Unique.cxx
) )
if(BUILD_OFI_TRANSPORT)
set(FAIRMQ_SOURCE_FILES ${FAIRMQ_SOURCE_FILES}
ofi/Context.cxx
ofi/Message.cxx
ofi/Socket.cxx
)
endif()
################### ###################
# configure files # # configure files #
@@ -165,26 +180,14 @@ if(BUILD_FAIRMQ)
############################ ############################
# preprocessor definitions # # preprocessor definitions #
############################ ############################
target_compile_definitions(${target} PUBLIC target_compile_definitions(${target} PUBLIC BOOST_ERROR_CODE_HEADER_ONLY)
BOOST_ERROR_CODE_HEADER_ONLY
BOOST_ASIO_HAS_HAS_STD_CHRONO
)
if(FAIRMQ_DEBUG_MODE) if(FAIRMQ_DEBUG_MODE)
target_compile_definitions(${target} PUBLIC FAIRMQ_DEBUG_MODE) target_compile_definitions(${target} PUBLIC FAIRMQ_DEBUG_MODE)
endif() endif()
target_compile_definitions(${target} PUBLIC if(BUILD_OFI_TRANSPORT)
FAIRMQ_HAS_STD_FILESYSTEM=${FAIRMQ_HAS_STD_FILESYSTEM} target_compile_definitions(${target} PRIVATE BUILD_OFI_TRANSPORT)
FAIRMQ_HAS_STD_PMR=${FAIRMQ_HAS_STD_PMR}
)
if(DEFINED FAIRMQ_CHANNEL_DEFAULT_AUTOBIND)
# translate CMake boolean (TRUE, FALSE, 0, 1, OFF, ON) into C++ boolean literal (true, false)
if(FAIRMQ_CHANNEL_DEFAULT_AUTOBIND)
set(value "true")
else()
set(value "false")
endif()
target_compile_definitions(${target} PUBLIC FAIRMQ_CHANNEL_DEFAULT_AUTOBIND=${value})
endif() endif()
target_compile_definitions(${target} PUBLIC FAIRMQ_HAS_STD_FILESYSTEM=${FAIRMQ_HAS_STD_FILESYSTEM})
####################### #######################
@@ -202,6 +205,13 @@ if(BUILD_FAIRMQ)
################## ##################
# link libraries # # link libraries #
################## ##################
if(BUILD_OFI_TRANSPORT)
set(OFI_DEPS
asio::asio
asiofi::asiofi
)
endif()
target_link_libraries(${target} target_link_libraries(${target}
INTERFACE # only consumers link against interface dependencies INTERFACE # only consumers link against interface dependencies
Boost::container Boost::container
@@ -212,13 +222,14 @@ if(BUILD_FAIRMQ)
$<$<PLATFORM_ID:Linux>:rt> $<$<PLATFORM_ID:Linux>:rt>
Boost::boost Boost::boost
Boost::program_options Boost::program_options
Boost::filesystem # still needed for Boost.DLL Boost::filesystem
Boost::regex Boost::regex
FairLogger::FairLogger FairLogger::FairLogger
PRIVATE # only libFairMQ links against private dependencies PRIVATE # only libFairMQ links against private dependencies
libzmq libzmq
PicoSHA2 PicoSHA2
${OFI_DEPS}
) )
set_target_properties(${target} PROPERTIES set_target_properties(${target} PROPERTIES
VERSION ${PROJECT_VERSION} VERSION ${PROJECT_VERSION}
@@ -332,3 +343,10 @@ if(BUILD_FAIRMQ)
) )
endforeach() endforeach()
endif() endif()
####################
# external plugins #
####################
if(BUILD_PMIX_PLUGIN)
add_subdirectory(plugins/PMIx)
endif()

View File

@@ -12,7 +12,6 @@
#include <fairmq/Channel.h> #include <fairmq/Channel.h>
#include <fairmq/Properties.h> #include <fairmq/Properties.h>
#include <fairmq/Tools.h> #include <fairmq/Tools.h>
#include <fairmq/Transports.h>
#include <random> #include <random>
#include <regex> #include <regex>
#include <set> #include <set>
@@ -384,10 +383,4 @@ bool Channel::BindEndpoint(string& endpoint)
} }
} }
std::string Channel::GetTransportName() const { return TransportName(fTransportType); }
Transport Channel::GetTransportType() const { return fTransportType; }
void Channel::UpdateTransport(const std::string& transport) { fTransportType = TransportType(transport); Invalidate(); }
} // namespace fair::mq } // namespace fair::mq

View File

@@ -14,12 +14,11 @@
#include <fairmq/Properties.h> #include <fairmq/Properties.h>
#include <fairmq/Socket.h> #include <fairmq/Socket.h>
#include <fairmq/TransportFactory.h> #include <fairmq/TransportFactory.h>
#include <fairmq/TransportEnum.h> #include <fairmq/Transports.h>
#include <fairmq/UnmanagedRegion.h> #include <fairmq/UnmanagedRegion.h>
#include <cstdint> // int64_t #include <cstdint> // int64_t
#include <memory> // unique_ptr, shared_ptr #include <memory> // unique_ptr, shared_ptr
#include <ostream>
#include <stdexcept> #include <stdexcept>
#include <string> #include <string>
#include <utility> // std::move #include <utility> // std::move
@@ -145,11 +144,11 @@ class Channel
/// Get channel transport name ("default", "zeromq" or "shmem") /// Get channel transport name ("default", "zeromq" or "shmem")
/// @return Returns channel transport name (e.g. "default", "zeromq" or "shmem") /// @return Returns channel transport name (e.g. "default", "zeromq" or "shmem")
std::string GetTransportName() const; std::string GetTransportName() const { return TransportName(fTransportType); }
/// Get channel transport type /// Get channel transport type
/// @return Returns channel transport type /// @return Returns channel transport type
mq::Transport GetTransportType() const; mq::Transport GetTransportType() const { return fTransportType; }
/// Get socket send buffer size (in number of messages) /// Get socket send buffer size (in number of messages)
/// @return Returns socket send buffer size (in number of messages) /// @return Returns socket send buffer size (in number of messages)
@@ -221,7 +220,7 @@ class Channel
/// Set channel transport /// Set channel transport
/// @param transport transport string ("default", "zeromq" or "shmem") /// @param transport transport string ("default", "zeromq" or "shmem")
void UpdateTransport(const std::string& transport); void UpdateTransport(const std::string& transport) { fTransportType = TransportType(transport); Invalidate(); }
/// Set socket send buffer size /// Set socket send buffer size
/// @param sndBufSize Socket send buffer size (in number of messages) /// @param sndBufSize Socket send buffer size (in number of messages)
@@ -379,19 +378,7 @@ class Channel
static constexpr int DefaultRateLogging = 1; static constexpr int DefaultRateLogging = 1;
static constexpr int DefaultPortRangeMin = 22000; static constexpr int DefaultPortRangeMin = 22000;
static constexpr int DefaultPortRangeMax = 23000; static constexpr int DefaultPortRangeMax = 23000;
#ifdef FAIRMQ_CHANNEL_DEFAULT_AUTOBIND
static constexpr bool DefaultAutoBind = FAIRMQ_CHANNEL_DEFAULT_AUTOBIND;
#else
static constexpr bool DefaultAutoBind = true; static constexpr bool DefaultAutoBind = true;
#endif
friend std::ostream& operator<<(std::ostream& os, const Channel& ch)
{
return os << "name: " << ch.fName
<< ", type: " << ch.fType
<< ", method: " << ch.fMethod
<< ", address: " << ch.fAddress;
}
private: private:
std::shared_ptr<TransportFactory> fTransportFactory; std::shared_ptr<TransportFactory> fTransportFactory;
@@ -429,16 +416,16 @@ class Channel
msg.get() msg.get()
)); ));
msg.release(); msg.release();
msg = std::move(msgWrapper); msg = move(msgWrapper);
} else { } else {
MessagePtr newMsg(NewMessage()); MessagePtr newMsg(NewMessage());
msg = std::move(newMsg); msg = move(newMsg);
} }
} }
} }
void CheckSendCompatibility(Parts& parts) { CheckSendCompatibility(parts.fParts); } void CheckSendCompatibility(Parts& parts) { CheckSendCompatibility(parts.fParts); }
void CheckSendCompatibility(Parts::container & msgVec) void CheckSendCompatibility(std::vector<MessagePtr>& msgVec)
{ {
for (auto& msg : msgVec) { for (auto& msg : msgVec) {
if (fTransportType != msg->GetType()) { if (fTransportType != msg->GetType()) {
@@ -450,10 +437,10 @@ class Channel
msg.get() msg.get()
)); ));
msg.release(); msg.release();
msg = std::move(msgWrapper); msg = move(msgWrapper);
} else { } else {
MessagePtr newMsg(NewMessage()); MessagePtr newMsg(NewMessage());
msg = std::move(newMsg); msg = move(newMsg);
} }
} }
} }
@@ -463,18 +450,18 @@ class Channel
{ {
if (fTransportType != msg->GetType()) { if (fTransportType != msg->GetType()) {
MessagePtr newMsg(NewMessage()); MessagePtr newMsg(NewMessage());
msg = std::move(newMsg); msg = move(newMsg);
} }
} }
void CheckReceiveCompatibility(Parts& parts) { CheckReceiveCompatibility(parts.fParts); } void CheckReceiveCompatibility(Parts& parts) { CheckReceiveCompatibility(parts.fParts); }
void CheckReceiveCompatibility(Parts::container& msgVec) void CheckReceiveCompatibility(std::vector<MessagePtr>& msgVec)
{ {
for (auto& msg : msgVec) { for (auto& msg : msgVec) {
if (fTransportType != msg->GetType()) { if (fTransportType != msg->GetType()) {
MessagePtr newMsg(NewMessage()); MessagePtr newMsg(NewMessage());
msg = std::move(newMsg); msg = move(newMsg);
} }
} }
} }

View File

@@ -1,25 +1,19 @@
/******************************************************************************** /********************************************************************************
* Copyright (C) 2012-2023 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * Copyright (C) 2012-2022 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* * * *
* This software is distributed under the terms of the * * This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, * * GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" * * copied verbatim in the file "LICENSE" *
********************************************************************************/ ********************************************************************************/
// FairMQ #include <algorithm> // std::max, std::any_of
#include <boost/algorithm/string.hpp> // join/split
#include <chrono>
#include <fairmq/Device.h> #include <fairmq/Device.h>
#include <fairmq/Tools.h> #include <fairmq/Tools.h>
#include <fairmq/Transports.h>
// boost
#include <boost/algorithm/string.hpp> // join/split
// std
#include <algorithm> // std::max, std::any_of
#include <chrono>
#include <iomanip> #include <iomanip>
#include <list> #include <list>
#include <memory> // std::make_unique #include <memory> // std::make_unique
#include <mutex> #include <mutex>
#include <thread> #include <thread>
@@ -78,9 +72,6 @@ Device::Device(ProgOptions& config, tools::Version version)
: Device(&config, version) : Device(&config, version)
{} {}
/// TODO: Remove this once Device::fChannels is no longer public
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
Device::Device(ProgOptions* config, tools::Version version) Device::Device(ProgOptions* config, tools::Version version)
: fTransportFactory(nullptr) : fTransportFactory(nullptr)
, fInternalConfig(config ? nullptr : make_unique<ProgOptions>()) , fInternalConfig(config ? nullptr : make_unique<ProgOptions>())
@@ -92,15 +83,18 @@ Device::Device(ProgOptions* config, tools::Version version)
, fVersion(version) , fVersion(version)
, fRate(DefaultRate) , fRate(DefaultRate)
, fInitializationTimeoutInS(DefaultInitTimeout) , fInitializationTimeoutInS(DefaultInitTimeout)
, fTransitioning(false)
{ {
SubscribeToNewTransition("device", [&](Transition transition) { SubscribeToNewTransition("device", [&](Transition transition) {
LOG(trace) << "device notified on new transition: " << transition; LOG(trace) << "device notified on new transition: " << transition;
InterruptTransports();
});
fStateMachine.PrepareState([&](State state) { switch (transition) {
LOG(trace) << "Resuming transports for " << state << " state"; case Transition::Stop:
ResumeTransports(); UnblockTransports();
break;
default:
break;
}
}); });
fStateMachine.HandleStates([&](State state) { fStateMachine.HandleStates([&](State state) {
@@ -141,7 +135,73 @@ Device::Device(ProgOptions* config, tools::Version version)
fStateMachine.Start(); fStateMachine.Start();
} }
#pragma GCC diagnostic pop
void Device::TransitionTo(State s)
{
{
lock_guard<mutex> lock(fTransitionMtx);
if (fTransitioning) {
LOG(debug) << "Attempting a transition with TransitionTo() while another one is already in progress";
throw OngoingTransition("Attempting a transition with TransitionTo() while another one is already in progress");
}
fTransitioning = true;
}
using mq::State;
StateQueue sq;
StateSubscription ss(tools::ToString(fId, ".TransitionTo"), fStateMachine, sq);
State currentState = GetCurrentState();
while (s != currentState) {
switch (currentState) {
case State::Idle:
if (s == State::Exiting) { ChangeState(Transition::End); }
else { ChangeState(Transition::InitDevice); }
break;
case State::InitializingDevice:
ChangeState(Transition::CompleteInit);
break;
case State::Initialized:
if (s == State::Exiting || s == State::Idle) { ChangeState(Transition::ResetDevice); }
else { ChangeState(Transition::Bind); }
break;
case State::Bound:
if (s == State::DeviceReady || s == State::Ready || s == State::Running) { ChangeState(Transition::Connect); }
else { ChangeState(Transition::ResetDevice); }
break;
case State::DeviceReady:
if (s == State::Running || s == State::Ready) { ChangeState(Transition::InitTask); }
else { ChangeState(Transition::ResetDevice); }
break;
case State::Ready:
if (s == State::Running) { ChangeState(Transition::Run); }
else { ChangeState(Transition::ResetTask); }
break;
case State::Running:
ChangeState(Transition::Stop);
break;
case State::Binding:
case State::Connecting:
case State::InitializingTask:
case State::ResettingDevice:
case State::ResettingTask:
LOG(debug) << "TransitionTo ignoring state: " << currentState << " (expected, automatic transition).";
break;
default:
LOG(debug) << "TransitionTo ignoring state: " << currentState;
break;
}
currentState = sq.WaitForNext();
}
{
lock_guard<mutex> lock(fTransitionMtx);
fTransitioning = false;
}
}
void Device::InitWrapper() void Device::InitWrapper()
{ {
@@ -166,7 +226,7 @@ void Device::InitWrapper()
unordered_map<string, int> infos = fConfig->GetChannelInfo(); unordered_map<string, int> infos = fConfig->GetChannelInfo();
for (const auto& info : infos) { for (const auto& info : infos) {
for (int i = 0; i < info.second; ++i) { for (int i = 0; i < info.second; ++i) {
GetChannels()[info.first].emplace_back(info.first, i, fConfig->GetPropertiesStartingWith(tools::ToString("chans.", info.first, ".", i, "."))); fChannels[info.first].emplace_back(info.first, i, fConfig->GetPropertiesStartingWith(tools::ToString("chans.", info.first, ".", i, ".")));
} }
} }
@@ -176,7 +236,8 @@ void Device::InitWrapper()
string networkInterface = fConfig->GetProperty<string>("network-interface", DefaultNetworkInterface); string networkInterface = fConfig->GetProperty<string>("network-interface", DefaultNetworkInterface);
// Fill the uninitialized channel containers // Fill the uninitialized channel containers
for (auto& channel : GetChannels()) { for (auto& channel : fChannels) {
int subChannelIndex = 0;
for (auto& subChannel : channel.second) { for (auto& subChannel : channel.second) {
// set channel transport // set channel transport
LOG(debug) << "Initializing transport for channel " << subChannel.fName << ": " << TransportNames.at(subChannel.fTransportType); LOG(debug) << "Initializing transport for channel " << subChannel.fName << ": " << TransportNames.at(subChannel.fTransportType);
@@ -208,10 +269,12 @@ void Device::InitWrapper()
LOG(error) << "Cannot update configuration. Socket method (bind/connect) for channel '" << subChannel.fName << "' not specified."; LOG(error) << "Cannot update configuration. Socket method (bind/connect) for channel '" << subChannel.fName << "' not specified.";
throw runtime_error(tools::ToString("Cannot update configuration. Socket method (bind/connect) for channel ", subChannel.fName, " not specified.")); throw runtime_error(tools::ToString("Cannot update configuration. Socket method (bind/connect) for channel ", subChannel.fName, " not specified."));
} }
subChannelIndex++;
} }
} }
// ChangeStateOrThrow(Transition::Auto); // ChangeState(Transition::Auto);
} }
void Device::BindWrapper() void Device::BindWrapper()
@@ -228,7 +291,7 @@ void Device::BindWrapper()
Bind(); Bind();
if (!NewStatePending()) { if (!NewStatePending()) {
ChangeStateOrThrow(Transition::Auto); ChangeState(Transition::Auto);
} }
} }
@@ -241,7 +304,7 @@ void Device::ConnectWrapper()
// first attempt // first attempt
AttachChannels(fUninitializedConnectingChannels); AttachChannels(fUninitializedConnectingChannels);
// if not all channels could be connected, update their address values from config and retry // if not all channels could be connected, update their address values from config and retry
while (!fUninitializedConnectingChannels.empty() && !NewStatePending()) { while (!fUninitializedConnectingChannels.empty()) {
this_thread::sleep_for(chrono::milliseconds(sleepTimeInMS)); this_thread::sleep_for(chrono::milliseconds(sleepTimeInMS));
for (auto& chan : fUninitializedConnectingChannels) { for (auto& chan : fUninitializedConnectingChannels) {
@@ -254,24 +317,20 @@ void Device::ConnectWrapper()
if (numAttempts++ > maxAttempts) { if (numAttempts++ > maxAttempts) {
LOG(error) << "could not connect all channels after " << fInitializationTimeoutInS << " attempts"; LOG(error) << "could not connect all channels after " << fInitializationTimeoutInS << " attempts";
LOG(error) << "following channels are still invalid:";
for (auto& chan : fUninitializedConnectingChannels) {
LOG(error) << "channel: " << *chan;
}
throw runtime_error(tools::ToString("could not connect all channels after ", fInitializationTimeoutInS, " attempts")); throw runtime_error(tools::ToString("could not connect all channels after ", fInitializationTimeoutInS, " attempts"));
} }
AttachChannels(fUninitializedConnectingChannels); AttachChannels(fUninitializedConnectingChannels);
} }
if (GetChannels().empty()) { if (fChannels.empty()) {
LOG(warn) << "No channels created after finishing initialization"; LOG(warn) << "No channels created after finishing initialization";
} }
Connect(); Connect();
if (!NewStatePending()) { if (!NewStatePending()) {
ChangeStateOrThrow(Transition::Auto); ChangeState(Transition::Auto);
} }
} }
@@ -286,7 +345,7 @@ void Device::AttachChannels(vector<Channel*>& chans)
// remove the channel from the uninitialized container // remove the channel from the uninitialized container
itr = chans.erase(itr); itr = chans.erase(itr);
} else { } else {
LOG(error) << "failed to attach channel " << (*itr)->fName << " (" << (*itr)->fMethod << " on " << (*itr)->fAddress << ")"; LOG(error) << "failed to attach channel " << (*itr)->fName << " (" << (*itr)->fMethod << ")";
++itr; ++itr;
} }
} else { } else {
@@ -373,7 +432,7 @@ void Device::InitTaskWrapper()
InitTask(); InitTask();
if (!NewStatePending()) { if (!NewStatePending()) {
ChangeStateOrThrow(Transition::Auto); ChangeState(Transition::Auto);
} }
} }
@@ -383,7 +442,7 @@ void Device::RunWrapper()
unique_ptr<thread> rateLogger; unique_ptr<thread> rateLogger;
// Check if rate logging thread is needed // Check if rate logging thread is needed
const bool rateLogging = any_of(GetChannels().cbegin(), GetChannels().cend(), [](auto ch) { const bool rateLogging = any_of(fChannels.cbegin(), fChannels.cend(), [](auto ch) {
return any_of(ch.second.cbegin(), ch.second.cend(), [](auto sub) { return sub.fRateLogging > 0; }); return any_of(ch.second.cbegin(), ch.second.cend(), [](auto sub) { return sub.fRateLogging > 0; });
}); });
@@ -394,9 +453,15 @@ void Device::RunWrapper()
if (rateLogging && rateLogger->joinable()) { rateLogger->join(); } if (rateLogging && rateLogger->joinable()) { rateLogger->join(); }
}); });
// notify transports to resume transfers
for (auto& t : fTransports) {
t.second->Resume();
}
// change to Error state in case of an exception, to release LogSocketRates // change to Error state in case of an exception, to release LogSocketRates
tools::CallOnDestruction cod([&](){ tools::CallOnDestruction cod([&](){
ChangeStateOrThrow(Transition::ErrorFound); ChangeState(Transition::ErrorFound);
}); });
PreRun(); PreRun();
@@ -404,7 +469,7 @@ void Device::RunWrapper()
// process either data callbacks or ConditionalRun/Run // process either data callbacks or ConditionalRun/Run
if (fDataCallbacks) { if (fDataCallbacks) {
// if only one input channel, do lightweight handling without additional polling. // if only one input channel, do lightweight handling without additional polling.
if (fInputChannelKeys.size() == 1 && GetChannels().at(fInputChannelKeys.at(0)).size() == 1) { if (fInputChannelKeys.size() == 1 && fChannels.at(fInputChannelKeys.at(0)).size() == 1) {
HandleSingleChannelInput(); HandleSingleChannelInput();
} else {// otherwise do full handling with polling } else {// otherwise do full handling with polling
HandleMultipleChannelInput(); HandleMultipleChannelInput();
@@ -423,7 +488,8 @@ void Device::RunWrapper()
// if Run() exited and the state is still RUNNING, transition to READY. // if Run() exited and the state is still RUNNING, transition to READY.
if (!NewStatePending()) { if (!NewStatePending()) {
ChangeStateOrThrow(Transition::Stop); UnblockTransports();
ChangeState(Transition::Stop);
} }
PostRun(); PostRun();
@@ -451,7 +517,7 @@ void Device::HandleMultipleChannelInput()
// check if more than one transport is used // check if more than one transport is used
fMultitransportInputs.clear(); fMultitransportInputs.clear();
for (const auto& k : fInputChannelKeys) { for (const auto& k : fInputChannelKeys) {
mq::Transport t = GetChannel(k, 0).fTransportType; mq::Transport t = fChannels.at(k).at(0).fTransportType;
if (fMultitransportInputs.find(t) == fMultitransportInputs.end()) { if (fMultitransportInputs.find(t) == fMultitransportInputs.end()) {
fMultitransportInputs.insert(pair<mq::Transport, vector<string>>(t, vector<string>())); fMultitransportInputs.insert(pair<mq::Transport, vector<string>>(t, vector<string>()));
fMultitransportInputs.at(t).push_back(k); fMultitransportInputs.at(t).push_back(k);
@@ -461,13 +527,13 @@ void Device::HandleMultipleChannelInput()
} }
for (const auto& mi : fMsgInputs) { for (const auto& mi : fMsgInputs) {
for (auto& i : GetChannels().at(mi.first)) { for (auto& i : fChannels.at(mi.first)) {
i.fMultipart = false; i.fMultipart = false;
} }
} }
for (const auto& mi : fMultipartInputs) { for (const auto& mi : fMultipartInputs) {
for (auto& i : GetChannels().at(mi.first)) { for (auto& i : fChannels.at(mi.first)) {
i.fMultipart = true; i.fMultipart = true;
} }
} }
@@ -478,16 +544,16 @@ void Device::HandleMultipleChannelInput()
} else { // otherwise poll directly } else { // otherwise poll directly
bool proceed = true; bool proceed = true;
PollerPtr poller(GetChannel(fInputChannelKeys.at(0), 0).fTransportFactory->CreatePoller(GetChannels(), fInputChannelKeys)); PollerPtr poller(fChannels.at(fInputChannelKeys.at(0)).at(0).fTransportFactory->CreatePoller(fChannels, fInputChannelKeys));
while (!NewStatePending() && proceed) { while (!NewStatePending() && proceed) {
poller->Poll(200); poller->Poll(200);
// check which inputs are ready and call their data handlers if they are. // check which inputs are ready and call their data handlers if they are.
for (const auto& ch : fInputChannelKeys) { for (const auto& ch : fInputChannelKeys) {
for (unsigned int i = 0; i < GetChannels().at(ch).size(); ++i) { for (unsigned int i = 0; i < fChannels.at(ch).size(); ++i) {
if (poller->CheckInput(ch, i)) { if (poller->CheckInput(ch, i)) {
if (GetChannel(ch, i).fMultipart) { if (fChannels.at(ch).at(i).fMultipart) {
proceed = HandleMultipartInput(ch, fMultipartInputs.at(ch), i); proceed = HandleMultipartInput(ch, fMultipartInputs.at(ch), i);
} else { } else {
proceed = HandleMsgInput(ch, fMsgInputs.at(ch), i); proceed = HandleMsgInput(ch, fMsgInputs.at(ch), i);
@@ -524,13 +590,13 @@ void Device::HandleMultipleTransportInput()
void Device::PollForTransport(const TransportFactory* factory, const vector<string>& channelKeys) void Device::PollForTransport(const TransportFactory* factory, const vector<string>& channelKeys)
{ {
try { try {
PollerPtr poller(factory->CreatePoller(GetChannels(), channelKeys)); PollerPtr poller(factory->CreatePoller(fChannels, channelKeys));
while (!NewStatePending() && fMultitransportProceed) { while (!NewStatePending() && fMultitransportProceed) {
poller->Poll(500); poller->Poll(500);
for (const auto& ch : channelKeys) { for (const auto& ch : channelKeys) {
for (unsigned int i = 0; i < GetChannels().at(ch).size(); ++i) { for (unsigned int i = 0; i < fChannels.at(ch).size(); ++i) {
if (poller->CheckInput(ch, i)) { if (poller->CheckInput(ch, i)) {
lock_guard<mutex> lock(fMultitransportMutex); lock_guard<mutex> lock(fMultitransportMutex);
@@ -538,7 +604,7 @@ void Device::PollForTransport(const TransportFactory* factory, const vector<stri
break; break;
} }
if (GetChannel(ch, i).fMultipart) { if (fChannels.at(ch).at(i).fMultipart) {
fMultitransportProceed = HandleMultipartInput(ch, fMultipartInputs.at(ch), i); fMultitransportProceed = HandleMultipartInput(ch, fMultipartInputs.at(ch), i);
} else { } else {
fMultitransportProceed = HandleMsgInput(ch, fMsgInputs.at(ch), i); fMultitransportProceed = HandleMsgInput(ch, fMsgInputs.at(ch), i);
@@ -562,7 +628,7 @@ void Device::PollForTransport(const TransportFactory* factory, const vector<stri
bool Device::HandleMsgInput(const string& chName, const InputMsgCallback& callback, int i) bool Device::HandleMsgInput(const string& chName, const InputMsgCallback& callback, int i)
{ {
unique_ptr<Message> input(GetChannel(chName, i).fTransportFactory->CreateMessage()); unique_ptr<Message> input(fChannels.at(chName).at(i).fTransportFactory->CreateMessage());
if (Receive(input, chName, i) >= 0) { if (Receive(input, chName, i) >= 0) {
return callback(input, i); return callback(input, i);
@@ -584,8 +650,6 @@ bool Device::HandleMultipartInput(const string& chName, const InputMultipartCall
shared_ptr<TransportFactory> Device::AddTransport(mq::Transport transport) shared_ptr<TransportFactory> Device::AddTransport(mq::Transport transport)
{ {
lock_guard<mutex> lock(fTransportMtx);
if (transport == mq::Transport::DEFAULT) { if (transport == mq::Transport::DEFAULT) {
transport = fDefaultTransportType; transport = fDefaultTransportType;
} }
@@ -619,7 +683,7 @@ void Device::LogSocketRates()
size_t chanNameLen = 0; size_t chanNameLen = 0;
// iterate over the channels map // iterate over the channels map
for (auto& channel : GetChannels()) { for (auto& channel : fChannels) {
// iterate over the channels vector // iterate over the channels vector
for (auto& subChannel : channel.second) { for (auto& subChannel : channel.second) {
if (subChannel.fRateLogging > 0) { if (subChannel.fRateLogging > 0) {
@@ -703,19 +767,10 @@ void Device::LogSocketRates()
} }
} }
void Device::InterruptTransports() void Device::UnblockTransports()
{ {
lock_guard<mutex> lock(fTransportMtx); for (auto& transport : fTransports) {
for (auto& [transportType, transport] : fTransports) { transport.second->Interrupt();
transport->Interrupt();
}
}
void Device::ResumeTransports()
{
lock_guard<mutex> lock(fTransportMtx);
for (auto& [transportType, transport] : fTransports) {
transport->Resume();
} }
} }
@@ -724,38 +779,31 @@ void Device::ResetTaskWrapper()
ResetTask(); ResetTask();
if (!NewStatePending()) { if (!NewStatePending()) {
ChangeStateOrThrow(Transition::Auto); ChangeState(Transition::Auto);
} }
} }
void Device::ResetWrapper() void Device::ResetWrapper()
{ {
{ for (auto& transport : fTransports) {
lock_guard<mutex> lock(fTransportMtx); transport.second->Reset();
for (auto& [transportType, transport] : fTransports) {
transport->Reset();
}
fTransports.clear();
} }
Reset(); Reset();
GetChannels().clear(); fChannels.clear();
fTransports.clear();
fTransportFactory.reset(); fTransportFactory.reset();
if (!NewStatePending()) { if (!NewStatePending()) {
ChangeStateOrThrow(Transition::Auto); ChangeState(Transition::Auto);
} }
} }
/// TODO: Remove this once Device::fChannels is no longer public
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
Device::~Device() Device::~Device()
{ {
UnsubscribeFromNewTransition("device"); UnsubscribeFromNewTransition("device");
fStateMachine.StopHandlingStates(); fStateMachine.StopHandlingStates();
LOG(debug) << "Shutting down device " << fId; LOG(debug) << "Shutting down device " << fId;
} }
#pragma GCC diagnostic pop
} // namespace fair::mq } // namespace fair::mq

View File

@@ -1,5 +1,5 @@
/******************************************************************************** /********************************************************************************
* Copyright (C) 2021-2023 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * Copyright (C) 2021-2022 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* * * *
* This software is distributed under the terms of the * * This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, * * GNU Lesser General Public Licence (LGPL) version 3, *
@@ -9,9 +9,12 @@
#ifndef FAIR_MQ_DEVICE_H #ifndef FAIR_MQ_DEVICE_H
#define FAIR_MQ_DEVICE_H #define FAIR_MQ_DEVICE_H
// FairMQ #include <algorithm> // find
#include <atomic>
#include <chrono>
#include <cstddef>
#include <fairlogger/Logger.h>
#include <fairmq/Channel.h> #include <fairmq/Channel.h>
#include <fairmq/Error.h>
#include <fairmq/Message.h> #include <fairmq/Message.h>
#include <fairmq/Parts.h> #include <fairmq/Parts.h>
#include <fairmq/ProgOptions.h> #include <fairmq/ProgOptions.h>
@@ -19,17 +22,8 @@
#include <fairmq/StateQueue.h> #include <fairmq/StateQueue.h>
#include <fairmq/Tools.h> #include <fairmq/Tools.h>
#include <fairmq/TransportFactory.h> #include <fairmq/TransportFactory.h>
#include <fairmq/TransportEnum.h> #include <fairmq/Transports.h>
#include <fairmq/UnmanagedRegion.h> #include <fairmq/UnmanagedRegion.h>
// logger
#include <fairlogger/Logger.h>
// std
#include <algorithm> // find
#include <atomic>
#include <chrono>
#include <cstddef>
#include <functional> #include <functional>
#include <memory> // unique_ptr #include <memory> // unique_ptr
#include <mutex> #include <mutex>
@@ -234,7 +228,7 @@ class Device
} }
} }
return GetChannel(chans.at(0), 0).Transport()->CreatePoller(GetChannels(), chans); return GetChannel(chans.at(0), 0).Transport()->CreatePoller(fChannels, chans);
} }
PollerPtr NewPoller(const std::vector<Channel*>& channels) PollerPtr NewPoller(const std::vector<Channel*>& channels)
@@ -322,7 +316,7 @@ class Device
Channel& GetChannel(const std::string& channelName, const int index = 0) Channel& GetChannel(const std::string& channelName, const int index = 0)
try { try {
return GetChannels().at(channelName).at(index); return fChannels.at(channelName).at(index);
} catch (const std::out_of_range& oor) { } catch (const std::out_of_range& oor) {
LOG(error) << "GetChannel(): '" << channelName << "[" << index << "]' does not exist."; LOG(error) << "GetChannel(): '" << channelName << "[" << index << "]' does not exist.";
throw; throw;
@@ -330,7 +324,7 @@ class Device
size_t GetNumSubChannels(const std::string& channelName) size_t GetNumSubChannels(const std::string& channelName)
try { try {
return GetChannels().at(channelName).size(); return fChannels.at(channelName).size();
} catch (const std::out_of_range& oor) { } catch (const std::out_of_range& oor) {
LOG(error) << "GetNumSubChannels(): '" << channelName << "' does not exist."; LOG(error) << "GetNumSubChannels(): '" << channelName << "' does not exist.";
throw; throw;
@@ -341,7 +335,7 @@ class Device
/// @param index sub-channel /// @param index sub-channel
unsigned long GetNumberOfConnectedPeers(const std::string& channelName, int index = 0) unsigned long GetNumberOfConnectedPeers(const std::string& channelName, int index = 0)
{ {
return GetChannel(channelName, index).GetNumberOfConnectedPeers(); return fChannels.at(channelName).at(index).GetNumberOfConnectedPeers();
} }
virtual void RegisterChannelEndpoints() {} virtual void RegisterChannelEndpoints() {}
@@ -439,23 +433,7 @@ class Device
fTransports; ///< Container for transports fTransports; ///< Container for transports
public: public:
[[deprecated("Use GetChannels() instead.")]]
std::unordered_map<std::string, std::vector<Channel>> fChannels; ///< Device channels std::unordered_map<std::string, std::vector<Channel>> fChannels; ///< Device channels
std::unordered_map<std::string, std::vector<Channel>>& GetChannels()
{
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
return fChannels;
#pragma GCC diagnostic pop
}
std::unordered_map<std::string, std::vector<Channel>> const& GetChannels() const
{
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
return fChannels;
#pragma GCC diagnostic pop
}
std::unique_ptr<ProgOptions> fInternalConfig; ///< Internal program options configuration std::unique_ptr<ProgOptions> fInternalConfig; ///< Internal program options configuration
ProgOptions* fConfig; ///< Pointer to config (internal or external) ProgOptions* fConfig; ///< Pointer to config (internal or external)
@@ -499,49 +477,21 @@ class Device
public: public:
/// @brief Request a device state transition /// @brief Request a device state transition
/// @param transition state transition /// @param transition state transition
/// @return whether the transition was successfully scheduled
/// ///
/// The state transition may not happen immediately, but when the current state evaluates the /// The state transition may not happen immediately, but when the current state evaluates the
/// pending transition event and terminates. In other words, the device states are scheduled /// pending transition event and terminates. In other words, the device states are scheduled
/// cooperatively. /// cooperatively.
[[nodiscard]] bool ChangeState(const Transition transition) bool ChangeState(const Transition transition) { return fStateMachine.ChangeState(transition); }
{
return fStateMachine.ChangeState(transition);
}
/// @brief Request a device state transition /// @brief Request a device state transition
/// @param transition state transition /// @param transition state transition
/// @return whether the transition was successfully scheduled
/// ///
/// The state transition may not happen immediately, but when the current state evaluates the /// The state transition may not happen immediately, but when the current state evaluates the
/// pending transition event and terminates. In other words, the device states are scheduled /// pending transition event and terminates. In other words, the device states are scheduled
/// cooperatively. /// cooperatively.
[[nodiscard]] bool ChangeState(const std::string& transition) bool ChangeState(const std::string& transition)
{ {
return fStateMachine.ChangeState(GetTransition(transition)); return fStateMachine.ChangeState(GetTransition(transition));
} }
/// @brief Request a device state transition
/// @param transition state transition
/// @throws when the transition could not have been scheduled
///
/// Throwing version of Device::ChangeState().
void ChangeStateOrThrow(Transition transition)
{
if(!ChangeState(transition)) {
auto const err = MakeErrorCode(ErrorCode::DeviceChangeStateFailed);
throw std::system_error(err.value(),
err.category(),
tools::ToString("Invalid transition: ", transition));
}
}
/// @brief Request a device state transition
/// @param transition state transition
/// @throws when the transition could not have been scheduled
///
/// Throwing version of Device::ChangeState().
void ChangeStateOrThrow(std::string const& transition)
{
ChangeStateOrThrow(GetTransition(transition));
}
/// @brief waits for the next state (any) to occur /// @brief waits for the next state (any) to occur
State WaitForNextState() { return fStateQueue.WaitForNext(); } State WaitForNextState() { return fStateQueue.WaitForNext(); }
@@ -552,6 +502,8 @@ class Device
/// @param state state to wait for /// @param state state to wait for
void WaitForState(const std::string& state) { WaitForState(GetState(state)); } void WaitForState(const std::string& state) { WaitForState(GetState(state)); }
void TransitionTo(State state);
/// @brief Subscribe with a callback to state changes /// @brief Subscribe with a callback to state changes
/// @param key id to identify your subscription /// @param key id to identify your subscription
/// @param callback callback (called with the new state as the parameter) /// @param callback callback (called with the new state as the parameter)
@@ -636,10 +588,7 @@ class Device
void ResetWrapper(); void ResetWrapper();
/// Notifies transports to cease any blocking activity /// Notifies transports to cease any blocking activity
void InterruptTransports(); void UnblockTransports();
/// Notifies transports to resume any blocking activity
void ResumeTransports();
/// Shuts down the transports and the device /// Shuts down the transports and the device
void Exit() {} void Exit() {}
@@ -673,12 +622,15 @@ class Device
const tools::Version fVersion; const tools::Version fVersion;
float fRate; ///< Rate limiting for ConditionalRun float fRate; ///< Rate limiting for ConditionalRun
uint64_t fMaxRunRuntimeInS; ///< Maximum runtime for the Running state handler, after which
///< state will change to Ready (in seconds, 0 for no limit).
int fInitializationTimeoutInS; int fInitializationTimeoutInS;
std::vector<std::string> fRawCmdLineArgs; std::vector<std::string> fRawCmdLineArgs;
StateQueue fStateQueue; StateQueue fStateQueue;
std::mutex fTransportMtx; ///< guards access to transports container std::mutex fTransitionMtx;
bool fTransitioning;
}; };
} // namespace fair::mq } // namespace fair::mq

View File

@@ -152,7 +152,7 @@ auto DeviceRunner::Run() -> int
fDevice->RegisterChannelEndpoints(); fDevice->RegisterChannelEndpoints();
if (fConfig.Count("print-channels")) { if (fConfig.Count("print-channels")) {
fDevice->PrintRegisteredChannels(); fDevice->PrintRegisteredChannels();
fDevice->ChangeStateOrThrow(fair::mq::Transition::End); fDevice->ChangeState(fair::mq::Transition::End);
return 0; return 0;
} }
@@ -160,7 +160,7 @@ auto DeviceRunner::Run() -> int
if (fConfig.Count("version")) { if (fConfig.Count("version")) {
LOGV(info, verylow) << "FairMQ version: " << FAIRMQ_GIT_VERSION; LOGV(info, verylow) << "FairMQ version: " << FAIRMQ_GIT_VERSION;
LOGV(info, verylow) << "User device version: " << fDevice->GetVersion(); LOGV(info, verylow) << "User device version: " << fDevice->GetVersion();
fDevice->ChangeStateOrThrow(fair::mq::Transition::End); fDevice->ChangeState(fair::mq::Transition::End);
return 0; return 0;
} }

39
fairmq/Error.cxx Normal file
View File

@@ -0,0 +1,39 @@
/********************************************************************************
* Copyright (C) 2019-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "Error.h"
namespace fair::mq {
const char* ErrorCategory::name() const noexcept { return "fairmq"; }
std::string ErrorCategory::message(int ev) const
{
switch (static_cast<ErrorCode>(ev)) {
case ErrorCode::OperationInProgress:
return "async operation already in progress";
case ErrorCode::OperationTimeout:
return "async operation timed out";
case ErrorCode::OperationCanceled:
return "async operation canceled";
case ErrorCode::DeviceChangeStateFailed:
return "failed to change state of a fairmq device";
case ErrorCode::DeviceGetPropertiesFailed:
return "failed to get fairmq device properties";
case ErrorCode::DeviceSetPropertiesFailed:
return "failed to set fairmq device properties";
default:
return "(unrecognized error)";
}
}
const ErrorCategory errorCategory{};
std::error_code MakeErrorCode(ErrorCode e) { return {static_cast<int>(e), errorCategory}; }
} // namespace fair::mq

View File

@@ -38,34 +38,14 @@ enum class ErrorCode
DeviceSetPropertiesFailed DeviceSetPropertiesFailed
}; };
std::error_code MakeErrorCode(ErrorCode);
struct ErrorCategory : std::error_category struct ErrorCategory : std::error_category
{ {
const char* name() const noexcept override { return "fairmq"; } const char* name() const noexcept override;
std::string message(int ev) const override std::string message(int ev) const override;
{
switch (static_cast<ErrorCode>(ev)) {
case ErrorCode::OperationInProgress:
return "async operation already in progress";
case ErrorCode::OperationTimeout:
return "async operation timed out";
case ErrorCode::OperationCanceled:
return "async operation canceled";
case ErrorCode::DeviceChangeStateFailed:
return "failed to change state of a fairmq device";
case ErrorCode::DeviceGetPropertiesFailed:
return "failed to get fairmq device properties";
case ErrorCode::DeviceSetPropertiesFailed:
return "failed to set fairmq device properties";
default:
return "(unrecognized error)";
}
}
}; };
static ErrorCategory ec;
inline std::error_code MakeErrorCode(ErrorCode e) { return {static_cast<int>(e), ec}; }
} // namespace fair::mq } // namespace fair::mq
namespace std { namespace std {

View File

@@ -1,20 +0,0 @@
/********************************************************************************
* Copyright (C) 2025 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "EventManager.h"
#include <string>
#include <typeindex>
template std::shared_ptr<
fair::mq::EventManager::Signal<fair::mq::PropertyChangeAsString, std::string>>
fair::mq::EventManager::GetSignal<fair::mq::PropertyChangeAsString, std::string>(
const std::pair<std::type_index, std::type_index>& key) const;
template void fair::mq::EventManager::Subscribe<fair::mq::PropertyChangeAsString, std::string>(
const std::string& subscriber,
std::function<void(typename fair::mq::PropertyChangeAsString::KeyType, std::string)>);

View File

@@ -1,5 +1,5 @@
/******************************************************************************** /********************************************************************************
* Copyright (C) 2014-2025 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * Copyright (C) 2014-2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* * * *
* This software is distributed under the terms of the * * This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, * * GNU Lesser General Public Licence (LGPL) version 3, *
@@ -57,8 +57,27 @@ class EventManager
template<typename E, typename ...Args> template<typename E, typename ...Args>
using Signal = boost::signals2::signal<void(typename E::KeyType, Args...)>; using Signal = boost::signals2::signal<void(typename E::KeyType, Args...)>;
template<typename E, typename... Args> template<typename E, typename ...Args>
auto Subscribe(const std::string& subscriber, std::function<void(typename E::KeyType, Args...)> callback) -> void; auto Subscribe(const std::string& subscriber, std::function<void(typename E::KeyType, Args...)> callback) -> void
{
const std::type_index event_type_index{typeid(E)};
const std::type_index callback_type_index{typeid(std::function<void(typename E::KeyType, Args...)>)};
const auto signalsKey = std::make_pair(event_type_index, callback_type_index);
const auto connectionsKey = std::make_pair(subscriber, signalsKey);
const auto connection = GetSignal<E, Args...>(signalsKey)->connect(callback);
{
std::lock_guard<std::mutex> lock{fMutex};
if (fConnections.find(connectionsKey) != fConnections.end())
{
fConnections.at(connectionsKey).disconnect();
fConnections.erase(connectionsKey);
}
fConnections.insert({connectionsKey, connection});
}
}
template<typename E, typename ...Args> template<typename E, typename ...Args>
auto Unsubscribe(const std::string& subscriber) -> void auto Unsubscribe(const std::string& subscriber) -> void
@@ -100,58 +119,21 @@ class EventManager
mutable std::mutex fMutex; mutable std::mutex fMutex;
template<typename E, typename ...Args> template<typename E, typename ...Args>
auto GetSignal(const SignalsKey& key) const -> std::shared_ptr<Signal<E, Args...>>; auto GetSignal(const SignalsKey& key) const -> std::shared_ptr<Signal<E, Args...>>
}; /* class EventManager */
struct PropertyChangeAsString : Event<std::string> {};
template<typename E, typename... Args>
auto EventManager::GetSignal(const SignalsKey& key) const -> std::shared_ptr<Signal<E, Args...>>
{
std::lock_guard<std::mutex> lock{fMutex};
if (fSignals.find(key) == fSignals.end()) {
// wrapper is needed because boost::signals2::signal is neither copyable nor movable
// and I don't know how else to insert it into the map
auto signal = std::make_shared<Signal<E, Args...>>();
fSignals.insert(std::make_pair(key, signal));
}
return boost::any_cast<std::shared_ptr<Signal<E, Args...>>>(fSignals.at(key));
}
template<typename E, typename... Args>
auto EventManager::Subscribe(const std::string& subscriber,
std::function<void(typename E::KeyType, Args...)> callback) -> void
{
const std::type_index event_type_index{typeid(E)};
const std::type_index callback_type_index{
typeid(std::function<void(typename E::KeyType, Args...)>)};
const auto signalsKey = std::make_pair(event_type_index, callback_type_index);
const auto connectionsKey = std::make_pair(subscriber, signalsKey);
const auto connection = GetSignal<E, Args...>(signalsKey)->connect(callback);
{ {
std::lock_guard<std::mutex> lock{fMutex}; std::lock_guard<std::mutex> lock{fMutex};
if (fConnections.find(connectionsKey) != fConnections.end()) { if (fSignals.find(key) == fSignals.end())
fConnections.at(connectionsKey).disconnect(); {
fConnections.erase(connectionsKey); // wrapper is needed because boost::signals2::signal is neither copyable nor movable
// and I don't know how else to insert it into the map
auto signal = std::make_shared<Signal<E, Args...>>();
fSignals.insert(std::make_pair(key, signal));
} }
fConnections.insert({connectionsKey, connection});
return boost::any_cast<std::shared_ptr<Signal<E, Args...>>>(fSignals.at(key));
} }
} }; /* class EventManager */
extern template std::shared_ptr<
fair::mq::EventManager::Signal<fair::mq::PropertyChangeAsString, std::string>>
fair::mq::EventManager::GetSignal<fair::mq::PropertyChangeAsString, std::string>(
const std::pair<std::type_index, std::type_index>& key) const;
extern template void
fair::mq::EventManager::Subscribe<fair::mq::PropertyChangeAsString, std::string>(
const std::string& subscriber,
std::function<void(typename fair::mq::PropertyChangeAsString::KeyType, std::string)>);
} // namespace fair::mq } // namespace fair::mq

View File

@@ -10,7 +10,7 @@
#define FAIR_MQ_MESSAGE_H #define FAIR_MQ_MESSAGE_H
#include <cstddef> // for size_t #include <cstddef> // for size_t
#include <fairmq/TransportEnum.h> #include <fairmq/Transports.h>
#include <memory> // unique_ptr #include <memory> // unique_ptr
#include <stdexcept> #include <stdexcept>
@@ -46,7 +46,7 @@ struct Message
virtual void* GetData() const = 0; virtual void* GetData() const = 0;
virtual size_t GetSize() const = 0; virtual size_t GetSize() const = 0;
virtual bool SetUsedSize(size_t size, Alignment alignment = Alignment{0}) = 0; virtual bool SetUsedSize(size_t size) = 0;
virtual Transport GetType() const = 0; virtual Transport GetType() const = 0;
TransportFactory* GetTransport() { return fTransport; } TransportFactory* GetTransport() { return fTransport; }
@@ -76,11 +76,6 @@ struct MessageBadAlloc : std::runtime_error
using std::runtime_error::runtime_error; using std::runtime_error::runtime_error;
}; };
struct RefCountBadAlloc : std::runtime_error
{
using std::runtime_error::runtime_error;
};
} // namespace fair::mq } // namespace fair::mq
using fairmq_free_fn [[deprecated("Use fair::mq::FreeFn")]] = fair::mq::FreeFn; using fairmq_free_fn [[deprecated("Use fair::mq::FreeFn")]] = fair::mq::FreeFn;

View File

@@ -1,5 +1,5 @@
/******************************************************************************** /********************************************************************************
* Copyright (C) 2014-2023 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * Copyright (C) 2014-2022 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* * * *
* This software is distributed under the terms of the * * This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, * * GNU Lesser General Public Licence (LGPL) version 3, *
@@ -38,12 +38,9 @@ struct Parts
template<typename... Ps> template<typename... Ps>
Parts(Ps&&... parts) Parts(Ps&&... parts)
{ {
fParts.reserve(sizeof...(Ps));
AddPart(std::forward<Ps>(parts)...); AddPart(std::forward<Ps>(parts)...);
} }
[[deprecated("Avoid owning raw pointer args, use AddPart(MessagePtr) instead.")]]
void AddPart(Message* msg) { fParts.push_back(MessagePtr(msg)); }
void AddPart(MessagePtr msg) { fParts.push_back(std::move(msg)); } void AddPart(MessagePtr msg) { fParts.push_back(std::move(msg)); }
template<typename... Ts> template<typename... Ts>
@@ -63,15 +60,9 @@ struct Parts
} }
} }
Message& operator[](size_type index) { return *(fParts[index]); } reference operator[](size_type index) { return fParts[index]; }
Message const& operator[](size_type index) const { return *(fParts[index]); } const_reference operator[](size_type index) const { return fParts[index]; }
// TODO: For consistency with the STL interfaces, operator[] should not dereference,
// but I have no good idea how to fix this.
// reference operator[](size_type index) { return fParts[index]; }
// const_reference operator[](size_type index) const { return fParts[index]; }
[[deprecated("Redundant, dereference at call site e.g. '*(parts.At(index))' instead.")]]
Message& AtRef(size_type index) { return *(fParts.at(index)); }
reference At(size_type index) { return fParts.at(index); } reference At(size_type index) { return fParts.at(index); }
const_reference At(size_type index) const { return fParts.at(index); } const_reference At(size_type index) const { return fParts.at(index); }

View File

@@ -22,6 +22,7 @@ using namespace std;
using fair::mq::Plugin; using fair::mq::Plugin;
using fair::mq::tools::ToString; using fair::mq::tools::ToString;
using fair::mq::tools::ToStrVector; using fair::mq::tools::ToStrVector;
namespace fs = boost::filesystem;
namespace po = boost::program_options; namespace po = boost::program_options;
namespace dll = boost::dll; namespace dll = boost::dll;
using boost::optional; using boost::optional;
@@ -170,7 +171,7 @@ auto fair::mq::PluginManager::LoadPluginPrelinkedDynamic(const string& pluginNam
} }
auto fair::mq::PluginManager::SearchPluginFile(const string& pluginName) const auto fair::mq::PluginManager::SearchPluginFile(const string& pluginName) const
-> fs::path -> boost::filesystem::path
{ {
for (const auto& searchPath : SearchPaths()) { for (const auto& searchPath : SearchPaths()) {
for (const auto& libPrefix : {fgkLibPrefix, fgkLibPrefixAlt}) { for (const auto& libPrefix : {fgkLibPrefix, fgkLibPrefixAlt}) {
@@ -180,7 +181,7 @@ auto fair::mq::PluginManager::SearchPluginFile(const string& pluginName) const
libPrefix, libPrefix,
pluginName, pluginName,
boost::dll::detail::shared_library_impl::suffix().native()); boost::dll::detail::shared_library_impl::suffix().native());
auto const found = fs::exists(file); auto const found = boost::filesystem::exists(file);
if (found) { if (found) {
return file; return file;
} }

View File

@@ -1,5 +1,5 @@
/******************************************************************************** /********************************************************************************
* Copyright (C) 2017-2023 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * Copyright (C) 2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* * * *
* This software is distributed under the terms of the * * This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, * * GNU Lesser General Public Licence (LGPL) version 3, *
@@ -13,16 +13,9 @@
#include <fairmq/PluginServices.h> #include <fairmq/PluginServices.h>
#include <fairmq/tools/Strings.h> #include <fairmq/tools/Strings.h>
#if FAIRMQ_HAS_STD_FILESYSTEM
#include <filesystem>
namespace fs = std::filesystem;
#else
#define BOOST_FILESYSTEM_VERSION 3 #define BOOST_FILESYSTEM_VERSION 3
#define BOOST_FILESYSTEM_NO_DEPRECATED #define BOOST_FILESYSTEM_NO_DEPRECATED
#include <boost/filesystem.hpp> #include <boost/filesystem.hpp>
namespace fs = ::boost::filesystem;
#endif
#include <boost/optional.hpp> #include <boost/optional.hpp>
#include <boost/program_options.hpp> #include <boost/program_options.hpp>
#include <boost/dll/import.hpp> #include <boost/dll/import.hpp>
@@ -67,13 +60,13 @@ class PluginManager
LOG(debug) << "Shutting down Plugin Manager"; LOG(debug) << "Shutting down Plugin Manager";
} }
auto SetSearchPaths(const std::vector<fs::path>&) -> void; auto SetSearchPaths(const std::vector<boost::filesystem::path>&) -> void;
auto AppendSearchPath(const fs::path&) -> void; auto AppendSearchPath(const boost::filesystem::path&) -> void;
auto PrependSearchPath(const fs::path&) -> void; auto PrependSearchPath(const boost::filesystem::path&) -> void;
auto SearchPaths() const -> const std::vector<fs::path>& { return fSearchPaths; } auto SearchPaths() const -> const std::vector<boost::filesystem::path>& { return fSearchPaths; }
struct BadSearchPath : std::invalid_argument { using std::invalid_argument::invalid_argument; }; struct BadSearchPath : std::invalid_argument { using std::invalid_argument::invalid_argument; };
auto SearchPluginFile(const std::string&) const -> fs::path; auto SearchPluginFile(const std::string&) const -> boost::filesystem::path;
struct PluginNotFound : std::runtime_error { using std::runtime_error::runtime_error; }; struct PluginNotFound : std::runtime_error { using std::runtime_error::runtime_error; };
auto LoadPlugin(const std::string& pluginName) -> void; auto LoadPlugin(const std::string& pluginName) -> void;
auto LoadPlugins(const std::vector<std::string>& pluginNames) -> void { for(const auto& pluginName : pluginNames) { LoadPlugin(pluginName); } } auto LoadPlugins(const std::vector<std::string>& pluginNames) -> void { for(const auto& pluginName : pluginNames) { LoadPlugin(pluginName); } }
@@ -95,33 +88,18 @@ class PluginManager
auto WaitForPluginsToReleaseDeviceControl() -> void { fPluginServices->WaitForReleaseDeviceControl(); } auto WaitForPluginsToReleaseDeviceControl() -> void { fPluginServices->WaitForReleaseDeviceControl(); }
private: private:
static auto ValidateSearchPath(const fs::path&) -> void; static auto ValidateSearchPath(const boost::filesystem::path&) -> void;
auto LoadPluginPrelinkedDynamic(const std::string& pluginName) -> void; auto LoadPluginPrelinkedDynamic(const std::string& pluginName) -> void;
auto LoadPluginDynamic(const std::string& pluginName) -> void; auto LoadPluginDynamic(const std::string& pluginName) -> void;
auto LoadPluginStatic(const std::string& pluginName) -> void; auto LoadPluginStatic(const std::string& pluginName) -> void;
#if FAIRMQ_HAS_STD_FILESYSTEM template<typename... Args>
template<typename T> auto LoadSymbols(const std::string& pluginName, Args&&... args) -> void
static auto AdaptPathType(T&& path)
{
if constexpr(std::is_same_v<T, std::filesystem::path>) {
return boost::filesystem::path(std::forward<T>(path));
} else {
return std::forward<T>(path);
}
}
#endif
template<typename FirstArg, typename... Args>
auto LoadSymbols(const std::string& pluginName, FirstArg&& farg, Args&&... args) -> void
{ {
using namespace boost::dll; using namespace boost::dll;
using fair::mq::tools::ToString; using fair::mq::tools::ToString;
#if FAIRMQ_HAS_STD_FILESYSTEM auto lib = shared_library{std::forward<Args>(args)...};
auto lib = shared_library{AdaptPathType(std::forward<FirstArg>(farg)), std::forward<Args>(args)...};
#else
auto lib = shared_library{std::forward<FirstArg>(farg), std::forward<Args>(args)...};
#endif
fgDLLKeepAlive.push_back(lib); fgDLLKeepAlive.push_back(lib);
fPluginFactories[pluginName] = import_alias<PluginFactory>( fPluginFactories[pluginName] = import_alias<PluginFactory>(
@@ -143,7 +121,7 @@ class PluginManager
static const std::string fgkLibPrefix; static const std::string fgkLibPrefix;
static const std::string fgkLibPrefixAlt; static const std::string fgkLibPrefixAlt;
std::vector<fs::path> fSearchPaths; std::vector<boost::filesystem::path> fSearchPaths;
static std::vector<boost::dll::shared_library> fgDLLKeepAlive; // NOLINT(cppcoreguidelines-avoid-non-const-global-variables) static std::vector<boost::dll::shared_library> fgDLLKeepAlive; // NOLINT(cppcoreguidelines-avoid-non-const-global-variables)
std::map<std::string, std::function<PluginFactory>> fPluginFactories; std::map<std::string, std::function<PluginFactory>> fPluginFactories;
std::unique_ptr<PluginServices> fPluginServices; std::unique_ptr<PluginServices> fPluginServices;

View File

@@ -448,6 +448,3 @@ void ProgOptions::PrintOptionsRaw() const
} }
} // namespace fair::mq } // namespace fair::mq
template void fair::mq::ProgOptions::SetProperty<std::string>(const std::string& key, std::string val);
template void fair::mq::ProgOptions::SetProperty<int>(const std::string& key, int val);

View File

@@ -129,7 +129,17 @@ class ProgOptions
/// @param key /// @param key
/// @param val /// @param val
template<typename T> template<typename T>
void SetProperty(const std::string& key, T val); void SetProperty(const std::string& key, T val)
{
std::unique_lock<std::mutex> lock(fMtx);
SetVarMapValue<typename std::decay<T>::type>(key, val);
lock.unlock();
fEvents.Emit<fair::mq::PropertyChange, typename std::decay<T>::type>(key, val);
fEvents.Emit<fair::mq::PropertyChangeAsString, std::string>(key, GetPropertyAsString(key));
}
/// @brief Updates an existing config property (or fails if it doesn't exist) /// @brief Updates an existing config property (or fails if it doesn't exist)
/// @param key /// @param key
@@ -265,20 +275,5 @@ class ProgOptions
}; };
} // namespace fair::mq } // namespace fair::mq
template <typename T>
void fair::mq::ProgOptions::SetProperty(const std::string& key, T val)
{
std::unique_lock<std::mutex> lock(fMtx);
SetVarMapValue<typename std::decay<T>::type>(key, val);
lock.unlock();
fEvents.Emit<fair::mq::PropertyChange, typename std::decay<T>::type>(key, val);
fEvents.Emit<fair::mq::PropertyChangeAsString, std::string>(key, GetPropertyAsString(key));
}
extern template void fair::mq::ProgOptions::SetProperty<int>(const std::string& key, int val);
extern template void fair::mq::ProgOptions::SetProperty<std::string>(const std::string& key, std::string val);
#endif /* FAIR_MQ_PROGOPTIONS_H */ #endif /* FAIR_MQ_PROGOPTIONS_H */

View File

@@ -1,5 +1,5 @@
/******************************************************************************** /********************************************************************************
* Copyright (C) 2014-2023 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * Copyright (C) 2014-2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* * * *
* This software is distributed under the terms of the * * This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, * * GNU Lesser General Public Licence (LGPL) version 3, *
@@ -8,15 +8,7 @@
#include <fairmq/Properties.h> #include <fairmq/Properties.h>
#if FAIRMQ_HAS_STD_FILESYSTEM
#include <filesystem>
namespace fs = std::filesystem;
#else
#define BOOST_FILESYSTEM_VERSION 3
#define BOOST_FILESYSTEM_NO_DEPRECATED
#include <boost/filesystem.hpp> #include <boost/filesystem.hpp>
namespace fs = ::boost::filesystem;
#endif
using namespace std; using namespace std;
using boost::any_cast; using boost::any_cast;
@@ -92,12 +84,7 @@ unordered_map<type_index, function<pair<string, string>(const Property&)>> Prope
{ type_index(typeid(long double)), [](const Property& p) { return getStringPair<long double>(p, "long double"); } }, { type_index(typeid(long double)), [](const Property& p) { return getStringPair<long double>(p, "long double"); } },
{ type_index(typeid(bool)), [](const Property& p) { stringstream ss; ss << boolalpha << any_cast<bool>(p); return pair<string, string>{ ss.str(), "bool" }; } }, { type_index(typeid(bool)), [](const Property& p) { stringstream ss; ss << boolalpha << any_cast<bool>(p); return pair<string, string>{ ss.str(), "bool" }; } },
{ type_index(typeid(vector<bool>)), [](const Property& p) { stringstream ss; ss << boolalpha << any_cast<vector<bool>>(p); return pair<string, string>{ ss.str(), "vector<bool>>" }; } }, { type_index(typeid(vector<bool>)), [](const Property& p) { stringstream ss; ss << boolalpha << any_cast<vector<bool>>(p); return pair<string, string>{ ss.str(), "vector<bool>>" }; } },
{ type_index(typeid(fs::path)), [](const Property& p) { return getStringPair<fs::path>(p, { type_index(typeid(boost::filesystem::path)), [](const Property& p) { return getStringPair<boost::filesystem::path>(p, "boost::filesystem::path"); } },
#if FAIRMQ_HAS_STD_FILESYSTEM
"std::filesystem::path"); } },
#else
"boost::filesystem::path"); } },
#endif
{ type_index(typeid(vector<char>)), [](const Property& p) { return getStringPair<vector<char>>(p, "vector<char>"); } }, { type_index(typeid(vector<char>)), [](const Property& p) { return getStringPair<vector<char>>(p, "vector<char>"); } },
{ type_index(typeid(vector<signed char>)), [](const Property& p) { return getStringPair<vector<signed char>>(p, "vector<signed char>"); } }, { type_index(typeid(vector<signed char>)), [](const Property& p) { return getStringPair<vector<signed char>>(p, "vector<signed char>"); } },
{ type_index(typeid(vector<unsigned char>)), [](const Property& p) { return getStringPair<vector<unsigned char>>(p, "vector<unsigned char>"); } }, { type_index(typeid(vector<unsigned char>)), [](const Property& p) { return getStringPair<vector<unsigned char>>(p, "vector<unsigned char>"); } },
@@ -113,12 +100,7 @@ unordered_map<type_index, function<pair<string, string>(const Property&)>> Prope
{ type_index(typeid(vector<float>)), [](const Property& p) { return getStringPair<vector<float>>(p, "vector<float>"); } }, { type_index(typeid(vector<float>)), [](const Property& p) { return getStringPair<vector<float>>(p, "vector<float>"); } },
{ type_index(typeid(vector<double>)), [](const Property& p) { return getStringPair<vector<double>>(p, "vector<double>"); } }, { type_index(typeid(vector<double>)), [](const Property& p) { return getStringPair<vector<double>>(p, "vector<double>"); } },
{ type_index(typeid(vector<long double>)), [](const Property& p) { return getStringPair<vector<long double>>(p, "vector<long double>"); } }, { type_index(typeid(vector<long double>)), [](const Property& p) { return getStringPair<vector<long double>>(p, "vector<long double>"); } },
{ type_index(typeid(vector<fs::path>)), [](const Property& p) { return getStringPair<vector<fs::path>>(p, { type_index(typeid(vector<boost::filesystem::path>)), [](const Property& p) { return getStringPair<vector<boost::filesystem::path>>(p, "vector<boost::filesystem::path>"); } },
#if FAIRMQ_HAS_STD_FILESYSTEM
"vector<std::filesystem::path>"); } },
#else
"vector<boost::filesystem::path>"); } },
#endif
}; };
unordered_map<type_index, void(*)(const EventManager&, const string&, const Property&)> PropertyHelper::fEventEmitters = { unordered_map<type_index, void(*)(const EventManager&, const string&, const Property&)> PropertyHelper::fEventEmitters = {
@@ -140,7 +122,7 @@ unordered_map<type_index, void(*)(const EventManager&, const string&, const Prop
{ type_index(typeid(long double)), [](const EventManager& em, const string& k, const Property& p) { em.Emit<PropertyChange, long double>(k, any_cast<long double>(p)); } }, { type_index(typeid(long double)), [](const EventManager& em, const string& k, const Property& p) { em.Emit<PropertyChange, long double>(k, any_cast<long double>(p)); } },
{ type_index(typeid(bool)), [](const EventManager& em, const string& k, const Property& p) { em.Emit<PropertyChange, bool>(k, any_cast<bool>(p)); } }, { type_index(typeid(bool)), [](const EventManager& em, const string& k, const Property& p) { em.Emit<PropertyChange, bool>(k, any_cast<bool>(p)); } },
{ type_index(typeid(vector<bool>)), [](const EventManager& em, const string& k, const Property& p) { em.Emit<PropertyChange, vector<bool>>(k, any_cast<vector<bool>>(p)); } }, { type_index(typeid(vector<bool>)), [](const EventManager& em, const string& k, const Property& p) { em.Emit<PropertyChange, vector<bool>>(k, any_cast<vector<bool>>(p)); } },
{ type_index(typeid(fs::path)), [](const EventManager& em, const string& k, const Property& p) { em.Emit<PropertyChange, fs::path>(k, any_cast<fs::path>(p)); } }, { type_index(typeid(boost::filesystem::path)), [](const EventManager& em, const string& k, const Property& p) { em.Emit<PropertyChange, boost::filesystem::path>(k, any_cast<boost::filesystem::path>(p)); } },
{ type_index(typeid(vector<char>)), [](const EventManager& em, const string& k, const Property& p) { em.Emit<PropertyChange, vector<char>>(k, any_cast<vector<char>>(p)); } }, { type_index(typeid(vector<char>)), [](const EventManager& em, const string& k, const Property& p) { em.Emit<PropertyChange, vector<char>>(k, any_cast<vector<char>>(p)); } },
{ type_index(typeid(vector<signed char>)), [](const EventManager& em, const string& k, const Property& p) { em.Emit<PropertyChange, vector<signed char>>(k, any_cast<vector<signed char>>(p)); } }, { type_index(typeid(vector<signed char>)), [](const EventManager& em, const string& k, const Property& p) { em.Emit<PropertyChange, vector<signed char>>(k, any_cast<vector<signed char>>(p)); } },
{ type_index(typeid(vector<unsigned char>)), [](const EventManager& em, const string& k, const Property& p) { em.Emit<PropertyChange, vector<unsigned char>>(k, any_cast<vector<unsigned char>>(p)); } }, { type_index(typeid(vector<unsigned char>)), [](const EventManager& em, const string& k, const Property& p) { em.Emit<PropertyChange, vector<unsigned char>>(k, any_cast<vector<unsigned char>>(p)); } },
@@ -156,7 +138,7 @@ unordered_map<type_index, void(*)(const EventManager&, const string&, const Prop
{ type_index(typeid(vector<float>)), [](const EventManager& em, const string& k, const Property& p) { em.Emit<PropertyChange, vector<float>>(k, any_cast<vector<float>>(p)); } }, { type_index(typeid(vector<float>)), [](const EventManager& em, const string& k, const Property& p) { em.Emit<PropertyChange, vector<float>>(k, any_cast<vector<float>>(p)); } },
{ type_index(typeid(vector<double>)), [](const EventManager& em, const string& k, const Property& p) { em.Emit<PropertyChange, vector<double>>(k, any_cast<vector<double>>(p)); } }, { type_index(typeid(vector<double>)), [](const EventManager& em, const string& k, const Property& p) { em.Emit<PropertyChange, vector<double>>(k, any_cast<vector<double>>(p)); } },
{ type_index(typeid(vector<long double>)), [](const EventManager& em, const string& k, const Property& p) { em.Emit<PropertyChange, vector<long double>>(k, any_cast<vector<long double>>(p)); } }, { type_index(typeid(vector<long double>)), [](const EventManager& em, const string& k, const Property& p) { em.Emit<PropertyChange, vector<long double>>(k, any_cast<vector<long double>>(p)); } },
{ type_index(typeid(vector<fs::path>)), [](const EventManager& em, const string& k, const Property& p) { em.Emit<PropertyChange, vector<fs::path>>(k, any_cast<vector<fs::path>>(p)); } }, { type_index(typeid(vector<boost::filesystem::path>)), [](const EventManager& em, const string& k, const Property& p) { em.Emit<PropertyChange, vector<boost::filesystem::path>>(k, any_cast<vector<boost::filesystem::path>>(p)); } },
}; };
} // namespace fair::mq } // namespace fair::mq

View File

@@ -1,5 +1,5 @@
/******************************************************************************** /********************************************************************************
* Copyright (C) 2014-2025 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * Copyright (C) 2014-2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* * * *
* This software is distributed under the terms of the * * This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, * * GNU Lesser General Public Licence (LGPL) version 3, *
@@ -29,6 +29,7 @@ using Property = boost::any;
using Properties = std::map<std::string, Property>; using Properties = std::map<std::string, Property>;
struct PropertyChange : Event<std::string> {}; struct PropertyChange : Event<std::string> {};
struct PropertyChangeAsString : Event<std::string> {};
class PropertyHelper class PropertyHelper
{ {

View File

@@ -52,8 +52,8 @@ struct Socket
virtual int64_t Send(MessagePtr& msg, int timeout = -1) = 0; virtual int64_t Send(MessagePtr& msg, int timeout = -1) = 0;
virtual int64_t Receive(MessagePtr& msg, int timeout = -1) = 0; virtual int64_t Receive(MessagePtr& msg, int timeout = -1) = 0;
virtual int64_t Send(Parts::container& msgVec, int timeout = -1) = 0; virtual int64_t Send(std::vector<std::unique_ptr<Message>>& msgVec, int timeout = -1) = 0;
virtual int64_t Receive(Parts::container & msgVec, int timeout = -1) = 0; virtual int64_t Receive(std::vector<std::unique_ptr<Message>>& msgVec, int timeout = -1) = 0;
virtual int64_t Send(Parts& parts, int timeout = -1) { return Send(parts.fParts, timeout); } virtual int64_t Send(Parts& parts, int timeout = -1) { return Send(parts.fParts, timeout); }
virtual int64_t Receive(Parts& parts, int timeout = -1) { return Receive(parts.fParts, timeout); } virtual int64_t Receive(Parts& parts, int timeout = -1) { return Receive(parts.fParts, timeout); }

View File

@@ -157,13 +157,6 @@ struct Machine_ : public state_machine_def<Machine_>
} }
} }
void CallStatePrep(const State state) const
{
if (!fStatePrepSignal.empty()) {
fStatePrepSignal(state);
}
}
void CallNewTransitionCallbacks(const Transition transition) const void CallNewTransitionCallbacks(const Transition transition) const
{ {
if (!fNewTransitionSignal.empty()) { if (!fNewTransitionSignal.empty()) {
@@ -177,13 +170,11 @@ struct Machine_ : public state_machine_def<Machine_>
atomic<bool> fLastTransitionResult; atomic<bool> fLastTransitionResult;
mutex fStateMtx; mutex fStateMtx;
mutex fSubscriptionsMtx;
atomic<bool> fNewStatePending; atomic<bool> fNewStatePending;
condition_variable fNewStatePendingCV; condition_variable fNewStatePendingCV;
boost::signals2::signal<void(const State)> fStateChangeSignal; boost::signals2::signal<void(const State)> fStateChangeSignal;
boost::signals2::signal<void(const State)> fStateHandleSignal; boost::signals2::signal<void(const State)> fStateHandleSignal;
boost::signals2::signal<void(const State)> fStatePrepSignal;
boost::signals2::signal<void(const Transition)> fNewTransitionSignal; boost::signals2::signal<void(const Transition)> fNewTransitionSignal;
unordered_map<string, boost::signals2::connection> fStateChangeSignalsMap; unordered_map<string, boost::signals2::connection> fStateChangeSignalsMap;
unordered_map<string, boost::signals2::connection> fNewTransitionSignalsMap; unordered_map<string, boost::signals2::connection> fNewTransitionSignalsMap;
@@ -207,7 +198,6 @@ struct Machine_ : public state_machine_def<Machine_>
} }
} }
CallStatePrep(fState);
CallStateChangeCallbacks(fState); CallStateChangeCallbacks(fState);
CallStateHandler(fState); CallStateHandler(fState);
} }
@@ -311,33 +301,18 @@ try {
void StateMachine::SubscribeToStateChange(const string& key, function<void(const State)> callback) void StateMachine::SubscribeToStateChange(const string& key, function<void(const State)> callback)
{ {
// Check if the key has a integer value as prefix, if yes, decode it. static_pointer_cast<FairMQFSM>(fFsm)->fStateChangeSignalsMap.insert({key, static_pointer_cast<FairMQFSM>(fFsm)->fStateChangeSignal.connect(callback)});
int i = strtol(key.c_str(), nullptr, 10);
auto fsm = static_pointer_cast<FairMQFSM>(fFsm);
lock_guard<mutex> lock(fsm->fSubscriptionsMtx);
fsm->fStateChangeSignalsMap.insert({key, fsm->fStateChangeSignal.connect(i, callback)});
} }
void StateMachine::UnsubscribeFromStateChange(const string& key) void StateMachine::UnsubscribeFromStateChange(const string& key)
{ {
auto fsm = static_pointer_cast<FairMQFSM>(fFsm); auto fsm = static_pointer_cast<FairMQFSM>(fFsm);
lock_guard<mutex> lock(fsm->fSubscriptionsMtx);
if (fsm->fStateChangeSignalsMap.count(key)) { if (fsm->fStateChangeSignalsMap.count(key)) {
fsm->fStateChangeSignalsMap.at(key).disconnect(); fsm->fStateChangeSignalsMap.at(key).disconnect();
fsm->fStateChangeSignalsMap.erase(key); fsm->fStateChangeSignalsMap.erase(key);
} }
} }
void StateMachine::PrepareState(std::function<void(const State)> callback)
{
auto fsm = static_pointer_cast<FairMQFSM>(fFsm);
if (fsm->fStatePrepSignal.empty()) {
fsm->fStatePrepSignal.connect(callback);
} else {
LOG(error) << "state preparation handler is already set";
}
}
void StateMachine::HandleStates(function<void(const State)> callback) void StateMachine::HandleStates(function<void(const State)> callback)
{ {
auto fsm = static_pointer_cast<FairMQFSM>(fFsm); auto fsm = static_pointer_cast<FairMQFSM>(fFsm);
@@ -351,9 +326,6 @@ void StateMachine::HandleStates(function<void(const State)> callback)
void StateMachine::StopHandlingStates() void StateMachine::StopHandlingStates()
{ {
auto fsm = static_pointer_cast<FairMQFSM>(fFsm); auto fsm = static_pointer_cast<FairMQFSM>(fFsm);
if (!fsm->fStatePrepSignal.empty()) {
fsm->fStatePrepSignal.disconnect_all_slots();
}
if (!fsm->fStateHandleSignal.empty()) { if (!fsm->fStateHandleSignal.empty()) {
fsm->fStateHandleSignal.disconnect_all_slots(); fsm->fStateHandleSignal.disconnect_all_slots();
} }
@@ -361,15 +333,12 @@ void StateMachine::StopHandlingStates()
void StateMachine::SubscribeToNewTransition(const string& key, function<void(const Transition)> callback) void StateMachine::SubscribeToNewTransition(const string& key, function<void(const Transition)> callback)
{ {
auto fsm = static_pointer_cast<FairMQFSM>(fFsm); static_pointer_cast<FairMQFSM>(fFsm)->fNewTransitionSignalsMap.insert({key, static_pointer_cast<FairMQFSM>(fFsm)->fNewTransitionSignal.connect(callback)});
lock_guard<mutex> lock(fsm->fSubscriptionsMtx);
fsm->fNewTransitionSignalsMap.insert({key, fsm->fNewTransitionSignal.connect(callback)});
} }
void StateMachine::UnsubscribeFromNewTransition(const string& key) void StateMachine::UnsubscribeFromNewTransition(const string& key)
{ {
auto fsm = static_pointer_cast<FairMQFSM>(fFsm); auto fsm = static_pointer_cast<FairMQFSM>(fFsm);
lock_guard<mutex> lock(fsm->fSubscriptionsMtx);
if (fsm->fNewTransitionSignalsMap.count(key)) { if (fsm->fNewTransitionSignalsMap.count(key)) {
fsm->fNewTransitionSignalsMap.at(key).disconnect(); fsm->fNewTransitionSignalsMap.at(key).disconnect();
fsm->fNewTransitionSignalsMap.erase(key); fsm->fNewTransitionSignalsMap.erase(key);

View File

@@ -35,7 +35,6 @@ class StateMachine
void SubscribeToStateChange(const std::string& key, std::function<void(const State)> callback); void SubscribeToStateChange(const std::string& key, std::function<void(const State)> callback);
void UnsubscribeFromStateChange(const std::string& key); void UnsubscribeFromStateChange(const std::string& key);
void PrepareState(std::function<void(const State)> callback);
void HandleStates(std::function<void(const State)> callback); void HandleStates(std::function<void(const State)> callback);
void StopHandlingStates(); void StopHandlingStates();

View File

@@ -1,5 +1,5 @@
/******************************************************************************** /********************************************************************************
* Copyright (C) 2017-2022 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * Copyright (C) 2017-2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* * * *
* This software is distributed under the terms of the * * This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, * * GNU Lesser General Public Licence (LGPL) version 3, *
@@ -10,7 +10,6 @@
#define FAIR_MQ_TOOLS_H #define FAIR_MQ_TOOLS_H
// IWYU pragma: begin_exports // IWYU pragma: begin_exports
#include <fairmq/tools/Compiler.h>
#include <fairmq/tools/CppSTL.h> #include <fairmq/tools/CppSTL.h>
#include <fairmq/tools/Exceptions.h> #include <fairmq/tools/Exceptions.h>
#include <fairmq/tools/InstanceLimit.h> #include <fairmq/tools/InstanceLimit.h>

View File

@@ -1,22 +0,0 @@
/********************************************************************************
* Copyright (C) 2014-2025 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#ifndef FAIR_MQ_TRANSPORTENUMS_H
#define FAIR_MQ_TRANSPORTENUMS_H
namespace fair::mq {
enum class Transport {
DEFAULT,
ZMQ,
SHM
};
}
#endif // FAIR_MQ_TRANSPORTENUMS_H

View File

@@ -1,5 +1,5 @@
/******************************************************************************** /********************************************************************************
* Copyright (C) 2017-2023 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * Copyright (C) 2017-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* * * *
* This software is distributed under the terms of the * * This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, * * GNU Lesser General Public Licence (LGPL) version 3, *
@@ -9,6 +9,9 @@
#include <fairmq/TransportFactory.h> #include <fairmq/TransportFactory.h>
#include <fairmq/shmem/TransportFactory.h> #include <fairmq/shmem/TransportFactory.h>
#include <fairmq/zeromq/TransportFactory.h> #include <fairmq/zeromq/TransportFactory.h>
#ifdef BUILD_OFI_TRANSPORT
#include <fairmq/ofi/TransportFactory.h>
#endif
#include <fairlogger/Logger.h> #include <fairlogger/Logger.h>
#include <fairmq/Tools.h> #include <fairmq/Tools.h>
#include <memory> #include <memory>
@@ -36,12 +39,20 @@ auto TransportFactory::CreateTransportFactory(const string& type,
} else if (type == "shmem") { } else if (type == "shmem") {
return make_shared<shmem::TransportFactory>(finalId, config); return make_shared<shmem::TransportFactory>(finalId, config);
} }
#ifdef BUILD_OFI_TRANSPORT
else if (type == "ofi") {
return make_shared<ofi::TransportFactory>(finalId, config);
}
#endif /* BUILD_OFI_TRANSPORT */
else { else {
LOG(error) << "Unavailable transport requested: " LOG(error) << "Unavailable transport requested: "
<< "\"" << type << "\"" << "\"" << type << "\""
<< ". Available are: " << ". Available are: "
<< "\"zeromq\"," << "\"zeromq\","
<< "\"shmem\"" << "\"shmem\""
#ifdef BUILD_OFI_TRANSPORT
<< ", and \"ofi\""
#endif /* BUILD_OFI_TRANSPORT */
<< ". Exiting."; << ". Exiting.";
throw TransportFactoryError(tools::ToString("Unavailable transport requested: ", type)); throw TransportFactoryError(tools::ToString("Unavailable transport requested: ", type));
} }

View File

@@ -14,7 +14,7 @@
#include <fairmq/Message.h> #include <fairmq/Message.h>
#include <fairmq/Poller.h> #include <fairmq/Poller.h>
#include <fairmq/Socket.h> #include <fairmq/Socket.h>
#include <fairmq/TransportEnum.h> #include <fairmq/Transports.h>
#include <fairmq/UnmanagedRegion.h> #include <fairmq/UnmanagedRegion.h>
#include <memory> // shared_ptr #include <memory> // shared_ptr
#include <stdexcept> #include <stdexcept>

View File

@@ -1,5 +1,5 @@
/******************************************************************************** /********************************************************************************
* Copyright (C) 2014-2023 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * Copyright (C) 2014-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* * * *
* This software is distributed under the terms of the * * This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, * * GNU Lesser General Public Licence (LGPL) version 3, *
@@ -10,7 +10,6 @@
#define FAIR_MQ_TRANSPORTS_H #define FAIR_MQ_TRANSPORTS_H
#include <fairmq/tools/Strings.h> #include <fairmq/tools/Strings.h>
#include <fairmq/TransportEnum.h>
#include <memory> #include <memory>
#include <ostream> #include <ostream>
#include <stdexcept> #include <stdexcept>
@@ -19,6 +18,14 @@
namespace fair::mq { namespace fair::mq {
enum class Transport
{
DEFAULT,
ZMQ,
SHM,
OFI
};
struct TransportError : std::runtime_error struct TransportError : std::runtime_error
{ {
using std::runtime_error::runtime_error; using std::runtime_error::runtime_error;
@@ -27,13 +34,15 @@ struct TransportError : std::runtime_error
static const std::unordered_map<std::string, Transport> TransportTypes{ static const std::unordered_map<std::string, Transport> TransportTypes{
{"default", Transport::DEFAULT}, {"default", Transport::DEFAULT},
{"zeromq", Transport::ZMQ}, {"zeromq", Transport::ZMQ},
{"shmem", Transport::SHM} {"shmem", Transport::SHM},
{"ofi", Transport::OFI}
}; };
static const std::unordered_map<Transport, std::string> TransportNames{ static const std::unordered_map<Transport, std::string> TransportNames{
{Transport::DEFAULT, "default"}, {Transport::DEFAULT, "default"},
{Transport::ZMQ, "zeromq"}, {Transport::ZMQ, "zeromq"},
{Transport::SHM, "shmem"} {Transport::SHM, "shmem"},
{Transport::OFI, "ofi"}
}; };
inline std::string TransportName(Transport transport) { return TransportNames.at(transport); } inline std::string TransportName(Transport transport) { return TransportNames.at(transport); }
@@ -52,7 +61,11 @@ inline std::ostream& operator<<(std::ostream& os, const Transport& transport)
inline auto GetEnabledTransports() -> std::vector<Transport> inline auto GetEnabledTransports() -> std::vector<Transport>
{ {
#ifdef BUILD_OFI_TRANSPORT
return {Transport::ZMQ, Transport::SHM, Transport::OFI};
#else
return {Transport::ZMQ, Transport::SHM}; return {Transport::ZMQ, Transport::SHM};
#endif
} }
} // namespace fair::mq } // namespace fair::mq

View File

@@ -9,7 +9,7 @@
#ifndef FAIR_MQ_UNMANAGEDREGION_H #ifndef FAIR_MQ_UNMANAGEDREGION_H
#define FAIR_MQ_UNMANAGEDREGION_H #define FAIR_MQ_UNMANAGEDREGION_H
#include <fairmq/TransportEnum.h> #include <fairmq/Transports.h>
#include <cstddef> // size_t #include <cstddef> // size_t
#include <cstdint> // uint32_t #include <cstdint> // uint32_t
@@ -134,7 +134,6 @@ struct RegionConfig
int creationFlags = 0; /// flags passed to the underlying transport on region creation int creationFlags = 0; /// flags passed to the underlying transport on region creation
int64_t userFlags = 0; /// custom flags that have no effect on the transport, but can be retrieved from the region by the user int64_t userFlags = 0; /// custom flags that have no effect on the transport, but can be retrieved from the region by the user
uint64_t size = 0; /// region size uint64_t size = 0; /// region size
uint64_t rcSegmentSize = 100000000; /// size of the segment that stores reference counts when "soft"-copying the messages
std::string path = ""; /// file path, if the region is backed by a file std::string path = ""; /// file path, if the region is backed by a file
std::optional<uint16_t> id = std::nullopt; /// region id std::optional<uint16_t> id = std::nullopt; /// region id
uint32_t linger = 100; /// delay in ms before region destruction to collect outstanding events uint32_t linger = 100; /// delay in ms before region destruction to collect outstanding events

View File

@@ -1,5 +1,5 @@
/******************************************************************************** /********************************************************************************
* Copyright (C) 2018-2025 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * Copyright (C) 2018-2022 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* * * *
* This software is distributed under the terms of the * * This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, * * GNU Lesser General Public Licence (LGPL) version 3, *
@@ -19,7 +19,7 @@
#define FAIRMQ_GIT_DATE "@PROJECT_GIT_DATE@" #define FAIRMQ_GIT_DATE "@PROJECT_GIT_DATE@"
#define FAIRMQ_REPO_URL "https://github.com/FairRootGroup/FairMQ" #define FAIRMQ_REPO_URL "https://github.com/FairRootGroup/FairMQ"
#define FAIRMQ_LICENSE "LGPL-3.0" #define FAIRMQ_LICENSE "LGPL-3.0"
#define FAIRMQ_COPYRIGHT "2012-2025 GSI" #define FAIRMQ_COPYRIGHT "2012-2022 GSI"
#define FAIRMQ_BUILD_TYPE "@CMAKE_BUILD_TYPE@" #define FAIRMQ_BUILD_TYPE "@CMAKE_BUILD_TYPE@"
#endif // FAIR_MQ_VERSION_H #endif // FAIR_MQ_VERSION_H

View File

@@ -1,5 +1,5 @@
/******************************************************************************** /********************************************************************************
* Copyright (C) 2014-2023 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * Copyright (C) 2014-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* * * *
* This software is distributed under the terms of the * * This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, * * GNU Lesser General Public Licence (LGPL) version 3, *
@@ -47,7 +47,7 @@ class Merger : public Device
std::vector<Channel*> chans; std::vector<Channel*> chans;
for (auto& chan : GetChannels().at(fInChannelName)) { for (auto& chan : fChannels.at(fInChannelName)) {
chans.push_back(&chan); chans.push_back(&chan);
} }

View File

@@ -1,5 +1,5 @@
/******************************************************************************** /********************************************************************************
* Copyright (C) 2014-2023 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * Copyright (C) 2014-2022 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* * * *
* This software is distributed under the terms of the * * This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, * * GNU Lesser General Public Licence (LGPL) version 3, *

View File

@@ -63,7 +63,6 @@ echo "Usage: startBenchmark [message size=1000000] [number of iterations=0] [tra
SAMPLER="fairmq-bsampler" SAMPLER="fairmq-bsampler"
SAMPLER+=" --id bsampler1" SAMPLER+=" --id bsampler1"
SAMPLER+=" --shm-monitor true"
#SAMPLER+=" --io-threads 2" #SAMPLER+=" --io-threads 2"
#SAMPLER+=" --control static" #SAMPLER+=" --control static"
SAMPLER+=" --transport $transport" SAMPLER+=" --transport $transport"
@@ -72,7 +71,6 @@ SAMPLER+=" --msg-size $msgSize"
SAMPLER+=" --multipart $multipart" SAMPLER+=" --multipart $multipart"
SAMPLER+=" --num-parts $numParts" SAMPLER+=" --num-parts $numParts"
SAMPLER+=" --shm-throw-bad-alloc false" SAMPLER+=" --shm-throw-bad-alloc false"
# SAMPLER+=" --shm-metadata-msg-size 1024"
# SAMPLER+=" --msg-rate 1000" # SAMPLER+=" --msg-rate 1000"
SAMPLER+=" --max-iterations $maxIterations" SAMPLER+=" --max-iterations $maxIterations"
SAMPLER+=" --channel-config name=data,type=pair,method=bind,address=tcp://127.0.0.1:5555" SAMPLER+=" --channel-config name=data,type=pair,method=bind,address=tcp://127.0.0.1:5555"
@@ -83,7 +81,6 @@ echo "pid: $!"
SINK="fairmq-sink" SINK="fairmq-sink"
SINK+=" --id sink1" SINK+=" --id sink1"
SINK+=" --shm-monitor true"
#SINK+=" --io-threads 2" #SINK+=" --io-threads 2"
#SINK+=" --control static" #SINK+=" --control static"
SINK+=" --transport $transport" SINK+=" --transport $transport"

130
fairmq/ofi/Context.cxx Normal file
View File

@@ -0,0 +1,130 @@
/********************************************************************************
* Copyright (C) 2018-2022 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <arpa/inet.h>
#include <asiofi/version.hpp>
#include <cassert>
#include <cstring>
#include <fairlogger/Logger.h>
#include <fairmq/ofi/Context.h>
#include <fairmq/tools/Strings.h>
#include <memory>
#include <netinet/in.h>
#include <regex>
#include <string>
#include <sys/socket.h>
namespace fair::mq::ofi
{
using namespace std;
Context::Context(mq::TransportFactory& sendFactory,
mq::TransportFactory& receiveFactory,
int numberIoThreads)
: fIoWork(fIoContext)
, fReceiveFactory(receiveFactory)
, fSendFactory(sendFactory)
, fSizeHint(0)
{
InitThreadPool(numberIoThreads);
}
auto Context::InitThreadPool(int numberIoThreads) -> void
{
assert(numberIoThreads > 0);
for (int i = 1; i <= numberIoThreads; ++i) {
fThreadPool.emplace_back([&, i, numberIoThreads]{
try {
LOG(debug) << "OFI transport: I/O thread #" << i << " of " << numberIoThreads << " started";
fIoContext.run();
LOG(debug) << "OFI transport: I/O thread #" << i << " of " << numberIoThreads << " stopped";
} catch (const std::exception& e) {
LOG(error) << "OFI transport: Uncaught exception in I/O thread #" << i << ": " << e.what();
} catch (...) {
LOG(error) << "OFI transport: Uncaught exception in I/O thread #" << i;
}
});
}
}
auto Context::Reset() -> void
{
// TODO "Linger", rethink this
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
fIoContext.stop();
}
Context::~Context()
{
for (auto& thread : fThreadPool)
thread.join();
}
auto Context::GetAsiofiVersion() const -> string
{
return ASIOFI_VERSION;
}
auto Context::ConvertAddress(std::string address) -> Address
{
string protocol, ip;
unsigned int port = 0;
regex address_regex("^([a-z]+)://([0-9]+\\.[0-9]+\\.[0-9]+\\.[0-9]+):([0-9]+).*");
smatch address_result;
if (regex_match(address, address_result, address_regex)) {
protocol = address_result[1];
ip = address_result[2];
port = stoul(address_result[3]);
// LOG(debug) << "Parsed '" << protocol << "', '" << ip << "', '" << port << "' fields from '" << address << "'";
} else {
throw ContextError(tools::ToString("Wrong format: Address must be in format prot://ip:port"));
}
return { protocol, ip, port };
}
auto Context::ConvertAddress(Address address) -> sockaddr_in
{
sockaddr_in sa;
if (inet_pton(AF_INET, address.Ip.c_str(), &(sa.sin_addr)) != 1)
throw ContextError(tools::ToString("Failed to convert given IP '", address.Ip, "' to struct in_addr, reason: ", strerror(errno)));
sa.sin_port = htons(address.Port);
sa.sin_family = AF_INET;
return sa;
}
auto Context::ConvertAddress(sockaddr_in address) -> Address
{
return {"tcp", inet_ntoa(address.sin_addr), ntohs(address.sin_port)};
}
auto Context::VerifyAddress(const std::string& address) -> Address
{
auto addr = ConvertAddress(address);
if (!(addr.Protocol == "tcp" || addr.Protocol == "verbs"))
throw ContextError("Wrong protocol: Supported protocols are: tcp:// and verbs://");
return addr;
}
auto Context::MakeReceiveMessage(size_t size) -> MessagePtr
{
return fReceiveFactory.CreateMessage(size);
}
auto Context::MakeSendMessage(size_t size) -> MessagePtr
{
return fSendFactory.CreateMessage(size);
}
} // namespace fair::mq::ofi

92
fairmq/ofi/Context.h Normal file
View File

@@ -0,0 +1,92 @@
/********************************************************************************
* Copyright (C) 2018-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#ifndef FAIR_MQ_OFI_CONTEXT_H
#define FAIR_MQ_OFI_CONTEXT_H
#include <asio/io_context.hpp>
#include <asiofi/domain.hpp>
#include <asiofi/fabric.hpp>
#include <asiofi/info.hpp>
#include <fairlogger/Logger.h>
#include <fairmq/TransportFactory.h>
#include <memory>
#include <netinet/in.h>
#include <ostream>
#include <stdexcept>
#include <string>
#include <thread>
#include <vector>
namespace fair::mq::ofi
{
enum class ConnectionType : bool { Bind, Connect };
struct Address {
std::string Protocol;
std::string Ip;
unsigned int Port;
friend auto operator<<(std::ostream& os, const Address& a) -> std::ostream&
{
return os << a.Protocol << "://" << a.Ip << ":" << a.Port;
}
friend auto operator==(const Address& lhs, const Address& rhs) -> bool
{
return (lhs.Protocol == rhs.Protocol) && (lhs.Ip == rhs.Ip) && (lhs.Port == rhs.Port);
}
};
/**
* @class Context Context.h <fairmq/ofi/Context.h>
* @brief Transport-wide context
*
* @todo TODO insert long description
*/
class Context
{
public:
Context(mq::TransportFactory& sendFactory,
mq::TransportFactory& receiveFactory,
int numberIoThreads = 1);
Context(const Context&) = delete;
Context(Context&&) = delete;
Context& operator=(const Context&) = delete;
Context& operator=(Context&&) = delete;
~Context();
auto GetAsiofiVersion() const -> std::string;
auto GetIoContext() -> asio::io_context& { return fIoContext; }
static auto ConvertAddress(std::string address) -> Address;
static auto ConvertAddress(Address address) -> sockaddr_in;
static auto ConvertAddress(sockaddr_in address) -> Address;
static auto VerifyAddress(const std::string& address) -> Address;
auto Interrupt() -> void { LOG(debug) << "OFI transport: Interrupted (NOOP - not implemented)."; }
auto Resume() -> void { LOG(debug) << "OFI transport: Resumed (NOOP - not implemented)."; }
auto Reset() -> void;
auto MakeReceiveMessage(size_t size) -> MessagePtr;
auto MakeSendMessage(size_t size) -> MessagePtr;
auto GetSizeHint() -> size_t { return fSizeHint; }
auto SetSizeHint(size_t size) -> void { fSizeHint = size; }
private:
asio::io_context fIoContext;
asio::io_context::work fIoWork;
std::vector<std::thread> fThreadPool;
mq::TransportFactory& fReceiveFactory;
mq::TransportFactory& fSendFactory;
size_t fSizeHint;
auto InitThreadPool(int numberIoThreads) -> void;
}; /* class Context */
struct ContextError : std::runtime_error { using std::runtime_error::runtime_error; };
} // namespace fair::mq::ofi
#endif /* FAIR_MQ_OFI_CONTEXT_H */

View File

@@ -0,0 +1,112 @@
/********************************************************************************
* Copyright (C) 2018-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#ifndef FAIR_MQ_OFI_CONTROLMESSAGES_H
#define FAIR_MQ_OFI_CONTROLMESSAGES_H
#include <asio/buffer.hpp>
#include <cstdint>
#include <fairlogger/Logger.h>
#include <functional>
#include <memory>
#include <memory_resource>
#include <type_traits>
namespace asio
{
template<typename PodType>
auto buffer(const PodType& obj) -> asio::const_buffer
{
return asio::const_buffer(static_cast<const void*>(&obj), sizeof(PodType));
}
} // namespace asio
namespace fair::mq::ofi
{
enum class ControlMessageType
{
Empty = 1,
PostBuffer,
PostMultiPartStartBuffer
};
struct Empty
{};
struct PostBuffer
{
uint64_t size; // buffer size (size_t)
};
struct PostMultiPartStartBuffer
{
uint32_t numParts; // buffer size (size_t)
uint64_t size; // buffer size (size_t)
};
union ControlMessageContent
{
PostBuffer postBuffer;
PostMultiPartStartBuffer postMultiPartStartBuffer;
};
struct ControlMessage
{
ControlMessageType type;
ControlMessageContent msg;
};
template<typename T>
using unique_ptr = std::unique_ptr<T, std::function<void(T*)>>;
template<typename T, typename... Args>
auto MakeControlMessageWithPmr(std::pmr::memory_resource& pmr, Args&&... args)
-> ofi::unique_ptr<ControlMessage>
{
void* mem = pmr.allocate(sizeof(ControlMessage));
ControlMessage* ctrl = new (mem) ControlMessage();
if (std::is_same<T, PostBuffer>::value) {
ctrl->type = ControlMessageType::PostBuffer;
ctrl->msg.postBuffer = PostBuffer(std::forward<Args>(args)...);
} else if (std::is_same<T, PostMultiPartStartBuffer>::value) {
ctrl->type = ControlMessageType::PostMultiPartStartBuffer;
ctrl->msg.postMultiPartStartBuffer = PostMultiPartStartBuffer(std::forward<Args>(args)...);
} else if (std::is_same<T, Empty>::value) {
ctrl->type = ControlMessageType::Empty;
}
return ofi::unique_ptr<ControlMessage>(ctrl, [&pmr](ControlMessage* p) {
p->~ControlMessage();
pmr.deallocate(p, sizeof(T));
});
}
template<typename T, typename... Args>
auto MakeControlMessage(Args&&... args) -> ControlMessage
{
ControlMessage ctrl;
if (std::is_same<T, PostBuffer>::value) {
ctrl.type = ControlMessageType::PostBuffer;
} else if (std::is_same<T, PostMultiPartStartBuffer>::value) {
ctrl.type = ControlMessageType::PostMultiPartStartBuffer;
} else if (std::is_same<T, Empty>::value) {
ctrl.type = ControlMessageType::Empty;
}
ctrl.msg = T(std::forward<Args>(args)...);
return ctrl;
}
} // namespace fair::mq::ofi
#endif /* FAIR_MQ_OFI_CONTROLMESSAGES_H */

199
fairmq/ofi/Message.cxx Normal file
View File

@@ -0,0 +1,199 @@
/********************************************************************************
* Copyright (C) 2018-2022 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <asiofi.hpp>
#include <cassert>
#include <cstdlib>
#include <fairlogger/Logger.h>
#include <fairmq/ofi/Message.h>
#include <zmq.h>
namespace fair::mq::ofi
{
using namespace std;
Message::Message(pmr::memory_resource* pmr)
: fInitialSize(0)
, fSize(0)
, fData(nullptr)
, fFreeFunction(nullptr)
, fHint(nullptr)
, fPmr(pmr)
{
}
Message::Message(pmr::memory_resource* pmr, Alignment /* alignment */)
: fInitialSize(0)
, fSize(0)
, fData(nullptr)
, fFreeFunction(nullptr)
, fHint(nullptr)
, fPmr(pmr)
{
}
Message::Message(pmr::memory_resource* pmr, const size_t size)
: fInitialSize(size)
, fSize(size)
, fData(nullptr)
, fFreeFunction(nullptr)
, fHint(nullptr)
, fPmr(pmr)
{
if (size) {
fData = fPmr->allocate(size);
assert(fData);
}
}
Message::Message(pmr::memory_resource* pmr, const size_t size, Alignment /* alignment */)
: fInitialSize(size)
, fSize(size)
, fData(nullptr)
, fFreeFunction(nullptr)
, fHint(nullptr)
, fPmr(pmr)
{
if (size) {
fData = fPmr->allocate(size);
assert(fData);
}
}
Message::Message(pmr::memory_resource* pmr,
void* data,
const size_t size,
FreeFn* ffn,
void* hint)
: fInitialSize(size)
, fSize(size)
, fData(data)
, fFreeFunction(ffn)
, fHint(hint)
, fPmr(pmr)
{}
Message::Message(pmr::memory_resource* /*pmr*/,
fair::mq::UnmanagedRegionPtr& /*region*/,
void* /*data*/,
const size_t /*size*/,
void* /*hint*/)
{
throw MessageError{"Not yet implemented."};
}
auto Message::Rebuild() -> void
{
if (fFreeFunction) {
fFreeFunction(fData, fHint);
} else {
if (fData) {
fPmr->deallocate(fData, fSize);
}
}
fData = nullptr;
fInitialSize = 0;
fSize = 0;
fFreeFunction = nullptr;
fHint = nullptr;
}
auto Message::Rebuild(Alignment /* alignment */) -> void
{
// TODO: implement alignment
Rebuild();
}
auto Message::Rebuild(size_t size) -> void
{
if (fFreeFunction) {
fFreeFunction(fData, fHint);
} else {
if (fData) {
fPmr->deallocate(fData, fSize);
}
}
if (size) {
fData = fPmr->allocate(size);
assert(fData);
} else {
fData = nullptr;
}
fInitialSize = size;
fSize = size;
fFreeFunction = nullptr;
fHint = nullptr;
}
auto Message::Rebuild(size_t size, Alignment /* alignment */) -> void
{
// TODO: implement alignment
Rebuild(size);
}
auto Message::Rebuild(void* /*data*/, size_t size, FreeFn* ffn, void* hint) -> void
{
if (fFreeFunction) {
fFreeFunction(fData, fHint);
} else {
if (fData) {
fPmr->deallocate(fData, fSize);
}
}
if (size) {
fData = fPmr->allocate(size);
assert(fData);
} else {
fData = nullptr;
}
assert(fData);
fInitialSize = size;
fSize = size;
fFreeFunction = ffn;
fHint = hint;
}
auto Message::GetData() const -> void*
{
return fData;
}
auto Message::GetSize() const -> size_t
{
return fSize;
}
auto Message::SetUsedSize(size_t size) -> bool
{
if (size == fSize) {
return true;
} else if (size <= fSize) {
throw MessageError{"Message size shrinking not yet implemented."};
} else {
throw MessageError{"Cannot grow message size."};
}
}
auto Message::Copy(const fair::mq::Message& /*msg*/) -> void
{
throw MessageError{"Not yet implemented."};
}
Message::~Message()
{
if (fFreeFunction) {
fFreeFunction(fData, fHint);
} else {
if (fData) {
fPmr->deallocate(fData, fSize);
}
}
}
} // namespace fair::mq::ofi

81
fairmq/ofi/Message.h Normal file
View File

@@ -0,0 +1,81 @@
/********************************************************************************
* Copyright (C) 2018-2022 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#ifndef FAIR_MQ_OFI_MESSAGE_H
#define FAIR_MQ_OFI_MESSAGE_H
#include <asiofi.hpp>
#include <atomic>
#include <cstddef> // size_t
#include <fairmq/Message.h>
#include <fairmq/Transports.h>
#include <fairmq/UnmanagedRegion.h>
#include <memory_resource>
#include <zmq.h>
namespace fair::mq::ofi
{
/**
* @class Message Message.h <fairmq/ofi/Message.h>
* @brief
*
* @todo TODO insert long description
*/
class Message final : public fair::mq::Message
{
public:
Message(std::pmr::memory_resource* pmr);
Message(std::pmr::memory_resource* pmr, Alignment alignment);
Message(std::pmr::memory_resource* pmr, size_t size);
Message(std::pmr::memory_resource* pmr, size_t size, Alignment alignment);
Message(std::pmr::memory_resource* pmr,
void* data,
size_t size,
FreeFn* ffn,
void* hint = nullptr);
Message(std::pmr::memory_resource* pmr,
fair::mq::UnmanagedRegionPtr& region,
void* data,
size_t size,
void* hint = 0);
Message(const Message&) = delete;
Message(Message&&) = delete;
Message& operator=(const Message&) = delete;
Message& operator=(Message&&) = delete;
auto Rebuild() -> void override;
auto Rebuild(Alignment alignment) -> void override;
auto Rebuild(size_t size) -> void override;
auto Rebuild(size_t size, Alignment alignment) -> void override;
auto Rebuild(void* data, size_t size, FreeFn* ffn, void* hint = nullptr) -> void override;
auto GetData() const -> void* override;
auto GetSize() const -> size_t override;
auto SetUsedSize(size_t size) -> bool override;
auto GetType() const -> fair::mq::Transport override { return fair::mq::Transport::OFI; }
auto Copy(const fair::mq::Message& msg) -> void override;
~Message() override;
private:
size_t fInitialSize;
size_t fSize;
void* fData;
FreeFn* fFreeFunction;
void* fHint;
std::pmr::memory_resource* fPmr;
}; /* class Message */
} // namespace fair::mq::ofi
#endif /* FAIR_MQ_OFI_MESSAGE_H */

680
fairmq/ofi/Socket.cxx Normal file
View File

@@ -0,0 +1,680 @@
/********************************************************************************
* Copyright (C) 2018-2022 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <fairmq/ofi/ControlMessages.h>
#include <fairmq/ofi/Socket.h>
#include <fairmq/ofi/TransportFactory.h>
#include <fairmq/tools/Strings.h>
#include <fairlogger/Logger.h>
#include <asiofi.hpp>
#include <asio/buffer.hpp>
#include <asio/dispatch.hpp>
#include <asio/post.hpp>
#include <chrono>
#include <cstring>
#include <functional>
#include <memory>
#include <sstream>
#include <thread>
#include <mutex>
#include <queue>
namespace fair::mq::ofi
{
using namespace std;
Socket::Socket(Context& context, const string& type, const string& name, const string& id /*= ""*/)
: fContext(context)
, fOfiInfo(nullptr)
, fOfiFabric(nullptr)
, fOfiDomain(nullptr)
, fPassiveEndpoint(nullptr)
, fDataEndpoint(nullptr)
, fControlEndpoint(nullptr)
, fId(id + "." + name + "." + type)
, fBytesTx(0)
, fBytesRx(0)
, fMessagesTx(0)
, fMessagesRx(0)
, fMultiPartRecvCounter(-1)
, fSendPushSem(fContext.GetIoContext(), 384)
, fSendPopSem(fContext.GetIoContext(), 0)
, fRecvPushSem(fContext.GetIoContext(), 384)
, fRecvPopSem(fContext.GetIoContext(), 0)
, fNeedOfiMemoryRegistration(false)
{
if (type != "pair") {
throw SocketError{tools::ToString("Socket type '", type, "' not implemented for ofi transport.")};
}
}
auto Socket::InitOfi(Address addr) -> void
{
if (!fOfiInfo) {
assert(!fOfiFabric);
assert(!fOfiDomain);
asiofi::hints hints;
if (addr.Protocol == "tcp") {
hints.set_provider("sockets");
} else if (addr.Protocol == "verbs") {
hints.set_provider("verbs");
}
if (fRemoteAddr == addr) {
fOfiInfo = make_unique<asiofi::info>(addr.Ip.c_str(), std::to_string(addr.Port).c_str(), 0, hints);
} else {
fOfiInfo = make_unique<asiofi::info>(addr.Ip.c_str(), std::to_string(addr.Port).c_str(), FI_SOURCE, hints);
}
LOG(debug) << "OFI transport (" << fId << "): " << *fOfiInfo;
fOfiFabric = make_unique<asiofi::fabric>(*fOfiInfo);
fOfiDomain = make_unique<asiofi::domain>(*fOfiFabric);
}
}
auto Socket::Bind(const string& addr) -> bool
try {
fLocalAddr = Context::VerifyAddress(addr);
if (fLocalAddr.Protocol == "verbs") {
fNeedOfiMemoryRegistration = true;
}
InitOfi(fLocalAddr);
fPassiveEndpoint = make_unique<asiofi::passive_endpoint>(fContext.GetIoContext(), *fOfiFabric);
//fPassiveEndpoint->set_local_address(Context::ConvertAddress(fLocalAddr));
BindControlEndpoint();
return true;
}
// TODO catch the correct ofi error
catch (const SilentSocketError& e)
{
// do not print error in this case, this is handled by fair::mq::Device
// in case no connection could be established after trying a number of random ports from a range.
return false;
}
catch (const std::exception& e)
{
LOG(error) << "OFI transport: " << e.what();
return false;
}
catch (...)
{
LOG(error) << "OFI transport: Unknown exception in ofi::Socket::Bind";
return false;
}
auto Socket::BindControlEndpoint() -> void
{
assert(!fControlEndpoint);
fPassiveEndpoint->listen([&](asiofi::info&& info) {
LOG(debug) << "OFI transport (" << fId
<< "): control band connection request received. Accepting ...";
fControlEndpoint = make_unique<asiofi::connected_endpoint>(
fContext.GetIoContext(), *fOfiDomain, info);
fControlEndpoint->enable();
fControlEndpoint->accept([&]() {
LOG(debug) << "OFI transport (" << fId << "): control band connection accepted.";
BindDataEndpoint();
});
});
LOG(debug) << "OFI transport (" << fId << "): control band bound to " << fLocalAddr;
}
auto Socket::BindDataEndpoint() -> void
{
assert(!fDataEndpoint);
fPassiveEndpoint->listen([&](asiofi::info&& info) {
LOG(debug) << "OFI transport (" << fId
<< "): data band connection request received. Accepting ...";
fDataEndpoint = make_unique<asiofi::connected_endpoint>(
fContext.GetIoContext(), *fOfiDomain, info);
fDataEndpoint->enable();
fDataEndpoint->accept([&]() {
LOG(debug) << "OFI transport (" << fId << "): data band connection accepted.";
if (fContext.GetSizeHint()) {
asio::post(fContext.GetIoContext(),
std::bind(&Socket::SendQueueReaderStatic, this));
asio::post(fContext.GetIoContext(),
std::bind(&Socket::RecvQueueReaderStatic, this));
} else {
asio::post(fContext.GetIoContext(),
std::bind(&Socket::SendQueueReader, this));
asio::post(fContext.GetIoContext(),
std::bind(&Socket::RecvControlQueueReader, this));
}
});
});
LOG(debug) << "OFI transport (" << fId << "): data band bound to " << fLocalAddr;
}
auto Socket::Connect(const string& address) -> bool
try {
fRemoteAddr = Context::VerifyAddress(address);
if (fRemoteAddr.Protocol == "verbs") {
fNeedOfiMemoryRegistration = true;
}
InitOfi(fRemoteAddr);
ConnectEndpoint(fControlEndpoint, Band::Control);
ConnectEndpoint(fDataEndpoint, Band::Data);
if (fContext.GetSizeHint()) {
asio::post(fContext.GetIoContext(), std::bind(&Socket::SendQueueReaderStatic, this));
asio::post(fContext.GetIoContext(), std::bind(&Socket::RecvQueueReaderStatic, this));
} else {
asio::post(fContext.GetIoContext(), std::bind(&Socket::SendQueueReader, this));
asio::post(fContext.GetIoContext(), std::bind(&Socket::RecvControlQueueReader, this));
}
return true;
}
catch (const SilentSocketError& e)
{
// do not print error in this case, this is handled by fair::mq::Device
return false;
}
catch (const std::exception& e)
{
LOG(error) << "OFI transport: " << e.what();
return false;
}
catch (...)
{
LOG(error) << "OFI transport: Unknown exception in ofi::Socket::Connect";
return false;
}
auto Socket::ConnectEndpoint(std::unique_ptr<asiofi::connected_endpoint>& endpoint, Band type) -> void
{
assert(!endpoint);
std::string band(type == Band::Control ? "control" : "data");
endpoint = make_unique<asiofi::connected_endpoint>(fContext.GetIoContext(), *fOfiDomain);
endpoint->enable();
LOG(debug) << "OFI transport (" << fId << "): Sending " << band << " band connection request to " << fRemoteAddr;
std::mutex mtx;
std::condition_variable cv;
bool notified(false), connected(false);
while (true) {
endpoint->connect(Context::ConvertAddress(fRemoteAddr), [&, band](asiofi::eq::event event) {
// LOG(debug) << "OFI transport (" << fId << "): " << band << " band conn event happened";
std::unique_lock<std::mutex> lk2(mtx);
notified = true;
if (event == asiofi::eq::event::connected) {
LOG(debug) << "OFI transport (" << fId << "): " << band << " band connected.";
connected = true;
} else {
// LOG(debug) << "OFI transport (" << fId << "): " << band << " band connection refused. Trying again.";
}
lk2.unlock();
cv.notify_one();
});
{
std::unique_lock<std::mutex> lk(mtx);
cv.wait(lk, [&] { return notified; });
if (connected) {
break;
} else {
notified = false;
lk.unlock();
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
}
}
auto Socket::Send(MessagePtr& msg, int /*timeout*/) -> int64_t
{
// timeout argument not yet implemented
std::vector<MessagePtr> msgVec;
msgVec.reserve(1);
msgVec.emplace_back(std::move(msg));
return Send(msgVec);
}
auto Socket::Send(std::vector<MessagePtr>& msgVec, int /*timeout*/) -> int64_t
try {
// timeout argument not yet implemented
int size(0);
for (auto& msg : msgVec) {
size += msg->GetSize();
}
fSendPushSem.wait();
{
std::lock_guard<std::mutex> lk(fSendQueueMutex);
fSendQueue.emplace(std::move(msgVec));
}
fSendPopSem.signal();
return size;
} catch (const std::exception& e) {
LOG(error) << e.what();
return static_cast<int64_t>(TransferCode::error);
}
auto Socket::SendQueueReader() -> void
{
fSendPopSem.async_wait([&] {
// Read msg from send queue
std::unique_lock<std::mutex> lk(fSendQueueMutex);
std::vector<MessagePtr> msgVec(std::move(fSendQueue.front()));
fSendQueue.pop();
lk.unlock();
bool postMultiPartStartBuffer = msgVec.size() > 1;
for (auto& msg : msgVec) {
// Create control message
ofi::unique_ptr<ControlMessage> ctrl(nullptr);
if (postMultiPartStartBuffer) {
postMultiPartStartBuffer = false;
ctrl = MakeControlMessageWithPmr<PostMultiPartStartBuffer>(fControlMemPool);
ctrl->msg.postMultiPartStartBuffer.numParts = msgVec.size();
ctrl->msg.postMultiPartStartBuffer.size = msg->GetSize();
} else {
ctrl = MakeControlMessageWithPmr<PostBuffer>(fControlMemPool);
ctrl->msg.postBuffer.size = msg->GetSize();
}
// Send control message
asio::mutable_buffer ctrlMsg(ctrl.get(), sizeof(ControlMessage));
if (fNeedOfiMemoryRegistration) {
asiofi::memory_region mr(*fOfiDomain, ctrlMsg, asiofi::mr::access::send);
auto desc = mr.desc();
fControlEndpoint->send(ctrlMsg,
desc,
[&, ctrl2 = std::move(ctrlMsg), mr2 = std::move(mr)](
asio::mutable_buffer) mutable {});
} else {
fControlEndpoint->send(
ctrlMsg, [&, ctrl2 = std::move(ctrl)](asio::mutable_buffer) mutable {});
}
// Send data message
const auto size = msg->GetSize();
if (size) {
asio::mutable_buffer buffer(msg->GetData(), size);
if (fNeedOfiMemoryRegistration) {
asiofi::memory_region mr(*fOfiDomain, buffer, asiofi::mr::access::send);
auto desc = mr.desc();
fDataEndpoint->send(buffer,
desc,
[&, size, msg2 = std::move(msg), mr2 = std::move(mr)](
asio::mutable_buffer) mutable {
fBytesTx += size;
fMessagesTx++;
fSendPushSem.signal();
});
} else {
fDataEndpoint->send(
buffer, [&, size, msg2 = std::move(msg)](asio::mutable_buffer) mutable {
fBytesTx += size;
fMessagesTx++;
fSendPushSem.signal();
});
}
} else {
++fMessagesTx;
fSendPushSem.signal();
}
}
asio::dispatch(fContext.GetIoContext(), std::bind(&Socket::SendQueueReader, this));
});
}
auto Socket::SendQueueReaderStatic() -> void
{
fSendPopSem.async_wait([&] {
// Read msg from send queue
std::unique_lock<std::mutex> lk(fSendQueueMutex);
std::vector<MessagePtr> msgVec(std::move(fSendQueue.front()));
fSendQueue.pop();
lk.unlock();
bool postMultiPartStartBuffer = msgVec.size() > 1;
if (postMultiPartStartBuffer) {
throw SocketError{tools::ToString("Multipart API not supported in static size mode.")};
}
MessagePtr& msg = msgVec[0];
// Send data message
const auto size = msg->GetSize();
if (size) {
asio::mutable_buffer buffer(msg->GetData(), size);
if (fNeedOfiMemoryRegistration) {
asiofi::memory_region mr(*fOfiDomain, buffer, asiofi::mr::access::send);
auto desc = mr.desc();
fDataEndpoint->send(buffer,
desc,
[&, size, msg2 = std::move(msg), mr2 = std::move(mr)](
asio::mutable_buffer) mutable {
fBytesTx += size;
fMessagesTx++;
fSendPushSem.signal();
});
} else {
fDataEndpoint->send(
buffer, [&, size, msg2 = std::move(msg)](asio::mutable_buffer) mutable {
fBytesTx += size;
fMessagesTx++;
fSendPushSem.signal();
});
}
} else {
++fMessagesTx;
fSendPushSem.signal();
}
asio::dispatch(fContext.GetIoContext(), std::bind(&Socket::SendQueueReaderStatic, this));
});
}
auto Socket::Receive(MessagePtr& msg, int /*timeout*/) -> int64_t
try {
// timeout argument not yet implemented
fRecvPopSem.wait();
{
std::lock_guard<std::mutex> lk(fRecvQueueMutex);
msg = std::move(fRecvQueue.front().front());
fRecvQueue.pop();
}
fRecvPushSem.signal();
int size(msg->GetSize());
fBytesRx += size;
++fMessagesRx;
return size;
} catch (const std::exception& e) {
LOG(error) << e.what();
return static_cast<int>(TransferCode::error);
}
auto Socket::Receive(std::vector<MessagePtr>& msgVec, int /*timeout*/) -> int64_t
try {
// timeout argument not yet implemented
fRecvPopSem.wait();
{
std::lock_guard<std::mutex> lk(fRecvQueueMutex);
msgVec = std::move(fRecvQueue.front());
fRecvQueue.pop();
}
fRecvPushSem.signal();
int64_t size(0);
for (auto& msg : msgVec) {
size += msg->GetSize();
}
fBytesRx += size;
++fMessagesRx;
return size;
} catch (const std::exception& e) {
LOG(error) << e.what();
return static_cast<int64_t>(TransferCode::error);
}
auto Socket::RecvControlQueueReader() -> void
{
fRecvPushSem.async_wait([&] {
// Receive control message
ofi::unique_ptr<ControlMessage> ctrl(MakeControlMessageWithPmr<Empty>(fControlMemPool));
asio::mutable_buffer ctrlMsg(ctrl.get(), sizeof(ControlMessage));
if (fNeedOfiMemoryRegistration) {
asiofi::memory_region mr(*fOfiDomain, ctrlMsg, asiofi::mr::access::recv);
auto desc = mr.desc();
fControlEndpoint->recv(
ctrlMsg,
desc,
[&, ctrl2 = std::move(ctrl), mr2 = std::move(mr)](
asio::mutable_buffer) mutable { OnRecvControl(std::move(ctrl2)); });
} else {
fControlEndpoint->recv(
ctrlMsg, [&, ctrl2 = std::move(ctrl)](asio::mutable_buffer) mutable {
OnRecvControl(std::move(ctrl2));
});
}
});
}
auto Socket::OnRecvControl(ofi::unique_ptr<ControlMessage> ctrl) -> void
{
// Check control message type
auto size(0);
if (ctrl->type == ControlMessageType::PostMultiPartStartBuffer) {
size = ctrl->msg.postMultiPartStartBuffer.size;
if (fMultiPartRecvCounter == -1) {
fMultiPartRecvCounter = ctrl->msg.postMultiPartStartBuffer.numParts;
assert(fInflightMultiPartMessage.empty());
fInflightMultiPartMessage.reserve(ctrl->msg.postMultiPartStartBuffer.numParts);
} else {
throw SocketError{tools::ToString(
"OFI transport: Received control start of new multi part message without completed "
"reception of previous multi part message. Number of parts missing: ",
fMultiPartRecvCounter)};
}
} else if (ctrl->type == ControlMessageType::PostBuffer) {
size = ctrl->msg.postBuffer.size;
} else {
throw SocketError{tools::ToString("OFI transport: Unknown control message type: '",
static_cast<int>(ctrl->type))};
}
// Receive data
auto msg = fContext.MakeReceiveMessage(size);
if (size) {
asio::mutable_buffer buffer(msg->GetData(), size);
if (fNeedOfiMemoryRegistration) {
asiofi::memory_region mr(*fOfiDomain, buffer, asiofi::mr::access::recv);
auto desc = mr.desc();
fDataEndpoint->recv(
buffer,
desc,
[&, msg2 = std::move(msg), mr2 = std::move(mr)](
asio::mutable_buffer) mutable { DataMessageReceived(std::move(msg2)); });
} else {
fDataEndpoint->recv(buffer,
[&, msg2 = std::move(msg)](asio::mutable_buffer) mutable {
DataMessageReceived(std::move(msg2));
});
}
} else {
DataMessageReceived(std::move(msg));
}
asio::dispatch(fContext.GetIoContext(),
std::bind(&Socket::RecvControlQueueReader, this));
}
auto Socket::RecvQueueReaderStatic() -> void
{
fRecvPushSem.async_wait([&] {
static size_t size = fContext.GetSizeHint();
// Receive data
auto msg = fContext.MakeReceiveMessage(size);
if (size) {
asio::mutable_buffer buffer(msg->GetData(), size);
if (fNeedOfiMemoryRegistration) {
asiofi::memory_region mr(*fOfiDomain, buffer, asiofi::mr::access::recv);
auto desc = mr.desc();
fDataEndpoint->recv(buffer,
desc,
[&, msg2 = std::move(msg), mr2 = std::move(mr)](
asio::mutable_buffer) mutable {
DataMessageReceived(std::move(msg2));
});
} else {
fDataEndpoint->recv(
buffer, [&, msg2 = std::move(msg)](asio::mutable_buffer) mutable {
DataMessageReceived(std::move(msg2));
});
}
} else {
DataMessageReceived(std::move(msg));
}
asio::dispatch(fContext.GetIoContext(),
std::bind(&Socket::RecvQueueReaderStatic, this));
});
}
auto Socket::DataMessageReceived(MessagePtr msg) -> void
{
if (fMultiPartRecvCounter > 0) {
--fMultiPartRecvCounter;
fInflightMultiPartMessage.push_back(std::move(msg));
}
if (fMultiPartRecvCounter == 0) {
std::unique_lock<std::mutex> lk(fRecvQueueMutex);
fRecvQueue.push(std::move(fInflightMultiPartMessage));
lk.unlock();
fMultiPartRecvCounter = -1;
fRecvPopSem.signal();
} else if (fMultiPartRecvCounter == -1) {
std::vector<MessagePtr> msgVec;
msgVec.push_back(std::move(msg));
std::unique_lock<std::mutex> lk(fRecvQueueMutex);
fRecvQueue.push(std::move(msgVec));
lk.unlock();
fRecvPopSem.signal();
}
}
auto Socket::Close() -> void {}
auto Socket::SetOption(const string& /*option*/, const void* /*value*/, size_t /*valueSize*/) -> void
{
// if (zmq_setsockopt(fControlSocket, GetConstant(option), value, valueSize) < 0) {
// throw SocketError{tools::ToString("Failed setting socket option, reason: ", zmq_strerror(errno))};
// }
}
auto Socket::GetOption(const string& /*option*/, void* /*value*/, size_t* /*valueSize*/) -> void
{
// if (zmq_getsockopt(fControlSocket, GetConstant(option), value, valueSize) < 0) {
// throw SocketError{tools::ToString("Failed getting socket option, reason: ", zmq_strerror(errno))};
// }
}
void Socket::SetLinger(int /*value*/)
{
LOG(debug) << "OFI transport (" << fId << "): Not yet implemented.";
}
int Socket::GetLinger() const
{
LOG(debug) << "OFI transport (" << fId << "): Not yet implemented.";
return 0;
}
void Socket::SetSndBufSize(int /*value*/)
{
LOG(debug) << "OFI transport (" << fId << "): Not yet implemented.";
}
int Socket::GetSndBufSize() const
{
LOG(debug) << "OFI transport (" << fId << "): Not yet implemented.";
return 0;
}
void Socket::SetRcvBufSize(int /*value*/)
{
LOG(debug) << "OFI transport (" << fId << "): Not yet implemented.";
}
int Socket::GetRcvBufSize() const
{
LOG(debug) << "OFI transport (" << fId << "): Not yet implemented.";
return 0;
}
void Socket::SetSndKernelSize(int /*value*/)
{
LOG(debug) << "OFI transport (" << fId << "): Not yet implemented.";
}
int Socket::GetSndKernelSize() const
{
LOG(debug) << "OFI transport (" << fId << "): Not yet implemented.";
return 0;
}
void Socket::SetRcvKernelSize(int /*value*/)
{
LOG(debug) << "OFI transport (" << fId << "): Not yet implemented.";
}
int Socket::GetRcvKernelSize() const
{
LOG(debug) << "OFI transport (" << fId << "): Not yet implemented.";
return 0;
}
auto Socket::GetConstant(const string& /*constant*/) -> int
{
LOG(debug) << "OFI transport: Not yet implemented.";
return -1;
}
Socket::~Socket()
{
try {
Close(); // NOLINT(clang-analyzer-optin.cplusplus.VirtualCall)
} catch (SocketError& e) {
LOG(error) << e.what();
}
}
} // namespace fair::mq::ofi

125
fairmq/ofi/Socket.h Normal file
View File

@@ -0,0 +1,125 @@
/********************************************************************************
* Copyright (C) 2018-2022 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#ifndef FAIR_MQ_OFI_SOCKET_H
#define FAIR_MQ_OFI_SOCKET_H
#include <fairmq/Message.h>
#include <fairmq/Socket.h>
#include <fairmq/ofi/Context.h>
#include <fairmq/ofi/ControlMessages.h>
#include <asiofi/connected_endpoint.hpp>
#include <asiofi/memory_resources.hpp>
#include <asiofi/passive_endpoint.hpp>
#include <asiofi/semaphore.hpp>
#include <memory> // unique_ptr
#include <mutex>
namespace fair::mq::ofi
{
/**
* @class Socket Socket.h <fairmq/ofi/Socket.h>
* @brief
*
* @todo TODO insert long description
*/
class Socket final : public fair::mq::Socket
{
public:
Socket(Context& context, const std::string& type, const std::string& name, const std::string& id = "");
Socket(const Socket&) = delete;
Socket(Socket&&) = delete;
Socket& operator=(const Socket&) = delete;
Socket& operator=(Socket&&) = delete;
auto GetId() const -> std::string override { return fId; }
auto Events(uint32_t *events) -> int override { *events = 0; return -1; }
auto Bind(const std::string& address) -> bool override;
auto Connect(const std::string& address) -> bool override;
auto Send(MessagePtr& msg, int timeout = 0) -> int64_t override;
auto Receive(MessagePtr& msg, int timeout = 0) -> int64_t override;
auto Send(std::vector<MessagePtr>& msgVec, int timeout = 0) -> int64_t override;
auto Receive(std::vector<MessagePtr>& msgVec, int timeout = 0) -> int64_t override;
auto GetSocket() const -> void* { return nullptr; }
void SetLinger(int value) override;
int GetLinger() const override;
void SetSndBufSize(int value) override;
int GetSndBufSize() const override;
void SetRcvBufSize(int value) override;
int GetRcvBufSize() const override;
void SetSndKernelSize(int value) override;
int GetSndKernelSize() const override;
void SetRcvKernelSize(int value) override;
int GetRcvKernelSize() const override;
auto Close() -> void override;
auto SetOption(const std::string& option, const void* value, size_t valueSize) -> void override;
auto GetOption(const std::string& option, void* value, size_t* valueSize) -> void override;
auto GetBytesTx() const -> unsigned long override { return fBytesTx; }
auto GetBytesRx() const -> unsigned long override { return fBytesRx; }
auto GetMessagesTx() const -> unsigned long override { return fMessagesTx; }
auto GetMessagesRx() const -> unsigned long override { return fMessagesRx; }
auto GetNumberOfConnectedPeers() const -> unsigned long override
{
throw SocketError("not yet implemented");
}
static auto GetConstant(const std::string& constant) -> int;
~Socket() override;
private:
Context& fContext;
asiofi::allocated_pool_resource fControlMemPool;
std::unique_ptr<asiofi::info> fOfiInfo;
std::unique_ptr<asiofi::fabric> fOfiFabric;
std::unique_ptr<asiofi::domain> fOfiDomain;
std::unique_ptr<asiofi::passive_endpoint> fPassiveEndpoint;
std::unique_ptr<asiofi::connected_endpoint> fDataEndpoint, fControlEndpoint;
std::string fId;
std::atomic<unsigned long> fBytesTx;
std::atomic<unsigned long> fBytesRx;
std::atomic<unsigned long> fMessagesTx;
std::atomic<unsigned long> fMessagesRx;
Address fRemoteAddr;
Address fLocalAddr;
std::mutex fSendQueueMutex, fRecvQueueMutex;
std::queue<std::vector<MessagePtr>> fSendQueue, fRecvQueue;
std::vector<MessagePtr> fInflightMultiPartMessage;
int64_t fMultiPartRecvCounter;
asiofi::synchronized_semaphore fSendPushSem, fSendPopSem, fRecvPushSem, fRecvPopSem;
std::atomic<bool> fNeedOfiMemoryRegistration;
auto InitOfi(Address addr) -> void;
auto BindControlEndpoint() -> void;
auto BindDataEndpoint() -> void;
enum class Band { Control, Data };
auto ConnectEndpoint(std::unique_ptr<asiofi::connected_endpoint>& endpoint, Band type) -> void;
auto SendQueueReader() -> void;
auto SendQueueReaderStatic() -> void;
auto RecvControlQueueReader() -> void;
auto RecvQueueReaderStatic() -> void;
auto OnRecvControl(ofi::unique_ptr<ControlMessage> ctrl) -> void;
auto DataMessageReceived(MessagePtr msg) -> void;
}; /* class Socket */
struct SilentSocketError : SocketError { using SocketError::SocketError; };
} // namespace fair::mq::ofi
#endif /* FAIR_MQ_OFI_SOCKET_H */

View File

@@ -0,0 +1,218 @@
/********************************************************************************
* Copyright (C) 2018-2022 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#ifndef FAIR_MQ_OFI_TRANSPORTFACTORY_H
#define FAIR_MQ_OFI_TRANSPORTFACTORY_H
#include <asiofi.hpp>
#include <cstddef>
#include <cstdint>
#include <fairlogger/Logger.h>
#include <fairmq/Channel.h>
#include <fairmq/Message.h>
#include <fairmq/Poller.h>
#include <fairmq/ProgOptions.h>
#include <fairmq/Socket.h>
#include <fairmq/TransportFactory.h>
#include <fairmq/Transports.h>
#include <fairmq/ofi/Context.h>
#include <fairmq/ofi/Message.h>
#include <fairmq/ofi/Socket.h>
#include <memory>
#include <stdexcept>
#include <string>
#include <unordered_map>
#include <vector>
namespace fair::mq::ofi {
/**
* @class TransportFactory TransportFactory.h <fairmq/ofi/TransportFactory.h>
* @brief FairMQ transport factory for the ofi transport
*
* @todo TODO insert long description
*/
struct TransportFactory final : mq::TransportFactory
{
TransportFactory(std::string const& id = "", ProgOptions const* config = nullptr)
: mq::TransportFactory(id)
, fContext(*this, *this, 1)
{
try {
LOG(debug) << "OFI transport: asiofi (" << fContext.GetAsiofiVersion() << ")";
if (config) {
fContext.SetSizeHint(config->GetProperty<size_t>("ofi-size-hint", 0));
}
} catch (ContextError& e) {
throw TransportFactoryError(e.what());
}
}
TransportFactory(const TransportFactory&) = delete;
TransportFactory(TransportFactory&&) = delete;
TransportFactory& operator=(const TransportFactory&) = delete;
TransportFactory& operator=(TransportFactory&&) = delete;
~TransportFactory() override = default;
auto CreateMessage() -> std::unique_ptr<mq::Message> override
{
return std::make_unique<Message>(&fMemoryResource);
}
auto CreateMessage(Alignment /*alignment*/) -> std::unique_ptr<mq::Message> override
{
// TODO Do not ignore alignment
return std::make_unique<Message>(&fMemoryResource);
}
auto CreateMessage(std::size_t size) -> std::unique_ptr<mq::Message> override
{
return std::make_unique<Message>(&fMemoryResource, size);
}
auto CreateMessage(std::size_t size, Alignment /*alignment*/)
-> std::unique_ptr<mq::Message> override
{
// TODO Do not ignore alignment
return std::make_unique<Message>(&fMemoryResource, size);
}
auto CreateMessage(void* data, std::size_t size, FreeFn* ffn, void* hint = nullptr)
-> std::unique_ptr<mq::Message> override
{
return std::make_unique<Message>(&fMemoryResource, data, size, ffn, hint);
}
auto CreateMessage(std::unique_ptr<mq::UnmanagedRegion>& region,
void* data,
std::size_t size,
void* hint = nullptr) -> std::unique_ptr<mq::Message> override
{
return std::make_unique<Message>(&fMemoryResource, region, data, size, hint);
}
auto CreateSocket(std::string const& type, std::string const& name)
-> std::unique_ptr<mq::Socket> override
{
return std::make_unique<Socket>(fContext, type, name, GetId());
}
auto CreatePoller(std::vector<mq::Channel> const& /*channels*/) const
-> std::unique_ptr<mq::Poller> override
{
throw std::runtime_error("Not yet implemented (Poller).");
}
auto CreatePoller(std::vector<mq::Channel*> const& /*channels*/) const
-> std::unique_ptr<mq::Poller> override
{
throw std::runtime_error("Not yet implemented (Poller).");
}
auto CreatePoller(
std::unordered_map<std::string, std::vector<Channel>> const& /*channelsMap*/,
std::vector<std::string> const& /*channelList*/) const
-> std::unique_ptr<mq::Poller> override
{
throw std::runtime_error("Not yet implemented (Poller).");
}
auto CreateUnmanagedRegion(std::size_t /*size*/,
RegionCallback /*callback = nullptr*/,
std::string const& /*path = ""*/,
int /*flags = 0*/,
RegionConfig /*cfg = RegionConfig()*/)
-> std::unique_ptr<mq::UnmanagedRegion> override
{
throw std::runtime_error("Not yet implemented UMR.");
}
auto CreateUnmanagedRegion(std::size_t /*size*/,
RegionBulkCallback /*callback = nullptr*/,
std::string const& /*path = ""*/,
int /*flags = 0*/,
RegionConfig /*cfg = RegionConfig()*/)
-> std::unique_ptr<mq::UnmanagedRegion> override
{
throw std::runtime_error("Not yet implemented UMR.");
}
auto CreateUnmanagedRegion(std::size_t /*size*/,
int64_t /*userFlags*/,
RegionCallback /*callback = nullptr*/,
std::string const& /*path = ""*/,
int /*flags = 0*/,
RegionConfig /*cfg = RegionConfig()*/)
-> std::unique_ptr<mq::UnmanagedRegion> override
{
throw std::runtime_error("Not yet implemented UMR.");
}
auto CreateUnmanagedRegion(std::size_t /*size*/,
int64_t /*userFlags*/,
RegionBulkCallback /*callback = nullptr*/,
std::string const& /*path = ""*/,
int /*flags = 0*/,
RegionConfig /*cfg = RegionConfig()*/)
-> std::unique_ptr<mq::UnmanagedRegion> override
{
throw std::runtime_error("Not yet implemented UMR.");
}
auto CreateUnmanagedRegion(std::size_t /*size*/,
RegionCallback /*callback*/,
RegionConfig /*cfg*/)
-> std::unique_ptr<mq::UnmanagedRegion> override
{
throw std::runtime_error("Not yet implemented UMR.");
}
auto CreateUnmanagedRegion(std::size_t /*size*/,
RegionBulkCallback /*callback*/,
RegionConfig /*cfg*/)
-> std::unique_ptr<mq::UnmanagedRegion> override
{
throw std::runtime_error("Not yet implemented UMR.");
}
auto SubscribeToRegionEvents(RegionEventCallback /*callback*/) -> void override
{
throw std::runtime_error("Not yet implemented.");
}
auto SubscribedToRegionEvents() -> bool override
{
throw std::runtime_error("Not yet implemented.");
}
auto UnsubscribeFromRegionEvents() -> void override
{
throw std::runtime_error("Not yet implemented.");
}
auto GetRegionInfo() -> std::vector<RegionInfo> override
{
LOG(error) << "GetRegionInfo not yet implemented for OFI, returning empty vector";
return std::vector<RegionInfo>();
}
auto GetType() const -> Transport override { return Transport::OFI; }
void Interrupt() override { fContext.Interrupt(); }
void Resume() override { fContext.Resume(); }
void Reset() override { fContext.Reset(); }
private:
mutable Context fContext;
asiofi::allocated_pool_resource fMemoryResource;
}; /* class TransportFactory */
} // namespace fair::mq::ofi
#endif /* FAIR_MQ_OFI_TRANSPORTFACTORY_H */

Some files were not shown because too many files have changed in this diff Show More