Compare commits

..

12 Commits

Author SHA1 Message Date
Alexey Rybalchenko
bffe74c5cf shm: handle shrink failure gracefully 2021-03-23 13:00:35 +01:00
Dennis Klein
72f319e276 CI: Adapt to new alfaci build hosts 2021-03-23 11:06:33 +01:00
Alexey Rybalchenko
62438bd99e shm: Improve error message when segment cannot be opened 2021-03-18 09:02:08 +01:00
Alexey Rybalchenko
c8ad684b18 Add --shm-no-cleanup option
When true, device will skip the segment cleanup even when it is the last
segment user
2021-03-18 09:02:08 +01:00
Alexey Rybalchenko
a5ec83208d Update docs 2021-03-11 12:14:00 +01:00
Alexey Rybalchenko
fc2241ece7 Fix incorrect channel index passed to OnData callback 2021-03-11 12:14:00 +01:00
Alexey Rybalchenko
1f26883b75 Formatting 2021-03-11 12:14:00 +01:00
Alexey Rybalchenko
edbdc57332 shm: make shmId also available as uint64_t 2021-03-11 12:14:00 +01:00
Alexey Rybalchenko
0fd2fcadc2 shm: Make sure no event notifications are missed 2021-03-11 12:14:00 +01:00
Alexey Rybalchenko
9b48b31a75 shm: Make sure all region events are fired 2021-03-11 12:14:00 +01:00
Dennis Klein
cb4335e59f Add test coverage for --channel-config name selector 2021-03-05 02:02:14 +01:00
Giulio Eulisse
ce4584b3d8 Provide a better syntax for --channel-config
The current syntax is ambiguous because it treats assignments
(like address=127.0.0.1) and selectors (name=my-channel) using
the symbol equal `"`.

This allows:

my-channel:address=127.0.0.1

as alternative syntax, which clearly separates the role of my-channel
from the associated properties.
2021-03-05 02:02:14 +01:00
29 changed files with 488 additions and 482 deletions

View File

@@ -1,88 +1,63 @@
################################################################################
# Copyright (C) 2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
# Copyright (C) 2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
# #
# This software is distributed under the terms of the #
# GNU Lesser General Public Licence (LGPL) version 3, #
# copied verbatim in the file "LICENSE" #
################################################################################
Set(CTEST_SOURCE_DIRECTORY $ENV{SOURCEDIR})
Set(CTEST_BINARY_DIRECTORY $ENV{BUILDDIR})
Set(CTEST_SITE $ENV{SITE})
Set(CTEST_BUILD_NAME $ENV{LABEL})
Set(CTEST_CMAKE_GENERATOR "Unix Makefiles")
Set(CTEST_PROJECT_NAME "FairMQ")
Find_Program(CTEST_GIT_COMMAND NAMES git)
Set(CTEST_UPDATE_COMMAND "${CTEST_GIT_COMMAND}")
cmake_host_system_information(RESULT fqdn QUERY FQDN)
Set(BUILD_COMMAND "make")
Set(CTEST_BUILD_COMMAND "${BUILD_COMMAND} -j$ENV{number_of_processors}")
set(CTEST_SOURCE_DIRECTORY .)
set(CTEST_BINARY_DIRECTORY build)
set(CTEST_CMAKE_GENERATOR "Ninja")
set(CTEST_USE_LAUNCHERS ON)
set(CTEST_CONFIGURATION_TYPE "RelWithDebInfo")
String(TOUPPER $ENV{ctest_model} _Model)
Set(configure_options "-DCMAKE_BUILD_TYPE=$ENV{ctest_model}")
Set(CTEST_USE_LAUNCHERS 1)
Set(configure_options "${configure_options};-DCTEST_USE_LAUNCHERS=${CTEST_USE_LAUNCHERS}")
Set(configure_options "${configure_options};-DDISABLE_COLOR=ON")
Set(configure_options "${configure_options};-DCMAKE_PREFIX_PATH=$ENV{SIMPATH}")
# Set(configure_options "${configure_options};-DBUILD_OFI_TRANSPORT=ON")
Set(configure_options "${configure_options};-DBUILD_DDS_PLUGIN=ON")
Set(configure_options "${configure_options};-DBUILD_SDK=ON")
Set(configure_options "${configure_options};-DBUILD_SDK_COMMANDS=ON")
Set(EXTRA_FLAGS $ENV{EXTRA_FLAGS})
If(EXTRA_FLAGS)
Set(configure_options "${configure_options};${EXTRA_FLAGS}")
EndIf()
If($ENV{ctest_model} MATCHES Profile)
Find_Program(GCOV_COMMAND gcov)
If(GCOV_COMMAND)
Message("Found GCOV: ${GCOV_COMMAND}")
Set(CTEST_COVERAGE_COMMAND ${GCOV_COMMAND})
set(CTEST_COVERAGE_EXTRA_FLAGS "-p")
EndIf(GCOV_COMMAND)
EndIf()
If($ENV{ctest_model} MATCHES Nightly OR $ENV{ctest_model} MATCHES Profile)
Ctest_Empty_Binary_Directory(${CTEST_BINARY_DIRECTORY})
EndIf()
Ctest_Start($ENV{ctest_model})
Ctest_Configure(BUILD "${CTEST_BINARY_DIRECTORY}"
OPTIONS "${configure_options}"
)
Ctest_Build(BUILD "${CTEST_BINARY_DIRECTORY}")
unset(exclude_tests)
if($ENV{EXCLUDE_UNSTABLE_DDS_TESTS})
set(exclude_tests EXCLUDE ".*\\.localhost$")
if(NOT NCPUS)
if(ENV{SLURM_CPUS_PER_TASK})
set(NCPUS $ENV{SLURM_CPUS_PER_TASK})
else()
include(ProcessorCount)
ProcessorCount(NCPUS)
if(NCPUS EQUAL 0)
set(NCPUS 1)
endif()
endif()
endif()
Ctest_Test(BUILD "${CTEST_BINARY_DIRECTORY}"
# PARALLEL_LEVEL $ENV{number_of_processors}
${exclude_tests}
PARALLEL_LEVEL $ENV{number_of_processors}
RETURN_VALUE _ctest_test_ret_val
)
If(GCOV_COMMAND)
Ctest_Coverage(BUILD "${CTEST_BINARY_DIRECTORY}" LABELS coverage)
EndIf()
if ("$ENV{CTEST_SITE}" STREQUAL "")
set(CTEST_SITE "${fqdn}")
else()
set(CTEST_SITE $ENV{CTEST_SITE})
endif()
If("$ENV{do_codecov_upload}")
Execute_Process(COMMAND curl https://codecov.io/bash -o codecov_uploader.sh
WORKING_DIRECTORY ${CTEST_BINARY_DIRECTORY}
TIMEOUT 60)
Execute_Process(COMMAND bash ./codecov_uploader.sh -X gcov
WORKING_DIRECTORY ${CTEST_BINARY_DIRECTORY}
TIMEOUT 60)
EndIf()
if ("$ENV{LABEL}" STREQUAL "")
set(CTEST_BUILD_NAME "build")
else()
set(CTEST_BUILD_NAME $ENV{LABEL})
endif()
Ctest_Submit()
ctest_start(Continuous)
if (_ctest_test_ret_val)
list(APPEND options
"-DDISABLE_COLOR=ON"
"-DBUILD_SDK_COMMANDS=ON"
"-DBUILD_SDK=ON"
"-DBUILD_DDS_PLUGIN=ON"
)
list(JOIN options ";" optionsstr)
ctest_configure(OPTIONS "${optionsstr}")
ctest_build(FLAGS "-j${NCPUS}")
ctest_test(BUILD "${CTEST_BINARY_DIRECTORY}"
PARALLEL_LEVEL 1
SCHEDULE_RANDOM ON
RETURN_VALUE _ctest_test_ret_val)
ctest_submit()
if(_ctest_test_ret_val)
Message(FATAL_ERROR "Some tests failed.")
endif()

100
Jenkinsfile vendored
View File

@@ -1,67 +1,50 @@
#!groovy
def specToLabel(Map spec) {
return "${spec.os}-${spec.arch}-${spec.compiler}-FairSoft_${spec.fairsoft}"
}
def jobMatrix(String prefix, List specs, Closure callback) {
def jobMatrix(String prefix, String type, List specs) {
def nodes = [:]
for (spec in specs) {
def label = specToLabel(spec)
def node_tag = label
if (spec.os =~ /macOS/) {
node_tag = spec.os
}
def fairsoft = spec.fairsoft
job = "${spec.os}-${spec.ver}-${spec.arch}-${spec.compiler}"
def label = "${type}/${job}"
def selector = "${spec.os}-${spec.ver}-${spec.arch}"
def os = spec.os
def compiler = spec.compiler
nodes["${prefix}/${label}"] = {
node(node_tag) {
def ver = spec.ver
def check = spec.check
nodes[label] = {
node(selector) {
githubNotify(context: "${prefix}/${label}", description: 'Building ...', status: 'PENDING')
try {
deleteDir()
checkout scm
sh """\
echo "export SIMPATH=\${SIMPATH_PREFIX}${fairsoft}" >> Dart.cfg
echo "export FAIRSOFT_VERSION=${fairsoft}" >> Dart.cfg
"""
if (os =~ /Debian/ && compiler =~ /gcc9/) {
sh '''\
echo "source /etc/profile.d/modules.sh" >> Dart.cfg
echo "module use /cvmfs/it.gsi.de/modulefiles" >> Dart.cfg
echo "module load compiler/gcc/9.1.0" >> Dart.cfg
'''
}
if (os =~ /[Mm]acOS/) {
sh "echo \"export EXTRA_FLAGS='-DCMAKE_CXX_COMPILER=clang++'\" >> Dart.cfg"
def jobscript = 'job.sh'
def ctestcmd = "ctest -S FairMQTest.cmake -V --output-on-failure"
sh "echo \"set -e\" >> ${jobscript}"
sh "echo \"export LABEL=\\\"\${JOB_BASE_NAME} ${label}\\\"\" >> ${jobscript}"
if (selector =~ /^macos/) {
sh """\
echo \"export DDS_ROOT=\\\"\\\$(brew --prefix dds)\\\"\" >> ${jobscript}
echo \"${ctestcmd}\" >> ${jobscript}
"""
sh "cat ${jobscript}"
sh "bash ${jobscript}"
} else {
sh "echo \"export EXTRA_FLAGS='-DCMAKE_CXX_COMPILER=g++'\" >> Dart.cfg"
def containercmd = "singularity exec -B/shared ${env.SINGULARITY_CONTAINER_ROOT}/fairmq/${os}.${ver}.sif bash -l -c \\\"${ctestcmd}\\\""
sh """\
echo \"echo \\\"*** Job started at .......: \\\$(date -R)\\\"\" >> ${jobscript}
echo \"echo \\\"*** Job ID ...............: \\\${SLURM_JOB_ID}\\\"\" >> ${jobscript}
echo \"echo \\\"*** Compute node .........: \\\$(hostname -f)\\\"\" >> ${jobscript}
echo \"unset http_proxy\" >> ${jobscript}
echo \"unset HTTP_PROXY\" >> ${jobscript}
echo \"${containercmd}\" >> ${jobscript}
"""
sh "cat ${jobscript}"
sh "test/ci/slurm-submit.sh \"FairMQ \${JOB_BASE_NAME} ${label}\" ${jobscript}"
}
sh '''\
echo "export BUILDDIR=$PWD/build" >> Dart.cfg
echo "export SOURCEDIR=$PWD" >> Dart.cfg
echo "export PATH=\\\$SIMPATH/bin:\\\$PATH" >> Dart.cfg
echo "export GIT_BRANCH=$JOB_BASE_NAME" >> Dart.cfg
echo "echo \\\$PATH" >> Dart.cfg
'''
if (os =~ /macOS10.14/) {
sh "echo \"export EXCLUDE_UNSTABLE_DDS_TESTS=1\" >> Dart.cfg"
}
sh 'cat Dart.cfg'
callback.call(spec, label)
deleteDir()
githubNotify(context: "${prefix}/${label}", description: 'Success', status: 'SUCCESS')
} catch (e) {
def tarball = "${prefix}_${label}_dds_logs.tar.gz"
sh "tar czvf ${tarball} -C \${WORKSPACE}/build/test/ .DDS/"
archiveArtifacts tarball
deleteDir()
githubNotify(context: "${prefix}/${label}", description: 'Error', status: 'ERROR')
throw e
@@ -78,23 +61,12 @@ pipeline{
stage("Run CI Matrix") {
steps{
script {
def build_jobs = jobMatrix('build', [
[os: 'Debian8', arch: 'x86_64', compiler: 'gcc9.1.0', fairsoft: 'fairmq_dev'],
[os: 'macOS10.14', arch: 'x86_64', compiler: 'AppleClang11.0', fairsoft: 'fairmq_dev'],
[os: 'macOS10.15', arch: 'x86_64', compiler: 'AppleClang12.0', fairsoft: 'fairmq_dev'],
]) { spec, label ->
sh './Dart.sh alfa_ci Dart.cfg'
}
def builds = jobMatrix('alfa-ci', 'build', [
[os: 'fedora', ver: '32', arch: 'x86_64', compiler: 'gcc-10'],
[os: 'macos', ver: '11', arch: 'x86_64', compiler: 'apple-clang-12'],
])
def profile_jobs = jobMatrix('codecov', [
[os: 'Debian8', arch: 'x86_64', compiler: 'gcc9.1.0', fairsoft: 'fairmq_dev'],
]) { spec, label ->
withCredentials([string(credentialsId: 'fairmq_codecov_token', variable: 'CODECOV_TOKEN')]) {
sh './Dart.sh codecov Dart.cfg'
}
}
parallel(build_jobs + profile_jobs)
parallel(builds)
}
}
}

View File

@@ -1,81 +0,0 @@
#!groovy
def specToLabel(Map spec) {
return "${spec.os}-${spec.arch}-${spec.compiler}-FairSoft_${spec.fairsoft}"
}
def buildMatrix(List specs, Closure callback) {
def nodes = [:]
for (spec in specs) {
def label = specToLabel(spec)
def fairsoft = spec.fairsoft
def os = spec.os
def compiler = spec.compiler
nodes[label] = {
node(label) {
try {
deleteDir()
checkout scm
sh """\
echo "export SIMPATH=\${SIMPATH_PREFIX}${fairsoft}" >> Dart.cfg
echo "export FAIRSOFT_VERSION=${fairsoft}" >> Dart.cfg
"""
if (os =~ /Debian/ && compiler =~ /gcc9/) {
sh '''\
echo "source /etc/profile.d/modules.sh" >> Dart.cfg
echo "module use /cvmfs/it.gsi.de/modulefiles" >> Dart.cfg
echo "module load compiler/gcc/9.1.0" >> Dart.cfg
'''
}
if (os =~ /MacOS/) {
sh "echo \"export EXTRA_FLAGS='-DCMAKE_CXX_COMPILER=clang++'\" >> Dart.cfg"
} else {
sh "echo \"export EXTRA_FLAGS='-DCMAKE_CXX_COMPILER=g++'\" >> Dart.cfg"
}
sh '''\
echo "export BUILDDIR=$PWD/build" >> Dart.cfg
echo "export SOURCEDIR=$PWD" >> Dart.cfg
echo "export PATH=\\\$SIMPATH/bin:\\\$PATH" >> Dart.cfg
echo "export GIT_BRANCH=dev" >> Dart.cfg
echo "echo \\\$PATH" >> Dart.cfg
'''
sh 'cat Dart.cfg'
callback.call(spec, label)
deleteDir()
} catch (e) {
def tarball = "${label}_dds_logs.tar.gz"
sh "tar czvf ${tarball} -C \${WORKSPACE}/build/test/ .DDS/"
archiveArtifacts tarball
deleteDir()
throw e
}
}
}
}
return nodes
}
pipeline{
agent none
triggers { cron('H 2 * * *') }
stages {
stage("Run Nightly Build/Test Matrix") {
steps{
script {
parallel(buildMatrix([
[os: 'Debian8', arch: 'x86_64', compiler: 'gcc9.1.0', fairsoft: 'fairmq_dev'],
[os: 'MacOS10.13', arch: 'x86_64', compiler: 'AppleLLVM10.0.0', fairsoft: 'fairmq_dev'],
[os: 'MacOS10.14', arch: 'x86_64', compiler: 'AppleLLVM10.0.0', fairsoft: 'fairmq_dev'],
]) { spec, label ->
sh './Dart.sh Nightly Dart.cfg'
sh './Dart.sh Profile Dart.cfg'
})
}
}
}
}
}

View File

@@ -7,10 +7,10 @@
################################################################################
add_library(Example11Lib STATIC
"Sampler.cxx"
"Sampler.h"
"Sink.cxx"
"Sink.h"
"Sampler.cxx"
"Sampler.h"
"Sink.cxx"
"Sink.h"
)
target_link_libraries(Example11Lib PUBLIC FairMQ)
@@ -40,12 +40,12 @@ set_tests_properties(Example.1-1.shmem PROPERTIES TIMEOUT "30" RUN_SERIAL true P
# install
install(
TARGETS
fairmq-ex-1-1-sampler
fairmq-ex-1-1-sink
TARGETS
fairmq-ex-1-1-sampler
fairmq-ex-1-1-sink
LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR}
RUNTIME DESTINATION ${PROJECT_INSTALL_BINDIR}
LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR}
RUNTIME DESTINATION ${PROJECT_INSTALL_BINDIR}
)
# configure run script with different executable paths for build and for install directories
@@ -54,7 +54,7 @@ set(FAIRMQ_BIN_DIR ${CMAKE_INSTALL_PREFIX}/${PROJECT_INSTALL_BINDIR}/fairmq)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-1-1.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-1-1.sh_install)
install(
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-1-1.sh_install
DESTINATION ${PROJECT_INSTALL_BINDIR}
RENAME fairmq-start-ex-1-1.sh
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-1-1.sh_install
DESTINATION ${PROJECT_INSTALL_BINDIR}
RENAME fairmq-start-ex-1-1.sh
)

View File

@@ -7,12 +7,12 @@
################################################################################
add_library(Example1N1Lib STATIC
"Sampler.cxx"
"Sampler.h"
"Processor.cxx"
"Processor.h"
"Sink.cxx"
"Sink.h"
"Sampler.cxx"
"Sampler.h"
"Processor.cxx"
"Processor.h"
"Sink.cxx"
"Sink.h"
)
target_link_libraries(Example1N1Lib PUBLIC FairMQ)
@@ -48,13 +48,13 @@ set_tests_properties(Example.1-n-1.shmem PROPERTIES TIMEOUT "30" RUN_SERIAL true
# install
install(
TARGETS
fairmq-ex-1-n-1-sampler
fairmq-ex-1-n-1-processor
fairmq-ex-1-n-1-sink
TARGETS
fairmq-ex-1-n-1-sampler
fairmq-ex-1-n-1-processor
fairmq-ex-1-n-1-sink
LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR}
RUNTIME DESTINATION ${PROJECT_INSTALL_BINDIR}
LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR}
RUNTIME DESTINATION ${PROJECT_INSTALL_BINDIR}
)
# configure run script with different executable paths for build and for install directories
@@ -64,12 +64,12 @@ set(FAIRMQ_BIN_DIR ${CMAKE_INSTALL_PREFIX}/${PROJECT_INSTALL_BINDIR}/fairmq)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-1-n-1.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-1-n-1.sh_install)
install(
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-1-n-1.sh_install
DESTINATION ${PROJECT_INSTALL_BINDIR}
RENAME fairmq-start-ex-1-n-1.sh
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-1-n-1.sh_install
DESTINATION ${PROJECT_INSTALL_BINDIR}
RENAME fairmq-start-ex-1-n-1.sh
)
install(
FILES ${CMAKE_CURRENT_BINARY_DIR}/ex-1-n-1.json
DESTINATION ${PROJECT_INSTALL_DATADIR}
FILES ${CMAKE_CURRENT_BINARY_DIR}/ex-1-n-1.json
DESTINATION ${PROJECT_INSTALL_DATADIR}
)

View File

@@ -34,7 +34,7 @@ set(FAIRMQ_BIN_DIR ${CMAKE_INSTALL_PREFIX}/${PROJECT_INSTALL_BINDIR}/fairmq)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-builtin-devices.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-builtin-devices.sh_install)
install(
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-builtin-devices.sh_install
DESTINATION ${PROJECT_INSTALL_BINDIR}
RENAME fairmq-start-ex-builtin-devices.sh
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-builtin-devices.sh_install
DESTINATION ${PROJECT_INSTALL_BINDIR}
RENAME fairmq-start-ex-builtin-devices.sh
)

View File

@@ -7,10 +7,10 @@
################################################################################
add_library(ExampleCopyPushLib STATIC
"Sampler.cxx"
"Sampler.h"
"Sink.cxx"
"Sink.h"
"Sampler.cxx"
"Sampler.h"
"Sink.cxx"
"Sink.h"
)
target_link_libraries(ExampleCopyPushLib PUBLIC FairMQ)
@@ -41,12 +41,12 @@ set_tests_properties(Example.CopyPush.shmem PROPERTIES TIMEOUT "30" RUN_SERIAL t
# install
install(
TARGETS
fairmq-ex-copypush-sampler
fairmq-ex-copypush-sink
TARGETS
fairmq-ex-copypush-sampler
fairmq-ex-copypush-sink
LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR}
RUNTIME DESTINATION ${PROJECT_INSTALL_BINDIR}
LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR}
RUNTIME DESTINATION ${PROJECT_INSTALL_BINDIR}
)
# configure run script with different executable paths for build and for install directories
@@ -55,7 +55,7 @@ set(FAIRMQ_BIN_DIR ${CMAKE_INSTALL_PREFIX}/${PROJECT_INSTALL_BINDIR}/fairmq)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-copypush.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-copypush.sh_install)
install(
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-copypush.sh_install
DESTINATION ${PROJECT_INSTALL_BINDIR}
RENAME fairmq-start-ex-copypush.sh
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-copypush.sh_install
DESTINATION ${PROJECT_INSTALL_BINDIR}
RENAME fairmq-start-ex-copypush.sh
)

View File

@@ -7,10 +7,10 @@
################################################################################
add_library(ExampleMultipartLib STATIC
"Sampler.cxx"
"Sampler.h"
"Sink.cxx"
"Sink.h"
"Sampler.cxx"
"Sampler.h"
"Sink.cxx"
"Sink.h"
)
target_link_libraries(ExampleMultipartLib PUBLIC FairMQ)
@@ -45,12 +45,12 @@ endif()
# install
install(
TARGETS
fairmq-ex-multipart-sampler
fairmq-ex-multipart-sink
TARGETS
fairmq-ex-multipart-sampler
fairmq-ex-multipart-sink
LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR}
RUNTIME DESTINATION ${PROJECT_INSTALL_BINDIR}
LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR}
RUNTIME DESTINATION ${PROJECT_INSTALL_BINDIR}
)
# configure run script with different executable paths for build and for install directories
@@ -59,7 +59,7 @@ set(FAIRMQ_BIN_DIR ${CMAKE_INSTALL_PREFIX}/${PROJECT_INSTALL_BINDIR}/fairmq)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-multipart.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-multipart.sh_install)
install(
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-multipart.sh_install
DESTINATION ${PROJECT_INSTALL_BINDIR}
RENAME fairmq-start-ex-multipart.sh
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-multipart.sh_install
DESTINATION ${PROJECT_INSTALL_BINDIR}
RENAME fairmq-start-ex-multipart.sh
)

View File

@@ -7,12 +7,12 @@
################################################################################
add_library(ExampleMultipleChannelsLib STATIC
"Sampler.cxx"
"Sampler.h"
"Broadcaster.cxx"
"Broadcaster.h"
"Sink.cxx"
"Sink.h"
"Sampler.cxx"
"Sampler.h"
"Broadcaster.cxx"
"Broadcaster.h"
"Sink.cxx"
"Sink.h"
)
target_link_libraries(ExampleMultipleChannelsLib PUBLIC FairMQ)
@@ -42,13 +42,13 @@ set_tests_properties(Example.MultipleChannels.zeromq PROPERTIES TIMEOUT "30" RUN
# install
install(
TARGETS
fairmq-ex-multiple-channels-sampler
fairmq-ex-multiple-channels-broadcaster
fairmq-ex-multiple-channels-sink
TARGETS
fairmq-ex-multiple-channels-sampler
fairmq-ex-multiple-channels-broadcaster
fairmq-ex-multiple-channels-sink
LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR}
RUNTIME DESTINATION ${PROJECT_INSTALL_BINDIR}
LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR}
RUNTIME DESTINATION ${PROJECT_INSTALL_BINDIR}
)
# configure run script with different executable paths for build and for install directories
@@ -57,7 +57,7 @@ set(FAIRMQ_BIN_DIR ${CMAKE_INSTALL_PREFIX}/${PROJECT_INSTALL_BINDIR}/fairmq)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-multiple-channels.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-multiple-channels.sh_install)
install(
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-multiple-channels.sh_install
DESTINATION ${PROJECT_INSTALL_BINDIR}
RENAME fairmq-start-ex-multiple-channels.sh
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-multiple-channels.sh_install
DESTINATION ${PROJECT_INSTALL_BINDIR}
RENAME fairmq-start-ex-multiple-channels.sh
)

View File

@@ -7,12 +7,12 @@
################################################################################
add_library(ExampleMultipleTransportsLib STATIC
"Sampler1.cxx"
"Sampler1.h"
"Sampler2.cxx"
"Sampler2.h"
"Sink.cxx"
"Sink.h"
"Sampler1.cxx"
"Sampler1.h"
"Sampler2.cxx"
"Sampler2.h"
"Sink.cxx"
"Sink.h"
)
target_link_libraries(ExampleMultipleTransportsLib PUBLIC FairMQ)
@@ -41,13 +41,13 @@ set_tests_properties(Example.MultipleTransports PROPERTIES TIMEOUT "30" RUN_SERI
# install
install(
TARGETS
fairmq-ex-multiple-transports-sampler1
fairmq-ex-multiple-transports-sampler2
fairmq-ex-multiple-transports-sink
TARGETS
fairmq-ex-multiple-transports-sampler1
fairmq-ex-multiple-transports-sampler2
fairmq-ex-multiple-transports-sink
LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR}
RUNTIME DESTINATION ${PROJECT_INSTALL_BINDIR}
LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR}
RUNTIME DESTINATION ${PROJECT_INSTALL_BINDIR}
)
# configure run script with different executable paths for install directories
@@ -56,7 +56,7 @@ set(FAIRMQ_BIN_DIR ${CMAKE_INSTALL_PREFIX}/${PROJECT_INSTALL_BINDIR}/fairmq)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-multiple-transports.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-multiple-transports.sh_install)
install(
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-multiple-transports.sh_install
DESTINATION ${PROJECT_INSTALL_BINDIR}
RENAME fairmq-start-ex-multiple-transports.sh
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-multiple-transports.sh_install
DESTINATION ${PROJECT_INSTALL_BINDIR}
RENAME fairmq-start-ex-multiple-transports.sh
)

View File

@@ -29,15 +29,15 @@ configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-readout-processing.sh
# install
install(
TARGETS
fairmq-ex-readout-readout
fairmq-ex-readout-builder
fairmq-ex-readout-processor
fairmq-ex-readout-sender
fairmq-ex-readout-receiver
TARGETS
fairmq-ex-readout-readout
fairmq-ex-readout-builder
fairmq-ex-readout-processor
fairmq-ex-readout-sender
fairmq-ex-readout-receiver
LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR}
RUNTIME DESTINATION ${PROJECT_INSTALL_BINDIR}
LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR}
RUNTIME DESTINATION ${PROJECT_INSTALL_BINDIR}
)
# configure run script with different executable paths for build and for install directories
@@ -47,13 +47,13 @@ configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-readout.sh.in ${CMAKE
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-readout-processing.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-readout-processing.sh_install)
install(
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-readout.sh_install
DESTINATION ${PROJECT_INSTALL_BINDIR}
RENAME fairmq-start-ex-readout.sh
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-readout.sh_install
DESTINATION ${PROJECT_INSTALL_BINDIR}
RENAME fairmq-start-ex-readout.sh
)
install(
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-readout-processing.sh_install
DESTINATION ${PROJECT_INSTALL_BINDIR}
RENAME fairmq-start-ex-readout-processing.sh
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-readout-processing.sh_install
DESTINATION ${PROJECT_INSTALL_BINDIR}
RENAME fairmq-start-ex-readout-processing.sh
)

View File

@@ -7,10 +7,10 @@
################################################################################
add_library(ExampleRegionLib STATIC
"Sampler.cxx"
"Sampler.h"
"Sink.cxx"
"Sink.h"
"Sampler.cxx"
"Sampler.h"
"Sink.cxx"
"Sink.h"
)
target_link_libraries(ExampleRegionLib PUBLIC FairMQ)
@@ -40,12 +40,12 @@ set_tests_properties(Example.Region.shmem PROPERTIES TIMEOUT "30" RUN_SERIAL tru
# install
install(
TARGETS
fairmq-ex-region-sampler
fairmq-ex-region-sink
TARGETS
fairmq-ex-region-sampler
fairmq-ex-region-sink
LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR}
RUNTIME DESTINATION ${PROJECT_INSTALL_BINDIR}
LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR}
RUNTIME DESTINATION ${PROJECT_INSTALL_BINDIR}
)
# configure run script with different executable paths for build and for install directories
@@ -54,7 +54,7 @@ set(FAIRMQ_BIN_DIR ${CMAKE_INSTALL_PREFIX}/${PROJECT_INSTALL_BINDIR}/fairmq)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-region.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-region.sh_install)
install(
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-region.sh_install
DESTINATION ${PROJECT_INSTALL_BINDIR}
RENAME fairmq-start-ex-region.sh
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-region.sh_install
DESTINATION ${PROJECT_INSTALL_BINDIR}
RENAME fairmq-start-ex-region.sh
)

View File

@@ -7,10 +7,10 @@
################################################################################
add_library(ExampleReqRepLib STATIC
"Client.cxx"
"Client.h"
"Server.cxx"
"Server.h"
"Client.cxx"
"Client.h"
"Server.cxx"
"Server.h"
)
target_link_libraries(ExampleReqRepLib PUBLIC FairMQ)
@@ -41,12 +41,12 @@ set_tests_properties(Example.ReqRep.shmem PROPERTIES TIMEOUT "30" RUN_SERIAL tru
# install
install(
TARGETS
fairmq-ex-req-rep-client
fairmq-ex-req-rep-server
TARGETS
fairmq-ex-req-rep-client
fairmq-ex-req-rep-server
LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR}
RUNTIME DESTINATION ${PROJECT_INSTALL_BINDIR}
LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR}
RUNTIME DESTINATION ${PROJECT_INSTALL_BINDIR}
)
# configure run script with different executable paths for build and for install directories
@@ -55,7 +55,7 @@ set(FAIRMQ_BIN_DIR ${CMAKE_INSTALL_PREFIX}/${PROJECT_INSTALL_BINDIR}/fairmq)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-req-rep.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-req-rep.sh_install)
install(
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-req-rep.sh_install
DESTINATION ${PROJECT_INSTALL_BINDIR}
RENAME fairmq-start-ex-req-rep.sh
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-req-rep.sh_install
DESTINATION ${PROJECT_INSTALL_BINDIR}
RENAME fairmq-start-ex-req-rep.sh
)

View File

@@ -486,7 +486,6 @@ void FairMQDevice::RunWrapper()
}
PostRun();
} catch (const out_of_range& oor) {
LOG(error) << "out of range: " << oor.what();
LOG(error) << "incorrect/incomplete channel configuration?";
@@ -504,17 +503,12 @@ void FairMQDevice::HandleSingleChannelInput()
{
bool proceed = true;
if (fMsgInputs.size() > 0)
{
while (!NewStatePending() && proceed)
{
if (fMsgInputs.size() > 0) {
while (!NewStatePending() && proceed) {
proceed = HandleMsgInput(fInputChannelKeys.at(0), fMsgInputs.begin()->second, 0);
}
}
else if (fMultipartInputs.size() > 0)
{
while (!NewStatePending() && proceed)
{
} else if (fMultipartInputs.size() > 0) {
while (!NewStatePending() && proceed) {
proceed = HandleMultipartInput(fInputChannelKeys.at(0), fMultipartInputs.begin()->second, 0);
}
}
@@ -524,75 +518,55 @@ void FairMQDevice::HandleMultipleChannelInput()
{
// check if more than one transport is used
fMultitransportInputs.clear();
for (const auto& k : fInputChannelKeys)
{
for (const auto& k : fInputChannelKeys) {
fair::mq::Transport t = fChannels.at(k).at(0).fTransportType;
if (fMultitransportInputs.find(t) == fMultitransportInputs.end())
{
if (fMultitransportInputs.find(t) == fMultitransportInputs.end()) {
fMultitransportInputs.insert(pair<fair::mq::Transport, vector<string>>(t, vector<string>()));
fMultitransportInputs.at(t).push_back(k);
}
else
{
} else {
fMultitransportInputs.at(t).push_back(k);
}
}
for (const auto& mi : fMsgInputs)
{
for (auto& i : fChannels.at(mi.first))
{
for (const auto& mi : fMsgInputs) {
for (auto& i : fChannels.at(mi.first)) {
i.fMultipart = false;
}
}
for (const auto& mi : fMultipartInputs)
{
for (auto& i : fChannels.at(mi.first))
{
for (const auto& mi : fMultipartInputs) {
for (auto& i : fChannels.at(mi.first)) {
i.fMultipart = true;
}
}
// if more than one transport is used, handle poll of each in a separate thread
if (fMultitransportInputs.size() > 1)
{
if (fMultitransportInputs.size() > 1) {
HandleMultipleTransportInput();
}
else // otherwise poll directly
{
} else { // otherwise poll directly
bool proceed = true;
FairMQPollerPtr poller(fChannels.at(fInputChannelKeys.at(0)).at(0).fTransportFactory->CreatePoller(fChannels, fInputChannelKeys));
while (!NewStatePending() && proceed)
{
while (!NewStatePending() && proceed) {
poller->Poll(200);
// check which inputs are ready and call their data handlers if they are.
for (const auto& ch : fInputChannelKeys)
{
for (unsigned int i = 0; i < fChannels.at(ch).size(); ++i)
{
if (poller->CheckInput(ch, i))
{
if (fChannels.at(ch).at(i).fMultipart)
{
for (const auto& ch : fInputChannelKeys) {
for (unsigned int i = 0; i < fChannels.at(ch).size(); ++i) {
if (poller->CheckInput(ch, i)) {
if (fChannels.at(ch).at(i).fMultipart) {
proceed = HandleMultipartInput(ch, fMultipartInputs.at(ch), i);
}
else
{
} else {
proceed = HandleMsgInput(ch, fMsgInputs.at(ch), i);
}
if (!proceed)
{
if (!proceed) {
break;
}
}
}
if (!proceed)
{
if (!proceed) {
break;
}
}
@@ -606,64 +580,49 @@ void FairMQDevice::HandleMultipleTransportInput()
fMultitransportProceed = true;
for (const auto& i : fMultitransportInputs)
{
for (const auto& i : fMultitransportInputs) {
threads.emplace_back(thread(&FairMQDevice::PollForTransport, this, fTransports.at(i.first).get(), i.second));
}
for (thread& t : threads)
{
for (thread& t : threads) {
t.join();
}
}
void FairMQDevice::PollForTransport(const FairMQTransportFactory* factory, const vector<string>& channelKeys)
{
try
{
try {
FairMQPollerPtr poller(factory->CreatePoller(fChannels, channelKeys));
while (!NewStatePending() && fMultitransportProceed)
{
while (!NewStatePending() && fMultitransportProceed) {
poller->Poll(500);
for (const auto& ch : channelKeys)
{
for (unsigned int i = 0; i < fChannels.at(ch).size(); ++i)
{
if (poller->CheckInput(ch, i))
{
for (const auto& ch : channelKeys) {
for (unsigned int i = 0; i < fChannels.at(ch).size(); ++i) {
if (poller->CheckInput(ch, i)) {
lock_guard<mutex> lock(fMultitransportMutex);
if (!fMultitransportProceed)
{
if (!fMultitransportProceed) {
break;
}
if (fChannels.at(ch).at(i).fMultipart)
{
if (fChannels.at(ch).at(i).fMultipart) {
fMultitransportProceed = HandleMultipartInput(ch, fMultipartInputs.at(ch), i);
}
else
{
} else {
fMultitransportProceed = HandleMsgInput(ch, fMsgInputs.at(ch), i);
}
if (!fMultitransportProceed)
{
if (!fMultitransportProceed) {
break;
}
}
}
if (!fMultitransportProceed)
{
if (!fMultitransportProceed) {
break;
}
}
}
}
catch (exception& e)
{
} catch (exception& e) {
LOG(error) << "FairMQDevice::PollForTransport() failed: " << e.what() << ", going to ERROR state.";
throw runtime_error(tools::ToString("FairMQDevice::PollForTransport() failed: ", e.what(), ", going to ERROR state."));
}
@@ -673,12 +632,9 @@ bool FairMQDevice::HandleMsgInput(const string& chName, const InputMsgCallback&
{
unique_ptr<FairMQMessage> input(fChannels.at(chName).at(i).fTransportFactory->CreateMessage());
if (Receive(input, chName, i) >= 0)
{
if (Receive(input, chName, i) >= 0) {
return callback(input, i);
}
else
{
} else {
return false;
}
}
@@ -687,12 +643,9 @@ bool FairMQDevice::HandleMultipartInput(const string& chName, const InputMultipa
{
FairMQParts input;
if (Receive(input, chName, i) >= 0)
{
return callback(input, 0);
}
else
{
if (Receive(input, chName, i) >= 0) {
return callback(input, i);
} else {
return false;
}
}

View File

@@ -421,28 +421,64 @@ class FairMQDevice
virtual void Reset() {}
public:
/// @brief Request a device state transition
/// @param transition state transition
///
/// The state transition may not happen immediately, but when the current state evaluates the
/// pending transition event and terminates. In other words, the device states are scheduled cooperatively.
bool ChangeState(const fair::mq::Transition transition) { return fStateMachine.ChangeState(transition); }
/// @brief Request a device state transition
/// @param transition state transition
///
/// The state transition may not happen immediately, but when the current state evaluates the
/// pending transition event and terminates. In other words, the device states are scheduled cooperatively.
bool ChangeState(const std::string& transition) { return fStateMachine.ChangeState(fair::mq::GetTransition(transition)); }
/// @brief waits for the next state (any) to occur
fair::mq::State WaitForNextState() { return fStateQueue.WaitForNext(); }
/// @brief waits for the specified state to occur
/// @param state state to wait for
void WaitForState(fair::mq::State state) { fStateQueue.WaitForState(state); }
/// @brief waits for the specified state to occur
/// @param state state to wait for
void WaitForState(const std::string& state) { WaitForState(fair::mq::GetState(state)); }
void TransitionTo(const fair::mq::State state);
/// @brief Subscribe with a callback to state changes
/// @param key id to identify your subscription
/// @param callback callback (called with the new state as the parameter)
///
/// The callback is called at the beginning of a new state.
/// The callback is called from the thread the state is running in.
void SubscribeToStateChange(const std::string& key, std::function<void(const fair::mq::State)> callback) { fStateMachine.SubscribeToStateChange(key, callback); }
/// @brief Unsubscribe from state changes
/// @param key id (that was used when subscribing)
void UnsubscribeFromStateChange(const std::string& key) { fStateMachine.UnsubscribeFromStateChange(key); }
/// @brief Subscribe with a callback to incoming state transitions
/// @param key id to identify your subscription
/// @param callback callback (called with the incoming transition as the parameter)
/// The callback is called when new transition is initiated.
/// The callback is called from the thread that initiates the transition (via ChangeState).
void SubscribeToNewTransition(const std::string& key, std::function<void(const fair::mq::Transition)> callback) { fStateMachine.SubscribeToNewTransition(key, callback); }
/// @brief Unsubscribe from state transitions
/// @param key id (that was used when subscribing)
void UnsubscribeFromNewTransition(const std::string& key) { fStateMachine.UnsubscribeFromNewTransition(key); }
/// Returns true is a new state has been requested, signaling the current handler to stop.
/// @brief Returns true if a new state has been requested, signaling the current handler to stop.
bool NewStatePending() const { return fStateMachine.NewStatePending(); }
/// @brief Returns the current state
fair::mq::State GetCurrentState() const { return fStateMachine.GetCurrentState(); }
/// @brief Returns the name of the current state as a string
std::string GetCurrentStateName() const { return fStateMachine.GetCurrentStateName(); }
/// @brief Returns name of the given state as a string
/// @param state state
static std::string GetStateName(const fair::mq::State state) { return fair::mq::GetStateName(state); }
/// @brief Returns name of the given transition as a string
/// @param transition transition
static std::string GetTransitionName(const fair::mq::Transition transition) { return fair::mq::GetTransitionName(transition); }
static constexpr const char* DefaultId = "";

View File

@@ -17,8 +17,9 @@
#include <fairlogger/Logger.h>
#include <boost/property_tree/ptree.hpp>
#include <utility> // make_pair
#include <string_view>
#include <utility> // make_pair
#include <cstring>
using boost::property_tree::ptree;
using namespace std;
@@ -83,6 +84,14 @@ Properties SuboptParser(const vector<string>& channelConfig, const string& devic
string argString(token);
char* subopts = &argString[0];
char* value = nullptr;
// Find either a : or a =. If we find the former first, we consider what is before it
// the channel name
char* firstSep = strpbrk(subopts, ":=");
if (firstSep && *firstSep == ':') {
channelName = std::string_view(subopts, firstSep - subopts);
channelProperties.put("name", channelName);
subopts = firstSep + 1;
}
while (subopts && *subopts != 0 && *subopts != ' ') {
char* cur = subopts;
int subopt = getsubopt(&subopts, (char**)channelOptionKeys, &value);

View File

@@ -71,6 +71,7 @@ Plugin::ProgOptions ConfigPluginProgramOptions()
("shm-zero-segment", po::value<bool >()->default_value(false), "Shared memory: zero the shared memory segment memory after initialization.")
("shm-throw-bad-alloc", po::value<bool >()->default_value(true), "Throw a fair::mq::MessageBadAlloc if cannot allocate a message (retry if false).")
("shm-monitor", po::value<bool >()->default_value(true), "Shared memory: run monitor daemon.")
("shm-no-cleanup", po::value<bool >()->default_value(false), "Shared memory: do not cleanup the memory when last device leaves.")
("ofi-size-hint", po::value<size_t >()->default_value(0), "EXPERIMENTAL: OFI size hint for the allocator.")
("rate", po::value<float >()->default_value(0.), "Rate for conditional run loop (Hz).")
("session", po::value<string >()->default_value("default"), "Session name.")

View File

@@ -11,6 +11,7 @@
#include <picosha2.h>
#include <atomic>
#include <sstream>
#include <string>
#include <functional> // std::equal_to
@@ -103,6 +104,15 @@ struct DeviceCounter
std::atomic<unsigned int> fCount;
};
struct EventCounter
{
EventCounter(uint64_t c)
: fCount(c)
{}
std::atomic<uint64_t> fCount;
};
struct RegionCounter
{
RegionCounter(uint16_t c)
@@ -185,15 +195,25 @@ struct RegionBlock
// find id for unique shmem name:
// a hash of user id + session id, truncated to 8 characters (to accommodate for name size limit on some systems (MacOS)).
inline std::string buildShmIdFromSessionIdAndUserId(const std::string& sessionId)
inline std::string makeShmIdStr(const std::string& sessionId)
{
std::string seed((std::to_string(geteuid()) + sessionId));
// generate a 8-digit hex value out of sha256 hash
std::vector<unsigned char> hash(4);
picosha2::hash256(seed.begin(), seed.end(), hash.begin(), hash.end());
std::string shmId = picosha2::bytes_to_hex_string(hash.begin(), hash.end());
return shmId;
return picosha2::bytes_to_hex_string(hash.begin(), hash.end());
}
inline uint64_t makeShmIdUint64(const std::string& sessionId)
{
std::string shmId = makeShmIdStr(sessionId);
uint64_t id = 0;
std::stringstream ss;
ss << std::hex << shmId;
ss >> id;
return id;
}
struct SegmentSize : public boost::static_visitor<size_t>

View File

@@ -64,7 +64,9 @@ class Manager
, fShmMtx(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_mtx").c_str())
, fRegionEventsCV(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_cv").c_str())
, fRegionEventsSubscriptionActive(false)
, fNumObservedEvents(0)
, fDeviceCounter(nullptr)
, fEventCounter(nullptr)
, fShmSegments(nullptr)
, fShmRegions(nullptr)
, fInterrupted(false)
@@ -76,6 +78,7 @@ class Manager
, fHeartbeatThread()
, fSendHeartbeats(true)
, fThrowOnBadAlloc(config ? config->GetProperty<bool>("shm-throw-bad-alloc", true) : true)
, fNoCleanup(config ? config->GetProperty<bool>("shm-no-cleanup", false) : false)
{
using namespace boost::interprocess;
@@ -102,6 +105,16 @@ class Manager
fShmSegments = fManagementSegment.find_or_construct<Uint16SegmentInfoHashMap>(unique_instance)(fShmVoidAlloc);
fEventCounter = fManagementSegment.find<EventCounter>(unique_instance).first;
if (fEventCounter) {
LOG(debug) << "event counter found: " << fEventCounter->fCount;
} else {
LOG(debug) << "no event counter found, creating one and initializing with 0";
fEventCounter = fManagementSegment.construct<EventCounter>(unique_instance)(0);
LOG(debug) << "initialized event counter with: " << fEventCounter->fCount;
}
try {
auto it = fShmSegments->find(fSegmentId);
if (it == fShmSegments->end()) {
@@ -114,6 +127,7 @@ class Manager
fShmSegments->emplace(fSegmentId, AllocationAlgorithm::simple_seq_fit);
}
ss << "Created ";
(fEventCounter->fCount)++;
} else {
// found segment with the given id, opening
if (it->second.fAllocationAlgorithm == AllocationAlgorithm::rbtree_best_fit) {
@@ -137,7 +151,8 @@ class Manager
<< " Allocation algorithm: " << allocationAlgorithm;
LOG(debug) << ss.str();
} catch(interprocess_exception& bie) {
LOG(error) << "something went wrong: " << bie.what();
LOG(error) << "Failed to create/open shared memory segment (" << "fmq_" << fShmId << "_m_" << fSegmentId << "): " << bie.what();
throw std::runtime_error(tools::ToString("Failed to create/open shared memory segment (", "fmq_", fShmId, "_m_", fSegmentId, "): ", bie.what()));
}
if (mlockSegment) {
@@ -276,6 +291,8 @@ class Manager
r.first->second->StartReceivingAcks();
result.first = &(r.first->second->fRegion);
result.second = id;
(fEventCounter->fCount)++;
}
fRegionEventsCV.notify_all();
@@ -327,6 +344,7 @@ class Manager
{
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
fShmRegions->at(id).fDestroyed = true;
(fEventCounter->fCount)++;
}
fRegionEventsCV.notify_all();
}
@@ -415,14 +433,16 @@ class Manager
while (fRegionEventsSubscriptionActive) {
auto infos = GetRegionInfoUnsafe();
for (const auto& i : infos) {
auto el = fObservedRegionEvents.find(i.id);
auto el = fObservedRegionEvents.find({i.id, i.managed});
if (el == fObservedRegionEvents.end()) {
fRegionEventCallback(i);
fObservedRegionEvents.emplace(i.id, i.event);
fObservedRegionEvents.emplace(std::make_pair(i.id, i.managed), i.event);
++fNumObservedEvents;
} else {
if (el->second == RegionEvent::created && i.event == RegionEvent::destroyed) {
fRegionEventCallback(i);
el->second = i.event;
++fNumObservedEvents;
} else {
// LOG(debug) << "ignoring event for id" << i.id << ":";
// LOG(debug) << "incoming event: " << i.event;
@@ -430,7 +450,7 @@ class Manager
}
}
}
fRegionEventsCV.wait(lock);
fRegionEventsCV.wait(lock, [&] { return !fRegionEventsSubscriptionActive || fNumObservedEvents != fEventCounter->fCount; });
}
}
@@ -590,7 +610,7 @@ class Manager
(fDeviceCounter->fCount)--;
if (fDeviceCounter->fCount == 0) {
LOG(debug) << "Last segment user, removing segment.";
LOG(debug) << "Last segment user, " << (fNoCleanup ? "skipping removal (--shm-no-cleanup is true)." : "removing segment.");
lastRemoved = true;
} else {
LOG(debug) << "Other segment users present (" << fDeviceCounter->fCount << "), skipping removal.";
@@ -599,7 +619,7 @@ class Manager
LOG(error) << "Manager could not acquire lock: " << e.what();
}
if (lastRemoved) {
if (lastRemoved && !fNoCleanup) {
Monitor::Cleanup(ShmId{fShmId});
}
}
@@ -617,9 +637,11 @@ class Manager
std::thread fRegionEventThread;
bool fRegionEventsSubscriptionActive;
std::function<void(fair::mq::RegionInfo)> fRegionEventCallback;
std::unordered_map<uint16_t, RegionEvent> fObservedRegionEvents;
std::map<std::pair<uint16_t, bool>, RegionEvent> fObservedRegionEvents;
uint64_t fNumObservedEvents;
DeviceCounter* fDeviceCounter;
EventCounter* fEventCounter;
Uint16SegmentInfoHashMap* fShmSegments;
Uint16RegionInfoHashMap* fShmRegions;
std::unordered_map<uint16_t, std::unique_ptr<Region>> fRegions;
@@ -637,6 +659,7 @@ class Manager
std::condition_variable fHeartbeatsCV;
bool fThrowOnBadAlloc;
bool fNoCleanup;
};
} // namespace fair::mq::shmem

View File

@@ -211,11 +211,31 @@ class Message final : public fair::mq::Message
return true;
} else if (newSize <= fMeta.fSize) {
try {
fLocalPtr = fManager.ShrinkInPlace(newSize, fLocalPtr, fMeta.fSegmentId);
fMeta.fSize = newSize;
return true;
try {
fLocalPtr = fManager.ShrinkInPlace(newSize, fLocalPtr, fMeta.fSegmentId);
fMeta.fSize = newSize;
return true;
} catch (boost::interprocess::bad_alloc& e) {
// if shrinking fails (can happen due to boost alignment requirements):
// unused size >= 1000000 bytes: reallocate fully
// unused size < 1000000 bytes: simply reset the size and keep the rest of the buffer until message destruction
if (fMeta.fSize - newSize >= 1000000) {
char* newPtr = fManager.Allocate(newSize, fAlignment);
if (newPtr) {
std::memcpy(newPtr, fLocalPtr, newSize);
fManager.Deallocate(fMeta.fHandle, fMeta.fSegmentId);
fLocalPtr = newPtr;
fMeta.fHandle = fManager.GetHandleFromAddress(fLocalPtr, fMeta.fSegmentId);
} else {
LOG(debug) << "could not set used size: " << e.what();
return false;
}
}
fMeta.fSize = newSize;
return true;
}
} catch (boost::interprocess::interprocess_exception& e) {
LOG(info) << "could not set used size: " << e.what();
LOG(debug) << "could not set used size: " << e.what();
return false;
}
} else {
@@ -257,7 +277,7 @@ class Message final : public fair::mq::Message
Manager& fManager;
bool fQueued;
MetaHeader fMeta;
size_t fAlignment; // TODO: put this to debug mode
size_t fAlignment;
mutable Region* fRegionPtr;
mutable char* fLocalPtr;

View File

@@ -393,7 +393,7 @@ void Monitor::PrintDebugInfo(const ShmId& shmId __attribute__((unused)))
void Monitor::PrintDebugInfo(const SessionId& sessionId)
{
ShmId shmId{buildShmIdFromSessionIdAndUserId(sessionId.sessionId)};
ShmId shmId{makeShmIdStr(sessionId.sessionId)};
PrintDebugInfo(shmId);
}
@@ -429,7 +429,7 @@ unordered_map<uint16_t, std::vector<BufferDebugInfo>> Monitor::GetDebugInfo(cons
}
unordered_map<uint16_t, std::vector<BufferDebugInfo>> Monitor::GetDebugInfo(const SessionId& sessionId)
{
ShmId shmId{buildShmIdFromSessionIdAndUserId(sessionId.sessionId)};
ShmId shmId{makeShmIdStr(sessionId.sessionId)};
return GetDebugInfo(shmId);
}
@@ -536,7 +536,7 @@ std::vector<std::pair<std::string, bool>> Monitor::Cleanup(const ShmId& shmId, b
std::vector<std::pair<std::string, bool>> Monitor::Cleanup(const SessionId& sessionId, bool verbose /* = true */)
{
ShmId shmId{buildShmIdFromSessionIdAndUserId(sessionId.sessionId)};
ShmId shmId{makeShmIdStr(sessionId.sessionId)};
if (verbose) {
cout << "Cleanup called with session id '" << sessionId.sessionId << "', translating to shared memory id '" << shmId.shmId << "'" << endl;
}
@@ -553,7 +553,7 @@ std::vector<std::pair<std::string, bool>> Monitor::CleanupFull(const ShmId& shmI
std::vector<std::pair<std::string, bool>> Monitor::CleanupFull(const SessionId& sessionId, bool verbose /* = true */)
{
ShmId shmId{buildShmIdFromSessionIdAndUserId(sessionId.sessionId)};
ShmId shmId{makeShmIdStr(sessionId.sessionId)};
if (verbose) {
cout << "Cleanup called with session id '" << sessionId.sessionId << "', translating to shared memory id '" << shmId.shmId << "'" << endl;
}

View File

@@ -152,7 +152,7 @@ struct Region
}
}
LOG(debug) << "AcksSender for " << fName << " leaving " << "(blocks left to free: " << fBlocksToFree.size() << ", "
LOG(trace) << "AcksSender for " << fName << " leaving " << "(blocks left to free: " << fBlocksToFree.size() << ", "
<< " blocks left to send: " << blocksToSend << ").";
}
@@ -195,7 +195,7 @@ struct Region
}
}
LOG(debug) << "AcksReceiver for " << fName << " leaving (remaining queue size: " << fQueue->get_num_msg() << ").";
LOG(trace) << "AcksReceiver for " << fName << " leaving (remaining queue size: " << fQueue->get_num_msg() << ").";
}
void ReleaseBlock(const RegionBlock& block)

View File

@@ -28,6 +28,7 @@
#include <memory> // unique_ptr, make_unique
#include <string>
#include <vector>
#include <stdexcept>
namespace fair::mq::shmem
{
@@ -35,10 +36,8 @@ namespace fair::mq::shmem
class TransportFactory final : public fair::mq::TransportFactory
{
public:
TransportFactory(const std::string& id = "", const ProgOptions* config = nullptr)
: fair::mq::TransportFactory(id)
, fDeviceId(id)
, fShmId()
TransportFactory(const std::string& deviceId = "", const ProgOptions* config = nullptr)
: fair::mq::TransportFactory(deviceId)
, fZmqCtx(zmq_ctx_new())
, fManager(nullptr)
{
@@ -69,8 +68,8 @@ class TransportFactory final : public fair::mq::TransportFactory
throw SharedMemoryError(tools::ToString("Provided shared memory allocation algorithm '", allocationAlgorithm, "' is not supported. Supported are 'rbtree_best_fit'/'simple_seq_fit'"));
}
fShmId = buildShmIdFromSessionIdAndUserId(sessionName);
LOG(debug) << "Generated shmid '" << fShmId << "' out of session id '" << sessionName << "'.";
std::string shmId = makeShmIdStr(sessionName);
LOG(debug) << "Generated shmid '" << shmId << "' out of session id '" << sessionName << "'.";
try {
if (zmq_ctx_set(fZmqCtx, ZMQ_IO_THREADS, numIoThreads) != 0) {
@@ -82,10 +81,13 @@ class TransportFactory final : public fair::mq::TransportFactory
LOG(error) << "failed configuring context, reason: " << zmq_strerror(errno);
}
fManager = std::make_unique<Manager>(fShmId, fDeviceId, segmentSize, config);
fManager = std::make_unique<Manager>(shmId, deviceId, segmentSize, config);
} catch (boost::interprocess::interprocess_exception& e) {
LOG(error) << "Could not initialize shared memory transport: " << e.what();
throw std::runtime_error(tools::ToString("Could not initialize shared memory transport: ", e.what()));
} catch (const std::exception& e) {
LOG(error) << "Could not initialize shared memory transport: " << e.what();
throw std::runtime_error(tools::ToString("Could not initialize shared memory transport: ", e.what()));
}
}
@@ -200,8 +202,6 @@ class TransportFactory final : public fair::mq::TransportFactory
}
private:
std::string fDeviceId;
std::string fShmId;
void* fZmqCtx;
std::unique_ptr<Manager> fManager;
};

View File

@@ -111,7 +111,7 @@ int main(int argc, char** argv)
}
if (shmId == "") {
shmId = buildShmIdFromSessionIdAndUserId(sessionName);
shmId = makeShmIdStr(sessionName);
}
if (cleanup) {

View File

@@ -217,6 +217,7 @@ add_testsuite(Properties
SOURCES
${CMAKE_CURRENT_BINARY_DIR}/runner.cxx
properties/_properties.cxx
properties/_suboptparser.cxx
LINKS FairMQ
INCLUDES ${CMAKE_CURRENT_SOURCE_DIR}

41
test/ci/slurm-submit.sh Executable file
View File

@@ -0,0 +1,41 @@
#! /bin/bash
label="$1"
jobsh="$2"
if [ -z "$ALFACI_SLURM_CPUS" ]
then
ALFACI_SLURM_CPUS=32
fi
if [ -z "$ALFACI_SLURM_EXTRA_OPTS" ]
then
ALFACI_SLURM_EXTRA_OPTS="--hint=compute_bound"
fi
if [ -z "$ALFACI_SLURM_TIMEOUT" ]
then
ALFACI_SLURM_TIMEOUT=30
fi
if [ -z "$ALFACI_SLURM_QUEUE" ]
then
ALFACI_SLURM_QUEUE=main
fi
echo "*** Slurm request options :"
echo "*** Working directory ..: $PWD"
echo "*** Queue ..............: $ALFACI_SLURM_QUEUE"
echo "*** CPUs ...............: $ALFACI_SLURM_CPUS"
echo "*** Wall Time ..........: $ALFACI_SLURM_TIMEOUT min"
echo "*** Job Name ...........: ${label}"
echo "*** Extra Options ......: ${ALFACI_SLURM_EXTRA_OPTS}"
echo "*** Submitting job at ....: $(date -R)"
(
set -x
srun -p $ALFACI_SLURM_QUEUE -c $ALFACI_SLURM_CPUS -n 1 \
-t $ALFACI_SLURM_TIMEOUT \
--job-name="${label}" \
${ALFACI_SLURM_EXTRA_OPTS} \
bash "${jobsh}"
)
retval=$?
echo "*** Exit Code ............: $retval"
exit "$retval"

View File

@@ -40,34 +40,34 @@ void RunPushPullWithMsgResize(const string& transport, const string& _address)
pull.Connect(address);
{
FairMQMessagePtr outMsg(push.NewMessage(1000));
ASSERT_EQ(outMsg->GetSize(), 1000);
memcpy(outMsg->GetData(), "ABC", 3);
ASSERT_EQ(outMsg->SetUsedSize(500), true);
ASSERT_EQ(outMsg->SetUsedSize(500), true);
ASSERT_EQ(outMsg->SetUsedSize(700), false);
ASSERT_EQ(outMsg->GetSize(), 500);
FairMQMessagePtr outMsg(push.NewMessage(6));
ASSERT_EQ(outMsg->GetSize(), 6);
memcpy(outMsg->GetData(), "ABCDEF", 6);
ASSERT_EQ(outMsg->SetUsedSize(5), true);
ASSERT_EQ(outMsg->SetUsedSize(5), true);
ASSERT_EQ(outMsg->SetUsedSize(7), false);
ASSERT_EQ(outMsg->GetSize(), 5);
// check if the data is still intact
ASSERT_EQ(static_cast<char*>(outMsg->GetData())[0], 'A');
ASSERT_EQ(static_cast<char*>(outMsg->GetData())[1], 'B');
ASSERT_EQ(static_cast<char*>(outMsg->GetData())[2], 'C');
ASSERT_EQ(outMsg->SetUsedSize(250), true);
ASSERT_EQ(outMsg->GetSize(), 250);
ASSERT_EQ(static_cast<char*>(outMsg->GetData())[3], 'D');
ASSERT_EQ(static_cast<char*>(outMsg->GetData())[4], 'E');
ASSERT_EQ(outMsg->SetUsedSize(2), true);
ASSERT_EQ(outMsg->GetSize(), 2);
ASSERT_EQ(static_cast<char*>(outMsg->GetData())[0], 'A');
ASSERT_EQ(static_cast<char*>(outMsg->GetData())[1], 'B');
ASSERT_EQ(static_cast<char*>(outMsg->GetData())[2], 'C');
FairMQMessagePtr msgCopy(push.NewMessage());
msgCopy->Copy(*outMsg);
ASSERT_EQ(msgCopy->GetSize(), 250);
ASSERT_EQ(msgCopy->GetSize(), 2);
ASSERT_EQ(push.Send(outMsg), 250);
ASSERT_EQ(push.Send(outMsg), 2);
FairMQMessagePtr inMsg(pull.NewMessage());
ASSERT_EQ(pull.Receive(inMsg), 250);
ASSERT_EQ(inMsg->GetSize(), 250);
ASSERT_EQ(pull.Receive(inMsg), 2);
ASSERT_EQ(inMsg->GetSize(), 2);
ASSERT_EQ(static_cast<char*>(inMsg->GetData())[0], 'A');
ASSERT_EQ(static_cast<char*>(inMsg->GetData())[1], 'B');
ASSERT_EQ(static_cast<char*>(inMsg->GetData())[2], 'C');
}
{

View File

@@ -0,0 +1,36 @@
/********************************************************************************
* Copyright (C) 2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <fairmq/Properties.h>
#include <fairmq/SuboptParser.h>
#include <gtest/gtest.h>
#include <string>
#include <string_view>
namespace {
using namespace std;
using namespace fair::mq;
auto contains(Properties const& parsed, string_view key, string_view value) -> bool
{
return PropertyHelper::ConvertPropertyToString(parsed.at(string(key))) == value;
}
TEST(SuboptParser, ChannelNameSelector)
{
Properties parsed(SuboptParser({"name=foo-data,address=tcp://0.0.0.0:6000,type=push",
"bar-data:address=tcp://0.0.0.0:7000,type=pull"},
"foo"));
ASSERT_TRUE(contains(parsed, "chans.foo-data.0.address", "tcp://0.0.0.0:6000"));
ASSERT_TRUE(contains(parsed, "chans.foo-data.0.type", "push"));
ASSERT_TRUE(contains(parsed, "chans.bar-data.0.address", "tcp://0.0.0.0:7000"));
ASSERT_TRUE(contains(parsed, "chans.bar-data.0.type", "pull"));
}
} // namespace

View File

@@ -50,12 +50,12 @@ void RegionEventSubscriptions(const string& transport)
ASSERT_EQ(factory->SubscribedToRegionEvents(), false);
factory->SubscribeToRegionEvents([&](FairMQRegionInfo info) {
LOG(warn) << ">>>" << info.event;
LOG(warn) << "managed: " << info.managed;
LOG(warn) << "id: " << info.id;
LOG(warn) << "ptr: " << info.ptr;
LOG(warn) << "size: " << info.size;
LOG(warn) << "flags: " << info.flags;
LOG(info) << ">>> " << info.event << ": "
<< (info.managed ? "managed" : "unmanaged")
<< ", id: " << info.id
<< ", ptr: " << info.ptr
<< ", size: " << info.size
<< ", flags: " << info.flags;
if (info.event == FairMQRegionEvent::created) {
if (info.id == id1) {
ASSERT_EQ(info.size, size1);