Compare commits

...

56 Commits

Author SHA1 Message Date
Alexey Rybalchenko
f3bc9e05a8 shmmonitor: update docs 2021-04-08 10:11:23 +02:00
Alexey Rybalchenko
5facc441b8 shmmonitor: add --list-all 2021-04-08 10:11:23 +02:00
Alexey Rybalchenko
2602f53585 Add tools: StrStartsWith, StrEndsWith 2021-04-08 10:11:23 +02:00
Alexey Rybalchenko
0976465338 shm: reduce delay between monitor daemon launch & HBs 2021-04-08 10:11:23 +02:00
Alexey Rybalchenko
9144258b89 shmmonitor: daemon output to file if FAIRMQ_SHMMONITOR_VERBOSE is true 2021-04-08 10:11:23 +02:00
Alexey Rybalchenko
be55565617 shmmonitor: use fairlogger 2021-04-08 10:11:23 +02:00
Alexey Rybalchenko
d7e2fbecea shmmonitor: refactor to separate monitoring from output 2021-04-08 10:11:23 +02:00
Alexey Rybalchenko
72175e5757 shmmonitor: optimize startup to avoid repeated start 2021-04-08 10:11:23 +02:00
Alexey Rybalchenko
effba534f0 shmmonitor: add session name and creator id to the output 2021-04-08 10:11:23 +02:00
Alexey Rybalchenko
efd42075a9 shmmonitor: enable read only access 2021-04-08 10:11:23 +02:00
Alexey Rybalchenko
5228407932 shmmonitor: distinguish daemon from monitor mode (orthogonal) 2021-04-08 10:11:23 +02:00
Alexey Rybalchenko
30e81d58f8 shmmonitor: allow getting shmids based on session/userid 2021-04-08 10:11:23 +02:00
Dennis Klein
2c7c46f2fd Remove codacy badge 2021-03-26 10:06:10 +01:00
Dennis Klein
0a5122bb24 Remove codecov badge 2021-03-26 10:06:10 +01:00
Dennis Klein
fc49687879 builtin devices: Reorganize 2021-03-26 10:06:10 +01:00
Dennis Klein
66a4df0667 fairmq-uuid-gen: Move to tools directory 2021-03-26 10:06:10 +01:00
Dennis Klein
978191fa6c Introduce <fairmq/runDevice.h> 2021-03-26 10:06:10 +01:00
Dennis Klein
cef6d0afcd Introduce <fairmq/Device.h> 2021-03-26 10:06:10 +01:00
Dennis Klein
47ec550792 control plugin: Move to subdirectory for consistency 2021-03-26 10:06:10 +01:00
Dennis Klein
b4aeb320e5 CI: Collect DDS logs on error 2021-03-26 10:06:10 +01:00
Dennis Klein
107248be0a Reorganize includes for consistency 2021-03-26 10:06:10 +01:00
Dennis Klein
68ceaba501 CI: Filter and process warnings and errors 2021-03-26 10:06:10 +01:00
Dennis Klein
4b6cf8b181 Fix aggregate initialization issue before C++20
Use value initialization to prevent

error: temporary of type ... has protected destructor

see https://stackoverflow.com/a/56745475
2021-03-26 10:06:10 +01:00
Dennis Klein
21d6cf9830 CI: Run clang-tidy 2021-03-26 10:06:10 +01:00
Alexey Rybalchenko
bffe74c5cf shm: handle shrink failure gracefully 2021-03-23 13:00:35 +01:00
Dennis Klein
72f319e276 CI: Adapt to new alfaci build hosts 2021-03-23 11:06:33 +01:00
Alexey Rybalchenko
62438bd99e shm: Improve error message when segment cannot be opened 2021-03-18 09:02:08 +01:00
Alexey Rybalchenko
c8ad684b18 Add --shm-no-cleanup option
When true, device will skip the segment cleanup even when it is the last
segment user
2021-03-18 09:02:08 +01:00
Alexey Rybalchenko
a5ec83208d Update docs 2021-03-11 12:14:00 +01:00
Alexey Rybalchenko
fc2241ece7 Fix incorrect channel index passed to OnData callback 2021-03-11 12:14:00 +01:00
Alexey Rybalchenko
1f26883b75 Formatting 2021-03-11 12:14:00 +01:00
Alexey Rybalchenko
edbdc57332 shm: make shmId also available as uint64_t 2021-03-11 12:14:00 +01:00
Alexey Rybalchenko
0fd2fcadc2 shm: Make sure no event notifications are missed 2021-03-11 12:14:00 +01:00
Alexey Rybalchenko
9b48b31a75 shm: Make sure all region events are fired 2021-03-11 12:14:00 +01:00
Dennis Klein
cb4335e59f Add test coverage for --channel-config name selector 2021-03-05 02:02:14 +01:00
Giulio Eulisse
ce4584b3d8 Provide a better syntax for --channel-config
The current syntax is ambiguous because it treats assignments
(like address=127.0.0.1) and selectors (name=my-channel) using
the symbol equal `"`.

This allows:

my-channel:address=127.0.0.1

as alternative syntax, which clearly separates the role of my-channel
from the associated properties.
2021-03-05 02:02:14 +01:00
Alexey Rybalchenko
bbc1dd4600 Add optional file output to FairMQSink 2021-03-01 15:33:45 +01:00
Dennis Klein
8327810942 Warn on unknown --channel-config args 2021-03-01 08:37:57 +01:00
Dennis Klein
c37742e3b4 Update Copyright string 2021-03-01 08:37:57 +01:00
Alexey Rybalchenko
93dff3c5a7 Fix regression in shmmonitor 2021-02-19 09:54:29 +01:00
Alexey Rybalchenko
2b3e38d9a4 shmmonitor: non-interactive mode checks and quits 2021-02-10 10:36:08 +01:00
Alexey Rybalchenko
c6b13cd3a1 Fix shmem::Message::SetUsedSize(0) 2021-01-25 13:46:40 +01:00
Alexey Rybalchenko
c5487a11ed Remove custom implementation for enum hashing 2021-01-25 13:46:40 +01:00
Alexey Rybalchenko
4a09154a91 17-ify namespaces 2021-01-25 13:46:40 +01:00
Alexey Rybalchenko
d9a5e82160 Cleanup tools includes 2021-01-25 13:46:40 +01:00
Alexey Rybalchenko
751c53171c Replace tools::make_unique with std::make_unique 2021-01-25 13:46:40 +01:00
Alexey Rybalchenko
6815c9c172 zmq: implement alignment 2021-01-13 12:36:32 +01:00
Alexey Rybalchenko
02a3980343 Remove useless parameter from implementation 2021-01-13 12:36:32 +01:00
Alexey Rybalchenko
38b34785e0 format & remove unused variable 2021-01-13 12:36:32 +01:00
Alexey Rybalchenko
be94ceb7a7 zmq: simplify SetUsedSize implementation 2021-01-13 12:36:32 +01:00
Alexey Rybalchenko
afadbb53e4 zmq: correct accounting for msg size > 2GB 2021-01-13 12:36:32 +01:00
Alexey Rybalchenko
749d28a3b5 DeviceRunner: check FAIRMQ_SEVERITY env var for severity 2021-01-13 12:36:32 +01:00
Alexey Rybalchenko
29f45fa77d Rename TransferResult to TransferCode 2021-01-13 12:36:32 +01:00
Alexey Rybalchenko
ea746b17d0 Remove deprecated methods 2021-01-13 12:36:32 +01:00
Alexey Rybalchenko
636846fcdb Bump C++ standard requirement to C++17 2021-01-13 12:36:32 +01:00
Alexey Rybalchenko
f46d446d52 shm: do mlock/zeroing under shmem lock 2020-12-04 13:27:45 +01:00
188 changed files with 2065 additions and 2303 deletions

View File

@@ -1,3 +1,3 @@
---
Checks: '*,-google-*,-fuchsia-*,-cert-*,-llvm-header-guard,-readability-named-parameter,-misc-non-private-member-variables-in-classes,-*-magic-numbers,-llvm-include-order,-hicpp-no-array-decay,-performance-unnecessary-value-param,-cppcoreguidelines-pro-bounds-array-to-pointer-decay,-modernize-use-trailing-return-type,-readability-redundant-member-init'
Checks: 'cppcoreguidelines-*,misc-unused-alias-decls,misc-unused-parameters,modernize-deprecated-headers,modernize-raw-string-literal,modernize-redundant-void-arg,modernize-use-bool-literals,modernize-use-default-member-init,modernize-use-emplace,modernize-use-equals-default,modernize-use-equals-delete,modernize-use-noexcept,modernize-use-nullptr,modernize-use-override,modernize-use-using,performance-faster-string-find,performance-for-range-copy,performance-unnecessary-copy-initialization,readability-avoid-const-params-in-decls,readability-braces-around-statements,readability-container-size-empty,readability-delete-null-pointer,readability-redundant-member-init,readability-redundant-string-init,readability-static-accessed-through-instance,readability-string-compare'
HeaderFilterRegex: '/(fairmq/)'

View File

@@ -18,11 +18,7 @@ get_git_version()
project(FairMQ VERSION ${PROJECT_VERSION} LANGUAGES CXX)
message(STATUS "${BWhite}${PROJECT_NAME}${CR} ${PROJECT_GIT_VERSION} from ${PROJECT_DATE}")
if(BUILD_OFI_TRANSPORT OR BUILD_SDK OR BUILD_PMIX_PLUGIN)
set(PROJECT_MIN_CXX_STANDARD 14)
else()
set(PROJECT_MIN_CXX_STANDARD 11)
endif()
set(PROJECT_MIN_CXX_STANDARD 17)
set_fairmq_defaults()
@@ -262,7 +258,7 @@ install_cmake_package()
# Summary ######################################################################
message(STATUS " ")
message(STATUS " ${Cyan}CXX STANDARD${CR} ${BGreen}C++${CMAKE_CXX_STANDARD}${CR} (>= C++${PROJECT_MIN_CXX_STANDARD}, change with ${BMagenta}-DCMAKE_CXX_STANDARD=17${CR})")
message(STATUS " ${Cyan}CXX STANDARD${CR} ${BGreen}C++${CMAKE_CXX_STANDARD}${CR} (>= C++${PROJECT_MIN_CXX_STANDARD}, change with ${BMagenta}-DCMAKE_CXX_STANDARD=20${CR})")
if(CMAKE_CXX_FLAGS)
message(STATUS " ")
message(STATUS " ${Cyan}GLOBAL CXX FLAGS${CR} ${BGreen}${CMAKE_CXX_FLAGS}${CR}")
@@ -365,9 +361,9 @@ else()
endif()
message(STATUS " ${BWhite}tests${CR} ${tests_summary}")
if(BUILD_OFI_TRANSPORT)
set(ofi_summary "${BGreen}YES${CR} EXPERIMENTAL (requires C++14) (disable with ${BMagenta}-DBUILD_OFI_TRANSPORT=OFF${CR})")
set(ofi_summary "${BGreen}YES${CR} EXPERIMENTAL (disable with ${BMagenta}-DBUILD_OFI_TRANSPORT=OFF${CR})")
else()
set(ofi_summary "${BRed} NO${CR} EXPERIMENTAL (requires C++14) (default, enable with ${BMagenta}-DBUILD_OFI_TRANSPORT=ON${CR})")
set(ofi_summary "${BRed} NO${CR} EXPERIMENTAL (default, enable with ${BMagenta}-DBUILD_OFI_TRANSPORT=ON${CR})")
endif()
message(STATUS " ${BWhite}ofi_transport${CR} ${ofi_summary}")
if(BUILD_DDS_PLUGIN)
@@ -377,9 +373,9 @@ else()
endif()
message(STATUS " ${BWhite}dds_plugin${CR} ${dds_summary}")
if(BUILD_PMIX_PLUGIN)
set(pmix_summary "${BGreen}YES${CR} EXPERIMENTAL (requires C++14) (disable with ${BMagenta}-DBUILD_PMIX_PLUGIN=OFF${CR})")
set(pmix_summary "${BGreen}YES${CR} EXPERIMENTAL (disable with ${BMagenta}-DBUILD_PMIX_PLUGIN=OFF${CR})")
else()
set(pmix_summary "${BRed} NO${CR} EXPERIMENTAL (requires C++14) (default, enable with ${BMagenta}-DBUILD_PMIX_PLUGIN=ON${CR})")
set(pmix_summary "${BRed} NO${CR} EXPERIMENTAL (default, enable with ${BMagenta}-DBUILD_PMIX_PLUGIN=ON${CR})")
endif()
message(STATUS " ${BWhite}pmix_plugin${CR} ${pmix_summary}")
if(BUILD_EXAMPLES)
@@ -395,9 +391,9 @@ else()
endif()
message(STATUS " ${BWhite}docs${CR} ${docs_summary}")
if(BUILD_SDK)
set(sdk_summary "${BGreen}YES${CR} EXPERIMENTAL (required C++14) (disable with ${BMagenta}-DBUILD_SDK=OFF${CR})")
set(sdk_summary "${BGreen}YES${CR} EXPERIMENTAL (disable with ${BMagenta}-DBUILD_SDK=OFF${CR})")
else()
set(sdk_summary "${BRed} NO${CR} EXPERIMENTAL (required C++14) (default, enable with ${BMagenta}-DBUILD_SDK=ON${CR})")
set(sdk_summary "${BRed} NO${CR} EXPERIMENTAL (default, enable with ${BMagenta}-DBUILD_SDK=ON${CR})")
endif()
message(STATUS " ${BWhite}sdk${CR} ${sdk_summary}")
if(BUILD_SDK_COMMANDS)
@@ -432,9 +428,9 @@ message(STATUS " ")
message(STATUS " ${Cyan}RUN STATIC ANALYSIS ${static_ana_summary}")
message(STATUS " ")
if(FAIRMQ_DEBUG_MODE)
message(STATUS " ${Cyan}DEBUG_MODE${CR} ${BGreen}${FAIRMQ_DEBUG_MODE}${CR} (disable with ${BMagenta}-DFAIRMQ_DEBUG_MODE=OFF${CR})")
message(STATUS " ${Cyan}DEBUG MODE${CR} ${BGreen}${FAIRMQ_DEBUG_MODE}${CR} (disable with ${BMagenta}-DFAIRMQ_DEBUG_MODE=OFF${CR})")
else()
message(STATUS " ${Cyan}DEBUG_MODE${CR} ${BRed}${FAIRMQ_DEBUG_MODE}${CR} (enable with ${BMagenta}-DFAIRMQ_DEBUG_MODE=ON${CR})")
message(STATUS " ${Cyan}DEBUG MODE${CR} ${BRed}${FAIRMQ_DEBUG_MODE}${CR} (enable with ${BMagenta}-DFAIRMQ_DEBUG_MODE=ON${CR})")
endif()
message(STATUS " ")
################################################################################

View File

@@ -1,88 +1,65 @@
################################################################################
# Copyright (C) 2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
# Copyright (C) 2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
# #
# This software is distributed under the terms of the #
# GNU Lesser General Public Licence (LGPL) version 3, #
# copied verbatim in the file "LICENSE" #
################################################################################
Set(CTEST_SOURCE_DIRECTORY $ENV{SOURCEDIR})
Set(CTEST_BINARY_DIRECTORY $ENV{BUILDDIR})
Set(CTEST_SITE $ENV{SITE})
Set(CTEST_BUILD_NAME $ENV{LABEL})
Set(CTEST_CMAKE_GENERATOR "Unix Makefiles")
Set(CTEST_PROJECT_NAME "FairMQ")
Find_Program(CTEST_GIT_COMMAND NAMES git)
Set(CTEST_UPDATE_COMMAND "${CTEST_GIT_COMMAND}")
cmake_host_system_information(RESULT fqdn QUERY FQDN)
Set(BUILD_COMMAND "make")
Set(CTEST_BUILD_COMMAND "${BUILD_COMMAND} -j$ENV{number_of_processors}")
set(CTEST_SOURCE_DIRECTORY .)
set(CTEST_BINARY_DIRECTORY build)
set(CTEST_CMAKE_GENERATOR "Ninja")
set(CTEST_USE_LAUNCHERS ON)
set(CTEST_CONFIGURATION_TYPE "RelWithDebInfo")
String(TOUPPER $ENV{ctest_model} _Model)
Set(configure_options "-DCMAKE_BUILD_TYPE=$ENV{ctest_model}")
Set(CTEST_USE_LAUNCHERS 1)
Set(configure_options "${configure_options};-DCTEST_USE_LAUNCHERS=${CTEST_USE_LAUNCHERS}")
Set(configure_options "${configure_options};-DDISABLE_COLOR=ON")
Set(configure_options "${configure_options};-DCMAKE_PREFIX_PATH=$ENV{SIMPATH}")
# Set(configure_options "${configure_options};-DBUILD_OFI_TRANSPORT=ON")
Set(configure_options "${configure_options};-DBUILD_DDS_PLUGIN=ON")
Set(configure_options "${configure_options};-DBUILD_SDK=ON")
Set(configure_options "${configure_options};-DBUILD_SDK_COMMANDS=ON")
Set(EXTRA_FLAGS $ENV{EXTRA_FLAGS})
If(EXTRA_FLAGS)
Set(configure_options "${configure_options};${EXTRA_FLAGS}")
EndIf()
If($ENV{ctest_model} MATCHES Profile)
Find_Program(GCOV_COMMAND gcov)
If(GCOV_COMMAND)
Message("Found GCOV: ${GCOV_COMMAND}")
Set(CTEST_COVERAGE_COMMAND ${GCOV_COMMAND})
set(CTEST_COVERAGE_EXTRA_FLAGS "-p")
EndIf(GCOV_COMMAND)
EndIf()
If($ENV{ctest_model} MATCHES Nightly OR $ENV{ctest_model} MATCHES Profile)
Ctest_Empty_Binary_Directory(${CTEST_BINARY_DIRECTORY})
EndIf()
Ctest_Start($ENV{ctest_model})
Ctest_Configure(BUILD "${CTEST_BINARY_DIRECTORY}"
OPTIONS "${configure_options}"
)
Ctest_Build(BUILD "${CTEST_BINARY_DIRECTORY}")
unset(exclude_tests)
if($ENV{EXCLUDE_UNSTABLE_DDS_TESTS})
set(exclude_tests EXCLUDE ".*\\.localhost$")
if(NOT NCPUS)
if(ENV{SLURM_CPUS_PER_TASK})
set(NCPUS $ENV{SLURM_CPUS_PER_TASK})
else()
include(ProcessorCount)
ProcessorCount(NCPUS)
if(NCPUS EQUAL 0)
set(NCPUS 1)
endif()
endif()
endif()
Ctest_Test(BUILD "${CTEST_BINARY_DIRECTORY}"
# PARALLEL_LEVEL $ENV{number_of_processors}
${exclude_tests}
PARALLEL_LEVEL $ENV{number_of_processors}
RETURN_VALUE _ctest_test_ret_val
)
If(GCOV_COMMAND)
Ctest_Coverage(BUILD "${CTEST_BINARY_DIRECTORY}" LABELS coverage)
EndIf()
if ("$ENV{CTEST_SITE}" STREQUAL "")
set(CTEST_SITE "${fqdn}")
else()
set(CTEST_SITE $ENV{CTEST_SITE})
endif()
If("$ENV{do_codecov_upload}")
Execute_Process(COMMAND curl https://codecov.io/bash -o codecov_uploader.sh
WORKING_DIRECTORY ${CTEST_BINARY_DIRECTORY}
TIMEOUT 60)
Execute_Process(COMMAND bash ./codecov_uploader.sh -X gcov
WORKING_DIRECTORY ${CTEST_BINARY_DIRECTORY}
TIMEOUT 60)
EndIf()
if ("$ENV{LABEL}" STREQUAL "")
set(CTEST_BUILD_NAME "build")
else()
set(CTEST_BUILD_NAME $ENV{LABEL})
endif()
Ctest_Submit()
ctest_start(Continuous)
if (_ctest_test_ret_val)
list(APPEND options
"-DDISABLE_COLOR=ON"
"-DBUILD_SDK_COMMANDS=ON"
"-DBUILD_SDK=ON"
"-DBUILD_DDS_PLUGIN=ON")
if(RUN_STATIC_ANALYSIS)
list(APPEND options "-DRUN_STATIC_ANALYSIS=ON")
endif()
list(JOIN options ";" optionsstr)
ctest_configure(OPTIONS "${optionsstr}")
ctest_build(FLAGS "-j${NCPUS}")
ctest_test(BUILD "${CTEST_BINARY_DIRECTORY}"
PARALLEL_LEVEL 1
SCHEDULE_RANDOM ON
RETURN_VALUE _ctest_test_ret_val)
ctest_submit()
if(_ctest_test_ret_val)
Message(FATAL_ERROR "Some tests failed.")
endif()

113
Jenkinsfile vendored
View File

@@ -1,69 +1,63 @@
#!groovy
def specToLabel(Map spec) {
return "${spec.os}-${spec.arch}-${spec.compiler}-FairSoft_${spec.fairsoft}"
}
def jobMatrix(String prefix, List specs, Closure callback) {
def jobMatrix(String type, List specs) {
def nodes = [:]
for (spec in specs) {
def label = specToLabel(spec)
def node_tag = label
if (spec.os =~ /macOS/) {
node_tag = spec.os
}
def fairsoft = spec.fairsoft
job = "${spec.os}-${spec.ver}-${spec.arch}-${spec.compiler}"
def label = "${type}/${job}"
def selector = "${spec.os}-${spec.ver}-${spec.arch}"
def os = spec.os
def compiler = spec.compiler
nodes["${prefix}/${label}"] = {
node(node_tag) {
githubNotify(context: "${prefix}/${label}", description: 'Building ...', status: 'PENDING')
def ver = spec.ver
def check = spec.check
nodes[label] = {
node(selector) {
githubNotify(context: "${label}", description: 'Building ...', status: 'PENDING')
try {
deleteDir()
checkout scm
sh """\
echo "export SIMPATH=\${SIMPATH_PREFIX}${fairsoft}" >> Dart.cfg
echo "export FAIRSOFT_VERSION=${fairsoft}" >> Dart.cfg
"""
if (os =~ /Debian/ && compiler =~ /gcc9/) {
sh '''\
echo "source /etc/profile.d/modules.sh" >> Dart.cfg
echo "module use /cvmfs/it.gsi.de/modulefiles" >> Dart.cfg
echo "module load compiler/gcc/9.1.0" >> Dart.cfg
'''
}
if (os =~ /[Mm]acOS/) {
sh "echo \"export EXTRA_FLAGS='-DCMAKE_CXX_COMPILER=clang++'\" >> Dart.cfg"
def jobscript = 'job.sh'
def ctestcmd = "ctest -S FairMQTest.cmake -V --output-on-failure"
sh "echo \"set -e\" >> ${jobscript}"
sh "echo \"export LABEL=\\\"\${JOB_BASE_NAME} ${label}\\\"\" >> ${jobscript}"
if (selector =~ /^macos/) {
sh """\
echo \"export DDS_ROOT=\\\"\\\$(brew --prefix dds)\\\"\" >> ${jobscript}
echo \"${ctestcmd}\" >> ${jobscript}
"""
sh "cat ${jobscript}"
sh "bash ${jobscript}"
} else {
sh "echo \"export EXTRA_FLAGS='-DCMAKE_CXX_COMPILER=g++'\" >> Dart.cfg"
def containercmd = "singularity exec -B/shared ${env.SINGULARITY_CONTAINER_ROOT}/fairmq/${os}.${ver}.sif bash -l -c \\\"${ctestcmd} -DRUN_STATIC_ANALYSIS=ON\\\""
sh """\
echo \"echo \\\"*** Job started at .......: \\\$(date -R)\\\"\" >> ${jobscript}
echo \"echo \\\"*** Job ID ...............: \\\${SLURM_JOB_ID}\\\"\" >> ${jobscript}
echo \"echo \\\"*** Compute node .........: \\\$(hostname -f)\\\"\" >> ${jobscript}
echo \"unset http_proxy\" >> ${jobscript}
echo \"unset HTTP_PROXY\" >> ${jobscript}
echo \"${containercmd}\" >> ${jobscript}
"""
sh "cat ${jobscript}"
sh "test/ci/slurm-submit.sh \"FairMQ \${JOB_BASE_NAME} ${label}\" ${jobscript}"
withChecks('Static Analysis') {
recordIssues(enabledForFailure: true,
tools: [gcc(pattern: 'build/Testing/Temporary/*.log')],
filters: [excludeFile('extern/*'), excludeFile('usr/*')],
skipBlames: true)
}
}
sh '''\
echo "export BUILDDIR=$PWD/build" >> Dart.cfg
echo "export SOURCEDIR=$PWD" >> Dart.cfg
echo "export PATH=\\\$SIMPATH/bin:\\\$PATH" >> Dart.cfg
echo "export GIT_BRANCH=$JOB_BASE_NAME" >> Dart.cfg
echo "echo \\\$PATH" >> Dart.cfg
'''
if (os =~ /macOS10.14/) {
sh "echo \"export EXCLUDE_UNSTABLE_DDS_TESTS=1\" >> Dart.cfg"
}
sh 'cat Dart.cfg'
callback.call(spec, label)
deleteDir()
githubNotify(context: "${prefix}/${label}", description: 'Success', status: 'SUCCESS')
githubNotify(context: "${label}", description: 'Success', status: 'SUCCESS')
} catch (e) {
def tarball = "${prefix}_${label}_dds_logs.tar.gz"
sh "tar czvf ${tarball} -C \${WORKSPACE}/build/test/ .DDS/"
sh "tar czvf ${tarball} -C \${WORKSPACE}/build/test .DDS/"
archiveArtifacts tarball
deleteDir()
githubNotify(context: "${prefix}/${label}", description: 'Error', status: 'ERROR')
githubNotify(context: "${label}", description: 'Error', status: 'ERROR')
throw e
}
}
@@ -75,26 +69,15 @@ def jobMatrix(String prefix, List specs, Closure callback) {
pipeline{
agent none
stages {
stage("Run CI Matrix") {
stage("CI") {
steps{
script {
def build_jobs = jobMatrix('build', [
[os: 'Debian8', arch: 'x86_64', compiler: 'gcc9.1.0', fairsoft: 'fairmq_dev'],
[os: 'macOS10.14', arch: 'x86_64', compiler: 'AppleClang11.0', fairsoft: 'fairmq_dev'],
[os: 'macOS10.15', arch: 'x86_64', compiler: 'AppleClang12.0', fairsoft: 'fairmq_dev'],
]) { spec, label ->
sh './Dart.sh alfa_ci Dart.cfg'
}
def builds = jobMatrix('build', [
[os: 'fedora', ver: '32', arch: 'x86_64', compiler: 'gcc-10'],
[os: 'macos', ver: '11', arch: 'x86_64', compiler: 'apple-clang-12'],
])
def profile_jobs = jobMatrix('codecov', [
[os: 'Debian8', arch: 'x86_64', compiler: 'gcc9.1.0', fairsoft: 'fairmq_dev'],
]) { spec, label ->
withCredentials([string(credentialsId: 'fairmq_codecov_token', variable: 'CODECOV_TOKEN')]) {
sh './Dart.sh codecov Dart.cfg'
}
}
parallel(build_jobs + profile_jobs)
parallel(builds)
}
}
}

View File

@@ -1,81 +0,0 @@
#!groovy
def specToLabel(Map spec) {
return "${spec.os}-${spec.arch}-${spec.compiler}-FairSoft_${spec.fairsoft}"
}
def buildMatrix(List specs, Closure callback) {
def nodes = [:]
for (spec in specs) {
def label = specToLabel(spec)
def fairsoft = spec.fairsoft
def os = spec.os
def compiler = spec.compiler
nodes[label] = {
node(label) {
try {
deleteDir()
checkout scm
sh """\
echo "export SIMPATH=\${SIMPATH_PREFIX}${fairsoft}" >> Dart.cfg
echo "export FAIRSOFT_VERSION=${fairsoft}" >> Dart.cfg
"""
if (os =~ /Debian/ && compiler =~ /gcc9/) {
sh '''\
echo "source /etc/profile.d/modules.sh" >> Dart.cfg
echo "module use /cvmfs/it.gsi.de/modulefiles" >> Dart.cfg
echo "module load compiler/gcc/9.1.0" >> Dart.cfg
'''
}
if (os =~ /MacOS/) {
sh "echo \"export EXTRA_FLAGS='-DCMAKE_CXX_COMPILER=clang++'\" >> Dart.cfg"
} else {
sh "echo \"export EXTRA_FLAGS='-DCMAKE_CXX_COMPILER=g++'\" >> Dart.cfg"
}
sh '''\
echo "export BUILDDIR=$PWD/build" >> Dart.cfg
echo "export SOURCEDIR=$PWD" >> Dart.cfg
echo "export PATH=\\\$SIMPATH/bin:\\\$PATH" >> Dart.cfg
echo "export GIT_BRANCH=dev" >> Dart.cfg
echo "echo \\\$PATH" >> Dart.cfg
'''
sh 'cat Dart.cfg'
callback.call(spec, label)
deleteDir()
} catch (e) {
def tarball = "${label}_dds_logs.tar.gz"
sh "tar czvf ${tarball} -C \${WORKSPACE}/build/test/ .DDS/"
archiveArtifacts tarball
deleteDir()
throw e
}
}
}
}
return nodes
}
pipeline{
agent none
triggers { cron('H 2 * * *') }
stages {
stage("Run Nightly Build/Test Matrix") {
steps{
script {
parallel(buildMatrix([
[os: 'Debian8', arch: 'x86_64', compiler: 'gcc9.1.0', fairsoft: 'fairmq_dev'],
[os: 'MacOS10.13', arch: 'x86_64', compiler: 'AppleLLVM10.0.0', fairsoft: 'fairmq_dev'],
[os: 'MacOS10.14', arch: 'x86_64', compiler: 'AppleLLVM10.0.0', fairsoft: 'fairmq_dev'],
]) { spec, label ->
sh './Dart.sh Nightly Dart.cfg'
sh './Dart.sh Profile Dart.cfg'
})
}
}
}
}
}

View File

@@ -1,5 +1,5 @@
<!-- {#mainpage} -->
# FairMQ [![license](https://alfa-ci.gsi.de/shields/badge/license-LGPL--3.0-orange.svg)](COPYRIGHT) [![build status](https://alfa-ci.gsi.de/buildStatus/icon?job=FairRootGroup/FairMQ/master)](https://alfa-ci.gsi.de/blue/organizations/jenkins/FairRootGroup%2FFairMQ/branches) [![test coverage master branch](https://codecov.io/gh/FairRootGroup/FairMQ/branch/master/graph/badge.svg)](https://codecov.io/gh/FairRootGroup/FairMQ/branch/master) [![Coverity Badge](https://alfa-ci.gsi.de/shields/coverity/scan/fairrootgroup-fairmq.svg)](https://scan.coverity.com/projects/fairrootgroup-fairmq) [![Codacy Badge](https://api.codacy.com/project/badge/Grade/6b648d95d68d4c4eae833b84f84d299c)](https://www.codacy.com/app/dennisklein/FairMQ?utm_source=github.com&amp;utm_medium=referral&amp;utm_content=FairRootGroup/FairMQ&amp;utm_campaign=Badge_Grade)
# FairMQ [![license](https://alfa-ci.gsi.de/shields/badge/license-LGPL--3.0-orange.svg)](COPYRIGHT) [![build status](https://alfa-ci.gsi.de/buildStatus/icon?job=FairRootGroup/FairMQ/dev)](https://alfa-ci.gsi.de/blue/organizations/jenkins/FairRootGroup%2FFairMQ/branches) [![Coverity Badge](https://alfa-ci.gsi.de/shields/coverity/scan/fairrootgroup-fairmq.svg)](https://scan.coverity.com/projects/fairrootgroup-fairmq)
C++ Message Queuing Library and Framework

View File

@@ -7,10 +7,10 @@
################################################################################
add_library(Example11Lib STATIC
"Sampler.cxx"
"Sampler.h"
"Sink.cxx"
"Sink.h"
"Sampler.cxx"
"Sampler.h"
"Sink.cxx"
"Sink.h"
)
target_link_libraries(Example11Lib PUBLIC FairMQ)
@@ -40,12 +40,12 @@ set_tests_properties(Example.1-1.shmem PROPERTIES TIMEOUT "30" RUN_SERIAL true P
# install
install(
TARGETS
fairmq-ex-1-1-sampler
fairmq-ex-1-1-sink
TARGETS
fairmq-ex-1-1-sampler
fairmq-ex-1-1-sink
LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR}
RUNTIME DESTINATION ${PROJECT_INSTALL_BINDIR}
LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR}
RUNTIME DESTINATION ${PROJECT_INSTALL_BINDIR}
)
# configure run script with different executable paths for build and for install directories
@@ -54,7 +54,7 @@ set(FAIRMQ_BIN_DIR ${CMAKE_INSTALL_PREFIX}/${PROJECT_INSTALL_BINDIR}/fairmq)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-1-1.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-1-1.sh_install)
install(
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-1-1.sh_install
DESTINATION ${PROJECT_INSTALL_BINDIR}
RENAME fairmq-start-ex-1-1.sh
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-1-1.sh_install
DESTINATION ${PROJECT_INSTALL_BINDIR}
RENAME fairmq-start-ex-1-1.sh
)

View File

@@ -7,12 +7,12 @@
################################################################################
add_library(Example1N1Lib STATIC
"Sampler.cxx"
"Sampler.h"
"Processor.cxx"
"Processor.h"
"Sink.cxx"
"Sink.h"
"Sampler.cxx"
"Sampler.h"
"Processor.cxx"
"Processor.h"
"Sink.cxx"
"Sink.h"
)
target_link_libraries(Example1N1Lib PUBLIC FairMQ)
@@ -48,13 +48,13 @@ set_tests_properties(Example.1-n-1.shmem PROPERTIES TIMEOUT "30" RUN_SERIAL true
# install
install(
TARGETS
fairmq-ex-1-n-1-sampler
fairmq-ex-1-n-1-processor
fairmq-ex-1-n-1-sink
TARGETS
fairmq-ex-1-n-1-sampler
fairmq-ex-1-n-1-processor
fairmq-ex-1-n-1-sink
LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR}
RUNTIME DESTINATION ${PROJECT_INSTALL_BINDIR}
LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR}
RUNTIME DESTINATION ${PROJECT_INSTALL_BINDIR}
)
# configure run script with different executable paths for build and for install directories
@@ -64,12 +64,12 @@ set(FAIRMQ_BIN_DIR ${CMAKE_INSTALL_PREFIX}/${PROJECT_INSTALL_BINDIR}/fairmq)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-1-n-1.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-1-n-1.sh_install)
install(
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-1-n-1.sh_install
DESTINATION ${PROJECT_INSTALL_BINDIR}
RENAME fairmq-start-ex-1-n-1.sh
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-1-n-1.sh_install
DESTINATION ${PROJECT_INSTALL_BINDIR}
RENAME fairmq-start-ex-1-n-1.sh
)
install(
FILES ${CMAKE_CURRENT_BINARY_DIR}/ex-1-n-1.json
DESTINATION ${PROJECT_INSTALL_DATADIR}
FILES ${CMAKE_CURRENT_BINARY_DIR}/ex-1-n-1.json
DESTINATION ${PROJECT_INSTALL_DATADIR}
)

View File

@@ -34,7 +34,7 @@ set(FAIRMQ_BIN_DIR ${CMAKE_INSTALL_PREFIX}/${PROJECT_INSTALL_BINDIR}/fairmq)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-builtin-devices.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-builtin-devices.sh_install)
install(
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-builtin-devices.sh_install
DESTINATION ${PROJECT_INSTALL_BINDIR}
RENAME fairmq-start-ex-builtin-devices.sh
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-builtin-devices.sh_install
DESTINATION ${PROJECT_INSTALL_BINDIR}
RENAME fairmq-start-ex-builtin-devices.sh
)

View File

@@ -7,10 +7,10 @@
################################################################################
add_library(ExampleCopyPushLib STATIC
"Sampler.cxx"
"Sampler.h"
"Sink.cxx"
"Sink.h"
"Sampler.cxx"
"Sampler.h"
"Sink.cxx"
"Sink.h"
)
target_link_libraries(ExampleCopyPushLib PUBLIC FairMQ)
@@ -41,12 +41,12 @@ set_tests_properties(Example.CopyPush.shmem PROPERTIES TIMEOUT "30" RUN_SERIAL t
# install
install(
TARGETS
fairmq-ex-copypush-sampler
fairmq-ex-copypush-sink
TARGETS
fairmq-ex-copypush-sampler
fairmq-ex-copypush-sink
LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR}
RUNTIME DESTINATION ${PROJECT_INSTALL_BINDIR}
LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR}
RUNTIME DESTINATION ${PROJECT_INSTALL_BINDIR}
)
# configure run script with different executable paths for build and for install directories
@@ -55,7 +55,7 @@ set(FAIRMQ_BIN_DIR ${CMAKE_INSTALL_PREFIX}/${PROJECT_INSTALL_BINDIR}/fairmq)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-copypush.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-copypush.sh_install)
install(
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-copypush.sh_install
DESTINATION ${PROJECT_INSTALL_BINDIR}
RENAME fairmq-start-ex-copypush.sh
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-copypush.sh_install
DESTINATION ${PROJECT_INSTALL_BINDIR}
RENAME fairmq-start-ex-copypush.sh
)

View File

@@ -7,10 +7,10 @@
################################################################################
add_library(ExampleMultipartLib STATIC
"Sampler.cxx"
"Sampler.h"
"Sink.cxx"
"Sink.h"
"Sampler.cxx"
"Sampler.h"
"Sink.cxx"
"Sink.h"
)
target_link_libraries(ExampleMultipartLib PUBLIC FairMQ)
@@ -45,12 +45,12 @@ endif()
# install
install(
TARGETS
fairmq-ex-multipart-sampler
fairmq-ex-multipart-sink
TARGETS
fairmq-ex-multipart-sampler
fairmq-ex-multipart-sink
LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR}
RUNTIME DESTINATION ${PROJECT_INSTALL_BINDIR}
LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR}
RUNTIME DESTINATION ${PROJECT_INSTALL_BINDIR}
)
# configure run script with different executable paths for build and for install directories
@@ -59,7 +59,7 @@ set(FAIRMQ_BIN_DIR ${CMAKE_INSTALL_PREFIX}/${PROJECT_INSTALL_BINDIR}/fairmq)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-multipart.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-multipart.sh_install)
install(
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-multipart.sh_install
DESTINATION ${PROJECT_INSTALL_BINDIR}
RENAME fairmq-start-ex-multipart.sh
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-multipart.sh_install
DESTINATION ${PROJECT_INSTALL_BINDIR}
RENAME fairmq-start-ex-multipart.sh
)

View File

@@ -7,12 +7,12 @@
################################################################################
add_library(ExampleMultipleChannelsLib STATIC
"Sampler.cxx"
"Sampler.h"
"Broadcaster.cxx"
"Broadcaster.h"
"Sink.cxx"
"Sink.h"
"Sampler.cxx"
"Sampler.h"
"Broadcaster.cxx"
"Broadcaster.h"
"Sink.cxx"
"Sink.h"
)
target_link_libraries(ExampleMultipleChannelsLib PUBLIC FairMQ)
@@ -42,13 +42,13 @@ set_tests_properties(Example.MultipleChannels.zeromq PROPERTIES TIMEOUT "30" RUN
# install
install(
TARGETS
fairmq-ex-multiple-channels-sampler
fairmq-ex-multiple-channels-broadcaster
fairmq-ex-multiple-channels-sink
TARGETS
fairmq-ex-multiple-channels-sampler
fairmq-ex-multiple-channels-broadcaster
fairmq-ex-multiple-channels-sink
LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR}
RUNTIME DESTINATION ${PROJECT_INSTALL_BINDIR}
LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR}
RUNTIME DESTINATION ${PROJECT_INSTALL_BINDIR}
)
# configure run script with different executable paths for build and for install directories
@@ -57,7 +57,7 @@ set(FAIRMQ_BIN_DIR ${CMAKE_INSTALL_PREFIX}/${PROJECT_INSTALL_BINDIR}/fairmq)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-multiple-channels.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-multiple-channels.sh_install)
install(
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-multiple-channels.sh_install
DESTINATION ${PROJECT_INSTALL_BINDIR}
RENAME fairmq-start-ex-multiple-channels.sh
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-multiple-channels.sh_install
DESTINATION ${PROJECT_INSTALL_BINDIR}
RENAME fairmq-start-ex-multiple-channels.sh
)

View File

@@ -7,12 +7,12 @@
################################################################################
add_library(ExampleMultipleTransportsLib STATIC
"Sampler1.cxx"
"Sampler1.h"
"Sampler2.cxx"
"Sampler2.h"
"Sink.cxx"
"Sink.h"
"Sampler1.cxx"
"Sampler1.h"
"Sampler2.cxx"
"Sampler2.h"
"Sink.cxx"
"Sink.h"
)
target_link_libraries(ExampleMultipleTransportsLib PUBLIC FairMQ)
@@ -41,13 +41,13 @@ set_tests_properties(Example.MultipleTransports PROPERTIES TIMEOUT "30" RUN_SERI
# install
install(
TARGETS
fairmq-ex-multiple-transports-sampler1
fairmq-ex-multiple-transports-sampler2
fairmq-ex-multiple-transports-sink
TARGETS
fairmq-ex-multiple-transports-sampler1
fairmq-ex-multiple-transports-sampler2
fairmq-ex-multiple-transports-sink
LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR}
RUNTIME DESTINATION ${PROJECT_INSTALL_BINDIR}
LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR}
RUNTIME DESTINATION ${PROJECT_INSTALL_BINDIR}
)
# configure run script with different executable paths for install directories
@@ -56,7 +56,7 @@ set(FAIRMQ_BIN_DIR ${CMAKE_INSTALL_PREFIX}/${PROJECT_INSTALL_BINDIR}/fairmq)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-multiple-transports.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-multiple-transports.sh_install)
install(
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-multiple-transports.sh_install
DESTINATION ${PROJECT_INSTALL_BINDIR}
RENAME fairmq-start-ex-multiple-transports.sh
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-multiple-transports.sh_install
DESTINATION ${PROJECT_INSTALL_BINDIR}
RENAME fairmq-start-ex-multiple-transports.sh
)

View File

@@ -29,15 +29,15 @@ configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-readout-processing.sh
# install
install(
TARGETS
fairmq-ex-readout-readout
fairmq-ex-readout-builder
fairmq-ex-readout-processor
fairmq-ex-readout-sender
fairmq-ex-readout-receiver
TARGETS
fairmq-ex-readout-readout
fairmq-ex-readout-builder
fairmq-ex-readout-processor
fairmq-ex-readout-sender
fairmq-ex-readout-receiver
LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR}
RUNTIME DESTINATION ${PROJECT_INSTALL_BINDIR}
LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR}
RUNTIME DESTINATION ${PROJECT_INSTALL_BINDIR}
)
# configure run script with different executable paths for build and for install directories
@@ -47,13 +47,13 @@ configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-readout.sh.in ${CMAKE
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-readout-processing.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-readout-processing.sh_install)
install(
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-readout.sh_install
DESTINATION ${PROJECT_INSTALL_BINDIR}
RENAME fairmq-start-ex-readout.sh
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-readout.sh_install
DESTINATION ${PROJECT_INSTALL_BINDIR}
RENAME fairmq-start-ex-readout.sh
)
install(
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-readout-processing.sh_install
DESTINATION ${PROJECT_INSTALL_BINDIR}
RENAME fairmq-start-ex-readout-processing.sh
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-readout-processing.sh_install
DESTINATION ${PROJECT_INSTALL_BINDIR}
RENAME fairmq-start-ex-readout-processing.sh
)

View File

@@ -7,10 +7,10 @@
################################################################################
add_library(ExampleRegionLib STATIC
"Sampler.cxx"
"Sampler.h"
"Sink.cxx"
"Sink.h"
"Sampler.cxx"
"Sampler.h"
"Sink.cxx"
"Sink.h"
)
target_link_libraries(ExampleRegionLib PUBLIC FairMQ)
@@ -40,12 +40,12 @@ set_tests_properties(Example.Region.shmem PROPERTIES TIMEOUT "30" RUN_SERIAL tru
# install
install(
TARGETS
fairmq-ex-region-sampler
fairmq-ex-region-sink
TARGETS
fairmq-ex-region-sampler
fairmq-ex-region-sink
LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR}
RUNTIME DESTINATION ${PROJECT_INSTALL_BINDIR}
LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR}
RUNTIME DESTINATION ${PROJECT_INSTALL_BINDIR}
)
# configure run script with different executable paths for build and for install directories
@@ -54,7 +54,7 @@ set(FAIRMQ_BIN_DIR ${CMAKE_INSTALL_PREFIX}/${PROJECT_INSTALL_BINDIR}/fairmq)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-region.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-region.sh_install)
install(
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-region.sh_install
DESTINATION ${PROJECT_INSTALL_BINDIR}
RENAME fairmq-start-ex-region.sh
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-region.sh_install
DESTINATION ${PROJECT_INSTALL_BINDIR}
RENAME fairmq-start-ex-region.sh
)

View File

@@ -7,10 +7,10 @@
################################################################################
add_library(ExampleReqRepLib STATIC
"Client.cxx"
"Client.h"
"Server.cxx"
"Server.h"
"Client.cxx"
"Client.h"
"Server.cxx"
"Server.h"
)
target_link_libraries(ExampleReqRepLib PUBLIC FairMQ)
@@ -41,12 +41,12 @@ set_tests_properties(Example.ReqRep.shmem PROPERTIES TIMEOUT "30" RUN_SERIAL tru
# install
install(
TARGETS
fairmq-ex-req-rep-client
fairmq-ex-req-rep-server
TARGETS
fairmq-ex-req-rep-client
fairmq-ex-req-rep-server
LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR}
RUNTIME DESTINATION ${PROJECT_INSTALL_BINDIR}
LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR}
RUNTIME DESTINATION ${PROJECT_INSTALL_BINDIR}
)
# configure run script with different executable paths for build and for install directories
@@ -55,7 +55,7 @@ set(FAIRMQ_BIN_DIR ${CMAKE_INSTALL_PREFIX}/${PROJECT_INSTALL_BINDIR}/fairmq)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-req-rep.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-req-rep.sh_install)
install(
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-req-rep.sh_install
DESTINATION ${PROJECT_INSTALL_BINDIR}
RENAME fairmq-start-ex-req-rep.sh
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-req-rep.sh_install
DESTINATION ${PROJECT_INSTALL_BINDIR}
RENAME fairmq-start-ex-req-rep.sh
)

View File

@@ -1,5 +1,5 @@
################################################################################
# Copyright (C) 2012-2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
# Copyright (C) 2012-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
# #
# This software is distributed under the terms of the #
# GNU Lesser General Public Licence (LGPL) version 3, #
@@ -62,6 +62,7 @@ if(BUILD_FAIRMQ OR BUILD_SDK)
target_link_libraries(${target}
PRIVATE
FairLogger::FairLogger
Threads::Threads
PUBLIC
Boost::boost
)
@@ -145,6 +146,7 @@ if(BUILD_FAIRMQ)
# libFairMQ header files #
##########################
set(FAIRMQ_PUBLIC_HEADER_FILES
Device.h
DeviceRunner.h
EventManager.h
FairMQChannel.h
@@ -169,14 +171,21 @@ if(BUILD_FAIRMQ)
Plugin.h
PluginManager.h
PluginServices.h
runDevice.h
runFairMQDevice.h
shmem/Monitor.h
)
set(FAIRMQ_PRIVATE_HEADER_FILES
devices/BenchmarkSampler.h
devices/Merger.h
devices/Multiplier.h
devices/Proxy.h
devices/Sink.h
devices/Splitter.h
plugins/Builtin.h
plugins/config/Config.h
plugins/Control.h
plugins/control/Control.h
shmem/Message.h
shmem/Poller.h
shmem/UnmanagedRegion.h
@@ -223,7 +232,7 @@ if(BUILD_FAIRMQ)
Properties.cxx
SuboptParser.cxx
plugins/config/Config.cxx
plugins/Control.cxx
plugins/control/Control.cxx
MemoryResources.cxx
shmem/Monitor.cxx
)
@@ -243,7 +252,7 @@ if(BUILD_FAIRMQ)
# configure files #
###################
set(FAIRMQ_BIN_DIR ${CMAKE_BINARY_DIR}/fairmq)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/run/startMQBenchmark.sh.in ${CMAKE_CURRENT_BINARY_DIR}/startMQBenchmark.sh)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/devices/startMQBenchmark.sh.in ${CMAKE_CURRENT_BINARY_DIR}/startMQBenchmark.sh)
#################################
# define libFairMQ build target #
@@ -347,22 +356,22 @@ if(BUILD_FAIRMQ)
###############
# executables #
###############
add_executable(fairmq-bsampler run/runBenchmarkSampler.cxx)
add_executable(fairmq-bsampler devices/runBenchmarkSampler.cxx)
target_link_libraries(fairmq-bsampler FairMQ)
add_executable(fairmq-merger run/runMerger.cxx)
add_executable(fairmq-merger devices/runMerger.cxx)
target_link_libraries(fairmq-merger FairMQ)
add_executable(fairmq-multiplier run/runMultiplier.cxx)
add_executable(fairmq-multiplier devices/runMultiplier.cxx)
target_link_libraries(fairmq-multiplier FairMQ)
add_executable(fairmq-proxy run/runProxy.cxx)
add_executable(fairmq-proxy devices/runProxy.cxx)
target_link_libraries(fairmq-proxy FairMQ)
add_executable(fairmq-sink run/runSink.cxx)
add_executable(fairmq-sink devices/runSink.cxx)
target_link_libraries(fairmq-sink FairMQ)
add_executable(fairmq-splitter run/runSplitter.cxx)
add_executable(fairmq-splitter devices/runSplitter.cxx)
target_link_libraries(fairmq-splitter FairMQ)
add_executable(fairmq-shmmonitor shmem/Monitor.cxx shmem/Monitor.h shmem/runMonitor.cxx)
@@ -383,8 +392,11 @@ if(BUILD_FAIRMQ)
$<BUILD_INTERFACE:${CMAKE_SOURCE_DIR}>
)
add_executable(fairmq-uuid-gen run/runUuidGenerator.cxx)
target_link_libraries(fairmq-uuid-gen FairMQ)
add_executable(fairmq-uuid-gen tools/runUuidGenerator.cxx)
target_link_libraries(fairmq-uuid-gen PUBLIC
Boost::program_options
Tools
)
###########

21
fairmq/Device.h Normal file
View File

@@ -0,0 +1,21 @@
/********************************************************************************
* Copyright (C) 2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#ifndef FAIR_MQ_DEVICE_H
#define FAIR_MQ_DEVICE_H
#include <FairMQDevice.h>
namespace fair::mq
{
using Device = ::FairMQDevice;
} // namespace fair::mq
#endif /* FAIR_MQ_DEVICE_H */

View File

@@ -1,5 +1,5 @@
/********************************************************************************
* Copyright (C) 2017-2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* Copyright (C) 2017-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
@@ -56,6 +56,10 @@ bool DeviceRunner::HandleGeneralOptions(const fair::mq::ProgOptions& config, boo
fair::Logger::SetConsoleSeverity("nolog");
} else {
fair::Logger::SetConsoleColor(color);
auto envFairMQSeverity = getenv("FAIRMQ_SEVERITY");
if (envFairMQSeverity) {
severity = envFairMQSeverity;
}
if (severity != "") {
fair::Logger::SetConsoleSeverity(severity);
}

View File

@@ -1,5 +1,5 @@
/********************************************************************************
* Copyright (C) 2017-2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* Copyright (C) 2017-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
@@ -9,18 +9,18 @@
#ifndef FAIR_MQ_DEVICERUNNER_H
#define FAIR_MQ_DEVICERUNNER_H
#include <fairmq/Device.h>
#include <fairmq/EventManager.h>
#include <fairmq/PluginManager.h>
#include <fairmq/ProgOptions.h>
#include <FairMQDevice.h>
#include <functional>
#include <memory>
#include <string>
#include <vector>
namespace fair {
namespace mq {
namespace fair::mq
{
/**
* @class DeviceRunner DeviceRunner.h <fairmq/DeviceRunner.h>
@@ -73,7 +73,7 @@ class DeviceRunner
std::vector<std::string> fRawCmdLineArgs;
fair::mq::ProgOptions fConfig;
std::unique_ptr<FairMQDevice> fDevice;
std::unique_ptr<Device> fDevice;
PluginManager fPluginManager;
const bool fPrintLogo;
@@ -88,7 +88,6 @@ struct ModifyRawCmdLineArgs : Event<DeviceRunner&> {};
struct InstantiateDevice : Event<DeviceRunner&> {};
} /* namespace hooks */
} /* namespace mq */
} /* namespace fair */
} // namespace fair::mq
#endif /* FAIR_MQ_DEVICERUNNER_H */

View File

@@ -21,9 +21,7 @@
#include <boost/functional/hash.hpp>
#include <boost/signals2.hpp>
namespace fair
{
namespace mq
namespace fair::mq
{
// Inherit from this base event type to create custom event types
@@ -137,7 +135,6 @@ class EventManager
}
}; /* class EventManager */
} /* namespace mq */
} /* namespace fair */
} // namespace fair::mq
#endif /* FAIR_MQ_EVENTMANAGER_H */

View File

@@ -256,7 +256,7 @@ class FairMQChannel
/// Sends a message to the socket queue.
/// @param msg Constant reference of unique_ptr to a FairMQMessage
/// @param sndTimeoutInMs send timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send)
/// @return Number of bytes that have been queued, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change)
/// @return Number of bytes that have been queued, TransferCode::timeout if timed out, TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by requested state change)
int64_t Send(FairMQMessagePtr& msg, int sndTimeoutInMs = -1)
{
CheckSendCompatibility(msg);
@@ -266,7 +266,7 @@ class FairMQChannel
/// Receives a message from the socket queue.
/// @param msg Constant reference of unique_ptr to a FairMQMessage
/// @param rcvTimeoutInMs receive timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive)
/// @return Number of bytes that have been received, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change)
/// @return Number of bytes that have been received, TransferCode::timeout if timed out, TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by requested state change)
int64_t Receive(FairMQMessagePtr& msg, int rcvTimeoutInMs = -1)
{
CheckReceiveCompatibility(msg);
@@ -276,7 +276,7 @@ class FairMQChannel
/// Send a vector of messages
/// @param msgVec message vector reference
/// @param sndTimeoutInMs send timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send)
/// @return Number of bytes that have been queued, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change)
/// @return Number of bytes that have been queued, TransferCode::timeout if timed out, TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by requested state change)
int64_t Send(std::vector<FairMQMessagePtr>& msgVec, int sndTimeoutInMs = -1)
{
CheckSendCompatibility(msgVec);
@@ -286,7 +286,7 @@ class FairMQChannel
/// Receive a vector of messages
/// @param msgVec message vector reference
/// @param rcvTimeoutInMs receive timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive)
/// @return Number of bytes that have been received, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change)
/// @return Number of bytes that have been received, TransferCode::timeout if timed out, TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by requested state change)
int64_t Receive(std::vector<FairMQMessagePtr>& msgVec, int rcvTimeoutInMs = -1)
{
CheckReceiveCompatibility(msgVec);
@@ -296,7 +296,7 @@ class FairMQChannel
/// Send FairMQParts
/// @param parts FairMQParts reference
/// @param sndTimeoutInMs send timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send)
/// @return Number of bytes that have been queued, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change)
/// @return Number of bytes that have been queued, TransferCode::timeout if timed out, TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by requested state change)
int64_t Send(FairMQParts& parts, int sndTimeoutInMs = -1)
{
return Send(parts.fParts, sndTimeoutInMs);
@@ -305,7 +305,7 @@ class FairMQChannel
/// Receive FairMQParts
/// @param parts FairMQParts reference
/// @param rcvTimeoutInMs receive timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive)
/// @return Number of bytes that have been received, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change)
/// @return Number of bytes that have been received, TransferCode::timeout if timed out, TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by requested state change)
int64_t Receive(FairMQParts& parts, int rcvTimeoutInMs = -1)
{
return Receive(parts.fParts, rcvTimeoutInMs);

View File

@@ -24,34 +24,6 @@
using namespace std;
using namespace fair::mq;
static map<Transition, State> backwardsCompatibilityWaitForEndOfStateHelper =
{
{ Transition::InitDevice, State::InitializingDevice },
{ Transition::CompleteInit, State::Initialized },
{ Transition::Bind, State::Bound },
{ Transition::Connect, State::DeviceReady },
{ Transition::InitTask, State::Ready },
{ Transition::Run, State::Ready },
{ Transition::Stop, State::Ready },
{ Transition::ResetTask, State::DeviceReady },
{ Transition::ResetDevice, State::Idle }
};
static map<int, Transition> backwardsCompatibilityChangeStateHelper =
{
{ FairMQDevice::Event::INIT_DEVICE, Transition::InitDevice },
{ FairMQDevice::Event::internal_DEVICE_READY, Transition::Auto },
{ FairMQDevice::Event::INIT_TASK, Transition::InitTask },
{ FairMQDevice::Event::internal_READY, Transition::Auto },
{ FairMQDevice::Event::RUN, Transition::Run },
{ FairMQDevice::Event::STOP, Transition::Stop },
{ FairMQDevice::Event::RESET_TASK, Transition::ResetTask },
{ FairMQDevice::Event::RESET_DEVICE, Transition::ResetDevice },
{ FairMQDevice::Event::internal_IDLE, Transition::Auto },
{ FairMQDevice::Event::END, Transition::End },
{ FairMQDevice::Event::ERROR_FOUND, Transition::ErrorFound }
};
constexpr const char* FairMQDevice::DefaultId;
constexpr int FairMQDevice::DefaultIOThreads;
constexpr const char* FairMQDevice::DefaultTransportName;
@@ -103,7 +75,7 @@ FairMQDevice::FairMQDevice(ProgOptions* config, const tools::Version version)
: fTransportFactory(nullptr)
, fTransports()
, fChannels()
, fInternalConfig(config ? nullptr : tools::make_unique<ProgOptions>())
, fInternalConfig(config ? nullptr : make_unique<ProgOptions>())
, fConfig(config ? config : fInternalConfig.get())
, fId(DefaultId)
, fDefaultTransportType(DefaultTransportType)
@@ -244,16 +216,6 @@ void FairMQDevice::TransitionTo(const fair::mq::State s)
}
}
bool FairMQDevice::ChangeState(const int transition)
{
return ChangeState(backwardsCompatibilityChangeStateHelper.at(transition));
}
void FairMQDevice::WaitForEndOfState(Transition transition)
{
WaitForState(backwardsCompatibilityWaitForEndOfStateHelper.at(transition));
}
void FairMQDevice::InitWrapper()
{
// run initialization once CompleteInit transition is requested
@@ -524,7 +486,6 @@ void FairMQDevice::RunWrapper()
}
PostRun();
} catch (const out_of_range& oor) {
LOG(error) << "out of range: " << oor.what();
LOG(error) << "incorrect/incomplete channel configuration?";
@@ -542,17 +503,12 @@ void FairMQDevice::HandleSingleChannelInput()
{
bool proceed = true;
if (fMsgInputs.size() > 0)
{
while (!NewStatePending() && proceed)
{
if (fMsgInputs.size() > 0) {
while (!NewStatePending() && proceed) {
proceed = HandleMsgInput(fInputChannelKeys.at(0), fMsgInputs.begin()->second, 0);
}
}
else if (fMultipartInputs.size() > 0)
{
while (!NewStatePending() && proceed)
{
} else if (fMultipartInputs.size() > 0) {
while (!NewStatePending() && proceed) {
proceed = HandleMultipartInput(fInputChannelKeys.at(0), fMultipartInputs.begin()->second, 0);
}
}
@@ -562,75 +518,55 @@ void FairMQDevice::HandleMultipleChannelInput()
{
// check if more than one transport is used
fMultitransportInputs.clear();
for (const auto& k : fInputChannelKeys)
{
for (const auto& k : fInputChannelKeys) {
fair::mq::Transport t = fChannels.at(k).at(0).fTransportType;
if (fMultitransportInputs.find(t) == fMultitransportInputs.end())
{
if (fMultitransportInputs.find(t) == fMultitransportInputs.end()) {
fMultitransportInputs.insert(pair<fair::mq::Transport, vector<string>>(t, vector<string>()));
fMultitransportInputs.at(t).push_back(k);
}
else
{
} else {
fMultitransportInputs.at(t).push_back(k);
}
}
for (const auto& mi : fMsgInputs)
{
for (auto& i : fChannels.at(mi.first))
{
for (const auto& mi : fMsgInputs) {
for (auto& i : fChannels.at(mi.first)) {
i.fMultipart = false;
}
}
for (const auto& mi : fMultipartInputs)
{
for (auto& i : fChannels.at(mi.first))
{
for (const auto& mi : fMultipartInputs) {
for (auto& i : fChannels.at(mi.first)) {
i.fMultipart = true;
}
}
// if more than one transport is used, handle poll of each in a separate thread
if (fMultitransportInputs.size() > 1)
{
if (fMultitransportInputs.size() > 1) {
HandleMultipleTransportInput();
}
else // otherwise poll directly
{
} else { // otherwise poll directly
bool proceed = true;
FairMQPollerPtr poller(fChannels.at(fInputChannelKeys.at(0)).at(0).fTransportFactory->CreatePoller(fChannels, fInputChannelKeys));
while (!NewStatePending() && proceed)
{
while (!NewStatePending() && proceed) {
poller->Poll(200);
// check which inputs are ready and call their data handlers if they are.
for (const auto& ch : fInputChannelKeys)
{
for (unsigned int i = 0; i < fChannels.at(ch).size(); ++i)
{
if (poller->CheckInput(ch, i))
{
if (fChannels.at(ch).at(i).fMultipart)
{
for (const auto& ch : fInputChannelKeys) {
for (unsigned int i = 0; i < fChannels.at(ch).size(); ++i) {
if (poller->CheckInput(ch, i)) {
if (fChannels.at(ch).at(i).fMultipart) {
proceed = HandleMultipartInput(ch, fMultipartInputs.at(ch), i);
}
else
{
} else {
proceed = HandleMsgInput(ch, fMsgInputs.at(ch), i);
}
if (!proceed)
{
if (!proceed) {
break;
}
}
}
if (!proceed)
{
if (!proceed) {
break;
}
}
@@ -644,64 +580,49 @@ void FairMQDevice::HandleMultipleTransportInput()
fMultitransportProceed = true;
for (const auto& i : fMultitransportInputs)
{
for (const auto& i : fMultitransportInputs) {
threads.emplace_back(thread(&FairMQDevice::PollForTransport, this, fTransports.at(i.first).get(), i.second));
}
for (thread& t : threads)
{
for (thread& t : threads) {
t.join();
}
}
void FairMQDevice::PollForTransport(const FairMQTransportFactory* factory, const vector<string>& channelKeys)
{
try
{
try {
FairMQPollerPtr poller(factory->CreatePoller(fChannels, channelKeys));
while (!NewStatePending() && fMultitransportProceed)
{
while (!NewStatePending() && fMultitransportProceed) {
poller->Poll(500);
for (const auto& ch : channelKeys)
{
for (unsigned int i = 0; i < fChannels.at(ch).size(); ++i)
{
if (poller->CheckInput(ch, i))
{
for (const auto& ch : channelKeys) {
for (unsigned int i = 0; i < fChannels.at(ch).size(); ++i) {
if (poller->CheckInput(ch, i)) {
lock_guard<mutex> lock(fMultitransportMutex);
if (!fMultitransportProceed)
{
if (!fMultitransportProceed) {
break;
}
if (fChannels.at(ch).at(i).fMultipart)
{
if (fChannels.at(ch).at(i).fMultipart) {
fMultitransportProceed = HandleMultipartInput(ch, fMultipartInputs.at(ch), i);
}
else
{
} else {
fMultitransportProceed = HandleMsgInput(ch, fMsgInputs.at(ch), i);
}
if (!fMultitransportProceed)
{
if (!fMultitransportProceed) {
break;
}
}
}
if (!fMultitransportProceed)
{
if (!fMultitransportProceed) {
break;
}
}
}
}
catch (exception& e)
{
} catch (exception& e) {
LOG(error) << "FairMQDevice::PollForTransport() failed: " << e.what() << ", going to ERROR state.";
throw runtime_error(tools::ToString("FairMQDevice::PollForTransport() failed: ", e.what(), ", going to ERROR state."));
}
@@ -711,12 +632,9 @@ bool FairMQDevice::HandleMsgInput(const string& chName, const InputMsgCallback&
{
unique_ptr<FairMQMessage> input(fChannels.at(chName).at(i).fTransportFactory->CreateMessage());
if (Receive(input, chName, i) >= 0)
{
if (Receive(input, chName, i) >= 0) {
return callback(input, i);
}
else
{
} else {
return false;
}
}
@@ -725,12 +643,9 @@ bool FairMQDevice::HandleMultipartInput(const string& chName, const InputMultipa
{
FairMQParts input;
if (Receive(input, chName, i) >= 0)
{
return callback(input, 0);
}
else
{
if (Receive(input, chName, i) >= 0) {
return callback(input, i);
} else {
return false;
}
}

View File

@@ -9,17 +9,17 @@
#ifndef FAIRMQDEVICE_H_
#define FAIRMQDEVICE_H_
#include <StateMachine.h>
#include <FairMQTransportFactory.h>
#include <fairmq/Transports.h>
#include <fairmq/StateQueue.h>
#include <FairMQChannel.h>
#include <FairMQLogger.h>
#include <FairMQMessage.h>
#include <FairMQParts.h>
#include <FairMQTransportFactory.h>
#include <FairMQUnmanagedRegion.h>
#include <FairMQLogger.h>
#include <fairmq/ProgOptions.h>
#include <fairmq/StateMachine.h>
#include <fairmq/StateQueue.h>
#include <fairmq/Transports.h>
#include <fairmq/tools/Version.h>
#include <vector>
#include <memory> // unique_ptr
@@ -34,58 +34,21 @@
#include <cstddef>
#include <utility> // pair
#include <fairmq/tools/Version.h>
using FairMQChannelMap = std::unordered_map<std::string, std::vector<FairMQChannel>>;
using InputMsgCallback = std::function<bool(FairMQMessagePtr&, int)>;
using InputMultipartCallback = std::function<bool(FairMQParts&, int)>;
namespace fair
{
namespace mq
namespace fair::mq
{
struct OngoingTransition : std::runtime_error { using std::runtime_error::runtime_error; };
}
}
class FairMQDevice
{
friend class FairMQChannel;
public:
// backwards-compatibility enum for old state machine interface, todo: delete this
enum Event
{
INIT_DEVICE,
internal_DEVICE_READY,
INIT_TASK,
internal_READY,
RUN,
STOP,
RESET_TASK,
RESET_DEVICE,
internal_IDLE,
END,
ERROR_FOUND
};
// backwards-compatibility enum for old state machine interface, todo: delete this
enum State
{
OK,
Error,
IDLE,
INITIALIZING_DEVICE,
DEVICE_READY,
INITIALIZING_TASK,
READY,
RUNNING,
RESETTING_TASK,
RESETTING_DEVICE,
EXITING
};
/// Default constructor
FairMQDevice();
/// Constructor with external fair::mq::ProgOptions
@@ -128,7 +91,7 @@ class FairMQDevice
/// @param chan channel name
/// @param i channel index
/// @param sndTimeoutInMs send timeout in ms, -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send)
/// @return Number of bytes that have been queued, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change)
/// @return Number of bytes that have been queued, TransferCode::timeout if timed out, TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by requested state change)
int64_t Send(FairMQMessagePtr& msg, const std::string& channel, const int index = 0, int sndTimeoutInMs = -1)
{
return GetChannel(channel, index).Send(msg, sndTimeoutInMs);
@@ -139,7 +102,7 @@ class FairMQDevice
/// @param chan channel name
/// @param i channel index
/// @param rcvTimeoutInMs receive timeout in ms, -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive)
/// @return Number of bytes that have been received, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change)
/// @return Number of bytes that have been received, TransferCode::timeout if timed out, TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by requested state change)
int64_t Receive(FairMQMessagePtr& msg, const std::string& channel, const int index = 0, int rcvTimeoutInMs = -1)
{
return GetChannel(channel, index).Receive(msg, rcvTimeoutInMs);
@@ -150,7 +113,7 @@ class FairMQDevice
/// @param chan channel name
/// @param i channel index
/// @param sndTimeoutInMs send timeout in ms, -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send)
/// @return Number of bytes that have been queued, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change)
/// @return Number of bytes that have been queued, TransferCode::timeout if timed out, TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by requested state change)
int64_t Send(FairMQParts& parts, const std::string& channel, const int index = 0, int sndTimeoutInMs = -1)
{
return GetChannel(channel, index).Send(parts.fParts, sndTimeoutInMs);
@@ -161,7 +124,7 @@ class FairMQDevice
/// @param chan channel name
/// @param i channel index
/// @param rcvTimeoutInMs receive timeout in ms, -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive)
/// @return Number of bytes that have been received, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change)
/// @return Number of bytes that have been received, TransferCode::timeout if timed out, TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by requested state change)
int64_t Receive(FairMQParts& parts, const std::string& channel, const int index = 0, int rcvTimeoutInMs = -1)
{
return GetChannel(channel, index).Receive(parts.fParts, rcvTimeoutInMs);
@@ -449,8 +412,6 @@ class FairMQDevice
/// Called in the RUNNING state once after executing the Run()/ConditionalRun() method
virtual void PostRun() {}
virtual void Pause() __attribute__((deprecated("PAUSE state is removed. This method is never called. To pause Run, go to READY with STOP transition and back to RUNNING with RUN to resume."))) {}
/// Resets the user task (to be overloaded in child classes)
virtual void ResetTask() {}
@@ -458,36 +419,64 @@ class FairMQDevice
virtual void Reset() {}
public:
/// @brief Request a device state transition
/// @param transition state transition
///
/// The state transition may not happen immediately, but when the current state evaluates the
/// pending transition event and terminates. In other words, the device states are scheduled cooperatively.
bool ChangeState(const fair::mq::Transition transition) { return fStateMachine.ChangeState(transition); }
/// @brief Request a device state transition
/// @param transition state transition
///
/// The state transition may not happen immediately, but when the current state evaluates the
/// pending transition event and terminates. In other words, the device states are scheduled cooperatively.
bool ChangeState(const std::string& transition) { return fStateMachine.ChangeState(fair::mq::GetTransition(transition)); }
bool ChangeState(const int transition) __attribute__((deprecated("Use ChangeState(const fair::mq::Transition transition).")));
void WaitForEndOfState(const fair::mq::Transition transition) __attribute__((deprecated("Use WaitForState(fair::mq::State expectedState).")));
void WaitForEndOfState(const std::string& transition) __attribute__((deprecated("Use WaitForState(fair::mq::State expectedState)."))) { WaitForState(transition); }
/// @brief waits for the next state (any) to occur
fair::mq::State WaitForNextState() { return fStateQueue.WaitForNext(); }
/// @brief waits for the specified state to occur
/// @param state state to wait for
void WaitForState(fair::mq::State state) { fStateQueue.WaitForState(state); }
/// @brief waits for the specified state to occur
/// @param state state to wait for
void WaitForState(const std::string& state) { WaitForState(fair::mq::GetState(state)); }
void TransitionTo(const fair::mq::State state);
/// @brief Subscribe with a callback to state changes
/// @param key id to identify your subscription
/// @param callback callback (called with the new state as the parameter)
///
/// The callback is called at the beginning of a new state.
/// The callback is called from the thread the state is running in.
void SubscribeToStateChange(const std::string& key, std::function<void(const fair::mq::State)> callback) { fStateMachine.SubscribeToStateChange(key, callback); }
/// @brief Unsubscribe from state changes
/// @param key id (that was used when subscribing)
void UnsubscribeFromStateChange(const std::string& key) { fStateMachine.UnsubscribeFromStateChange(key); }
/// @brief Subscribe with a callback to incoming state transitions
/// @param key id to identify your subscription
/// @param callback callback (called with the incoming transition as the parameter)
/// The callback is called when new transition is initiated.
/// The callback is called from the thread that initiates the transition (via ChangeState).
void SubscribeToNewTransition(const std::string& key, std::function<void(const fair::mq::Transition)> callback) { fStateMachine.SubscribeToNewTransition(key, callback); }
/// @brief Unsubscribe from state transitions
/// @param key id (that was used when subscribing)
void UnsubscribeFromNewTransition(const std::string& key) { fStateMachine.UnsubscribeFromNewTransition(key); }
bool CheckCurrentState(const int /* state */) const __attribute__((deprecated("Use NewStatePending()."))) { return !fStateMachine.NewStatePending(); }
bool CheckCurrentState(const std::string& /* state */) const __attribute__((deprecated("Use NewStatePending()."))) { return !fStateMachine.NewStatePending(); }
/// Returns true is a new state has been requested, signaling the current handler to stop.
/// @brief Returns true if a new state has been requested, signaling the current handler to stop.
bool NewStatePending() const { return fStateMachine.NewStatePending(); }
/// @brief Returns the current state
fair::mq::State GetCurrentState() const { return fStateMachine.GetCurrentState(); }
/// @brief Returns the name of the current state as a string
std::string GetCurrentStateName() const { return fStateMachine.GetCurrentStateName(); }
/// @brief Returns name of the given state as a string
/// @param state state
static std::string GetStateName(const fair::mq::State state) { return fair::mq::GetStateName(state); }
/// @brief Returns name of the given transition as a string
/// @param transition transition
static std::string GetTransitionName(const fair::mq::Transition transition) { return fair::mq::GetTransitionName(transition); }
static constexpr const char* DefaultId = "";

View File

@@ -18,9 +18,7 @@
using fairmq_free_fn = void(void* data, void* hint);
class FairMQTransportFactory;
namespace fair
{
namespace mq
namespace fair::mq
{
struct Alignment
@@ -29,8 +27,7 @@ struct Alignment
explicit operator size_t() const { return alignment; }
};
} /* namespace mq */
} /* namespace fair */
} // namespace fair::mq
class FairMQMessage
{
@@ -39,7 +36,9 @@ class FairMQMessage
FairMQMessage(FairMQTransportFactory* factory) : fTransport(factory) {}
virtual void Rebuild() = 0;
virtual void Rebuild(fair::mq::Alignment alignment) = 0;
virtual void Rebuild(const size_t size) = 0;
virtual void Rebuild(const size_t size, fair::mq::Alignment alignment) = 0;
virtual void Rebuild(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) = 0;
virtual void* GetData() const = 0;
@@ -61,9 +60,7 @@ class FairMQMessage
using FairMQMessagePtr = std::unique_ptr<FairMQMessage>;
namespace fair
{
namespace mq
namespace fair::mq
{
using Message = FairMQMessage;
@@ -71,7 +68,6 @@ using MessagePtr = FairMQMessagePtr;
struct MessageError : std::runtime_error { using std::runtime_error::runtime_error; };
struct MessageBadAlloc : std::runtime_error { using std::runtime_error::runtime_error; };
} /* namespace mq */
} /* namespace fair */
} // namespace fair::mq
#endif /* FAIRMQMESSAGE_H_ */

View File

@@ -1,5 +1,5 @@
/********************************************************************************
* Copyright (C) 2014-2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* Copyright (C) 2014-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
@@ -9,8 +9,9 @@
#ifndef FAIRMQPOLLER_H_
#define FAIRMQPOLLER_H_
#include <string>
#include <memory>
#include <stdexcept>
#include <string>
class FairMQPoller
{
@@ -26,16 +27,13 @@ class FairMQPoller
using FairMQPollerPtr = std::unique_ptr<FairMQPoller>;
namespace fair
{
namespace mq
namespace fair::mq
{
using Poller = FairMQPoller;
using PollerPtr = FairMQPollerPtr;
struct PollerError : std::runtime_error { using std::runtime_error::runtime_error; };
} /* namespace mq */
} /* namespace fair */
} // namespace fair::mq
#endif /* FAIRMQPOLLER_H_ */

View File

@@ -19,20 +19,18 @@
class FairMQTransportFactory;
namespace fair
{
namespace mq
namespace fair::mq
{
enum class TransferResult : int
enum class TransferCode : int
{
success = 0,
error = -1,
timeout = -2,
interrupted = -3
};
} // namespace mq
} // namespace fair
} // namespace fair::mq
class FairMQSocket
{
@@ -86,16 +84,13 @@ class FairMQSocket
using FairMQSocketPtr = std::unique_ptr<FairMQSocket>;
namespace fair
{
namespace mq
namespace fair::mq
{
using Socket = FairMQSocket;
using SocketPtr = FairMQSocketPtr;
struct SocketError : std::runtime_error { using std::runtime_error::runtime_error; };
} /* namespace mq */
} /* namespace fair */
} // namespace fair::mq
#endif /* FAIRMQSOCKET_H_ */

View File

@@ -13,7 +13,8 @@
#include <fairmq/ofi/TransportFactory.h>
#endif
#include <FairMQLogger.h>
#include <fairmq/Tools.h>
#include <fairmq/tools/Unique.h>
#include <fairmq/tools/Strings.h>
#include <fairlogger/Logger.h>

View File

@@ -24,7 +24,7 @@
#include <cstddef> // size_t
class FairMQChannel;
namespace fair { namespace mq { class ProgOptions; } }
namespace fair::mq { class ProgOptions; }
class FairMQTransportFactory
{
@@ -171,15 +171,12 @@ class FairMQTransportFactory
}
};
namespace fair
{
namespace mq
namespace fair::mq
{
using TransportFactory = FairMQTransportFactory;
struct TransportFactoryError : std::runtime_error { using std::runtime_error::runtime_error; };
} /* namespace mq */
} /* namespace fair */
} // namespace fair::mq
#endif /* FAIRMQTRANSPORTFACTORY_H_ */

View File

@@ -104,9 +104,7 @@ inline std::ostream& operator<<(std::ostream& os, const FairMQRegionEvent& event
}
}
namespace fair
{
namespace mq
namespace fair::mq
{
using RegionCallback = FairMQRegionCallback;
@@ -118,7 +116,6 @@ using RegionBlock = FairMQRegionBlock;
using UnmanagedRegion = FairMQUnmanagedRegion;
using UnmanagedRegionPtr = FairMQUnmanagedRegionPtr;
} /* namespace mq */
} /* namespace fair */
} // namespace fair::mq
#endif /* FAIRMQUNMANAGEDREGION_H_ */

View File

@@ -15,7 +15,6 @@
#include "JSONParser.h"
#include "FairMQChannel.h"
#include <fairmq/PropertyOutput.h>
#include <fairmq/tools/CppSTL.h>
#include <fairmq/tools/Strings.h>
#include <fairlogger/Logger.h>
@@ -31,9 +30,7 @@ using namespace fair::mq;
using namespace fair::mq::tools;
using namespace boost::property_tree;
namespace fair
{
namespace mq
namespace fair::mq
{
fair::mq::Properties PtreeParser(const ptree& pt, const string& id)
@@ -186,5 +183,4 @@ void SubChannelParser(const ptree& channelTree, fair::mq::Properties& properties
}
} // helper namespace
} // namespace mq
} // namespace fair
} // namespace fair::mq

View File

@@ -21,9 +21,7 @@
#include <stdexcept>
#include <string>
namespace fair
{
namespace mq
namespace fair::mq
{
struct ParserError : std::runtime_error { using std::runtime_error::runtime_error; };
@@ -41,7 +39,6 @@ void SubChannelParser(const boost::property_tree::ptree& tree, fair::mq::Propert
} // helper namespace
} // namespace mq
} // namespace fair
} // namespace fair::mq
#endif /* FAIR_MQ_JSONPARSER_H */

View File

@@ -15,8 +15,8 @@
#include <fairmq/FairMQTransportFactory.h>
#include <fairmq/MemoryResources.h>
namespace fair {
namespace mq {
namespace fair::mq
{
using BytePmrAllocator = pmr::polymorphic_allocator<fair::mq::byte>;
@@ -62,5 +62,4 @@ FairMQMessagePtr getMessage(ContainerT &&container_, FairMQMemoryResource *targe
return message;
}
} /* namespace mq */
} /* namespace fair */
} // namespace fair::mq

View File

@@ -15,7 +15,7 @@
#ifndef FAIR_MQ_MEMORY_RESOURCES_H
#define FAIR_MQ_MEMORY_RESOURCES_H
#include <fairmq/FairMQMessage.h>
#include <FairMQMessage.h>
class FairMQTransportFactory;
#include <boost/container/container_fwd.hpp>
@@ -26,8 +26,8 @@ class FairMQTransportFactory;
#include <stdexcept>
#include <utility>
namespace fair {
namespace mq {
namespace fair::mq
{
using byte = unsigned char;
namespace pmr = boost::container::pmr;
@@ -107,7 +107,6 @@ class ChannelResource : public FairMQMemoryResource
};
};
} /* namespace mq */
} /* namespace fair */
} // namespace fair::mq
#endif /* FAIR_MQ_MEMORY_RESOURCES_H */

View File

@@ -9,7 +9,6 @@
#ifndef FAIR_MQ_PLUGIN_H
#define FAIR_MQ_PLUGIN_H
#include <fairmq/tools/CppSTL.h>
#include <fairmq/tools/Version.h>
#include <fairmq/PluginServices.h>
@@ -25,9 +24,7 @@
#include <tuple>
#include <utility>
namespace fair
{
namespace mq
namespace fair::mq
{
/**
@@ -134,13 +131,12 @@ class Plugin
PluginServices* fPluginServices;
}; /* class Plugin */
} /* namespace mq */
} /* namespace fair */
} // namespace fair::mq
#define REGISTER_FAIRMQ_PLUGIN(KLASS, NAME, VERSION, MAINTAINER, HOMEPAGE, PROGOPTIONS) \
static auto Make_##NAME##_Plugin(fair::mq::PluginServices* pluginServices) -> std::unique_ptr<fair::mq::Plugin> \
{ \
return fair::mq::tools::make_unique<KLASS>(std::string{#NAME}, VERSION, std::string{MAINTAINER}, std::string{HOMEPAGE}, pluginServices); \
return std::make_unique<KLASS>(std::string{#NAME}, VERSION, std::string{MAINTAINER}, std::string{HOMEPAGE}, pluginServices); \
} \
BOOST_DLL_ALIAS(Make_##NAME##_Plugin, make_##NAME##_plugin) \
BOOST_DLL_ALIAS(PROGOPTIONS, get_##NAME##_plugin_progoptions)

View File

@@ -11,7 +11,6 @@
#include <fairmq/Plugin.h>
#include <fairmq/PluginServices.h>
#include <fairmq/tools/CppSTL.h>
#include <fairmq/tools/Strings.h>
#define BOOST_FILESYSTEM_VERSION 3
@@ -31,9 +30,7 @@
#include <vector>
#include <utility> // forward
namespace fair
{
namespace mq
namespace fair::mq
{
/**
@@ -80,7 +77,7 @@ class PluginManager
auto ForEachPluginProgOptions(std::function<void (boost::program_options::options_description)> func) const -> void { for(const auto& pair : fPluginProgOptions) { func(pair.second); } }
template<typename... Args>
auto EmplacePluginServices(Args&&... args) -> void { fPluginServices = fair::mq::tools::make_unique<PluginServices>(std::forward<Args>(args)...); }
auto EmplacePluginServices(Args&&... args) -> void { fPluginServices = std::make_unique<PluginServices>(std::forward<Args>(args)...); }
auto WaitForPluginsToReleaseDeviceControl() -> void { fPluginServices->WaitForReleaseDeviceControl(); }
@@ -126,7 +123,6 @@ class PluginManager
std::map<std::string, boost::program_options::options_description> fPluginProgOptions;
}; /* class PluginManager */
} /* namespace mq */
} /* namespace fair */
} // namespace fair::mq
#endif /* FAIR_MQ_PLUGINMANAGER_H */

View File

@@ -26,9 +26,7 @@
#include <unordered_map>
#include <vector>
namespace fair
{
namespace mq
namespace fair::mq
{
/**
@@ -281,7 +279,6 @@ class PluginServices
std::condition_variable fReleaseDeviceControlCondition;
}; /* class PluginServices */
} /* namespace mq */
} /* namespace fair */
} // namespace fair::mq
#endif /* FAIR_MQ_PLUGINSERVICES_H */

View File

@@ -39,9 +39,7 @@ struct ValInfo
string origin;
};
namespace fair
{
namespace mq
namespace fair::mq
{
ValInfo ConvertVarValToValInfo(const po::variable_value& v)
@@ -449,6 +447,4 @@ void ProgOptions::PrintOptionsRaw() const
}
}
} // namespace mq
} // namespace fair
} // namespace fair::mq

View File

@@ -26,9 +26,7 @@
#include <vector>
#include <stdexcept>
namespace fair
{
namespace mq
namespace fair::mq
{
struct PropertyNotFoundError : std::runtime_error { using std::runtime_error::runtime_error; };
@@ -270,7 +268,6 @@ class ProgOptions
mutable std::mutex fMtx;
};
} // namespace mq
} // namespace fair
} // namespace fair::mq
#endif /* FAIR_MQ_PROGOPTIONS_H */

View File

@@ -9,13 +9,10 @@
#ifndef FAIR_MQ_PROGOPTIONSFWD_H
#define FAIR_MQ_PROGOPTIONSFWD_H
namespace fair
{
namespace mq
namespace fair::mq
{
class ProgOptions;
}
}
using FairMQProgOptions = fair::mq::ProgOptions;

View File

@@ -7,17 +7,13 @@
********************************************************************************/
#include <fairmq/Properties.h>
#include <fairmq/Tools.h>
#include <boost/filesystem.hpp>
using namespace std;
using namespace fair::mq::tools;
using boost::any_cast;
namespace fair
{
namespace mq
namespace fair::mq
{
template<class T>
@@ -145,5 +141,4 @@ unordered_map<type_index, void(*)(const EventManager&, const string&, const Prop
{ type_index(typeid(vector<boost::filesystem::path>)), [](const EventManager& em, const string& k, const Property& p) { em.Emit<PropertyChange, vector<boost::filesystem::path>>(k, any_cast<vector<boost::filesystem::path>>(p)); } },
};
} // namespace mq
} // namespace fair
} // namespace fair::mq

View File

@@ -22,9 +22,7 @@
#include <typeinfo>
#include <utility> // pair
namespace fair
{
namespace mq
namespace fair::mq
{
using Property = boost::any;
@@ -72,7 +70,6 @@ class PropertyHelper
static std::unordered_map<std::type_index, std::function<std::pair<std::string, std::string>(const Property&)>> fTypeInfos;
};
}
}
#endif /* FAIR_MQ_PROPERTIES_H */

View File

@@ -35,9 +35,7 @@ using namespace boost::msm::front;
using namespace boost::msm::back;
namespace bmpl = boost::mpl;
namespace fair
{
namespace mq
namespace fair::mq
{
namespace fsm
{
@@ -239,8 +237,7 @@ struct Machine_ : public state_machine_def<Machine_>
using FairMQFSM = state_machine<Machine_>;
} // namespace fsm
} // namespace mq
} // namespace fair
} // namespace fair::mq
using namespace fair::mq::fsm;
using namespace fair::mq;

View File

@@ -16,9 +16,7 @@
#include <functional>
#include <stdexcept>
namespace fair
{
namespace mq
namespace fair::mq
{
class StateMachine
@@ -56,7 +54,6 @@ class StateMachine
std::shared_ptr<void> fFsm;
};
} // namespace mq
} // namespace fair
} // namespace fair::mq
#endif /* FAIRMQSTATEMACHINE_H_ */

View File

@@ -17,9 +17,7 @@
#include <utility> // pair
#include <condition_variable>
namespace fair
{
namespace mq
namespace fair::mq
{
class StateQueue
@@ -88,7 +86,6 @@ class StateQueue
std::condition_variable fCV;
};
} // namespace mq
} // namespace fair
} // namespace fair::mq
#endif /* FAIRMQSTATEQUEUE_H_ */
#endif /* FAIRMQSTATEQUEUE_H_ */

View File

@@ -13,9 +13,7 @@
using namespace std;
namespace fair
{
namespace mq
namespace fair::mq
{
array<string, 16> stateNames =
@@ -114,5 +112,4 @@ Transition GetTransition(const string& transition)
return transitions.at(transition);
}
} // namespace mq
} // namespace fair
} // namespace fair::mq

View File

@@ -13,9 +13,7 @@
#include <ostream>
#include <stdexcept>
namespace fair
{
namespace mq
namespace fair::mq
{
enum class State : int
@@ -64,7 +62,6 @@ struct DeviceErrorState : std::runtime_error { using std::runtime_error::runtime
inline std::ostream& operator<<(std::ostream& os, const State& state) { return os << GetStateName(state); }
inline std::ostream& operator<<(std::ostream& os, const Transition& transition) { return os << GetTransitionName(transition); }
} // namespace mq
} // namespace fair
} // namespace fair::mq
#endif /* FAIRMQSTATES_H_ */

View File

@@ -1,5 +1,5 @@
/********************************************************************************
* Copyright (C) 2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* Copyright (C) 2017-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public License (LGPL) version 3, *
@@ -17,15 +17,14 @@
#include <fairlogger/Logger.h>
#include <boost/property_tree/ptree.hpp>
#include <utility> // make_pair
#include <string_view>
#include <utility> // make_pair
#include <cstring>
using boost::property_tree::ptree;
using namespace std;
namespace fair
{
namespace mq
namespace fair::mq
{
enum channelOptionKeyIds
@@ -85,7 +84,16 @@ Properties SuboptParser(const vector<string>& channelConfig, const string& devic
string argString(token);
char* subopts = &argString[0];
char* value = nullptr;
// Find either a : or a =. If we find the former first, we consider what is before it
// the channel name
char* firstSep = strpbrk(subopts, ":=");
if (firstSep && *firstSep == ':') {
channelName = std::string_view(subopts, firstSep - subopts);
channelProperties.put("name", channelName);
subopts = firstSep + 1;
}
while (subopts && *subopts != 0 && *subopts != ' ') {
char* cur = subopts;
int subopt = getsubopt(&subopts, (char**)channelOptionKeys, &value);
if (subopt == NAME) {
channelName = value;
@@ -96,6 +104,8 @@ Properties SuboptParser(const vector<string>& channelConfig, const string& devic
socketsArray.push_back(make_pair("", socketProperties));
} else if (subopt >= 0 && value != nullptr) {
channelProperties.put(channelOptionKeys[subopt], value);
} else if (subopt == -1) {
LOG(warn) << "Ignoring unknown argument in --channel-config: " << cur;
}
}
@@ -120,4 +130,3 @@ Properties SuboptParser(const vector<string>& channelConfig, const string& devic
}
}
}

View File

@@ -19,9 +19,7 @@
#include <vector>
#include <string>
namespace fair
{
namespace mq
namespace fair::mq
{
/**
@@ -43,7 +41,6 @@ namespace mq
Properties SuboptParser(const std::vector<std::string>& channelConfig, const std::string& deviceId);
}
}
#endif /* FAIR_MQ_SUBOPTPARSER_H */

View File

@@ -9,7 +9,6 @@
#ifndef FAIR_MQ_TRANSPORTS_H
#define FAIR_MQ_TRANSPORTS_H
#include <fairmq/tools/CppSTL.h>
#include <fairmq/tools/Strings.h>
#include <memory>
@@ -17,9 +16,7 @@
#include <string>
#include <unordered_map>
namespace fair
{
namespace mq
namespace fair::mq
{
enum class Transport
@@ -32,20 +29,9 @@ enum class Transport
struct TransportError : std::runtime_error { using std::runtime_error::runtime_error; };
} /* namespace mq */
} /* namespace fair */
} // namespace fair::mq
namespace std
{
template<>
struct hash<fair::mq::Transport> : fair::mq::tools::HashEnum<fair::mq::Transport> {};
} /* namespace std */
namespace fair
{
namespace mq
namespace fair::mq
{
static std::unordered_map<std::string, Transport> TransportTypes {
@@ -74,7 +60,6 @@ try {
throw TransportError(tools::ToString("Unknown transport provided: ", transport));
}
} /* namespace mq */
} /* namespace fair */
} // namespace fair::mq
#endif /* FAIR_MQ_TRANSPORTS_H */

View File

@@ -21,7 +21,7 @@
#define FAIRMQ_GIT_DATE "@PROJECT_GIT_DATE@"
#define FAIRMQ_REPO_URL "https://github.com/FairRootGroup/FairMQ"
#define FAIRMQ_LICENSE "LGPL-3.0"
#define FAIRMQ_COPYRIGHT "2012-2020 GSI"
#define FAIRMQ_COPYRIGHT "2012-2021 GSI"
#define FAIRMQ_BUILD_TYPE "@CMAKE_BUILD_TYPE@"
#endif // FAIR_MQ_VERSION_H

View File

@@ -1,33 +1,35 @@
/********************************************************************************
* Copyright (C) 2014-2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* Copyright (C) 2014-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#ifndef FAIRMQBENCHMARKSAMPLER_H_
#define FAIRMQBENCHMARKSAMPLER_H_
#ifndef FAIR_MQ_BENCHMARKSAMPLER_H
#define FAIR_MQ_BENCHMARKSAMPLER_H
#include "../FairMQLogger.h"
#include "FairMQDevice.h"
#include "tools/RateLimit.h"
#include <fairmq/Device.h>
#include <fairmq/tools/RateLimit.h>
#include <atomic>
#include <chrono>
#include <cstddef> // size_t
#include <cstdint> // uint64_t
#include <cstring> // memset
#include <fairlogger/Logger.h>
#include <string>
namespace fair::mq
{
/**
* Sampler to generate traffic for benchmarking.
*/
class FairMQBenchmarkSampler : public FairMQDevice
class BenchmarkSampler : public Device
{
public:
FairMQBenchmarkSampler()
BenchmarkSampler()
: fMultipart(false)
, fMemSet(false)
, fNumParts(1)
@@ -106,19 +108,18 @@ class FairMQBenchmarkSampler : public FairMQDevice
LOG(info) << "Done " << fNumIterations << " iterations in " << std::chrono::duration<double, std::milli>(tEnd - tStart).count() << "ms.";
}
virtual ~FairMQBenchmarkSampler() {}
protected:
bool fMultipart;
bool fMemSet;
size_t fNumParts;
size_t fMsgSize;
size_t fMsgAlignment;
std::atomic<int> fMsgCounter;
float fMsgRate;
uint64_t fNumIterations;
uint64_t fMaxIterations;
std::string fOutChannelName;
};
#endif /* FAIRMQBENCHMARKSAMPLER_H_ */
} // namespace fair::mq
#endif /* FAIR_MQ_BENCHMARKSAMPLER_H */

View File

@@ -1,93 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* FairMQSink.h
*
* @since 2013-01-09
* @author D. Klein, A. Rybalchenko
*/
#ifndef FAIRMQSINK_H_
#define FAIRMQSINK_H_
#include "../FairMQDevice.h"
#include "../FairMQLogger.h"
#include <chrono>
#include <string>
// template<typename OutputPolicy>
class FairMQSink : public FairMQDevice //, public OutputPolicy
{
public:
FairMQSink()
: fMultipart(false)
, fMaxIterations(0)
, fNumIterations(0)
, fInChannelName()
{}
~FairMQSink() {}
protected:
bool fMultipart;
uint64_t fMaxIterations;
uint64_t fNumIterations;
std::string fInChannelName;
void InitTask() override
{
fMultipart = fConfig->GetProperty<bool>("multipart");
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
fInChannelName = fConfig->GetProperty<std::string>("in-channel");
}
void Run() override
{
// store the channel reference to avoid traversing the map on every loop iteration
FairMQChannel& dataInChannel = fChannels.at(fInChannelName).at(0);
LOG(info) << "Starting the benchmark and expecting to receive " << fMaxIterations << " messages.";
auto tStart = std::chrono::high_resolution_clock::now();
while (!NewStatePending()) {
if (fMultipart) {
FairMQParts parts;
if (dataInChannel.Receive(parts) >= 0) {
if (fMaxIterations > 0) {
if (fNumIterations >= fMaxIterations) {
LOG(info) << "Configured maximum number of iterations reached.";
break;
}
}
fNumIterations++;
}
} else {
FairMQMessagePtr msg(dataInChannel.NewMessage());
if (dataInChannel.Receive(msg) >= 0) {
if (fMaxIterations > 0) {
if (fNumIterations >= fMaxIterations) {
LOG(info) << "Configured maximum number of iterations reached.";
break;
}
}
fNumIterations++;
}
}
}
auto tEnd = std::chrono::high_resolution_clock::now();
LOG(info) << "Leaving RUNNING state. Received " << fNumIterations << " messages in "
<< std::chrono::duration<double, std::milli>(tEnd - tStart).count() << "ms.";
}
};
#endif /* FAIRMQSINK_H_ */

View File

@@ -1,36 +1,32 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* Copyright (C) 2014-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* FairMQMerger.h
*
* @since 2012-12-06
* @author D. Klein, A. Rybalchenko
*/
#ifndef FAIRMQMERGER_H_
#define FAIRMQMERGER_H_
#ifndef FAIR_MQ_MERGER_H
#define FAIR_MQ_MERGER_H
#include "FairMQDevice.h"
#include "../FairMQPoller.h"
#include "../FairMQLogger.h"
#include <FairMQPoller.h>
#include <fairmq/Device.h>
#include <fairlogger/Logger.h>
#include <string>
#include <vector>
class FairMQMerger : public FairMQDevice
namespace fair::mq
{
class Merger : public Device
{
public:
FairMQMerger()
Merger()
: fMultipart(true)
, fInChannelName("data-in")
, fOutChannelName("data-out")
{}
~FairMQMerger() {}
protected:
bool fMultipart;
@@ -112,4 +108,6 @@ class FairMQMerger : public FairMQDevice
}
};
#endif /* FAIRMQMERGER_H_ */
} // namespace fair::mq
#endif /* FAIR_MQ_MERGER_H */

View File

@@ -1,29 +1,31 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* Copyright (C) 2014-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#ifndef FAIRMQMULTIPLIER_H_
#define FAIRMQMULTIPLIER_H_
#ifndef FAIR_MQ_MULTIPLIER_H
#define FAIR_MQ_MULTIPLIER_H
#include "FairMQDevice.h"
#include <fairmq/Device.h>
#include <string>
#include <vector>
class FairMQMultiplier : public FairMQDevice
namespace fair::mq
{
class Multiplier : public Device
{
public:
FairMQMultiplier()
Multiplier()
: fMultipart(true)
, fNumOutputs(0)
, fInChannelName()
, fOutChannelNames()
{}
~FairMQMultiplier() {}
protected:
bool fMultipart;
@@ -39,9 +41,9 @@ class FairMQMultiplier : public FairMQDevice
fNumOutputs = fChannels.at(fOutChannelNames.at(0)).size();
if (fMultipart) {
OnData(fInChannelName, &FairMQMultiplier::HandleMultipartData);
OnData(fInChannelName, &Multiplier::HandleMultipartData);
} else {
OnData(fInChannelName, &FairMQMultiplier::HandleSingleData);
OnData(fInChannelName, &Multiplier::HandleSingleData);
}
}
@@ -107,4 +109,6 @@ class FairMQMultiplier : public FairMQDevice
}
};
#endif /* FAIRMQMULTIPLIER_H_ */
} // namespace fair::mq
#endif /* FAIR_MQ_MULTIPLIER_H */

View File

@@ -1,33 +1,29 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* Copyright (C) 2014-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* FairMQProxy.h
*
* @since 2013-10-02
* @author A. Rybalchenko
*/
#ifndef FAIRMQPROXY_H_
#define FAIRMQPROXY_H_
#ifndef FAIR_MQ_PROXY_H
#define FAIR_MQ_PROXY_H
#include "FairMQDevice.h"
#include <fairmq/Device.h>
#include <string>
class FairMQProxy : public FairMQDevice
namespace fair::mq
{
class Proxy : public Device
{
public:
FairMQProxy()
Proxy()
: fMultipart(true)
, fInChannelName()
, fOutChannelName()
{}
~FairMQProxy() {}
protected:
bool fMultipart;
@@ -73,4 +69,6 @@ class FairMQProxy : public FairMQDevice
}
};
#endif /* FAIRMQPROXY_H_ */
} // namespace fair::mq
#endif /* FAIR_MQ_PROXY_H */

View File

@@ -2,9 +2,9 @@
With FairMQ several generic devices are provided:
- **FairMQBenchmarkSampler**: generates random data of configurable size and at configurable rate and sends it out on an output channel.
- **FairMQSink**: receives messages on the input channel and simply discards them.
- **FairMQMerger**: receives data from multiple input channels and forwards it to a single output channel.
- **FairMQSplitter**: receives messages on a single input channels and round-robins them among multiple output channels (which can have different socket types).
- **FairMQMultiplier**: receives data from a single input channel and multiplies (copies) it to two or more output channels.
- **FairMQProxy**: connects input channel to output channel, where both can have different socket types and multiple peers.
- **BenchmarkSampler**: generates random data of configurable size and at configurable rate and sends it out on an output channel.
- **Sink**: receives messages on the input channel and simply discards them.
- **Merger**: receives data from multiple input channels and forwards it to a single output channel.
- **Splitter**: receives messages on a single input channels and round-robins them among multiple output channels (which can have different socket types).
- **Multiplier**: receives data from a single input channel and multiplies (copies) it to two or more output channels.
- **Proxy**: connects input channel to output channel, where both can have different socket types and multiple peers.

146
fairmq/devices/Sink.h Normal file
View File

@@ -0,0 +1,146 @@
/********************************************************************************
* Copyright (C) 2014-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#ifndef FAIR_MQ_SINK_H
#define FAIR_MQ_SINK_H
#include <FairMQPoller.h>
#include <fairmq/Device.h>
#include <fairmq/tools/Strings.h>
#include <chrono>
#include <fairlogger/Logger.h>
#include <fstream>
#include <string>
#include <stdexcept>
namespace fair::mq
{
class Sink : public Device
{
public:
Sink()
: fMultipart(false)
, fMaxIterations(0)
, fNumIterations(0)
, fMaxFileSize(0)
, fBytesWritten(0)
, fInChannelName()
, fOutFilename()
{}
protected:
bool fMultipart;
uint64_t fMaxIterations;
uint64_t fNumIterations;
uint64_t fMaxFileSize;
uint64_t fBytesWritten;
std::string fInChannelName;
std::string fOutFilename;
std::fstream fOutputFile;
void InitTask() override
{
fMultipart = fConfig->GetProperty<bool>("multipart");
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
fMaxFileSize = fConfig->GetProperty<uint64_t>("max-file-size");
fInChannelName = fConfig->GetProperty<std::string>("in-channel");
fOutFilename = fConfig->GetProperty<std::string>("out-filename");
fBytesWritten = 0;
}
void Run() override
{
// store the channel reference to avoid traversing the map on every loop iteration
FairMQChannel& dataInChannel = fChannels.at(fInChannelName).at(0);
LOG(info) << "Starting sink and expecting to receive " << fMaxIterations << " messages.";
auto tStart = std::chrono::high_resolution_clock::now();
if (!fOutFilename.empty()) {
LOG(debug) << "Incoming messages will be written to file: " << fOutFilename;
if (fMaxFileSize != 0) {
LOG(debug) << "File output will stop after " << fMaxFileSize << " bytes";
} else {
LOG(debug) << "ATTENTION: --max-file-size is 0 - output file will continue to grow until sink is stopped";
}
fOutputFile.open(fOutFilename, std::ios::out | std::ios::binary);
if (!fOutputFile) {
LOG(error) << "Could not open '" << fOutFilename;
throw std::runtime_error(fair::mq::tools::ToString("Could not open '", fOutFilename));
}
}
while (!NewStatePending()) {
if (fMultipart) {
FairMQParts parts;
if (dataInChannel.Receive(parts) < 0) {
continue;
}
if (fOutputFile.is_open()) {
for (const auto& part : parts) {
WriteToFile(static_cast<const char*>(part->GetData()), part->GetSize());
}
}
} else {
FairMQMessagePtr msg(dataInChannel.NewMessage());
if (dataInChannel.Receive(msg) < 0) {
continue;
}
if (fOutputFile.is_open()) {
WriteToFile(static_cast<const char*>(msg->GetData()), msg->GetSize());
}
}
if (fMaxFileSize > 0 && fBytesWritten >= fMaxFileSize) {
LOG(info) << "Written " << fBytesWritten << " bytes, stopping...";
break;
}
if (fMaxIterations > 0) {
if (fNumIterations >= fMaxIterations) {
LOG(info) << "Configured maximum number of iterations reached.";
break;
}
}
fNumIterations++;
}
if (fOutputFile.is_open()) {
fOutputFile.flush();
fOutputFile.close();
}
auto tEnd = std::chrono::high_resolution_clock::now();
auto ms = std::chrono::duration<double, std::milli>(tEnd - tStart).count();
LOG(info) << "Received " << fNumIterations << " messages in " << ms << "ms.";
if (!fOutFilename.empty()) {
auto sec = std::chrono::duration<double>(tEnd - tStart).count();
LOG(info) << "Closed '" << fOutFilename << "' after writing " << fBytesWritten << " bytes."
<< "(" << (fBytesWritten / (1000. * 1000.)) / sec << " MB/s)";
}
LOG(info) << "Leaving RUNNING state.";
}
void WriteToFile(const char* ptr, size_t size)
{
fOutputFile.write(ptr, size);
if (fOutputFile.bad()) {
LOG(error) << "failed writing to file";
throw std::runtime_error("failed writing to file");
}
fBytesWritten += size;
}
};
} // namespace fair::mq
#endif /* FAIR_MQ_SINK_H */

View File

@@ -1,35 +1,31 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* Copyright (C) 2014-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* FairMQSplitter.h
*
* @since 2012-12-06
* @author D. Klein, A. Rybalchenko
*/
#ifndef FAIRMQSPLITTER_H_
#define FAIRMQSPLITTER_H_
#ifndef FAIR_MQ_SPLITTER_H
#define FAIR_MQ_SPLITTER_H
#include "FairMQDevice.h"
#include <fairmq/Device.h>
#include <string>
class FairMQSplitter : public FairMQDevice
namespace fair::mq
{
class Splitter : public Device
{
public:
FairMQSplitter()
Splitter()
: fMultipart(true)
, fNumOutputs(0)
, fDirection(0)
, fInChannelName()
, fOutChannelName()
{}
~FairMQSplitter() {}
protected:
bool fMultipart;
@@ -47,9 +43,9 @@ class FairMQSplitter : public FairMQDevice
fDirection = 0;
if (fMultipart) {
OnData(fInChannelName, &FairMQSplitter::HandleData<FairMQParts>);
OnData(fInChannelName, &Splitter::HandleData<FairMQParts>);
} else {
OnData(fInChannelName, &FairMQSplitter::HandleData<FairMQMessagePtr>);
OnData(fInChannelName, &Splitter::HandleData<FairMQMessagePtr>);
}
}
@@ -66,4 +62,6 @@ class FairMQSplitter : public FairMQDevice
}
};
#endif /* FAIRMQSPLITTER_H_ */
} // namespace fair::mq
#endif /* FAIR_MQ_SPLITTER_H */

View File

@@ -1,13 +1,13 @@
/********************************************************************************
* Copyright (C) 2014-2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* Copyright (C) 2014-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <runFairMQDevice.h>
#include <devices/FairMQBenchmarkSampler.h>
#include <fairmq/devices/BenchmarkSampler.h>
#include <fairmq/runDevice.h>
namespace bpo = boost::program_options;
@@ -24,7 +24,7 @@ void addCustomOptions(bpo::options_description& options)
("msg-rate", bpo::value<float>()->default_value(0), "Msg rate limit in maximum number of messages per second");
}
FairMQDevicePtr getDevice(const fair::mq::ProgOptions& /* config */)
std::unique_ptr<fair::mq::Device> getDevice(const fair::mq::ProgOptions& /* config */)
{
return new FairMQBenchmarkSampler();
return std::make_unique<fair::mq::BenchmarkSampler>();
}

View File

@@ -1,13 +1,13 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* Copyright (C) 2014-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <runFairMQDevice.h>
#include <devices/FairMQMerger.h>
#include <fairmq/devices/Merger.h>
#include <fairmq/runDevice.h>
namespace bpo = boost::program_options;
@@ -19,7 +19,7 @@ void addCustomOptions(bpo::options_description& options)
("multipart", bpo::value<bool>()->default_value(true), "Handle multipart payloads");
}
FairMQDevicePtr getDevice(const fair::mq::ProgOptions& /*config*/)
std::unique_ptr<fair::mq::Device> getDevice(const fair::mq::ProgOptions& /*config*/)
{
return new FairMQMerger();
return std::make_unique<fair::mq::Merger>();
}

View File

@@ -1,13 +1,13 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* Copyright (C) 2014-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <runFairMQDevice.h>
#include <devices/FairMQMultiplier.h>
#include <fairmq/devices/Multiplier.h>
#include <fairmq/runDevice.h>
namespace bpo = boost::program_options;
@@ -19,7 +19,7 @@ void addCustomOptions(bpo::options_description& options)
("multipart", bpo::value<bool>()->default_value(true), "Handle multipart payloads");
}
FairMQDevicePtr getDevice(const fair::mq::ProgOptions& /*config*/)
std::unique_ptr<fair::mq::Device> getDevice(const fair::mq::ProgOptions& /*config*/)
{
return new FairMQMultiplier();
return std::make_unique<fair::mq::Multiplier>();
}

View File

@@ -1,13 +1,13 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* Copyright (C) 2014-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <runFairMQDevice.h>
#include <devices/FairMQProxy.h>
#include <fairmq/devices/Proxy.h>
#include <fairmq/runDevice.h>
namespace bpo = boost::program_options;
@@ -19,7 +19,7 @@ void addCustomOptions(bpo::options_description& options)
("multipart", bpo::value<bool>()->default_value(true), "Handle multipart payloads");
}
FairMQDevicePtr getDevice(const fair::mq::ProgOptions& /*config*/)
std::unique_ptr<fair::mq::Device> getDevice(const fair::mq::ProgOptions& /*config*/)
{
return new FairMQProxy();
return std::make_unique<fair::mq::Proxy>();
}

View File

@@ -1,13 +1,13 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* Copyright (C) 2014-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <runFairMQDevice.h>
#include <devices/FairMQSink.h>
#include <fairmq/devices/Sink.h>
#include <fairmq/runDevice.h>
namespace bpo = boost::program_options;
@@ -15,11 +15,13 @@ void addCustomOptions(bpo::options_description& options)
{
options.add_options()
("in-channel", bpo::value<std::string>()->default_value("data"), "Name of the input channel")
("out-filename", bpo::value<std::string>()->default_value(""), "Write incoming message buffers to the specified file")
("max-file-size", bpo::value<uint64_t>()->default_value(2000000000), "Maximum file size for the file output (0 - unlimited)")
("max-iterations", bpo::value<uint64_t>()->default_value(0), "Number of run iterations (0 - infinite)")
("multipart", bpo::value<bool>()->default_value(false), "Handle multipart payloads");
}
FairMQDevicePtr getDevice(const fair::mq::ProgOptions& /*config*/)
std::unique_ptr<fair::mq::Device> getDevice(const fair::mq::ProgOptions& /*config*/)
{
return new FairMQSink();
return std::make_unique<fair::mq::Sink>();
}

View File

@@ -1,13 +1,13 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* Copyright (C) 2014-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <runFairMQDevice.h>
#include <devices/FairMQSplitter.h>
#include <fairmq/devices/Splitter.h>
#include <fairmq/runDevice.h>
namespace bpo = boost::program_options;
@@ -19,7 +19,7 @@ void addCustomOptions(bpo::options_description& options)
("multipart", bpo::value<bool>()->default_value(true), "Handle multipart payloads");
}
FairMQDevicePtr getDevice(const fair::mq::ProgOptions& /*config*/)
std::unique_ptr<fair::mq::Device> getDevice(const fair::mq::ProgOptions& /*config*/)
{
return new FairMQSplitter();
return std::make_unique<fair::mq::Splitter>();
}

View File

@@ -7,7 +7,7 @@
********************************************************************************/
#include <fairmq/ofi/Context.h>
#include <fairmq/Tools.h>
#include <fairmq/tools/Strings.h>
#include <FairMQLogger.h>
#include <asiofi/version.hpp>
@@ -22,11 +22,7 @@
#include <string.h>
#include <sys/socket.h>
namespace fair
{
namespace mq
{
namespace ofi
namespace fair::mq::ofi
{
using namespace std;
@@ -134,6 +130,4 @@ auto Context::MakeSendMessage(size_t size) -> MessagePtr
return fSendFactory.CreateMessage(size);
}
} /* namespace ofi */
} /* namespace mq */
} /* namespace fair */
} // namespace fair::mq::ofi

View File

@@ -24,11 +24,7 @@
#include <thread>
#include <vector>
namespace fair
{
namespace mq
{
namespace ofi
namespace fair::mq::ofi
{
enum class ConnectionType : bool { Bind, Connect };
@@ -88,8 +84,6 @@ class Context
struct ContextError : std::runtime_error { using std::runtime_error::runtime_error; };
} /* namespace ofi */
} /* namespace mq */
} /* namespace fair */
} // namespace fair::mq::ofi
#endif /* FAIR_MQ_OFI_CONTEXT_H */

View File

@@ -17,8 +17,8 @@
#include <memory>
#include <type_traits>
namespace boost {
namespace asio {
namespace boost::asio
{
template<typename PodType>
auto buffer(const PodType& obj) -> boost::asio::const_buffer
@@ -26,12 +26,10 @@ auto buffer(const PodType& obj) -> boost::asio::const_buffer
return boost::asio::const_buffer(static_cast<const void*>(&obj), sizeof(PodType));
}
} // namespace asio
} // namespace boost
} // namespace boost::asio
namespace fair {
namespace mq {
namespace ofi {
namespace fair::mq::ofi
{
enum class ControlMessageType
{
@@ -109,8 +107,6 @@ auto MakeControlMessage(Args&&... args) -> ControlMessage
return ctrl;
}
} // namespace ofi
} // namespace mq
} // namespace fair
} // namespace fair::mq::ofi
#endif /* FAIR_MQ_OFI_CONTROLMESSAGES_H */

View File

@@ -7,7 +7,6 @@
********************************************************************************/
#include <fairmq/ofi/Message.h>
#include <fairmq/Tools.h>
#include <FairMQLogger.h>
#include <asiofi.hpp>
@@ -15,11 +14,7 @@
#include <cstdlib>
#include <zmq.h>
namespace fair
{
namespace mq
{
namespace ofi
namespace fair::mq::ofi
{
using namespace std;
@@ -110,6 +105,12 @@ auto Message::Rebuild() -> void
fHint = nullptr;
}
auto Message::Rebuild(Alignment /* alignment */) -> void
{
// TODO: implement alignment
Rebuild();
}
auto Message::Rebuild(const size_t size) -> void
{
if (fFreeFunction) {
@@ -131,6 +132,12 @@ auto Message::Rebuild(const size_t size) -> void
fHint = nullptr;
}
auto Message::Rebuild(const size_t size, Alignment /* alignment */) -> void
{
// TODO: implement alignment
Rebuild(size);
}
auto Message::Rebuild(void* /*data*/, const size_t size, fairmq_free_fn* ffn, void* hint) -> void
{
if (fFreeFunction) {
@@ -190,6 +197,4 @@ Message::~Message()
}
}
} /* namespace ofi */
} /* namespace mq */
} /* namespace fair */
} // namespace fair::mq::ofi

View File

@@ -17,11 +17,7 @@
#include <cstddef> // size_t
#include <zmq.h>
namespace fair
{
namespace mq
{
namespace ofi
namespace fair::mq::ofi
{
/**
@@ -52,7 +48,9 @@ class Message final : public fair::mq::Message
Message operator=(const Message&) = delete;
auto Rebuild() -> void override;
auto Rebuild(Alignment alignment) -> void override;
auto Rebuild(const size_t size) -> void override;
auto Rebuild(const size_t size, Alignment alignment) -> void override;
auto Rebuild(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) -> void override;
auto GetData() const -> void* override;
@@ -75,8 +73,6 @@ class Message final : public fair::mq::Message
boost::container::pmr::memory_resource* fPmr;
}; /* class Message */
} /* namespace ofi */
} /* namespace mq */
} /* namespace fair */
} // namespace fair::mq::ofi
#endif /* FAIR_MQ_OFI_MESSAGE_H */

View File

@@ -8,16 +8,12 @@
#include <fairmq/ofi/Poller.h>
#include <fairmq/ofi/Socket.h>
#include <fairmq/Tools.h>
#include <fairmq/tools/Strings.h>
#include <FairMQLogger.h>
#include <zmq.h>
namespace fair
{
namespace mq
{
namespace ofi
namespace fair::mq::ofi
{
using namespace std;
@@ -153,6 +149,4 @@ Poller::~Poller()
delete[] fItems;
}
} /* namespace ofi */
} /* namespace mq */
} /* namespace fair */
} // namespace fair::mq::ofi

View File

@@ -18,11 +18,7 @@
#include <zmq.h>
namespace fair
{
namespace mq
{
namespace ofi
namespace fair::mq::ofi
{
class TransportFactory;
@@ -63,8 +59,6 @@ class Poller final : public FairMQPoller
std::unordered_map<std::string, int> fOffsetMap;
}; /* class Poller */
} /* namespace ofi */
} /* namespace mq */
} /* namespace fair */
} // namespace fair::mq::ofi
#endif /* FAIR_MQ_OFI_POLLER_H */

View File

@@ -9,7 +9,7 @@
#include <fairmq/ofi/ControlMessages.h>
#include <fairmq/ofi/Socket.h>
#include <fairmq/ofi/TransportFactory.h>
#include <fairmq/Tools.h>
#include <fairmq/tools/Strings.h>
#include <FairMQLogger.h>
#include <asiofi.hpp>
@@ -25,11 +25,7 @@
#include <mutex>
#include <queue>
namespace fair
{
namespace mq
{
namespace ofi
namespace fair::mq::ofi
{
using namespace std;
@@ -74,16 +70,16 @@ auto Socket::InitOfi(Address addr) -> void
hints.set_provider("verbs");
}
if (fRemoteAddr == addr) {
fOfiInfo = tools::make_unique<asiofi::info>(addr.Ip.c_str(), std::to_string(addr.Port).c_str(), 0, hints);
fOfiInfo = make_unique<asiofi::info>(addr.Ip.c_str(), std::to_string(addr.Port).c_str(), 0, hints);
} else {
fOfiInfo = tools::make_unique<asiofi::info>(addr.Ip.c_str(), std::to_string(addr.Port).c_str(), FI_SOURCE, hints);
fOfiInfo = make_unique<asiofi::info>(addr.Ip.c_str(), std::to_string(addr.Port).c_str(), FI_SOURCE, hints);
}
LOG(debug) << "OFI transport (" << fId << "): " << *fOfiInfo;
fOfiFabric = tools::make_unique<asiofi::fabric>(*fOfiInfo);
fOfiFabric = make_unique<asiofi::fabric>(*fOfiInfo);
fOfiDomain = tools::make_unique<asiofi::domain>(*fOfiFabric);
fOfiDomain = make_unique<asiofi::domain>(*fOfiFabric);
}
}
@@ -96,7 +92,7 @@ try {
InitOfi(fLocalAddr);
fPassiveEndpoint = tools::make_unique<asiofi::passive_endpoint>(fContext.GetIoContext(), *fOfiFabric);
fPassiveEndpoint = make_unique<asiofi::passive_endpoint>(fContext.GetIoContext(), *fOfiFabric);
//fPassiveEndpoint->set_local_address(Context::ConvertAddress(fLocalAddr));
BindControlEndpoint();
@@ -128,7 +124,7 @@ auto Socket::BindControlEndpoint() -> void
fPassiveEndpoint->listen([&](asiofi::info&& info) {
LOG(debug) << "OFI transport (" << fId
<< "): control band connection request received. Accepting ...";
fControlEndpoint = tools::make_unique<asiofi::connected_endpoint>(
fControlEndpoint = make_unique<asiofi::connected_endpoint>(
fContext.GetIoContext(), *fOfiDomain, info);
fControlEndpoint->enable();
fControlEndpoint->accept([&]() {
@@ -148,7 +144,7 @@ auto Socket::BindDataEndpoint() -> void
fPassiveEndpoint->listen([&](asiofi::info&& info) {
LOG(debug) << "OFI transport (" << fId
<< "): data band connection request received. Accepting ...";
fDataEndpoint = tools::make_unique<asiofi::connected_endpoint>(
fDataEndpoint = make_unique<asiofi::connected_endpoint>(
fContext.GetIoContext(), *fOfiDomain, info);
fDataEndpoint->enable();
fDataEndpoint->accept([&]() {
@@ -215,7 +211,7 @@ auto Socket::ConnectEndpoint(std::unique_ptr<asiofi::connected_endpoint>& endpoi
std::string band(type == Band::Control ? "control" : "data");
endpoint = tools::make_unique<asiofi::connected_endpoint>(fContext.GetIoContext(), *fOfiDomain);
endpoint = make_unique<asiofi::connected_endpoint>(fContext.GetIoContext(), *fOfiDomain);
endpoint->enable();
LOG(debug) << "OFI transport (" << fId << "): Sending " << band << " band connection request to " << fRemoteAddr;
@@ -284,7 +280,7 @@ try {
return size;
} catch (const std::exception& e) {
LOG(error) << e.what();
return static_cast<int64_t>(TransferResult::error);
return static_cast<int64_t>(TransferCode::error);
}
auto Socket::SendQueueReader() -> void
@@ -431,7 +427,7 @@ try {
return size;
} catch (const std::exception& e) {
LOG(error) << e.what();
return static_cast<int>(TransferResult::error);
return static_cast<int>(TransferCode::error);
}
auto Socket::Receive(std::vector<MessagePtr>& msgVec, const int /*timeout*/) -> int64_t
@@ -456,7 +452,7 @@ try {
return size;
} catch (const std::exception& e) {
LOG(error) << e.what();
return static_cast<int64_t>(TransferResult::error);
return static_cast<int64_t>(TransferCode::error);
}
auto Socket::RecvControlQueueReader() -> void
@@ -683,6 +679,4 @@ Socket::~Socket()
}
}
} /* namespace ofi */
} /* namespace mq */
} /* namespace fair */
} // namespace fair::mq::ofi

View File

@@ -23,11 +23,7 @@
#include <mutex>
namespace fair
{
namespace mq
{
namespace ofi
namespace fair::mq::ofi
{
/**
@@ -120,8 +116,6 @@ class Socket final : public fair::mq::Socket
struct SilentSocketError : SocketError { using SocketError::SocketError; };
} /* namespace ofi */
} /* namespace mq */
} /* namespace fair */
} // namespace fair::mq::ofi
#endif /* FAIR_MQ_OFI_SOCKET_H */

View File

@@ -10,15 +10,10 @@
#include <fairmq/ofi/Poller.h>
#include <fairmq/ofi/Socket.h>
#include <fairmq/ofi/TransportFactory.h>
#include <fairmq/Tools.h>
#include <stdexcept>
namespace fair
{
namespace mq
{
namespace ofi
namespace fair::mq::ofi
{
using namespace std;
@@ -122,6 +117,4 @@ auto TransportFactory::GetType() const -> Transport
return Transport::OFI;
}
} /* namespace ofi */
} /* namespace mq */
} /* namespace fair */
} // namespace fair::mq::ofi

View File

@@ -15,11 +15,7 @@
#include <asiofi.hpp>
namespace fair
{
namespace mq
{
namespace ofi
namespace fair::mq::ofi
{
/**
@@ -69,8 +65,6 @@ class TransportFactory final : public FairMQTransportFactory
asiofi::allocated_pool_resource fMemoryResource;
}; /* class TransportFactory */
} /* namespace ofi */
} /* namespace mq */
} /* namespace fair */
} // namespace fair::mq::ofi
#endif /* FAIR_MQ_OFI_TRANSPORTFACTORY_H */

View File

@@ -1,5 +1,5 @@
/********************************************************************************
* Copyright (C) 2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* Copyright (C) 2017-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
@@ -9,4 +9,4 @@
// List of all builtin plugin headers (the ones which call REGISTER_FAIRMQ_PLUGIN macro)
#include <fairmq/plugins/config/Config.h>
#include <fairmq/plugins/Control.h>
#include <fairmq/plugins/control/Control.h>

View File

@@ -22,11 +22,7 @@
using namespace std;
using fair::mq::tools::ToString;
namespace fair
{
namespace mq
{
namespace plugins
namespace fair::mq::plugins
{
DDS::DDS(const string& name,
@@ -458,6 +454,4 @@ DDS::~DDS()
}
}
} /* namespace plugins */
} /* namespace mq */
} /* namespace fair */
} // namespace fair::mq::plugins

View File

@@ -32,11 +32,7 @@
#include <utility> // pair
#include <vector>
namespace fair
{
namespace mq
{
namespace plugins
namespace fair::mq::plugins
{
struct DDSConfig
@@ -196,8 +192,6 @@ REGISTER_FAIRMQ_PLUGIN(
DDSProgramOptions // custom program options for the plugin
)
} /* namespace plugins */
} /* namespace mq */
} /* namespace fair */
} // namespace fair::mq::plugins
#endif /* FAIR_MQ_PLUGINS_DDS */

View File

@@ -13,7 +13,7 @@
#include <FairMQLogger.h>
#include <fairmq/tools/Semaphore.h>
#include <fairmq/tools/CppSTL.h>
#include <memory> // make_unique
#include <string>
namespace pmix
@@ -156,7 +156,7 @@ class Commands
void Send(const std::string& msg, const std::vector<proc>& destination)
{
std::unique_ptr<Holder> holder = fair::mq::tools::make_unique<Holder>();
std::unique_ptr<Holder> holder = std::make_unique<Holder>();
PMIX_DATA_ARRAY_CREATE(holder->fData, destination.size(), PMIX_PROC);
memcpy(holder->fData->array, destination.data(), destination.size() * sizeof(pmix_proc_t));

View File

@@ -9,7 +9,7 @@
#include "PMIxPlugin.h"
#include <fairmq/sdk/commands/Commands.h>
#include <fairmq/Tools.h>
#include <fairmq/tools/Strings.h>
#include <sstream>
#include <stdexcept>
@@ -18,11 +18,7 @@
using namespace std;
using namespace fair::mq::sdk::cmd;
namespace fair
{
namespace mq
{
namespace plugins
namespace fair::mq::plugins
{
PMIxPlugin::PMIxPlugin(const string& name,
@@ -304,6 +300,4 @@ auto PMIxPlugin::WaitForExitingAck() -> void
});
}
} /* namespace plugins */
} /* namespace mq */
} /* namespace fair */
} // namespace fair::mq::plugins

View File

@@ -24,11 +24,7 @@
#include <unistd.h>
#include <vector>
namespace fair
{
namespace mq
{
namespace plugins
namespace fair::mq::plugins
{
class PMIxPlugin : public Plugin
@@ -88,8 +84,6 @@ REGISTER_FAIRMQ_PLUGIN(
PMIxProgramOptions // custom program options for the plugin
)
} /* namespace plugins */
} /* namespace mq */
} /* namespace fair */
} // namespace fair::mq::plugins
#endif /* FAIR_MQ_PLUGINS_PMIX */

View File

@@ -15,11 +15,7 @@
using namespace std;
namespace fair
{
namespace mq
{
namespace plugins
namespace fair::mq::plugins
{
Config::Config(const string& name, const Plugin::Version version, const string& maintainer, const string& homepage, PluginServices* pluginServices)
@@ -75,6 +71,7 @@ Plugin::ProgOptions ConfigPluginProgramOptions()
("shm-zero-segment", po::value<bool >()->default_value(false), "Shared memory: zero the shared memory segment memory after initialization.")
("shm-throw-bad-alloc", po::value<bool >()->default_value(true), "Throw a fair::mq::MessageBadAlloc if cannot allocate a message (retry if false).")
("shm-monitor", po::value<bool >()->default_value(true), "Shared memory: run monitor daemon.")
("shm-no-cleanup", po::value<bool >()->default_value(false), "Shared memory: do not cleanup the memory when last device leaves.")
("ofi-size-hint", po::value<size_t >()->default_value(0), "EXPERIMENTAL: OFI size hint for the allocator.")
("rate", po::value<float >()->default_value(0.), "Rate for conditional run loop (Hz).")
("session", po::value<string >()->default_value("default"), "Session name.")
@@ -89,6 +86,4 @@ Config::~Config()
UnsubscribeFromDeviceStateChange();
}
} /* namespace plugins */
} /* namespace mq */
} /* namespace fair */
} // namespace fair::mq::plugins

View File

@@ -14,11 +14,7 @@
#include <string>
namespace fair
{
namespace mq
{
namespace plugins
namespace fair::mq::plugins
{
class Config : public Plugin
@@ -40,8 +36,6 @@ REGISTER_FAIRMQ_PLUGIN(
ConfigPluginProgramOptions
)
} /* namespace plugins */
} /* namespace mq */
} /* namespace fair */
} // namespace fair::mq::plugins
#endif /* FAIR_MQ_PLUGINS_CONFIG */

View File

@@ -43,11 +43,7 @@ namespace
}
}
namespace fair
{
namespace mq
{
namespace plugins
namespace fair::mq::plugins
{
Control::Control(const string& name, const Plugin::Version version, const string& maintainer, const string& homepage, PluginServices* pluginServices)
@@ -475,6 +471,4 @@ Control::~Control()
UnsubscribeFromDeviceStateChange();
}
} /* namespace plugins */
} /* namespace mq */
} /* namespace fair */
} // namespace fair::mq::plugins

View File

@@ -21,11 +21,7 @@
#include <atomic>
#include <stdexcept>
namespace fair
{
namespace mq
{
namespace plugins
namespace fair::mq::plugins
{
class Control : public Plugin
@@ -68,8 +64,6 @@ REGISTER_FAIRMQ_PLUGIN(
// boost::optional<boost::program_options::options_description>
)
} /* namespace plugins */
} /* namespace mq */
} /* namespace fair */
} // namespace fair::mq::plugins
#endif /* FAIR_MQ_PLUGINS_CONTROL */

59
fairmq/runDevice.h Normal file
View File

@@ -0,0 +1,59 @@
/********************************************************************************
* Copyright (C) 2017-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <fairmq/DeviceRunner.h>
#include <boost/program_options.hpp>
#include <memory>
// to be implemented by the user to return a child class of FairMQDevice
std::unique_ptr<fair::mq::Device> getDevice(const fair::mq::ProgOptions& config);
// to be implemented by the user to add custom command line options (or just with empty body)
void addCustomOptions(boost::program_options::options_description&);
int main(int argc, char* argv[])
{
using namespace fair::mq;
using namespace fair::mq::hooks;
try {
DeviceRunner runner(argc, argv);
// runner.AddHook<LoadPlugins>([](DeviceRunner& r){
// // for example:
// r.fPluginManager->SetSearchPaths({"/lib", "/lib/plugins"});
// r.fPluginManager->LoadPlugin("asdf");
// });
runner.AddHook<SetCustomCmdLineOptions>([](DeviceRunner& r){
boost::program_options::options_description customOptions("Custom options");
addCustomOptions(customOptions);
r.fConfig.AddToCmdLineOptions(customOptions);
});
// runner.AddHook<ModifyRawCmdLineArgs>([](DeviceRunner& r){
// // for example:
// r.fRawCmdLineArgs.push_back("--blubb");
// });
runner.AddHook<InstantiateDevice>([](DeviceRunner& r){
r.fDevice = getDevice(r.fConfig);
});
return runner.Run();
// Run with builtin catch all exception handler, just:
// return runner.RunWithExceptionHandlers();
} catch (std::exception& e) {
LOG(error) << "Uncaught exception reached the top of main: " << e.what();
return 1;
} catch (...) {
LOG(error) << "Uncaught exception reached the top of main.";
return 1;
}
}

View File

@@ -29,9 +29,8 @@
#define FAIR_LOG LOG
#endif /* ifndef FAIR_LOG */
namespace fair {
namespace mq {
namespace sdk {
namespace fair::mq::sdk
{
template<typename... SignatureArgTypes>
struct AsioAsyncOpImplBase
@@ -219,9 +218,6 @@ struct AsioAsyncOp<Executor,
}
};
} /* namespace sdk */
} /* namespace mq */
} /* namespace fair */
} // namespace fair::mq::sdk
#endif /* FAIR_MQ_SDK_ASIOASYNCOP_H */

View File

@@ -14,9 +14,8 @@
#include <memory>
#include <utility>
namespace fair {
namespace mq {
namespace sdk {
namespace fair::mq::sdk
{
using DefaultExecutor = asio::any_io_executor;
using DefaultAllocator = std::allocator<int>;
@@ -69,8 +68,6 @@ class AsioBase
AllocatorType fAllocator;
};
} /* namespace sdk */
} /* namespace mq */
} /* namespace fair */
} // namespace fair::mq::sdk
#endif /* FAIR_MQ_SDK_ASIOBASE_H */

View File

@@ -16,9 +16,8 @@
#include <chrono>
#include <cstdint>
namespace fair {
namespace mq {
namespace sdk {
namespace fair::mq::sdk
{
/**
* @class DDSAgent <fairmq/sdk/DDSAgent.h>
@@ -74,8 +73,6 @@ class DDSAgent
std::string fUsername;
};
} // namespace sdk
} // namespace mq
} // namespace fair
} // namespace fair::mq::sdk
#endif /* FAIR_MQ_SDK_DDSSAGENT_H */

View File

@@ -14,9 +14,8 @@
#include <ostream>
#include <cstdint>
namespace fair {
namespace mq {
namespace sdk {
namespace fair::mq::sdk
{
/**
* @class DDSCollection <fairmq/sdk/DDSCollection.h>
@@ -42,8 +41,6 @@ class DDSCollection
Id fId;
};
} // namespace sdk
} // namespace mq
} // namespace fair
} // namespace fair::mq::sdk
#endif /* FAIR_MQ_SDK_DDSCOLLECTION_H */

View File

@@ -11,14 +11,13 @@
#include <cstdlib>
#include <dds/dds.h>
#include <fairlogger/Logger.h>
#include <fairmq/Tools.h>
#include <fairmq/tools/InstanceLimit.h>
#include <fairmq/sdk/DDSInfo.h>
#include <sstream>
#include <utility>
namespace fair {
namespace mq {
namespace sdk {
namespace fair::mq::sdk
{
struct DDSEnvironment::Impl
{
@@ -133,6 +132,4 @@ auto operator<<(std::ostream& os, DDSEnvironment env) -> std::ostream&
<< "$DDS_CONFIG_HOME: " << env.GetConfigHome() / DDSEnvironment::Path(".DDS");
}
} // namespace sdk
} // namespace mq
} // namespace fair
} // namespace fair::mq::sdk

View File

@@ -13,9 +13,8 @@
#include <memory>
#include <ostream>
namespace fair {
namespace mq {
namespace sdk {
namespace fair::mq::sdk
{
/**
* @class DDSEnvironment DDSSession.h <fairmq/sdk/DDSSession.h>
@@ -40,8 +39,6 @@ class DDSEnvironment
using DDSEnv = DDSEnvironment;
} // namespace sdk
} // namespace mq
} // namespace fair
} // namespace fair::mq::sdk
#endif /* FAIR_MQ_SDK_DDSENVIRONMENT_H */

View File

@@ -11,23 +11,11 @@
#include <string>
namespace dds {
namespace tools_api {
namespace dds::tools_api { class CSession; }
namespace dds::topology_api { class CTopology; }
class CSession;
} // namespace tools_api
namespace topology_api {
class CTopology;
} // namespace topology_api
} // namespace dds
namespace fair {
namespace mq {
namespace sdk {
namespace fair::mq::sdk
{
const std::string DDSVersion("@DDS_VERSION@");
const std::string DDSInstallPrefix("@DDS_INSTALL_PREFIX@");
@@ -36,8 +24,6 @@ const std::string DDSIncludeDir("@DDS_INCDIR@");
const std::string DDSLibraryDir("@DDS_LIBDIR@");
const std::string DDSPluginDir("@DDS_PLUGINDIR@");
} // namespace sdk
} // namespace mq
} // namespace fair
} // namespace fair::mq::sdk
#endif /* FAIR_MQ_SDK_DDSINFO_H */

View File

@@ -14,7 +14,8 @@
#include <cstdlib>
#include <dds/dds.h>
#include <fairlogger/Logger.h>
#include <fairmq/Tools.h>
#include <fairmq/tools/Strings.h>
#include <fairmq/tools/Semaphore.h>
#include <fairmq/sdk/DDSAgent.h>
#include <fairmq/sdk/DDSEnvironment.h>
#include <fairmq/sdk/DDSTopology.h>
@@ -24,9 +25,8 @@
#include <utility>
#include <vector>
namespace fair {
namespace mq {
namespace sdk {
namespace fair::mq::sdk
{
auto operator<<(std::ostream& os, DDSRMSPlugin plugin) -> std::ostream&
{
@@ -383,6 +383,4 @@ auto getMostRecentRunningDDSSession(DDSEnv env) -> DDSSession
return DDSSession(DDSSession::Id(sessionId), std::move(env));
}
} // namespace sdk
} // namespace mq
} // namespace fair
} // namespace fair::mq::sdk

View File

@@ -24,9 +24,8 @@
#include <functional>
#include <vector>
namespace fair {
namespace mq {
namespace sdk {
namespace fair::mq::sdk
{
/**
* @enum DDSRMSPlugin DDSSession.h <fairmq/sdk/DDSSession.h>
@@ -114,8 +113,6 @@ class DDSSession
auto getMostRecentRunningDDSSession(DDSEnv env = {}) -> DDSSession;
} // namespace sdk
} // namespace mq
} // namespace fair
} // namespace fair::mq::sdk
#endif /* FAIR_MQ_SDK_DDSSESSION_H */

View File

@@ -14,9 +14,8 @@
#include <ostream>
#include <cstdint>
namespace fair {
namespace mq {
namespace sdk {
namespace fair::mq::sdk
{
/**
* @class DDSTask <fairmq/sdk/DDSTask.h>
@@ -45,8 +44,6 @@ class DDSTask
DDSCollection::Id fCollectionId;
};
} // namespace sdk
} // namespace mq
} // namespace fair
} // namespace fair::mq::sdk
#endif /* FAIR_MQ_SDK_DDSTASK_H */

View File

@@ -11,16 +11,14 @@
#include <boost/range/iterator_range.hpp>
#include <dds/dds.h>
#include <fairlogger/Logger.h>
#include <fairmq/Tools.h>
#include <fairmq/sdk/DDSEnvironment.h>
#include <memory>
#include <sstream>
#include <stdexcept>
#include <utility>
namespace fair {
namespace mq {
namespace sdk {
namespace fair::mq::sdk
{
struct DDSTopology::Impl
{
@@ -113,6 +111,4 @@ try {
return os << "DDS topology: " << t.GetName();
}
} // namespace sdk
} // namespace mq
} // namespace fair
} // namespace fair::mq::sdk

View File

@@ -18,9 +18,8 @@
#include <string>
#include <vector>
namespace fair {
namespace mq {
namespace sdk {
namespace fair::mq::sdk
{
/**
* @class DDSTopology DDSTopology.h <fairmq/sdk/DDSTopology.h>
@@ -71,8 +70,6 @@ class DDSTopology
using DDSTopo = DDSTopology;
} // namespace sdk
} // namespace mq
} // namespace fair
} // namespace fair::mq::sdk
#endif /* FAIR_MQ_SDK_DDSTOPOLOGY_H */

Some files were not shown because too many files have changed in this diff Show More