mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-15 17:41:45 +00:00
Compare commits
15 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
bffe74c5cf | ||
|
72f319e276 | ||
|
62438bd99e | ||
|
c8ad684b18 | ||
|
a5ec83208d | ||
|
fc2241ece7 | ||
|
1f26883b75 | ||
|
edbdc57332 | ||
|
0fd2fcadc2 | ||
|
9b48b31a75 | ||
|
cb4335e59f | ||
|
ce4584b3d8 | ||
|
bbc1dd4600 | ||
|
8327810942 | ||
|
c37742e3b4 |
119
FairMQTest.cmake
119
FairMQTest.cmake
@@ -1,88 +1,63 @@
|
||||
################################################################################
|
||||
# Copyright (C) 2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
|
||||
# Copyright (C) 2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
|
||||
# #
|
||||
# This software is distributed under the terms of the #
|
||||
# GNU Lesser General Public Licence (LGPL) version 3, #
|
||||
# copied verbatim in the file "LICENSE" #
|
||||
################################################################################
|
||||
Set(CTEST_SOURCE_DIRECTORY $ENV{SOURCEDIR})
|
||||
Set(CTEST_BINARY_DIRECTORY $ENV{BUILDDIR})
|
||||
Set(CTEST_SITE $ENV{SITE})
|
||||
Set(CTEST_BUILD_NAME $ENV{LABEL})
|
||||
Set(CTEST_CMAKE_GENERATOR "Unix Makefiles")
|
||||
Set(CTEST_PROJECT_NAME "FairMQ")
|
||||
|
||||
Find_Program(CTEST_GIT_COMMAND NAMES git)
|
||||
Set(CTEST_UPDATE_COMMAND "${CTEST_GIT_COMMAND}")
|
||||
cmake_host_system_information(RESULT fqdn QUERY FQDN)
|
||||
|
||||
Set(BUILD_COMMAND "make")
|
||||
Set(CTEST_BUILD_COMMAND "${BUILD_COMMAND} -j$ENV{number_of_processors}")
|
||||
set(CTEST_SOURCE_DIRECTORY .)
|
||||
set(CTEST_BINARY_DIRECTORY build)
|
||||
set(CTEST_CMAKE_GENERATOR "Ninja")
|
||||
set(CTEST_USE_LAUNCHERS ON)
|
||||
set(CTEST_CONFIGURATION_TYPE "RelWithDebInfo")
|
||||
|
||||
String(TOUPPER $ENV{ctest_model} _Model)
|
||||
Set(configure_options "-DCMAKE_BUILD_TYPE=$ENV{ctest_model}")
|
||||
|
||||
Set(CTEST_USE_LAUNCHERS 1)
|
||||
Set(configure_options "${configure_options};-DCTEST_USE_LAUNCHERS=${CTEST_USE_LAUNCHERS}")
|
||||
|
||||
Set(configure_options "${configure_options};-DDISABLE_COLOR=ON")
|
||||
Set(configure_options "${configure_options};-DCMAKE_PREFIX_PATH=$ENV{SIMPATH}")
|
||||
# Set(configure_options "${configure_options};-DBUILD_OFI_TRANSPORT=ON")
|
||||
Set(configure_options "${configure_options};-DBUILD_DDS_PLUGIN=ON")
|
||||
Set(configure_options "${configure_options};-DBUILD_SDK=ON")
|
||||
Set(configure_options "${configure_options};-DBUILD_SDK_COMMANDS=ON")
|
||||
|
||||
Set(EXTRA_FLAGS $ENV{EXTRA_FLAGS})
|
||||
If(EXTRA_FLAGS)
|
||||
Set(configure_options "${configure_options};${EXTRA_FLAGS}")
|
||||
EndIf()
|
||||
|
||||
If($ENV{ctest_model} MATCHES Profile)
|
||||
Find_Program(GCOV_COMMAND gcov)
|
||||
If(GCOV_COMMAND)
|
||||
Message("Found GCOV: ${GCOV_COMMAND}")
|
||||
Set(CTEST_COVERAGE_COMMAND ${GCOV_COMMAND})
|
||||
set(CTEST_COVERAGE_EXTRA_FLAGS "-p")
|
||||
EndIf(GCOV_COMMAND)
|
||||
EndIf()
|
||||
|
||||
If($ENV{ctest_model} MATCHES Nightly OR $ENV{ctest_model} MATCHES Profile)
|
||||
Ctest_Empty_Binary_Directory(${CTEST_BINARY_DIRECTORY})
|
||||
EndIf()
|
||||
|
||||
Ctest_Start($ENV{ctest_model})
|
||||
|
||||
Ctest_Configure(BUILD "${CTEST_BINARY_DIRECTORY}"
|
||||
OPTIONS "${configure_options}"
|
||||
)
|
||||
|
||||
Ctest_Build(BUILD "${CTEST_BINARY_DIRECTORY}")
|
||||
|
||||
unset(exclude_tests)
|
||||
if($ENV{EXCLUDE_UNSTABLE_DDS_TESTS})
|
||||
set(exclude_tests EXCLUDE ".*\\.localhost$")
|
||||
if(NOT NCPUS)
|
||||
if(ENV{SLURM_CPUS_PER_TASK})
|
||||
set(NCPUS $ENV{SLURM_CPUS_PER_TASK})
|
||||
else()
|
||||
include(ProcessorCount)
|
||||
ProcessorCount(NCPUS)
|
||||
if(NCPUS EQUAL 0)
|
||||
set(NCPUS 1)
|
||||
endif()
|
||||
endif()
|
||||
endif()
|
||||
Ctest_Test(BUILD "${CTEST_BINARY_DIRECTORY}"
|
||||
# PARALLEL_LEVEL $ENV{number_of_processors}
|
||||
${exclude_tests}
|
||||
PARALLEL_LEVEL $ENV{number_of_processors}
|
||||
RETURN_VALUE _ctest_test_ret_val
|
||||
)
|
||||
|
||||
If(GCOV_COMMAND)
|
||||
Ctest_Coverage(BUILD "${CTEST_BINARY_DIRECTORY}" LABELS coverage)
|
||||
EndIf()
|
||||
if ("$ENV{CTEST_SITE}" STREQUAL "")
|
||||
set(CTEST_SITE "${fqdn}")
|
||||
else()
|
||||
set(CTEST_SITE $ENV{CTEST_SITE})
|
||||
endif()
|
||||
|
||||
If("$ENV{do_codecov_upload}")
|
||||
Execute_Process(COMMAND curl https://codecov.io/bash -o codecov_uploader.sh
|
||||
WORKING_DIRECTORY ${CTEST_BINARY_DIRECTORY}
|
||||
TIMEOUT 60)
|
||||
Execute_Process(COMMAND bash ./codecov_uploader.sh -X gcov
|
||||
WORKING_DIRECTORY ${CTEST_BINARY_DIRECTORY}
|
||||
TIMEOUT 60)
|
||||
EndIf()
|
||||
if ("$ENV{LABEL}" STREQUAL "")
|
||||
set(CTEST_BUILD_NAME "build")
|
||||
else()
|
||||
set(CTEST_BUILD_NAME $ENV{LABEL})
|
||||
endif()
|
||||
|
||||
Ctest_Submit()
|
||||
ctest_start(Continuous)
|
||||
|
||||
if (_ctest_test_ret_val)
|
||||
list(APPEND options
|
||||
"-DDISABLE_COLOR=ON"
|
||||
"-DBUILD_SDK_COMMANDS=ON"
|
||||
"-DBUILD_SDK=ON"
|
||||
"-DBUILD_DDS_PLUGIN=ON"
|
||||
)
|
||||
list(JOIN options ";" optionsstr)
|
||||
ctest_configure(OPTIONS "${optionsstr}")
|
||||
|
||||
ctest_build(FLAGS "-j${NCPUS}")
|
||||
|
||||
ctest_test(BUILD "${CTEST_BINARY_DIRECTORY}"
|
||||
PARALLEL_LEVEL 1
|
||||
SCHEDULE_RANDOM ON
|
||||
RETURN_VALUE _ctest_test_ret_val)
|
||||
|
||||
ctest_submit()
|
||||
|
||||
if(_ctest_test_ret_val)
|
||||
Message(FATAL_ERROR "Some tests failed.")
|
||||
endif()
|
||||
|
100
Jenkinsfile
vendored
100
Jenkinsfile
vendored
@@ -1,67 +1,50 @@
|
||||
#!groovy
|
||||
|
||||
def specToLabel(Map spec) {
|
||||
return "${spec.os}-${spec.arch}-${spec.compiler}-FairSoft_${spec.fairsoft}"
|
||||
}
|
||||
|
||||
def jobMatrix(String prefix, List specs, Closure callback) {
|
||||
def jobMatrix(String prefix, String type, List specs) {
|
||||
def nodes = [:]
|
||||
for (spec in specs) {
|
||||
def label = specToLabel(spec)
|
||||
def node_tag = label
|
||||
if (spec.os =~ /macOS/) {
|
||||
node_tag = spec.os
|
||||
}
|
||||
def fairsoft = spec.fairsoft
|
||||
job = "${spec.os}-${spec.ver}-${spec.arch}-${spec.compiler}"
|
||||
def label = "${type}/${job}"
|
||||
def selector = "${spec.os}-${spec.ver}-${spec.arch}"
|
||||
def os = spec.os
|
||||
def compiler = spec.compiler
|
||||
nodes["${prefix}/${label}"] = {
|
||||
node(node_tag) {
|
||||
def ver = spec.ver
|
||||
def check = spec.check
|
||||
|
||||
nodes[label] = {
|
||||
node(selector) {
|
||||
githubNotify(context: "${prefix}/${label}", description: 'Building ...', status: 'PENDING')
|
||||
try {
|
||||
deleteDir()
|
||||
checkout scm
|
||||
|
||||
sh """\
|
||||
echo "export SIMPATH=\${SIMPATH_PREFIX}${fairsoft}" >> Dart.cfg
|
||||
echo "export FAIRSOFT_VERSION=${fairsoft}" >> Dart.cfg
|
||||
"""
|
||||
if (os =~ /Debian/ && compiler =~ /gcc9/) {
|
||||
sh '''\
|
||||
echo "source /etc/profile.d/modules.sh" >> Dart.cfg
|
||||
echo "module use /cvmfs/it.gsi.de/modulefiles" >> Dart.cfg
|
||||
echo "module load compiler/gcc/9.1.0" >> Dart.cfg
|
||||
'''
|
||||
}
|
||||
if (os =~ /[Mm]acOS/) {
|
||||
sh "echo \"export EXTRA_FLAGS='-DCMAKE_CXX_COMPILER=clang++'\" >> Dart.cfg"
|
||||
def jobscript = 'job.sh'
|
||||
def ctestcmd = "ctest -S FairMQTest.cmake -V --output-on-failure"
|
||||
sh "echo \"set -e\" >> ${jobscript}"
|
||||
sh "echo \"export LABEL=\\\"\${JOB_BASE_NAME} ${label}\\\"\" >> ${jobscript}"
|
||||
if (selector =~ /^macos/) {
|
||||
sh """\
|
||||
echo \"export DDS_ROOT=\\\"\\\$(brew --prefix dds)\\\"\" >> ${jobscript}
|
||||
echo \"${ctestcmd}\" >> ${jobscript}
|
||||
"""
|
||||
sh "cat ${jobscript}"
|
||||
sh "bash ${jobscript}"
|
||||
} else {
|
||||
sh "echo \"export EXTRA_FLAGS='-DCMAKE_CXX_COMPILER=g++'\" >> Dart.cfg"
|
||||
def containercmd = "singularity exec -B/shared ${env.SINGULARITY_CONTAINER_ROOT}/fairmq/${os}.${ver}.sif bash -l -c \\\"${ctestcmd}\\\""
|
||||
sh """\
|
||||
echo \"echo \\\"*** Job started at .......: \\\$(date -R)\\\"\" >> ${jobscript}
|
||||
echo \"echo \\\"*** Job ID ...............: \\\${SLURM_JOB_ID}\\\"\" >> ${jobscript}
|
||||
echo \"echo \\\"*** Compute node .........: \\\$(hostname -f)\\\"\" >> ${jobscript}
|
||||
echo \"unset http_proxy\" >> ${jobscript}
|
||||
echo \"unset HTTP_PROXY\" >> ${jobscript}
|
||||
echo \"${containercmd}\" >> ${jobscript}
|
||||
"""
|
||||
sh "cat ${jobscript}"
|
||||
sh "test/ci/slurm-submit.sh \"FairMQ \${JOB_BASE_NAME} ${label}\" ${jobscript}"
|
||||
}
|
||||
|
||||
sh '''\
|
||||
echo "export BUILDDIR=$PWD/build" >> Dart.cfg
|
||||
echo "export SOURCEDIR=$PWD" >> Dart.cfg
|
||||
echo "export PATH=\\\$SIMPATH/bin:\\\$PATH" >> Dart.cfg
|
||||
echo "export GIT_BRANCH=$JOB_BASE_NAME" >> Dart.cfg
|
||||
echo "echo \\\$PATH" >> Dart.cfg
|
||||
'''
|
||||
|
||||
if (os =~ /macOS10.14/) {
|
||||
sh "echo \"export EXCLUDE_UNSTABLE_DDS_TESTS=1\" >> Dart.cfg"
|
||||
}
|
||||
|
||||
sh 'cat Dart.cfg'
|
||||
|
||||
callback.call(spec, label)
|
||||
|
||||
deleteDir()
|
||||
githubNotify(context: "${prefix}/${label}", description: 'Success', status: 'SUCCESS')
|
||||
} catch (e) {
|
||||
def tarball = "${prefix}_${label}_dds_logs.tar.gz"
|
||||
sh "tar czvf ${tarball} -C \${WORKSPACE}/build/test/ .DDS/"
|
||||
archiveArtifacts tarball
|
||||
|
||||
deleteDir()
|
||||
githubNotify(context: "${prefix}/${label}", description: 'Error', status: 'ERROR')
|
||||
throw e
|
||||
@@ -78,23 +61,12 @@ pipeline{
|
||||
stage("Run CI Matrix") {
|
||||
steps{
|
||||
script {
|
||||
def build_jobs = jobMatrix('build', [
|
||||
[os: 'Debian8', arch: 'x86_64', compiler: 'gcc9.1.0', fairsoft: 'fairmq_dev'],
|
||||
[os: 'macOS10.14', arch: 'x86_64', compiler: 'AppleClang11.0', fairsoft: 'fairmq_dev'],
|
||||
[os: 'macOS10.15', arch: 'x86_64', compiler: 'AppleClang12.0', fairsoft: 'fairmq_dev'],
|
||||
]) { spec, label ->
|
||||
sh './Dart.sh alfa_ci Dart.cfg'
|
||||
}
|
||||
def builds = jobMatrix('alfa-ci', 'build', [
|
||||
[os: 'fedora', ver: '32', arch: 'x86_64', compiler: 'gcc-10'],
|
||||
[os: 'macos', ver: '11', arch: 'x86_64', compiler: 'apple-clang-12'],
|
||||
])
|
||||
|
||||
def profile_jobs = jobMatrix('codecov', [
|
||||
[os: 'Debian8', arch: 'x86_64', compiler: 'gcc9.1.0', fairsoft: 'fairmq_dev'],
|
||||
]) { spec, label ->
|
||||
withCredentials([string(credentialsId: 'fairmq_codecov_token', variable: 'CODECOV_TOKEN')]) {
|
||||
sh './Dart.sh codecov Dart.cfg'
|
||||
}
|
||||
}
|
||||
|
||||
parallel(build_jobs + profile_jobs)
|
||||
parallel(builds)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@@ -1,81 +0,0 @@
|
||||
#!groovy
|
||||
|
||||
def specToLabel(Map spec) {
|
||||
return "${spec.os}-${spec.arch}-${spec.compiler}-FairSoft_${spec.fairsoft}"
|
||||
}
|
||||
|
||||
def buildMatrix(List specs, Closure callback) {
|
||||
def nodes = [:]
|
||||
for (spec in specs) {
|
||||
def label = specToLabel(spec)
|
||||
def fairsoft = spec.fairsoft
|
||||
def os = spec.os
|
||||
def compiler = spec.compiler
|
||||
nodes[label] = {
|
||||
node(label) {
|
||||
try {
|
||||
deleteDir()
|
||||
checkout scm
|
||||
|
||||
sh """\
|
||||
echo "export SIMPATH=\${SIMPATH_PREFIX}${fairsoft}" >> Dart.cfg
|
||||
echo "export FAIRSOFT_VERSION=${fairsoft}" >> Dart.cfg
|
||||
"""
|
||||
if (os =~ /Debian/ && compiler =~ /gcc9/) {
|
||||
sh '''\
|
||||
echo "source /etc/profile.d/modules.sh" >> Dart.cfg
|
||||
echo "module use /cvmfs/it.gsi.de/modulefiles" >> Dart.cfg
|
||||
echo "module load compiler/gcc/9.1.0" >> Dart.cfg
|
||||
'''
|
||||
}
|
||||
if (os =~ /MacOS/) {
|
||||
sh "echo \"export EXTRA_FLAGS='-DCMAKE_CXX_COMPILER=clang++'\" >> Dart.cfg"
|
||||
} else {
|
||||
sh "echo \"export EXTRA_FLAGS='-DCMAKE_CXX_COMPILER=g++'\" >> Dart.cfg"
|
||||
}
|
||||
sh '''\
|
||||
echo "export BUILDDIR=$PWD/build" >> Dart.cfg
|
||||
echo "export SOURCEDIR=$PWD" >> Dart.cfg
|
||||
echo "export PATH=\\\$SIMPATH/bin:\\\$PATH" >> Dart.cfg
|
||||
echo "export GIT_BRANCH=dev" >> Dart.cfg
|
||||
echo "echo \\\$PATH" >> Dart.cfg
|
||||
'''
|
||||
sh 'cat Dart.cfg'
|
||||
|
||||
callback.call(spec, label)
|
||||
|
||||
deleteDir()
|
||||
} catch (e) {
|
||||
def tarball = "${label}_dds_logs.tar.gz"
|
||||
sh "tar czvf ${tarball} -C \${WORKSPACE}/build/test/ .DDS/"
|
||||
archiveArtifacts tarball
|
||||
|
||||
deleteDir()
|
||||
throw e
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return nodes
|
||||
}
|
||||
|
||||
pipeline{
|
||||
agent none
|
||||
triggers { cron('H 2 * * *') }
|
||||
stages {
|
||||
stage("Run Nightly Build/Test Matrix") {
|
||||
steps{
|
||||
script {
|
||||
parallel(buildMatrix([
|
||||
[os: 'Debian8', arch: 'x86_64', compiler: 'gcc9.1.0', fairsoft: 'fairmq_dev'],
|
||||
[os: 'MacOS10.13', arch: 'x86_64', compiler: 'AppleLLVM10.0.0', fairsoft: 'fairmq_dev'],
|
||||
[os: 'MacOS10.14', arch: 'x86_64', compiler: 'AppleLLVM10.0.0', fairsoft: 'fairmq_dev'],
|
||||
]) { spec, label ->
|
||||
sh './Dart.sh Nightly Dart.cfg'
|
||||
sh './Dart.sh Profile Dart.cfg'
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@@ -7,10 +7,10 @@
|
||||
################################################################################
|
||||
|
||||
add_library(Example11Lib STATIC
|
||||
"Sampler.cxx"
|
||||
"Sampler.h"
|
||||
"Sink.cxx"
|
||||
"Sink.h"
|
||||
"Sampler.cxx"
|
||||
"Sampler.h"
|
||||
"Sink.cxx"
|
||||
"Sink.h"
|
||||
)
|
||||
|
||||
target_link_libraries(Example11Lib PUBLIC FairMQ)
|
||||
@@ -40,12 +40,12 @@ set_tests_properties(Example.1-1.shmem PROPERTIES TIMEOUT "30" RUN_SERIAL true P
|
||||
# install
|
||||
|
||||
install(
|
||||
TARGETS
|
||||
fairmq-ex-1-1-sampler
|
||||
fairmq-ex-1-1-sink
|
||||
TARGETS
|
||||
fairmq-ex-1-1-sampler
|
||||
fairmq-ex-1-1-sink
|
||||
|
||||
LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR}
|
||||
RUNTIME DESTINATION ${PROJECT_INSTALL_BINDIR}
|
||||
LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR}
|
||||
RUNTIME DESTINATION ${PROJECT_INSTALL_BINDIR}
|
||||
)
|
||||
|
||||
# configure run script with different executable paths for build and for install directories
|
||||
@@ -54,7 +54,7 @@ set(FAIRMQ_BIN_DIR ${CMAKE_INSTALL_PREFIX}/${PROJECT_INSTALL_BINDIR}/fairmq)
|
||||
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-1-1.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-1-1.sh_install)
|
||||
|
||||
install(
|
||||
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-1-1.sh_install
|
||||
DESTINATION ${PROJECT_INSTALL_BINDIR}
|
||||
RENAME fairmq-start-ex-1-1.sh
|
||||
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-1-1.sh_install
|
||||
DESTINATION ${PROJECT_INSTALL_BINDIR}
|
||||
RENAME fairmq-start-ex-1-1.sh
|
||||
)
|
||||
|
@@ -7,12 +7,12 @@
|
||||
################################################################################
|
||||
|
||||
add_library(Example1N1Lib STATIC
|
||||
"Sampler.cxx"
|
||||
"Sampler.h"
|
||||
"Processor.cxx"
|
||||
"Processor.h"
|
||||
"Sink.cxx"
|
||||
"Sink.h"
|
||||
"Sampler.cxx"
|
||||
"Sampler.h"
|
||||
"Processor.cxx"
|
||||
"Processor.h"
|
||||
"Sink.cxx"
|
||||
"Sink.h"
|
||||
)
|
||||
|
||||
target_link_libraries(Example1N1Lib PUBLIC FairMQ)
|
||||
@@ -48,13 +48,13 @@ set_tests_properties(Example.1-n-1.shmem PROPERTIES TIMEOUT "30" RUN_SERIAL true
|
||||
# install
|
||||
|
||||
install(
|
||||
TARGETS
|
||||
fairmq-ex-1-n-1-sampler
|
||||
fairmq-ex-1-n-1-processor
|
||||
fairmq-ex-1-n-1-sink
|
||||
TARGETS
|
||||
fairmq-ex-1-n-1-sampler
|
||||
fairmq-ex-1-n-1-processor
|
||||
fairmq-ex-1-n-1-sink
|
||||
|
||||
LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR}
|
||||
RUNTIME DESTINATION ${PROJECT_INSTALL_BINDIR}
|
||||
LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR}
|
||||
RUNTIME DESTINATION ${PROJECT_INSTALL_BINDIR}
|
||||
)
|
||||
|
||||
# configure run script with different executable paths for build and for install directories
|
||||
@@ -64,12 +64,12 @@ set(FAIRMQ_BIN_DIR ${CMAKE_INSTALL_PREFIX}/${PROJECT_INSTALL_BINDIR}/fairmq)
|
||||
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-1-n-1.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-1-n-1.sh_install)
|
||||
|
||||
install(
|
||||
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-1-n-1.sh_install
|
||||
DESTINATION ${PROJECT_INSTALL_BINDIR}
|
||||
RENAME fairmq-start-ex-1-n-1.sh
|
||||
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-1-n-1.sh_install
|
||||
DESTINATION ${PROJECT_INSTALL_BINDIR}
|
||||
RENAME fairmq-start-ex-1-n-1.sh
|
||||
)
|
||||
|
||||
install(
|
||||
FILES ${CMAKE_CURRENT_BINARY_DIR}/ex-1-n-1.json
|
||||
DESTINATION ${PROJECT_INSTALL_DATADIR}
|
||||
FILES ${CMAKE_CURRENT_BINARY_DIR}/ex-1-n-1.json
|
||||
DESTINATION ${PROJECT_INSTALL_DATADIR}
|
||||
)
|
||||
|
@@ -34,7 +34,7 @@ set(FAIRMQ_BIN_DIR ${CMAKE_INSTALL_PREFIX}/${PROJECT_INSTALL_BINDIR}/fairmq)
|
||||
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-builtin-devices.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-builtin-devices.sh_install)
|
||||
|
||||
install(
|
||||
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-builtin-devices.sh_install
|
||||
DESTINATION ${PROJECT_INSTALL_BINDIR}
|
||||
RENAME fairmq-start-ex-builtin-devices.sh
|
||||
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-builtin-devices.sh_install
|
||||
DESTINATION ${PROJECT_INSTALL_BINDIR}
|
||||
RENAME fairmq-start-ex-builtin-devices.sh
|
||||
)
|
||||
|
@@ -7,10 +7,10 @@
|
||||
################################################################################
|
||||
|
||||
add_library(ExampleCopyPushLib STATIC
|
||||
"Sampler.cxx"
|
||||
"Sampler.h"
|
||||
"Sink.cxx"
|
||||
"Sink.h"
|
||||
"Sampler.cxx"
|
||||
"Sampler.h"
|
||||
"Sink.cxx"
|
||||
"Sink.h"
|
||||
)
|
||||
|
||||
target_link_libraries(ExampleCopyPushLib PUBLIC FairMQ)
|
||||
@@ -41,12 +41,12 @@ set_tests_properties(Example.CopyPush.shmem PROPERTIES TIMEOUT "30" RUN_SERIAL t
|
||||
# install
|
||||
|
||||
install(
|
||||
TARGETS
|
||||
fairmq-ex-copypush-sampler
|
||||
fairmq-ex-copypush-sink
|
||||
TARGETS
|
||||
fairmq-ex-copypush-sampler
|
||||
fairmq-ex-copypush-sink
|
||||
|
||||
LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR}
|
||||
RUNTIME DESTINATION ${PROJECT_INSTALL_BINDIR}
|
||||
LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR}
|
||||
RUNTIME DESTINATION ${PROJECT_INSTALL_BINDIR}
|
||||
)
|
||||
|
||||
# configure run script with different executable paths for build and for install directories
|
||||
@@ -55,7 +55,7 @@ set(FAIRMQ_BIN_DIR ${CMAKE_INSTALL_PREFIX}/${PROJECT_INSTALL_BINDIR}/fairmq)
|
||||
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-copypush.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-copypush.sh_install)
|
||||
|
||||
install(
|
||||
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-copypush.sh_install
|
||||
DESTINATION ${PROJECT_INSTALL_BINDIR}
|
||||
RENAME fairmq-start-ex-copypush.sh
|
||||
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-copypush.sh_install
|
||||
DESTINATION ${PROJECT_INSTALL_BINDIR}
|
||||
RENAME fairmq-start-ex-copypush.sh
|
||||
)
|
||||
|
@@ -7,10 +7,10 @@
|
||||
################################################################################
|
||||
|
||||
add_library(ExampleMultipartLib STATIC
|
||||
"Sampler.cxx"
|
||||
"Sampler.h"
|
||||
"Sink.cxx"
|
||||
"Sink.h"
|
||||
"Sampler.cxx"
|
||||
"Sampler.h"
|
||||
"Sink.cxx"
|
||||
"Sink.h"
|
||||
)
|
||||
|
||||
target_link_libraries(ExampleMultipartLib PUBLIC FairMQ)
|
||||
@@ -45,12 +45,12 @@ endif()
|
||||
# install
|
||||
|
||||
install(
|
||||
TARGETS
|
||||
fairmq-ex-multipart-sampler
|
||||
fairmq-ex-multipart-sink
|
||||
TARGETS
|
||||
fairmq-ex-multipart-sampler
|
||||
fairmq-ex-multipart-sink
|
||||
|
||||
LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR}
|
||||
RUNTIME DESTINATION ${PROJECT_INSTALL_BINDIR}
|
||||
LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR}
|
||||
RUNTIME DESTINATION ${PROJECT_INSTALL_BINDIR}
|
||||
)
|
||||
|
||||
# configure run script with different executable paths for build and for install directories
|
||||
@@ -59,7 +59,7 @@ set(FAIRMQ_BIN_DIR ${CMAKE_INSTALL_PREFIX}/${PROJECT_INSTALL_BINDIR}/fairmq)
|
||||
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-multipart.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-multipart.sh_install)
|
||||
|
||||
install(
|
||||
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-multipart.sh_install
|
||||
DESTINATION ${PROJECT_INSTALL_BINDIR}
|
||||
RENAME fairmq-start-ex-multipart.sh
|
||||
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-multipart.sh_install
|
||||
DESTINATION ${PROJECT_INSTALL_BINDIR}
|
||||
RENAME fairmq-start-ex-multipart.sh
|
||||
)
|
||||
|
@@ -7,12 +7,12 @@
|
||||
################################################################################
|
||||
|
||||
add_library(ExampleMultipleChannelsLib STATIC
|
||||
"Sampler.cxx"
|
||||
"Sampler.h"
|
||||
"Broadcaster.cxx"
|
||||
"Broadcaster.h"
|
||||
"Sink.cxx"
|
||||
"Sink.h"
|
||||
"Sampler.cxx"
|
||||
"Sampler.h"
|
||||
"Broadcaster.cxx"
|
||||
"Broadcaster.h"
|
||||
"Sink.cxx"
|
||||
"Sink.h"
|
||||
)
|
||||
|
||||
target_link_libraries(ExampleMultipleChannelsLib PUBLIC FairMQ)
|
||||
@@ -42,13 +42,13 @@ set_tests_properties(Example.MultipleChannels.zeromq PROPERTIES TIMEOUT "30" RUN
|
||||
# install
|
||||
|
||||
install(
|
||||
TARGETS
|
||||
fairmq-ex-multiple-channels-sampler
|
||||
fairmq-ex-multiple-channels-broadcaster
|
||||
fairmq-ex-multiple-channels-sink
|
||||
TARGETS
|
||||
fairmq-ex-multiple-channels-sampler
|
||||
fairmq-ex-multiple-channels-broadcaster
|
||||
fairmq-ex-multiple-channels-sink
|
||||
|
||||
LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR}
|
||||
RUNTIME DESTINATION ${PROJECT_INSTALL_BINDIR}
|
||||
LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR}
|
||||
RUNTIME DESTINATION ${PROJECT_INSTALL_BINDIR}
|
||||
)
|
||||
|
||||
# configure run script with different executable paths for build and for install directories
|
||||
@@ -57,7 +57,7 @@ set(FAIRMQ_BIN_DIR ${CMAKE_INSTALL_PREFIX}/${PROJECT_INSTALL_BINDIR}/fairmq)
|
||||
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-multiple-channels.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-multiple-channels.sh_install)
|
||||
|
||||
install(
|
||||
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-multiple-channels.sh_install
|
||||
DESTINATION ${PROJECT_INSTALL_BINDIR}
|
||||
RENAME fairmq-start-ex-multiple-channels.sh
|
||||
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-multiple-channels.sh_install
|
||||
DESTINATION ${PROJECT_INSTALL_BINDIR}
|
||||
RENAME fairmq-start-ex-multiple-channels.sh
|
||||
)
|
||||
|
@@ -7,12 +7,12 @@
|
||||
################################################################################
|
||||
|
||||
add_library(ExampleMultipleTransportsLib STATIC
|
||||
"Sampler1.cxx"
|
||||
"Sampler1.h"
|
||||
"Sampler2.cxx"
|
||||
"Sampler2.h"
|
||||
"Sink.cxx"
|
||||
"Sink.h"
|
||||
"Sampler1.cxx"
|
||||
"Sampler1.h"
|
||||
"Sampler2.cxx"
|
||||
"Sampler2.h"
|
||||
"Sink.cxx"
|
||||
"Sink.h"
|
||||
)
|
||||
|
||||
target_link_libraries(ExampleMultipleTransportsLib PUBLIC FairMQ)
|
||||
@@ -41,13 +41,13 @@ set_tests_properties(Example.MultipleTransports PROPERTIES TIMEOUT "30" RUN_SERI
|
||||
# install
|
||||
|
||||
install(
|
||||
TARGETS
|
||||
fairmq-ex-multiple-transports-sampler1
|
||||
fairmq-ex-multiple-transports-sampler2
|
||||
fairmq-ex-multiple-transports-sink
|
||||
TARGETS
|
||||
fairmq-ex-multiple-transports-sampler1
|
||||
fairmq-ex-multiple-transports-sampler2
|
||||
fairmq-ex-multiple-transports-sink
|
||||
|
||||
LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR}
|
||||
RUNTIME DESTINATION ${PROJECT_INSTALL_BINDIR}
|
||||
LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR}
|
||||
RUNTIME DESTINATION ${PROJECT_INSTALL_BINDIR}
|
||||
)
|
||||
|
||||
# configure run script with different executable paths for install directories
|
||||
@@ -56,7 +56,7 @@ set(FAIRMQ_BIN_DIR ${CMAKE_INSTALL_PREFIX}/${PROJECT_INSTALL_BINDIR}/fairmq)
|
||||
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-multiple-transports.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-multiple-transports.sh_install)
|
||||
|
||||
install(
|
||||
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-multiple-transports.sh_install
|
||||
DESTINATION ${PROJECT_INSTALL_BINDIR}
|
||||
RENAME fairmq-start-ex-multiple-transports.sh
|
||||
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-multiple-transports.sh_install
|
||||
DESTINATION ${PROJECT_INSTALL_BINDIR}
|
||||
RENAME fairmq-start-ex-multiple-transports.sh
|
||||
)
|
||||
|
@@ -29,15 +29,15 @@ configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-readout-processing.sh
|
||||
# install
|
||||
|
||||
install(
|
||||
TARGETS
|
||||
fairmq-ex-readout-readout
|
||||
fairmq-ex-readout-builder
|
||||
fairmq-ex-readout-processor
|
||||
fairmq-ex-readout-sender
|
||||
fairmq-ex-readout-receiver
|
||||
TARGETS
|
||||
fairmq-ex-readout-readout
|
||||
fairmq-ex-readout-builder
|
||||
fairmq-ex-readout-processor
|
||||
fairmq-ex-readout-sender
|
||||
fairmq-ex-readout-receiver
|
||||
|
||||
LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR}
|
||||
RUNTIME DESTINATION ${PROJECT_INSTALL_BINDIR}
|
||||
LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR}
|
||||
RUNTIME DESTINATION ${PROJECT_INSTALL_BINDIR}
|
||||
)
|
||||
|
||||
# configure run script with different executable paths for build and for install directories
|
||||
@@ -47,13 +47,13 @@ configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-readout.sh.in ${CMAKE
|
||||
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-readout-processing.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-readout-processing.sh_install)
|
||||
|
||||
install(
|
||||
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-readout.sh_install
|
||||
DESTINATION ${PROJECT_INSTALL_BINDIR}
|
||||
RENAME fairmq-start-ex-readout.sh
|
||||
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-readout.sh_install
|
||||
DESTINATION ${PROJECT_INSTALL_BINDIR}
|
||||
RENAME fairmq-start-ex-readout.sh
|
||||
)
|
||||
|
||||
install(
|
||||
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-readout-processing.sh_install
|
||||
DESTINATION ${PROJECT_INSTALL_BINDIR}
|
||||
RENAME fairmq-start-ex-readout-processing.sh
|
||||
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-readout-processing.sh_install
|
||||
DESTINATION ${PROJECT_INSTALL_BINDIR}
|
||||
RENAME fairmq-start-ex-readout-processing.sh
|
||||
)
|
||||
|
@@ -7,10 +7,10 @@
|
||||
################################################################################
|
||||
|
||||
add_library(ExampleRegionLib STATIC
|
||||
"Sampler.cxx"
|
||||
"Sampler.h"
|
||||
"Sink.cxx"
|
||||
"Sink.h"
|
||||
"Sampler.cxx"
|
||||
"Sampler.h"
|
||||
"Sink.cxx"
|
||||
"Sink.h"
|
||||
)
|
||||
|
||||
target_link_libraries(ExampleRegionLib PUBLIC FairMQ)
|
||||
@@ -40,12 +40,12 @@ set_tests_properties(Example.Region.shmem PROPERTIES TIMEOUT "30" RUN_SERIAL tru
|
||||
# install
|
||||
|
||||
install(
|
||||
TARGETS
|
||||
fairmq-ex-region-sampler
|
||||
fairmq-ex-region-sink
|
||||
TARGETS
|
||||
fairmq-ex-region-sampler
|
||||
fairmq-ex-region-sink
|
||||
|
||||
LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR}
|
||||
RUNTIME DESTINATION ${PROJECT_INSTALL_BINDIR}
|
||||
LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR}
|
||||
RUNTIME DESTINATION ${PROJECT_INSTALL_BINDIR}
|
||||
)
|
||||
|
||||
# configure run script with different executable paths for build and for install directories
|
||||
@@ -54,7 +54,7 @@ set(FAIRMQ_BIN_DIR ${CMAKE_INSTALL_PREFIX}/${PROJECT_INSTALL_BINDIR}/fairmq)
|
||||
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-region.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-region.sh_install)
|
||||
|
||||
install(
|
||||
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-region.sh_install
|
||||
DESTINATION ${PROJECT_INSTALL_BINDIR}
|
||||
RENAME fairmq-start-ex-region.sh
|
||||
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-region.sh_install
|
||||
DESTINATION ${PROJECT_INSTALL_BINDIR}
|
||||
RENAME fairmq-start-ex-region.sh
|
||||
)
|
||||
|
@@ -7,10 +7,10 @@
|
||||
################################################################################
|
||||
|
||||
add_library(ExampleReqRepLib STATIC
|
||||
"Client.cxx"
|
||||
"Client.h"
|
||||
"Server.cxx"
|
||||
"Server.h"
|
||||
"Client.cxx"
|
||||
"Client.h"
|
||||
"Server.cxx"
|
||||
"Server.h"
|
||||
)
|
||||
|
||||
target_link_libraries(ExampleReqRepLib PUBLIC FairMQ)
|
||||
@@ -41,12 +41,12 @@ set_tests_properties(Example.ReqRep.shmem PROPERTIES TIMEOUT "30" RUN_SERIAL tru
|
||||
# install
|
||||
|
||||
install(
|
||||
TARGETS
|
||||
fairmq-ex-req-rep-client
|
||||
fairmq-ex-req-rep-server
|
||||
TARGETS
|
||||
fairmq-ex-req-rep-client
|
||||
fairmq-ex-req-rep-server
|
||||
|
||||
LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR}
|
||||
RUNTIME DESTINATION ${PROJECT_INSTALL_BINDIR}
|
||||
LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR}
|
||||
RUNTIME DESTINATION ${PROJECT_INSTALL_BINDIR}
|
||||
)
|
||||
|
||||
# configure run script with different executable paths for build and for install directories
|
||||
@@ -55,7 +55,7 @@ set(FAIRMQ_BIN_DIR ${CMAKE_INSTALL_PREFIX}/${PROJECT_INSTALL_BINDIR}/fairmq)
|
||||
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-req-rep.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-req-rep.sh_install)
|
||||
|
||||
install(
|
||||
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-req-rep.sh_install
|
||||
DESTINATION ${PROJECT_INSTALL_BINDIR}
|
||||
RENAME fairmq-start-ex-req-rep.sh
|
||||
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-req-rep.sh_install
|
||||
DESTINATION ${PROJECT_INSTALL_BINDIR}
|
||||
RENAME fairmq-start-ex-req-rep.sh
|
||||
)
|
||||
|
@@ -486,7 +486,6 @@ void FairMQDevice::RunWrapper()
|
||||
}
|
||||
|
||||
PostRun();
|
||||
|
||||
} catch (const out_of_range& oor) {
|
||||
LOG(error) << "out of range: " << oor.what();
|
||||
LOG(error) << "incorrect/incomplete channel configuration?";
|
||||
@@ -504,17 +503,12 @@ void FairMQDevice::HandleSingleChannelInput()
|
||||
{
|
||||
bool proceed = true;
|
||||
|
||||
if (fMsgInputs.size() > 0)
|
||||
{
|
||||
while (!NewStatePending() && proceed)
|
||||
{
|
||||
if (fMsgInputs.size() > 0) {
|
||||
while (!NewStatePending() && proceed) {
|
||||
proceed = HandleMsgInput(fInputChannelKeys.at(0), fMsgInputs.begin()->second, 0);
|
||||
}
|
||||
}
|
||||
else if (fMultipartInputs.size() > 0)
|
||||
{
|
||||
while (!NewStatePending() && proceed)
|
||||
{
|
||||
} else if (fMultipartInputs.size() > 0) {
|
||||
while (!NewStatePending() && proceed) {
|
||||
proceed = HandleMultipartInput(fInputChannelKeys.at(0), fMultipartInputs.begin()->second, 0);
|
||||
}
|
||||
}
|
||||
@@ -524,75 +518,55 @@ void FairMQDevice::HandleMultipleChannelInput()
|
||||
{
|
||||
// check if more than one transport is used
|
||||
fMultitransportInputs.clear();
|
||||
for (const auto& k : fInputChannelKeys)
|
||||
{
|
||||
for (const auto& k : fInputChannelKeys) {
|
||||
fair::mq::Transport t = fChannels.at(k).at(0).fTransportType;
|
||||
if (fMultitransportInputs.find(t) == fMultitransportInputs.end())
|
||||
{
|
||||
if (fMultitransportInputs.find(t) == fMultitransportInputs.end()) {
|
||||
fMultitransportInputs.insert(pair<fair::mq::Transport, vector<string>>(t, vector<string>()));
|
||||
fMultitransportInputs.at(t).push_back(k);
|
||||
}
|
||||
else
|
||||
{
|
||||
} else {
|
||||
fMultitransportInputs.at(t).push_back(k);
|
||||
}
|
||||
}
|
||||
|
||||
for (const auto& mi : fMsgInputs)
|
||||
{
|
||||
for (auto& i : fChannels.at(mi.first))
|
||||
{
|
||||
for (const auto& mi : fMsgInputs) {
|
||||
for (auto& i : fChannels.at(mi.first)) {
|
||||
i.fMultipart = false;
|
||||
}
|
||||
}
|
||||
|
||||
for (const auto& mi : fMultipartInputs)
|
||||
{
|
||||
for (auto& i : fChannels.at(mi.first))
|
||||
{
|
||||
for (const auto& mi : fMultipartInputs) {
|
||||
for (auto& i : fChannels.at(mi.first)) {
|
||||
i.fMultipart = true;
|
||||
}
|
||||
}
|
||||
|
||||
// if more than one transport is used, handle poll of each in a separate thread
|
||||
if (fMultitransportInputs.size() > 1)
|
||||
{
|
||||
if (fMultitransportInputs.size() > 1) {
|
||||
HandleMultipleTransportInput();
|
||||
}
|
||||
else // otherwise poll directly
|
||||
{
|
||||
} else { // otherwise poll directly
|
||||
bool proceed = true;
|
||||
|
||||
FairMQPollerPtr poller(fChannels.at(fInputChannelKeys.at(0)).at(0).fTransportFactory->CreatePoller(fChannels, fInputChannelKeys));
|
||||
|
||||
while (!NewStatePending() && proceed)
|
||||
{
|
||||
while (!NewStatePending() && proceed) {
|
||||
poller->Poll(200);
|
||||
|
||||
// check which inputs are ready and call their data handlers if they are.
|
||||
for (const auto& ch : fInputChannelKeys)
|
||||
{
|
||||
for (unsigned int i = 0; i < fChannels.at(ch).size(); ++i)
|
||||
{
|
||||
if (poller->CheckInput(ch, i))
|
||||
{
|
||||
if (fChannels.at(ch).at(i).fMultipart)
|
||||
{
|
||||
for (const auto& ch : fInputChannelKeys) {
|
||||
for (unsigned int i = 0; i < fChannels.at(ch).size(); ++i) {
|
||||
if (poller->CheckInput(ch, i)) {
|
||||
if (fChannels.at(ch).at(i).fMultipart) {
|
||||
proceed = HandleMultipartInput(ch, fMultipartInputs.at(ch), i);
|
||||
}
|
||||
else
|
||||
{
|
||||
} else {
|
||||
proceed = HandleMsgInput(ch, fMsgInputs.at(ch), i);
|
||||
}
|
||||
|
||||
if (!proceed)
|
||||
{
|
||||
if (!proceed) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!proceed)
|
||||
{
|
||||
if (!proceed) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
@@ -606,64 +580,49 @@ void FairMQDevice::HandleMultipleTransportInput()
|
||||
|
||||
fMultitransportProceed = true;
|
||||
|
||||
for (const auto& i : fMultitransportInputs)
|
||||
{
|
||||
for (const auto& i : fMultitransportInputs) {
|
||||
threads.emplace_back(thread(&FairMQDevice::PollForTransport, this, fTransports.at(i.first).get(), i.second));
|
||||
}
|
||||
|
||||
for (thread& t : threads)
|
||||
{
|
||||
for (thread& t : threads) {
|
||||
t.join();
|
||||
}
|
||||
}
|
||||
|
||||
void FairMQDevice::PollForTransport(const FairMQTransportFactory* factory, const vector<string>& channelKeys)
|
||||
{
|
||||
try
|
||||
{
|
||||
try {
|
||||
FairMQPollerPtr poller(factory->CreatePoller(fChannels, channelKeys));
|
||||
|
||||
while (!NewStatePending() && fMultitransportProceed)
|
||||
{
|
||||
while (!NewStatePending() && fMultitransportProceed) {
|
||||
poller->Poll(500);
|
||||
|
||||
for (const auto& ch : channelKeys)
|
||||
{
|
||||
for (unsigned int i = 0; i < fChannels.at(ch).size(); ++i)
|
||||
{
|
||||
if (poller->CheckInput(ch, i))
|
||||
{
|
||||
for (const auto& ch : channelKeys) {
|
||||
for (unsigned int i = 0; i < fChannels.at(ch).size(); ++i) {
|
||||
if (poller->CheckInput(ch, i)) {
|
||||
lock_guard<mutex> lock(fMultitransportMutex);
|
||||
|
||||
if (!fMultitransportProceed)
|
||||
{
|
||||
if (!fMultitransportProceed) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (fChannels.at(ch).at(i).fMultipart)
|
||||
{
|
||||
if (fChannels.at(ch).at(i).fMultipart) {
|
||||
fMultitransportProceed = HandleMultipartInput(ch, fMultipartInputs.at(ch), i);
|
||||
}
|
||||
else
|
||||
{
|
||||
} else {
|
||||
fMultitransportProceed = HandleMsgInput(ch, fMsgInputs.at(ch), i);
|
||||
}
|
||||
|
||||
if (!fMultitransportProceed)
|
||||
{
|
||||
if (!fMultitransportProceed) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!fMultitransportProceed)
|
||||
{
|
||||
if (!fMultitransportProceed) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (exception& e)
|
||||
{
|
||||
} catch (exception& e) {
|
||||
LOG(error) << "FairMQDevice::PollForTransport() failed: " << e.what() << ", going to ERROR state.";
|
||||
throw runtime_error(tools::ToString("FairMQDevice::PollForTransport() failed: ", e.what(), ", going to ERROR state."));
|
||||
}
|
||||
@@ -673,12 +632,9 @@ bool FairMQDevice::HandleMsgInput(const string& chName, const InputMsgCallback&
|
||||
{
|
||||
unique_ptr<FairMQMessage> input(fChannels.at(chName).at(i).fTransportFactory->CreateMessage());
|
||||
|
||||
if (Receive(input, chName, i) >= 0)
|
||||
{
|
||||
if (Receive(input, chName, i) >= 0) {
|
||||
return callback(input, i);
|
||||
}
|
||||
else
|
||||
{
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
@@ -687,12 +643,9 @@ bool FairMQDevice::HandleMultipartInput(const string& chName, const InputMultipa
|
||||
{
|
||||
FairMQParts input;
|
||||
|
||||
if (Receive(input, chName, i) >= 0)
|
||||
{
|
||||
return callback(input, 0);
|
||||
}
|
||||
else
|
||||
{
|
||||
if (Receive(input, chName, i) >= 0) {
|
||||
return callback(input, i);
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
@@ -421,28 +421,64 @@ class FairMQDevice
|
||||
virtual void Reset() {}
|
||||
|
||||
public:
|
||||
/// @brief Request a device state transition
|
||||
/// @param transition state transition
|
||||
///
|
||||
/// The state transition may not happen immediately, but when the current state evaluates the
|
||||
/// pending transition event and terminates. In other words, the device states are scheduled cooperatively.
|
||||
bool ChangeState(const fair::mq::Transition transition) { return fStateMachine.ChangeState(transition); }
|
||||
/// @brief Request a device state transition
|
||||
/// @param transition state transition
|
||||
///
|
||||
/// The state transition may not happen immediately, but when the current state evaluates the
|
||||
/// pending transition event and terminates. In other words, the device states are scheduled cooperatively.
|
||||
bool ChangeState(const std::string& transition) { return fStateMachine.ChangeState(fair::mq::GetTransition(transition)); }
|
||||
|
||||
/// @brief waits for the next state (any) to occur
|
||||
fair::mq::State WaitForNextState() { return fStateQueue.WaitForNext(); }
|
||||
/// @brief waits for the specified state to occur
|
||||
/// @param state state to wait for
|
||||
void WaitForState(fair::mq::State state) { fStateQueue.WaitForState(state); }
|
||||
/// @brief waits for the specified state to occur
|
||||
/// @param state state to wait for
|
||||
void WaitForState(const std::string& state) { WaitForState(fair::mq::GetState(state)); }
|
||||
|
||||
void TransitionTo(const fair::mq::State state);
|
||||
|
||||
/// @brief Subscribe with a callback to state changes
|
||||
/// @param key id to identify your subscription
|
||||
/// @param callback callback (called with the new state as the parameter)
|
||||
///
|
||||
/// The callback is called at the beginning of a new state.
|
||||
/// The callback is called from the thread the state is running in.
|
||||
void SubscribeToStateChange(const std::string& key, std::function<void(const fair::mq::State)> callback) { fStateMachine.SubscribeToStateChange(key, callback); }
|
||||
/// @brief Unsubscribe from state changes
|
||||
/// @param key id (that was used when subscribing)
|
||||
void UnsubscribeFromStateChange(const std::string& key) { fStateMachine.UnsubscribeFromStateChange(key); }
|
||||
|
||||
/// @brief Subscribe with a callback to incoming state transitions
|
||||
/// @param key id to identify your subscription
|
||||
/// @param callback callback (called with the incoming transition as the parameter)
|
||||
/// The callback is called when new transition is initiated.
|
||||
/// The callback is called from the thread that initiates the transition (via ChangeState).
|
||||
void SubscribeToNewTransition(const std::string& key, std::function<void(const fair::mq::Transition)> callback) { fStateMachine.SubscribeToNewTransition(key, callback); }
|
||||
/// @brief Unsubscribe from state transitions
|
||||
/// @param key id (that was used when subscribing)
|
||||
void UnsubscribeFromNewTransition(const std::string& key) { fStateMachine.UnsubscribeFromNewTransition(key); }
|
||||
|
||||
/// Returns true is a new state has been requested, signaling the current handler to stop.
|
||||
/// @brief Returns true if a new state has been requested, signaling the current handler to stop.
|
||||
bool NewStatePending() const { return fStateMachine.NewStatePending(); }
|
||||
|
||||
/// @brief Returns the current state
|
||||
fair::mq::State GetCurrentState() const { return fStateMachine.GetCurrentState(); }
|
||||
/// @brief Returns the name of the current state as a string
|
||||
std::string GetCurrentStateName() const { return fStateMachine.GetCurrentStateName(); }
|
||||
|
||||
/// @brief Returns name of the given state as a string
|
||||
/// @param state state
|
||||
static std::string GetStateName(const fair::mq::State state) { return fair::mq::GetStateName(state); }
|
||||
/// @brief Returns name of the given transition as a string
|
||||
/// @param transition transition
|
||||
static std::string GetTransitionName(const fair::mq::Transition transition) { return fair::mq::GetTransitionName(transition); }
|
||||
|
||||
static constexpr const char* DefaultId = "";
|
||||
|
@@ -1,5 +1,5 @@
|
||||
/********************************************************************************
|
||||
* Copyright (C) 2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* Copyright (C) 2017-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* *
|
||||
* This software is distributed under the terms of the *
|
||||
* GNU Lesser General Public License (LGPL) version 3, *
|
||||
@@ -17,8 +17,9 @@
|
||||
#include <fairlogger/Logger.h>
|
||||
|
||||
#include <boost/property_tree/ptree.hpp>
|
||||
|
||||
#include <utility> // make_pair
|
||||
#include <string_view>
|
||||
#include <utility> // make_pair
|
||||
#include <cstring>
|
||||
|
||||
using boost::property_tree::ptree;
|
||||
using namespace std;
|
||||
@@ -83,7 +84,16 @@ Properties SuboptParser(const vector<string>& channelConfig, const string& devic
|
||||
string argString(token);
|
||||
char* subopts = &argString[0];
|
||||
char* value = nullptr;
|
||||
// Find either a : or a =. If we find the former first, we consider what is before it
|
||||
// the channel name
|
||||
char* firstSep = strpbrk(subopts, ":=");
|
||||
if (firstSep && *firstSep == ':') {
|
||||
channelName = std::string_view(subopts, firstSep - subopts);
|
||||
channelProperties.put("name", channelName);
|
||||
subopts = firstSep + 1;
|
||||
}
|
||||
while (subopts && *subopts != 0 && *subopts != ' ') {
|
||||
char* cur = subopts;
|
||||
int subopt = getsubopt(&subopts, (char**)channelOptionKeys, &value);
|
||||
if (subopt == NAME) {
|
||||
channelName = value;
|
||||
@@ -94,6 +104,8 @@ Properties SuboptParser(const vector<string>& channelConfig, const string& devic
|
||||
socketsArray.push_back(make_pair("", socketProperties));
|
||||
} else if (subopt >= 0 && value != nullptr) {
|
||||
channelProperties.put(channelOptionKeys[subopt], value);
|
||||
} else if (subopt == -1) {
|
||||
LOG(warn) << "Ignoring unknown argument in --channel-config: " << cur;
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -21,7 +21,7 @@
|
||||
#define FAIRMQ_GIT_DATE "@PROJECT_GIT_DATE@"
|
||||
#define FAIRMQ_REPO_URL "https://github.com/FairRootGroup/FairMQ"
|
||||
#define FAIRMQ_LICENSE "LGPL-3.0"
|
||||
#define FAIRMQ_COPYRIGHT "2012-2020 GSI"
|
||||
#define FAIRMQ_COPYRIGHT "2012-2021 GSI"
|
||||
#define FAIRMQ_BUILD_TYPE "@CMAKE_BUILD_TYPE@"
|
||||
|
||||
#endif // FAIR_MQ_VERSION_H
|
||||
|
@@ -17,19 +17,24 @@
|
||||
|
||||
#include "../FairMQDevice.h"
|
||||
#include "../FairMQLogger.h"
|
||||
#include <fairmq/tools/Strings.h>
|
||||
|
||||
#include <chrono>
|
||||
#include <string>
|
||||
#include <fstream>
|
||||
#include <stdexcept>
|
||||
|
||||
// template<typename OutputPolicy>
|
||||
class FairMQSink : public FairMQDevice //, public OutputPolicy
|
||||
class FairMQSink : public FairMQDevice
|
||||
{
|
||||
public:
|
||||
FairMQSink()
|
||||
: fMultipart(false)
|
||||
, fMaxIterations(0)
|
||||
, fNumIterations(0)
|
||||
, fMaxFileSize(0)
|
||||
, fBytesWritten(0)
|
||||
, fInChannelName()
|
||||
, fOutFilename()
|
||||
{}
|
||||
|
||||
~FairMQSink() {}
|
||||
@@ -38,13 +43,21 @@ class FairMQSink : public FairMQDevice //, public OutputPolicy
|
||||
bool fMultipart;
|
||||
uint64_t fMaxIterations;
|
||||
uint64_t fNumIterations;
|
||||
uint64_t fMaxFileSize;
|
||||
uint64_t fBytesWritten;
|
||||
std::string fInChannelName;
|
||||
std::string fOutFilename;
|
||||
std::fstream fOutputFile;
|
||||
|
||||
void InitTask() override
|
||||
{
|
||||
fMultipart = fConfig->GetProperty<bool>("multipart");
|
||||
fMultipart = fConfig->GetProperty<bool>("multipart");
|
||||
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
|
||||
fMaxFileSize = fConfig->GetProperty<uint64_t>("max-file-size");
|
||||
fInChannelName = fConfig->GetProperty<std::string>("in-channel");
|
||||
fOutFilename = fConfig->GetProperty<std::string>("out-filename");
|
||||
|
||||
fBytesWritten = 0;
|
||||
}
|
||||
|
||||
void Run() override
|
||||
@@ -52,41 +65,83 @@ class FairMQSink : public FairMQDevice //, public OutputPolicy
|
||||
// store the channel reference to avoid traversing the map on every loop iteration
|
||||
FairMQChannel& dataInChannel = fChannels.at(fInChannelName).at(0);
|
||||
|
||||
LOG(info) << "Starting the benchmark and expecting to receive " << fMaxIterations << " messages.";
|
||||
LOG(info) << "Starting sink and expecting to receive " << fMaxIterations << " messages.";
|
||||
auto tStart = std::chrono::high_resolution_clock::now();
|
||||
|
||||
if (!fOutFilename.empty()) {
|
||||
LOG(debug) << "Incoming messages will be written to file: " << fOutFilename;
|
||||
if (fMaxFileSize != 0) {
|
||||
LOG(debug) << "File output will stop after " << fMaxFileSize << " bytes";
|
||||
} else {
|
||||
LOG(debug) << "ATTENTION: --max-file-size is 0 - output file will continue to grow until sink is stopped";
|
||||
}
|
||||
|
||||
fOutputFile.open(fOutFilename, std::ios::out | std::ios::binary);
|
||||
if (!fOutputFile) {
|
||||
LOG(error) << "Could not open '" << fOutFilename;
|
||||
throw std::runtime_error(fair::mq::tools::ToString("Could not open '", fOutFilename));
|
||||
}
|
||||
}
|
||||
|
||||
while (!NewStatePending()) {
|
||||
if (fMultipart) {
|
||||
FairMQParts parts;
|
||||
|
||||
if (dataInChannel.Receive(parts) >= 0) {
|
||||
if (fMaxIterations > 0) {
|
||||
if (fNumIterations >= fMaxIterations) {
|
||||
LOG(info) << "Configured maximum number of iterations reached.";
|
||||
break;
|
||||
}
|
||||
if (dataInChannel.Receive(parts) < 0) {
|
||||
continue;
|
||||
}
|
||||
if (fOutputFile.is_open()) {
|
||||
for (const auto& part : parts) {
|
||||
WriteToFile(static_cast<const char*>(part->GetData()), part->GetSize());
|
||||
}
|
||||
fNumIterations++;
|
||||
}
|
||||
} else {
|
||||
FairMQMessagePtr msg(dataInChannel.NewMessage());
|
||||
|
||||
if (dataInChannel.Receive(msg) >= 0) {
|
||||
if (fMaxIterations > 0) {
|
||||
if (fNumIterations >= fMaxIterations) {
|
||||
LOG(info) << "Configured maximum number of iterations reached.";
|
||||
break;
|
||||
}
|
||||
}
|
||||
fNumIterations++;
|
||||
if (dataInChannel.Receive(msg) < 0) {
|
||||
continue;
|
||||
}
|
||||
if (fOutputFile.is_open()) {
|
||||
WriteToFile(static_cast<const char*>(msg->GetData()), msg->GetSize());
|
||||
}
|
||||
}
|
||||
|
||||
if (fMaxFileSize > 0 && fBytesWritten >= fMaxFileSize) {
|
||||
LOG(info) << "Written " << fBytesWritten << " bytes, stopping...";
|
||||
break;
|
||||
}
|
||||
if (fMaxIterations > 0) {
|
||||
if (fNumIterations >= fMaxIterations) {
|
||||
LOG(info) << "Configured maximum number of iterations reached.";
|
||||
break;
|
||||
}
|
||||
}
|
||||
fNumIterations++;
|
||||
}
|
||||
|
||||
if (fOutputFile.is_open()) {
|
||||
fOutputFile.flush();
|
||||
fOutputFile.close();
|
||||
}
|
||||
|
||||
auto tEnd = std::chrono::high_resolution_clock::now();
|
||||
auto ms = std::chrono::duration<double, std::milli>(tEnd - tStart).count();
|
||||
LOG(info) << "Received " << fNumIterations << " messages in " << ms << "ms.";
|
||||
if (!fOutFilename.empty()) {
|
||||
auto sec = std::chrono::duration<double>(tEnd - tStart).count();
|
||||
LOG(info) << "Closed '" << fOutFilename << "' after writing " << fBytesWritten << " bytes."
|
||||
<< "(" << (fBytesWritten / (1000. * 1000.)) / sec << " MB/s)";
|
||||
}
|
||||
|
||||
LOG(info) << "Leaving RUNNING state. Received " << fNumIterations << " messages in "
|
||||
<< std::chrono::duration<double, std::milli>(tEnd - tStart).count() << "ms.";
|
||||
LOG(info) << "Leaving RUNNING state.";
|
||||
}
|
||||
|
||||
void WriteToFile(const char* ptr, size_t size)
|
||||
{
|
||||
fOutputFile.write(ptr, size);
|
||||
if (fOutputFile.bad()) {
|
||||
LOG(error) << "failed writing to file";
|
||||
throw std::runtime_error("failed writing to file");
|
||||
}
|
||||
fBytesWritten += size;
|
||||
}
|
||||
};
|
||||
|
||||
|
@@ -71,6 +71,7 @@ Plugin::ProgOptions ConfigPluginProgramOptions()
|
||||
("shm-zero-segment", po::value<bool >()->default_value(false), "Shared memory: zero the shared memory segment memory after initialization.")
|
||||
("shm-throw-bad-alloc", po::value<bool >()->default_value(true), "Throw a fair::mq::MessageBadAlloc if cannot allocate a message (retry if false).")
|
||||
("shm-monitor", po::value<bool >()->default_value(true), "Shared memory: run monitor daemon.")
|
||||
("shm-no-cleanup", po::value<bool >()->default_value(false), "Shared memory: do not cleanup the memory when last device leaves.")
|
||||
("ofi-size-hint", po::value<size_t >()->default_value(0), "EXPERIMENTAL: OFI size hint for the allocator.")
|
||||
("rate", po::value<float >()->default_value(0.), "Rate for conditional run loop (Hz).")
|
||||
("session", po::value<string >()->default_value("default"), "Session name.")
|
||||
|
@@ -15,6 +15,8 @@ void addCustomOptions(bpo::options_description& options)
|
||||
{
|
||||
options.add_options()
|
||||
("in-channel", bpo::value<std::string>()->default_value("data"), "Name of the input channel")
|
||||
("out-filename", bpo::value<std::string>()->default_value(""), "Write incoming message buffers to the specified file")
|
||||
("max-file-size", bpo::value<uint64_t>()->default_value(2000000000), "Maximum file size for the file output (0 - unlimited)")
|
||||
("max-iterations", bpo::value<uint64_t>()->default_value(0), "Number of run iterations (0 - infinite)")
|
||||
("multipart", bpo::value<bool>()->default_value(false), "Handle multipart payloads");
|
||||
}
|
||||
|
@@ -11,6 +11,7 @@
|
||||
#include <picosha2.h>
|
||||
|
||||
#include <atomic>
|
||||
#include <sstream>
|
||||
#include <string>
|
||||
#include <functional> // std::equal_to
|
||||
|
||||
@@ -103,6 +104,15 @@ struct DeviceCounter
|
||||
std::atomic<unsigned int> fCount;
|
||||
};
|
||||
|
||||
struct EventCounter
|
||||
{
|
||||
EventCounter(uint64_t c)
|
||||
: fCount(c)
|
||||
{}
|
||||
|
||||
std::atomic<uint64_t> fCount;
|
||||
};
|
||||
|
||||
struct RegionCounter
|
||||
{
|
||||
RegionCounter(uint16_t c)
|
||||
@@ -185,15 +195,25 @@ struct RegionBlock
|
||||
|
||||
// find id for unique shmem name:
|
||||
// a hash of user id + session id, truncated to 8 characters (to accommodate for name size limit on some systems (MacOS)).
|
||||
inline std::string buildShmIdFromSessionIdAndUserId(const std::string& sessionId)
|
||||
inline std::string makeShmIdStr(const std::string& sessionId)
|
||||
{
|
||||
std::string seed((std::to_string(geteuid()) + sessionId));
|
||||
// generate a 8-digit hex value out of sha256 hash
|
||||
std::vector<unsigned char> hash(4);
|
||||
picosha2::hash256(seed.begin(), seed.end(), hash.begin(), hash.end());
|
||||
std::string shmId = picosha2::bytes_to_hex_string(hash.begin(), hash.end());
|
||||
|
||||
return shmId;
|
||||
return picosha2::bytes_to_hex_string(hash.begin(), hash.end());
|
||||
}
|
||||
|
||||
inline uint64_t makeShmIdUint64(const std::string& sessionId)
|
||||
{
|
||||
std::string shmId = makeShmIdStr(sessionId);
|
||||
uint64_t id = 0;
|
||||
std::stringstream ss;
|
||||
ss << std::hex << shmId;
|
||||
ss >> id;
|
||||
|
||||
return id;
|
||||
}
|
||||
|
||||
struct SegmentSize : public boost::static_visitor<size_t>
|
||||
|
@@ -64,7 +64,9 @@ class Manager
|
||||
, fShmMtx(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_mtx").c_str())
|
||||
, fRegionEventsCV(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_cv").c_str())
|
||||
, fRegionEventsSubscriptionActive(false)
|
||||
, fNumObservedEvents(0)
|
||||
, fDeviceCounter(nullptr)
|
||||
, fEventCounter(nullptr)
|
||||
, fShmSegments(nullptr)
|
||||
, fShmRegions(nullptr)
|
||||
, fInterrupted(false)
|
||||
@@ -76,6 +78,7 @@ class Manager
|
||||
, fHeartbeatThread()
|
||||
, fSendHeartbeats(true)
|
||||
, fThrowOnBadAlloc(config ? config->GetProperty<bool>("shm-throw-bad-alloc", true) : true)
|
||||
, fNoCleanup(config ? config->GetProperty<bool>("shm-no-cleanup", false) : false)
|
||||
{
|
||||
using namespace boost::interprocess;
|
||||
|
||||
@@ -102,6 +105,16 @@ class Manager
|
||||
|
||||
fShmSegments = fManagementSegment.find_or_construct<Uint16SegmentInfoHashMap>(unique_instance)(fShmVoidAlloc);
|
||||
|
||||
fEventCounter = fManagementSegment.find<EventCounter>(unique_instance).first;
|
||||
|
||||
if (fEventCounter) {
|
||||
LOG(debug) << "event counter found: " << fEventCounter->fCount;
|
||||
} else {
|
||||
LOG(debug) << "no event counter found, creating one and initializing with 0";
|
||||
fEventCounter = fManagementSegment.construct<EventCounter>(unique_instance)(0);
|
||||
LOG(debug) << "initialized event counter with: " << fEventCounter->fCount;
|
||||
}
|
||||
|
||||
try {
|
||||
auto it = fShmSegments->find(fSegmentId);
|
||||
if (it == fShmSegments->end()) {
|
||||
@@ -114,6 +127,7 @@ class Manager
|
||||
fShmSegments->emplace(fSegmentId, AllocationAlgorithm::simple_seq_fit);
|
||||
}
|
||||
ss << "Created ";
|
||||
(fEventCounter->fCount)++;
|
||||
} else {
|
||||
// found segment with the given id, opening
|
||||
if (it->second.fAllocationAlgorithm == AllocationAlgorithm::rbtree_best_fit) {
|
||||
@@ -137,7 +151,8 @@ class Manager
|
||||
<< " Allocation algorithm: " << allocationAlgorithm;
|
||||
LOG(debug) << ss.str();
|
||||
} catch(interprocess_exception& bie) {
|
||||
LOG(error) << "something went wrong: " << bie.what();
|
||||
LOG(error) << "Failed to create/open shared memory segment (" << "fmq_" << fShmId << "_m_" << fSegmentId << "): " << bie.what();
|
||||
throw std::runtime_error(tools::ToString("Failed to create/open shared memory segment (", "fmq_", fShmId, "_m_", fSegmentId, "): ", bie.what()));
|
||||
}
|
||||
|
||||
if (mlockSegment) {
|
||||
@@ -276,6 +291,8 @@ class Manager
|
||||
r.first->second->StartReceivingAcks();
|
||||
result.first = &(r.first->second->fRegion);
|
||||
result.second = id;
|
||||
|
||||
(fEventCounter->fCount)++;
|
||||
}
|
||||
fRegionEventsCV.notify_all();
|
||||
|
||||
@@ -327,6 +344,7 @@ class Manager
|
||||
{
|
||||
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
|
||||
fShmRegions->at(id).fDestroyed = true;
|
||||
(fEventCounter->fCount)++;
|
||||
}
|
||||
fRegionEventsCV.notify_all();
|
||||
}
|
||||
@@ -415,14 +433,16 @@ class Manager
|
||||
while (fRegionEventsSubscriptionActive) {
|
||||
auto infos = GetRegionInfoUnsafe();
|
||||
for (const auto& i : infos) {
|
||||
auto el = fObservedRegionEvents.find(i.id);
|
||||
auto el = fObservedRegionEvents.find({i.id, i.managed});
|
||||
if (el == fObservedRegionEvents.end()) {
|
||||
fRegionEventCallback(i);
|
||||
fObservedRegionEvents.emplace(i.id, i.event);
|
||||
fObservedRegionEvents.emplace(std::make_pair(i.id, i.managed), i.event);
|
||||
++fNumObservedEvents;
|
||||
} else {
|
||||
if (el->second == RegionEvent::created && i.event == RegionEvent::destroyed) {
|
||||
fRegionEventCallback(i);
|
||||
el->second = i.event;
|
||||
++fNumObservedEvents;
|
||||
} else {
|
||||
// LOG(debug) << "ignoring event for id" << i.id << ":";
|
||||
// LOG(debug) << "incoming event: " << i.event;
|
||||
@@ -430,7 +450,7 @@ class Manager
|
||||
}
|
||||
}
|
||||
}
|
||||
fRegionEventsCV.wait(lock);
|
||||
fRegionEventsCV.wait(lock, [&] { return !fRegionEventsSubscriptionActive || fNumObservedEvents != fEventCounter->fCount; });
|
||||
}
|
||||
}
|
||||
|
||||
@@ -590,7 +610,7 @@ class Manager
|
||||
(fDeviceCounter->fCount)--;
|
||||
|
||||
if (fDeviceCounter->fCount == 0) {
|
||||
LOG(debug) << "Last segment user, removing segment.";
|
||||
LOG(debug) << "Last segment user, " << (fNoCleanup ? "skipping removal (--shm-no-cleanup is true)." : "removing segment.");
|
||||
lastRemoved = true;
|
||||
} else {
|
||||
LOG(debug) << "Other segment users present (" << fDeviceCounter->fCount << "), skipping removal.";
|
||||
@@ -599,7 +619,7 @@ class Manager
|
||||
LOG(error) << "Manager could not acquire lock: " << e.what();
|
||||
}
|
||||
|
||||
if (lastRemoved) {
|
||||
if (lastRemoved && !fNoCleanup) {
|
||||
Monitor::Cleanup(ShmId{fShmId});
|
||||
}
|
||||
}
|
||||
@@ -617,9 +637,11 @@ class Manager
|
||||
std::thread fRegionEventThread;
|
||||
bool fRegionEventsSubscriptionActive;
|
||||
std::function<void(fair::mq::RegionInfo)> fRegionEventCallback;
|
||||
std::unordered_map<uint16_t, RegionEvent> fObservedRegionEvents;
|
||||
std::map<std::pair<uint16_t, bool>, RegionEvent> fObservedRegionEvents;
|
||||
uint64_t fNumObservedEvents;
|
||||
|
||||
DeviceCounter* fDeviceCounter;
|
||||
EventCounter* fEventCounter;
|
||||
Uint16SegmentInfoHashMap* fShmSegments;
|
||||
Uint16RegionInfoHashMap* fShmRegions;
|
||||
std::unordered_map<uint16_t, std::unique_ptr<Region>> fRegions;
|
||||
@@ -637,6 +659,7 @@ class Manager
|
||||
std::condition_variable fHeartbeatsCV;
|
||||
|
||||
bool fThrowOnBadAlloc;
|
||||
bool fNoCleanup;
|
||||
};
|
||||
|
||||
} // namespace fair::mq::shmem
|
||||
|
@@ -211,11 +211,31 @@ class Message final : public fair::mq::Message
|
||||
return true;
|
||||
} else if (newSize <= fMeta.fSize) {
|
||||
try {
|
||||
fLocalPtr = fManager.ShrinkInPlace(newSize, fLocalPtr, fMeta.fSegmentId);
|
||||
fMeta.fSize = newSize;
|
||||
return true;
|
||||
try {
|
||||
fLocalPtr = fManager.ShrinkInPlace(newSize, fLocalPtr, fMeta.fSegmentId);
|
||||
fMeta.fSize = newSize;
|
||||
return true;
|
||||
} catch (boost::interprocess::bad_alloc& e) {
|
||||
// if shrinking fails (can happen due to boost alignment requirements):
|
||||
// unused size >= 1000000 bytes: reallocate fully
|
||||
// unused size < 1000000 bytes: simply reset the size and keep the rest of the buffer until message destruction
|
||||
if (fMeta.fSize - newSize >= 1000000) {
|
||||
char* newPtr = fManager.Allocate(newSize, fAlignment);
|
||||
if (newPtr) {
|
||||
std::memcpy(newPtr, fLocalPtr, newSize);
|
||||
fManager.Deallocate(fMeta.fHandle, fMeta.fSegmentId);
|
||||
fLocalPtr = newPtr;
|
||||
fMeta.fHandle = fManager.GetHandleFromAddress(fLocalPtr, fMeta.fSegmentId);
|
||||
} else {
|
||||
LOG(debug) << "could not set used size: " << e.what();
|
||||
return false;
|
||||
}
|
||||
}
|
||||
fMeta.fSize = newSize;
|
||||
return true;
|
||||
}
|
||||
} catch (boost::interprocess::interprocess_exception& e) {
|
||||
LOG(info) << "could not set used size: " << e.what();
|
||||
LOG(debug) << "could not set used size: " << e.what();
|
||||
return false;
|
||||
}
|
||||
} else {
|
||||
@@ -257,7 +277,7 @@ class Message final : public fair::mq::Message
|
||||
Manager& fManager;
|
||||
bool fQueued;
|
||||
MetaHeader fMeta;
|
||||
size_t fAlignment; // TODO: put this to debug mode
|
||||
size_t fAlignment;
|
||||
mutable Region* fRegionPtr;
|
||||
mutable char* fLocalPtr;
|
||||
|
||||
|
@@ -393,7 +393,7 @@ void Monitor::PrintDebugInfo(const ShmId& shmId __attribute__((unused)))
|
||||
|
||||
void Monitor::PrintDebugInfo(const SessionId& sessionId)
|
||||
{
|
||||
ShmId shmId{buildShmIdFromSessionIdAndUserId(sessionId.sessionId)};
|
||||
ShmId shmId{makeShmIdStr(sessionId.sessionId)};
|
||||
PrintDebugInfo(shmId);
|
||||
}
|
||||
|
||||
@@ -429,7 +429,7 @@ unordered_map<uint16_t, std::vector<BufferDebugInfo>> Monitor::GetDebugInfo(cons
|
||||
}
|
||||
unordered_map<uint16_t, std::vector<BufferDebugInfo>> Monitor::GetDebugInfo(const SessionId& sessionId)
|
||||
{
|
||||
ShmId shmId{buildShmIdFromSessionIdAndUserId(sessionId.sessionId)};
|
||||
ShmId shmId{makeShmIdStr(sessionId.sessionId)};
|
||||
return GetDebugInfo(shmId);
|
||||
}
|
||||
|
||||
@@ -536,7 +536,7 @@ std::vector<std::pair<std::string, bool>> Monitor::Cleanup(const ShmId& shmId, b
|
||||
|
||||
std::vector<std::pair<std::string, bool>> Monitor::Cleanup(const SessionId& sessionId, bool verbose /* = true */)
|
||||
{
|
||||
ShmId shmId{buildShmIdFromSessionIdAndUserId(sessionId.sessionId)};
|
||||
ShmId shmId{makeShmIdStr(sessionId.sessionId)};
|
||||
if (verbose) {
|
||||
cout << "Cleanup called with session id '" << sessionId.sessionId << "', translating to shared memory id '" << shmId.shmId << "'" << endl;
|
||||
}
|
||||
@@ -553,7 +553,7 @@ std::vector<std::pair<std::string, bool>> Monitor::CleanupFull(const ShmId& shmI
|
||||
|
||||
std::vector<std::pair<std::string, bool>> Monitor::CleanupFull(const SessionId& sessionId, bool verbose /* = true */)
|
||||
{
|
||||
ShmId shmId{buildShmIdFromSessionIdAndUserId(sessionId.sessionId)};
|
||||
ShmId shmId{makeShmIdStr(sessionId.sessionId)};
|
||||
if (verbose) {
|
||||
cout << "Cleanup called with session id '" << sessionId.sessionId << "', translating to shared memory id '" << shmId.shmId << "'" << endl;
|
||||
}
|
||||
|
@@ -152,7 +152,7 @@ struct Region
|
||||
}
|
||||
}
|
||||
|
||||
LOG(debug) << "AcksSender for " << fName << " leaving " << "(blocks left to free: " << fBlocksToFree.size() << ", "
|
||||
LOG(trace) << "AcksSender for " << fName << " leaving " << "(blocks left to free: " << fBlocksToFree.size() << ", "
|
||||
<< " blocks left to send: " << blocksToSend << ").";
|
||||
}
|
||||
|
||||
@@ -195,7 +195,7 @@ struct Region
|
||||
}
|
||||
}
|
||||
|
||||
LOG(debug) << "AcksReceiver for " << fName << " leaving (remaining queue size: " << fQueue->get_num_msg() << ").";
|
||||
LOG(trace) << "AcksReceiver for " << fName << " leaving (remaining queue size: " << fQueue->get_num_msg() << ").";
|
||||
}
|
||||
|
||||
void ReleaseBlock(const RegionBlock& block)
|
||||
|
@@ -28,6 +28,7 @@
|
||||
#include <memory> // unique_ptr, make_unique
|
||||
#include <string>
|
||||
#include <vector>
|
||||
#include <stdexcept>
|
||||
|
||||
namespace fair::mq::shmem
|
||||
{
|
||||
@@ -35,10 +36,8 @@ namespace fair::mq::shmem
|
||||
class TransportFactory final : public fair::mq::TransportFactory
|
||||
{
|
||||
public:
|
||||
TransportFactory(const std::string& id = "", const ProgOptions* config = nullptr)
|
||||
: fair::mq::TransportFactory(id)
|
||||
, fDeviceId(id)
|
||||
, fShmId()
|
||||
TransportFactory(const std::string& deviceId = "", const ProgOptions* config = nullptr)
|
||||
: fair::mq::TransportFactory(deviceId)
|
||||
, fZmqCtx(zmq_ctx_new())
|
||||
, fManager(nullptr)
|
||||
{
|
||||
@@ -69,8 +68,8 @@ class TransportFactory final : public fair::mq::TransportFactory
|
||||
throw SharedMemoryError(tools::ToString("Provided shared memory allocation algorithm '", allocationAlgorithm, "' is not supported. Supported are 'rbtree_best_fit'/'simple_seq_fit'"));
|
||||
}
|
||||
|
||||
fShmId = buildShmIdFromSessionIdAndUserId(sessionName);
|
||||
LOG(debug) << "Generated shmid '" << fShmId << "' out of session id '" << sessionName << "'.";
|
||||
std::string shmId = makeShmIdStr(sessionName);
|
||||
LOG(debug) << "Generated shmid '" << shmId << "' out of session id '" << sessionName << "'.";
|
||||
|
||||
try {
|
||||
if (zmq_ctx_set(fZmqCtx, ZMQ_IO_THREADS, numIoThreads) != 0) {
|
||||
@@ -82,10 +81,13 @@ class TransportFactory final : public fair::mq::TransportFactory
|
||||
LOG(error) << "failed configuring context, reason: " << zmq_strerror(errno);
|
||||
}
|
||||
|
||||
fManager = std::make_unique<Manager>(fShmId, fDeviceId, segmentSize, config);
|
||||
fManager = std::make_unique<Manager>(shmId, deviceId, segmentSize, config);
|
||||
} catch (boost::interprocess::interprocess_exception& e) {
|
||||
LOG(error) << "Could not initialize shared memory transport: " << e.what();
|
||||
throw std::runtime_error(tools::ToString("Could not initialize shared memory transport: ", e.what()));
|
||||
} catch (const std::exception& e) {
|
||||
LOG(error) << "Could not initialize shared memory transport: " << e.what();
|
||||
throw std::runtime_error(tools::ToString("Could not initialize shared memory transport: ", e.what()));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -200,8 +202,6 @@ class TransportFactory final : public fair::mq::TransportFactory
|
||||
}
|
||||
|
||||
private:
|
||||
std::string fDeviceId;
|
||||
std::string fShmId;
|
||||
void* fZmqCtx;
|
||||
std::unique_ptr<Manager> fManager;
|
||||
};
|
||||
|
@@ -111,7 +111,7 @@ int main(int argc, char** argv)
|
||||
}
|
||||
|
||||
if (shmId == "") {
|
||||
shmId = buildShmIdFromSessionIdAndUserId(sessionName);
|
||||
shmId = makeShmIdStr(sessionName);
|
||||
}
|
||||
|
||||
if (cleanup) {
|
||||
|
@@ -217,6 +217,7 @@ add_testsuite(Properties
|
||||
SOURCES
|
||||
${CMAKE_CURRENT_BINARY_DIR}/runner.cxx
|
||||
properties/_properties.cxx
|
||||
properties/_suboptparser.cxx
|
||||
|
||||
LINKS FairMQ
|
||||
INCLUDES ${CMAKE_CURRENT_SOURCE_DIR}
|
||||
|
41
test/ci/slurm-submit.sh
Executable file
41
test/ci/slurm-submit.sh
Executable file
@@ -0,0 +1,41 @@
|
||||
#! /bin/bash
|
||||
|
||||
label="$1"
|
||||
jobsh="$2"
|
||||
|
||||
if [ -z "$ALFACI_SLURM_CPUS" ]
|
||||
then
|
||||
ALFACI_SLURM_CPUS=32
|
||||
fi
|
||||
if [ -z "$ALFACI_SLURM_EXTRA_OPTS" ]
|
||||
then
|
||||
ALFACI_SLURM_EXTRA_OPTS="--hint=compute_bound"
|
||||
fi
|
||||
if [ -z "$ALFACI_SLURM_TIMEOUT" ]
|
||||
then
|
||||
ALFACI_SLURM_TIMEOUT=30
|
||||
fi
|
||||
if [ -z "$ALFACI_SLURM_QUEUE" ]
|
||||
then
|
||||
ALFACI_SLURM_QUEUE=main
|
||||
fi
|
||||
|
||||
echo "*** Slurm request options :"
|
||||
echo "*** Working directory ..: $PWD"
|
||||
echo "*** Queue ..............: $ALFACI_SLURM_QUEUE"
|
||||
echo "*** CPUs ...............: $ALFACI_SLURM_CPUS"
|
||||
echo "*** Wall Time ..........: $ALFACI_SLURM_TIMEOUT min"
|
||||
echo "*** Job Name ...........: ${label}"
|
||||
echo "*** Extra Options ......: ${ALFACI_SLURM_EXTRA_OPTS}"
|
||||
echo "*** Submitting job at ....: $(date -R)"
|
||||
(
|
||||
set -x
|
||||
srun -p $ALFACI_SLURM_QUEUE -c $ALFACI_SLURM_CPUS -n 1 \
|
||||
-t $ALFACI_SLURM_TIMEOUT \
|
||||
--job-name="${label}" \
|
||||
${ALFACI_SLURM_EXTRA_OPTS} \
|
||||
bash "${jobsh}"
|
||||
)
|
||||
retval=$?
|
||||
echo "*** Exit Code ............: $retval"
|
||||
exit "$retval"
|
@@ -40,34 +40,34 @@ void RunPushPullWithMsgResize(const string& transport, const string& _address)
|
||||
pull.Connect(address);
|
||||
|
||||
{
|
||||
FairMQMessagePtr outMsg(push.NewMessage(1000));
|
||||
ASSERT_EQ(outMsg->GetSize(), 1000);
|
||||
memcpy(outMsg->GetData(), "ABC", 3);
|
||||
ASSERT_EQ(outMsg->SetUsedSize(500), true);
|
||||
ASSERT_EQ(outMsg->SetUsedSize(500), true);
|
||||
ASSERT_EQ(outMsg->SetUsedSize(700), false);
|
||||
ASSERT_EQ(outMsg->GetSize(), 500);
|
||||
FairMQMessagePtr outMsg(push.NewMessage(6));
|
||||
ASSERT_EQ(outMsg->GetSize(), 6);
|
||||
memcpy(outMsg->GetData(), "ABCDEF", 6);
|
||||
ASSERT_EQ(outMsg->SetUsedSize(5), true);
|
||||
ASSERT_EQ(outMsg->SetUsedSize(5), true);
|
||||
ASSERT_EQ(outMsg->SetUsedSize(7), false);
|
||||
ASSERT_EQ(outMsg->GetSize(), 5);
|
||||
// check if the data is still intact
|
||||
ASSERT_EQ(static_cast<char*>(outMsg->GetData())[0], 'A');
|
||||
ASSERT_EQ(static_cast<char*>(outMsg->GetData())[1], 'B');
|
||||
ASSERT_EQ(static_cast<char*>(outMsg->GetData())[2], 'C');
|
||||
ASSERT_EQ(outMsg->SetUsedSize(250), true);
|
||||
ASSERT_EQ(outMsg->GetSize(), 250);
|
||||
ASSERT_EQ(static_cast<char*>(outMsg->GetData())[3], 'D');
|
||||
ASSERT_EQ(static_cast<char*>(outMsg->GetData())[4], 'E');
|
||||
ASSERT_EQ(outMsg->SetUsedSize(2), true);
|
||||
ASSERT_EQ(outMsg->GetSize(), 2);
|
||||
ASSERT_EQ(static_cast<char*>(outMsg->GetData())[0], 'A');
|
||||
ASSERT_EQ(static_cast<char*>(outMsg->GetData())[1], 'B');
|
||||
ASSERT_EQ(static_cast<char*>(outMsg->GetData())[2], 'C');
|
||||
FairMQMessagePtr msgCopy(push.NewMessage());
|
||||
msgCopy->Copy(*outMsg);
|
||||
ASSERT_EQ(msgCopy->GetSize(), 250);
|
||||
ASSERT_EQ(msgCopy->GetSize(), 2);
|
||||
|
||||
ASSERT_EQ(push.Send(outMsg), 250);
|
||||
ASSERT_EQ(push.Send(outMsg), 2);
|
||||
|
||||
FairMQMessagePtr inMsg(pull.NewMessage());
|
||||
ASSERT_EQ(pull.Receive(inMsg), 250);
|
||||
ASSERT_EQ(inMsg->GetSize(), 250);
|
||||
ASSERT_EQ(pull.Receive(inMsg), 2);
|
||||
ASSERT_EQ(inMsg->GetSize(), 2);
|
||||
ASSERT_EQ(static_cast<char*>(inMsg->GetData())[0], 'A');
|
||||
ASSERT_EQ(static_cast<char*>(inMsg->GetData())[1], 'B');
|
||||
ASSERT_EQ(static_cast<char*>(inMsg->GetData())[2], 'C');
|
||||
}
|
||||
|
||||
{
|
||||
|
36
test/properties/_suboptparser.cxx
Normal file
36
test/properties/_suboptparser.cxx
Normal file
@@ -0,0 +1,36 @@
|
||||
/********************************************************************************
|
||||
* Copyright (C) 2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* *
|
||||
* This software is distributed under the terms of the *
|
||||
* GNU Lesser General Public Licence (LGPL) version 3, *
|
||||
* copied verbatim in the file "LICENSE" *
|
||||
********************************************************************************/
|
||||
|
||||
#include <fairmq/Properties.h>
|
||||
#include <fairmq/SuboptParser.h>
|
||||
#include <gtest/gtest.h>
|
||||
#include <string>
|
||||
#include <string_view>
|
||||
|
||||
namespace {
|
||||
|
||||
using namespace std;
|
||||
using namespace fair::mq;
|
||||
|
||||
auto contains(Properties const& parsed, string_view key, string_view value) -> bool
|
||||
{
|
||||
return PropertyHelper::ConvertPropertyToString(parsed.at(string(key))) == value;
|
||||
}
|
||||
|
||||
TEST(SuboptParser, ChannelNameSelector)
|
||||
{
|
||||
Properties parsed(SuboptParser({"name=foo-data,address=tcp://0.0.0.0:6000,type=push",
|
||||
"bar-data:address=tcp://0.0.0.0:7000,type=pull"},
|
||||
"foo"));
|
||||
ASSERT_TRUE(contains(parsed, "chans.foo-data.0.address", "tcp://0.0.0.0:6000"));
|
||||
ASSERT_TRUE(contains(parsed, "chans.foo-data.0.type", "push"));
|
||||
ASSERT_TRUE(contains(parsed, "chans.bar-data.0.address", "tcp://0.0.0.0:7000"));
|
||||
ASSERT_TRUE(contains(parsed, "chans.bar-data.0.type", "pull"));
|
||||
}
|
||||
|
||||
} // namespace
|
@@ -50,12 +50,12 @@ void RegionEventSubscriptions(const string& transport)
|
||||
|
||||
ASSERT_EQ(factory->SubscribedToRegionEvents(), false);
|
||||
factory->SubscribeToRegionEvents([&](FairMQRegionInfo info) {
|
||||
LOG(warn) << ">>>" << info.event;
|
||||
LOG(warn) << "managed: " << info.managed;
|
||||
LOG(warn) << "id: " << info.id;
|
||||
LOG(warn) << "ptr: " << info.ptr;
|
||||
LOG(warn) << "size: " << info.size;
|
||||
LOG(warn) << "flags: " << info.flags;
|
||||
LOG(info) << ">>> " << info.event << ": "
|
||||
<< (info.managed ? "managed" : "unmanaged")
|
||||
<< ", id: " << info.id
|
||||
<< ", ptr: " << info.ptr
|
||||
<< ", size: " << info.size
|
||||
<< ", flags: " << info.flags;
|
||||
if (info.event == FairMQRegionEvent::created) {
|
||||
if (info.id == id1) {
|
||||
ASSERT_EQ(info.size, size1);
|
||||
|
Reference in New Issue
Block a user