Compare commits

...

69 Commits

Author SHA1 Message Date
Alexey Rybalchenko
9bf908fb52 shm: revert some changes from c85d6e0 that introduced a race 2021-05-20 00:40:58 +02:00
Alexey Rybalchenko
021c1b1c4d shm: add monitor method to retrieve free segment memory 2021-05-18 14:05:12 +02:00
Alexey Rybalchenko
aaf74ad93f reduce noise in examples 2021-05-13 23:00:35 +02:00
Alexey Rybalchenko
a7dbeadd1c runDevice: remove const from getDevice parameter 2021-05-13 23:00:35 +02:00
Alexey Rybalchenko
e6f67b3658 Fix Ofi interface 2021-05-07 21:59:52 +02:00
Alexey Rybalchenko
091d0824d1 ofi: fix Events() signature 2021-05-07 21:33:15 +02:00
Alexey Rybalchenko
857aa84fa3 add mlock/zero options to unmanaged region 2021-05-07 21:33:15 +02:00
Alexey Rybalchenko
c85d6e079c shm: reduce shm contention when dealing with ack queues 2021-05-07 21:33:15 +02:00
Alexey Rybalchenko
4e466514d2 region example: fix msg counter 2021-05-07 21:33:15 +02:00
Alexey Rybalchenko
8b4056e408 Update docs 2021-05-07 21:33:15 +02:00
Alexey Rybalchenko
b67b80e0ad shmmonitor: add severity setting 2021-05-07 21:33:15 +02:00
Alexey Rybalchenko
2c89b24857 shm: eliminate race/deadlock in region subscriptions 2021-05-07 21:33:15 +02:00
Alexey Rybalchenko
c6a6a5f21b Check transport type of msg and corresponding region 2021-05-07 21:33:15 +02:00
Alexey Rybalchenko
9defa71622 Add GetType() to UnmanagedRegion 2021-05-07 21:33:15 +02:00
Alexey Rybalchenko
ed2dcedf03 Add operator<< for fair::mq::Transport 2021-05-07 21:33:15 +02:00
Alexey Rybalchenko
a3d56b9aeb configurable transport for region example script 2021-05-07 21:33:15 +02:00
Alexey Rybalchenko
8a2641d842 shm: check result of region acquisition 2021-05-07 21:33:15 +02:00
Alexey Rybalchenko
2ca62d06db shm region cache: fix multiple sessions issue 2021-05-07 14:20:00 +02:00
Alexey Rybalchenko
87e0ca5450 add region cache test 2021-05-07 14:20:00 +02:00
Gvozden Neskovic
ef5b3c782e improve message counter cache line use 2021-05-07 14:20:00 +02:00
Gvozden Neskovic
f7ba3052aa use thread local cache to avoid interprocess lock on shm GetData 2021-05-07 14:20:00 +02:00
Dennis Klein
a90dbf64de Fix -Wunused-result
Fixes #281
2021-05-07 13:18:12 +02:00
Dennis Klein
9724f184f4 Fallback to Boost.Filesystem on GCC 8 2021-05-07 13:13:16 +02:00
Dennis Klein
057ba03776 PluginManager: Do not load built-in plugins via dlopen/dlsym
fixes #351
2021-05-05 03:52:12 +02:00
Giulio Eulisse
6dfea32aee Improve Events API
If the call is interrupted by a signal, this will throw, which we clearly do not want. Simplifying the API to let the user decide what to do on error is probably the best option.
2021-05-04 22:54:19 +02:00
Dennis Klein
868fe02ee9 CI: Submit results to CDash for each build step 2021-04-08 16:22:47 +02:00
Dennis Klein
a2016a9361 CI: Add alice-centos-7 environment 2021-04-08 16:22:47 +02:00
Dennis Klein
ea9aede652 Fallback to <boost/filesystem> on GCC 7 2021-04-08 16:22:47 +02:00
Alexey Rybalchenko
77bf12c8e8 docs patch 2021-04-08 12:38:18 +02:00
Alexey Rybalchenko
f3bc9e05a8 shmmonitor: update docs 2021-04-08 10:11:23 +02:00
Alexey Rybalchenko
5facc441b8 shmmonitor: add --list-all 2021-04-08 10:11:23 +02:00
Alexey Rybalchenko
2602f53585 Add tools: StrStartsWith, StrEndsWith 2021-04-08 10:11:23 +02:00
Alexey Rybalchenko
0976465338 shm: reduce delay between monitor daemon launch & HBs 2021-04-08 10:11:23 +02:00
Alexey Rybalchenko
9144258b89 shmmonitor: daemon output to file if FAIRMQ_SHMMONITOR_VERBOSE is true 2021-04-08 10:11:23 +02:00
Alexey Rybalchenko
be55565617 shmmonitor: use fairlogger 2021-04-08 10:11:23 +02:00
Alexey Rybalchenko
d7e2fbecea shmmonitor: refactor to separate monitoring from output 2021-04-08 10:11:23 +02:00
Alexey Rybalchenko
72175e5757 shmmonitor: optimize startup to avoid repeated start 2021-04-08 10:11:23 +02:00
Alexey Rybalchenko
effba534f0 shmmonitor: add session name and creator id to the output 2021-04-08 10:11:23 +02:00
Alexey Rybalchenko
efd42075a9 shmmonitor: enable read only access 2021-04-08 10:11:23 +02:00
Alexey Rybalchenko
5228407932 shmmonitor: distinguish daemon from monitor mode (orthogonal) 2021-04-08 10:11:23 +02:00
Alexey Rybalchenko
30e81d58f8 shmmonitor: allow getting shmids based on session/userid 2021-04-08 10:11:23 +02:00
Dennis Klein
2c7c46f2fd Remove codacy badge 2021-03-26 10:06:10 +01:00
Dennis Klein
0a5122bb24 Remove codecov badge 2021-03-26 10:06:10 +01:00
Dennis Klein
fc49687879 builtin devices: Reorganize 2021-03-26 10:06:10 +01:00
Dennis Klein
66a4df0667 fairmq-uuid-gen: Move to tools directory 2021-03-26 10:06:10 +01:00
Dennis Klein
978191fa6c Introduce <fairmq/runDevice.h> 2021-03-26 10:06:10 +01:00
Dennis Klein
cef6d0afcd Introduce <fairmq/Device.h> 2021-03-26 10:06:10 +01:00
Dennis Klein
47ec550792 control plugin: Move to subdirectory for consistency 2021-03-26 10:06:10 +01:00
Dennis Klein
b4aeb320e5 CI: Collect DDS logs on error 2021-03-26 10:06:10 +01:00
Dennis Klein
107248be0a Reorganize includes for consistency 2021-03-26 10:06:10 +01:00
Dennis Klein
68ceaba501 CI: Filter and process warnings and errors 2021-03-26 10:06:10 +01:00
Dennis Klein
4b6cf8b181 Fix aggregate initialization issue before C++20
Use value initialization to prevent

error: temporary of type ... has protected destructor

see https://stackoverflow.com/a/56745475
2021-03-26 10:06:10 +01:00
Dennis Klein
21d6cf9830 CI: Run clang-tidy 2021-03-26 10:06:10 +01:00
Alexey Rybalchenko
bffe74c5cf shm: handle shrink failure gracefully 2021-03-23 13:00:35 +01:00
Dennis Klein
72f319e276 CI: Adapt to new alfaci build hosts 2021-03-23 11:06:33 +01:00
Alexey Rybalchenko
62438bd99e shm: Improve error message when segment cannot be opened 2021-03-18 09:02:08 +01:00
Alexey Rybalchenko
c8ad684b18 Add --shm-no-cleanup option
When true, device will skip the segment cleanup even when it is the last
segment user
2021-03-18 09:02:08 +01:00
Alexey Rybalchenko
a5ec83208d Update docs 2021-03-11 12:14:00 +01:00
Alexey Rybalchenko
fc2241ece7 Fix incorrect channel index passed to OnData callback 2021-03-11 12:14:00 +01:00
Alexey Rybalchenko
1f26883b75 Formatting 2021-03-11 12:14:00 +01:00
Alexey Rybalchenko
edbdc57332 shm: make shmId also available as uint64_t 2021-03-11 12:14:00 +01:00
Alexey Rybalchenko
0fd2fcadc2 shm: Make sure no event notifications are missed 2021-03-11 12:14:00 +01:00
Alexey Rybalchenko
9b48b31a75 shm: Make sure all region events are fired 2021-03-11 12:14:00 +01:00
Dennis Klein
cb4335e59f Add test coverage for --channel-config name selector 2021-03-05 02:02:14 +01:00
Giulio Eulisse
ce4584b3d8 Provide a better syntax for --channel-config
The current syntax is ambiguous because it treats assignments
(like address=127.0.0.1) and selectors (name=my-channel) using
the symbol equal `"`.

This allows:

my-channel:address=127.0.0.1

as alternative syntax, which clearly separates the role of my-channel
from the associated properties.
2021-03-05 02:02:14 +01:00
Alexey Rybalchenko
bbc1dd4600 Add optional file output to FairMQSink 2021-03-01 15:33:45 +01:00
Dennis Klein
8327810942 Warn on unknown --channel-config args 2021-03-01 08:37:57 +01:00
Dennis Klein
c37742e3b4 Update Copyright string 2021-03-01 08:37:57 +01:00
Alexey Rybalchenko
93dff3c5a7 Fix regression in shmmonitor 2021-02-19 09:54:29 +01:00
190 changed files with 3480 additions and 4141 deletions

View File

@@ -1,3 +1,3 @@
---
Checks: '*,-google-*,-fuchsia-*,-cert-*,-llvm-header-guard,-readability-named-parameter,-misc-non-private-member-variables-in-classes,-*-magic-numbers,-llvm-include-order,-hicpp-no-array-decay,-performance-unnecessary-value-param,-cppcoreguidelines-pro-bounds-array-to-pointer-decay,-modernize-use-trailing-return-type,-readability-redundant-member-init'
Checks: 'cppcoreguidelines-*,misc-unused-alias-decls,misc-unused-parameters,modernize-deprecated-headers,modernize-raw-string-literal,modernize-redundant-void-arg,modernize-use-bool-literals,modernize-use-default-member-init,modernize-use-emplace,modernize-use-equals-default,modernize-use-equals-delete,modernize-use-noexcept,modernize-use-nullptr,modernize-use-override,modernize-use-using,performance-faster-string-find,performance-for-range-copy,performance-unnecessary-copy-initialization,readability-avoid-const-params-in-decls,readability-braces-around-statements,readability-container-size-empty,readability-delete-null-pointer,readability-redundant-member-init,readability-redundant-string-init,readability-static-accessed-through-instance,readability-string-compare'
HeaderFilterRegex: '/(fairmq/)'

View File

@@ -1,88 +1,69 @@
################################################################################
# Copyright (C) 2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
# Copyright (C) 2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
# #
# This software is distributed under the terms of the #
# GNU Lesser General Public Licence (LGPL) version 3, #
# copied verbatim in the file "LICENSE" #
################################################################################
Set(CTEST_SOURCE_DIRECTORY $ENV{SOURCEDIR})
Set(CTEST_BINARY_DIRECTORY $ENV{BUILDDIR})
Set(CTEST_SITE $ENV{SITE})
Set(CTEST_BUILD_NAME $ENV{LABEL})
Set(CTEST_CMAKE_GENERATOR "Unix Makefiles")
Set(CTEST_PROJECT_NAME "FairMQ")
Find_Program(CTEST_GIT_COMMAND NAMES git)
Set(CTEST_UPDATE_COMMAND "${CTEST_GIT_COMMAND}")
cmake_host_system_information(RESULT fqdn QUERY FQDN)
Set(BUILD_COMMAND "make")
Set(CTEST_BUILD_COMMAND "${BUILD_COMMAND} -j$ENV{number_of_processors}")
set(CTEST_SOURCE_DIRECTORY .)
set(CTEST_BINARY_DIRECTORY build)
set(CTEST_CMAKE_GENERATOR "Ninja")
set(CTEST_USE_LAUNCHERS ON)
set(CTEST_CONFIGURATION_TYPE "RelWithDebInfo")
String(TOUPPER $ENV{ctest_model} _Model)
Set(configure_options "-DCMAKE_BUILD_TYPE=$ENV{ctest_model}")
Set(CTEST_USE_LAUNCHERS 1)
Set(configure_options "${configure_options};-DCTEST_USE_LAUNCHERS=${CTEST_USE_LAUNCHERS}")
Set(configure_options "${configure_options};-DDISABLE_COLOR=ON")
Set(configure_options "${configure_options};-DCMAKE_PREFIX_PATH=$ENV{SIMPATH}")
# Set(configure_options "${configure_options};-DBUILD_OFI_TRANSPORT=ON")
Set(configure_options "${configure_options};-DBUILD_DDS_PLUGIN=ON")
Set(configure_options "${configure_options};-DBUILD_SDK=ON")
Set(configure_options "${configure_options};-DBUILD_SDK_COMMANDS=ON")
Set(EXTRA_FLAGS $ENV{EXTRA_FLAGS})
If(EXTRA_FLAGS)
Set(configure_options "${configure_options};${EXTRA_FLAGS}")
EndIf()
If($ENV{ctest_model} MATCHES Profile)
Find_Program(GCOV_COMMAND gcov)
If(GCOV_COMMAND)
Message("Found GCOV: ${GCOV_COMMAND}")
Set(CTEST_COVERAGE_COMMAND ${GCOV_COMMAND})
set(CTEST_COVERAGE_EXTRA_FLAGS "-p")
EndIf(GCOV_COMMAND)
EndIf()
If($ENV{ctest_model} MATCHES Nightly OR $ENV{ctest_model} MATCHES Profile)
Ctest_Empty_Binary_Directory(${CTEST_BINARY_DIRECTORY})
EndIf()
Ctest_Start($ENV{ctest_model})
Ctest_Configure(BUILD "${CTEST_BINARY_DIRECTORY}"
OPTIONS "${configure_options}"
)
Ctest_Build(BUILD "${CTEST_BINARY_DIRECTORY}")
unset(exclude_tests)
if($ENV{EXCLUDE_UNSTABLE_DDS_TESTS})
set(exclude_tests EXCLUDE ".*\\.localhost$")
if(NOT NCPUS)
if(ENV{SLURM_CPUS_PER_TASK})
set(NCPUS $ENV{SLURM_CPUS_PER_TASK})
else()
include(ProcessorCount)
ProcessorCount(NCPUS)
if(NCPUS EQUAL 0)
set(NCPUS 1)
endif()
endif()
endif()
Ctest_Test(BUILD "${CTEST_BINARY_DIRECTORY}"
# PARALLEL_LEVEL $ENV{number_of_processors}
${exclude_tests}
PARALLEL_LEVEL $ENV{number_of_processors}
RETURN_VALUE _ctest_test_ret_val
)
If(GCOV_COMMAND)
Ctest_Coverage(BUILD "${CTEST_BINARY_DIRECTORY}" LABELS coverage)
EndIf()
if ("$ENV{CTEST_SITE}" STREQUAL "")
set(CTEST_SITE "${fqdn}")
else()
set(CTEST_SITE $ENV{CTEST_SITE})
endif()
If("$ENV{do_codecov_upload}")
Execute_Process(COMMAND curl https://codecov.io/bash -o codecov_uploader.sh
WORKING_DIRECTORY ${CTEST_BINARY_DIRECTORY}
TIMEOUT 60)
Execute_Process(COMMAND bash ./codecov_uploader.sh -X gcov
WORKING_DIRECTORY ${CTEST_BINARY_DIRECTORY}
TIMEOUT 60)
EndIf()
if ("$ENV{LABEL}" STREQUAL "")
set(CTEST_BUILD_NAME "build")
else()
set(CTEST_BUILD_NAME $ENV{LABEL})
endif()
Ctest_Submit()
ctest_start(Continuous)
if (_ctest_test_ret_val)
list(APPEND options
"-DDISABLE_COLOR=ON"
"-DBUILD_SDK_COMMANDS=ON"
"-DBUILD_SDK=ON"
"-DBUILD_DDS_PLUGIN=ON")
if(RUN_STATIC_ANALYSIS)
list(APPEND options "-DRUN_STATIC_ANALYSIS=ON")
endif()
list(JOIN options ";" optionsstr)
ctest_configure(OPTIONS "${optionsstr}")
ctest_submit()
ctest_build(FLAGS "-j${NCPUS}")
ctest_submit()
ctest_test(BUILD "${CTEST_BINARY_DIRECTORY}"
PARALLEL_LEVEL 1
SCHEDULE_RANDOM ON
RETURN_VALUE _ctest_test_ret_val)
ctest_submit()
if(_ctest_test_ret_val)
Message(FATAL_ERROR "Some tests failed.")
endif()

120
Jenkinsfile vendored
View File

@@ -1,69 +1,69 @@
#!groovy
def specToLabel(Map spec) {
return "${spec.os}-${spec.arch}-${spec.compiler}-FairSoft_${spec.fairsoft}"
}
def jobMatrix(String prefix, List specs, Closure callback) {
def jobMatrix(String type, List specs) {
def nodes = [:]
for (spec in specs) {
def label = specToLabel(spec)
def node_tag = label
if (spec.os =~ /macOS/) {
node_tag = spec.os
}
def fairsoft = spec.fairsoft
job = "${spec.os}-${spec.ver}-${spec.arch}-${spec.compiler}"
def label = "${type}/${job}"
def selector = "${spec.os}-${spec.ver}-${spec.arch}"
def os = spec.os
def compiler = spec.compiler
nodes["${prefix}/${label}"] = {
node(node_tag) {
githubNotify(context: "${prefix}/${label}", description: 'Building ...', status: 'PENDING')
def ver = spec.ver
def check = spec.check
nodes[label] = {
node(selector) {
githubNotify(context: "${label}", description: 'Building ...', status: 'PENDING')
try {
deleteDir()
checkout scm
sh """\
echo "export SIMPATH=\${SIMPATH_PREFIX}${fairsoft}" >> Dart.cfg
echo "export FAIRSOFT_VERSION=${fairsoft}" >> Dart.cfg
"""
if (os =~ /Debian/ && compiler =~ /gcc9/) {
sh '''\
echo "source /etc/profile.d/modules.sh" >> Dart.cfg
echo "module use /cvmfs/it.gsi.de/modulefiles" >> Dart.cfg
echo "module load compiler/gcc/9.1.0" >> Dart.cfg
'''
}
if (os =~ /[Mm]acOS/) {
sh "echo \"export EXTRA_FLAGS='-DCMAKE_CXX_COMPILER=clang++'\" >> Dart.cfg"
def jobscript = 'job.sh'
def ctestcmd = "ctest -S FairMQTest.cmake -V --output-on-failure"
sh "echo \"set -e\" >> ${jobscript}"
sh "echo \"export LABEL=\\\"\${JOB_BASE_NAME} ${label}\\\"\" >> ${jobscript}"
if (selector =~ /^macos/) {
sh """\
echo \"export DDS_ROOT=\\\"\\\$(brew --prefix dds)\\\"\" >> ${jobscript}
echo \"${ctestcmd}\" >> ${jobscript}
"""
sh "cat ${jobscript}"
sh "bash ${jobscript}"
} else {
sh "echo \"export EXTRA_FLAGS='-DCMAKE_CXX_COMPILER=g++'\" >> Dart.cfg"
def static_analysis = "OFF"
if (selector =~ /^fedora/) {
static_analysis = "ON"
}
def containercmd = "singularity exec -B/shared ${env.SINGULARITY_CONTAINER_ROOT}/fairmq/${os}.${ver}.sif bash -l -c \\\"${ctestcmd} -DRUN_STATIC_ANALYSIS=${static_analysis}\\\""
sh """\
echo \"echo \\\"*** Job started at .......: \\\$(date -R)\\\"\" >> ${jobscript}
echo \"echo \\\"*** Job ID ...............: \\\${SLURM_JOB_ID}\\\"\" >> ${jobscript}
echo \"echo \\\"*** Compute node .........: \\\$(hostname -f)\\\"\" >> ${jobscript}
echo \"unset http_proxy\" >> ${jobscript}
echo \"unset HTTP_PROXY\" >> ${jobscript}
echo \"${containercmd}\" >> ${jobscript}
"""
sh "cat ${jobscript}"
sh "test/ci/slurm-submit.sh \"FairMQ \${JOB_BASE_NAME} ${label}\" ${jobscript}"
withChecks('Static Analysis') {
if (static_analysis == "ON") {
recordIssues(enabledForFailure: true,
tools: [gcc(pattern: 'build/Testing/Temporary/*.log')],
filters: [excludeFile('extern/*'), excludeFile('usr/*')],
skipBlames: true)
}
}
}
sh '''\
echo "export BUILDDIR=$PWD/build" >> Dart.cfg
echo "export SOURCEDIR=$PWD" >> Dart.cfg
echo "export PATH=\\\$SIMPATH/bin:\\\$PATH" >> Dart.cfg
echo "export GIT_BRANCH=$JOB_BASE_NAME" >> Dart.cfg
echo "echo \\\$PATH" >> Dart.cfg
'''
if (os =~ /macOS10.14/) {
sh "echo \"export EXCLUDE_UNSTABLE_DDS_TESTS=1\" >> Dart.cfg"
}
sh 'cat Dart.cfg'
callback.call(spec, label)
deleteDir()
githubNotify(context: "${prefix}/${label}", description: 'Success', status: 'SUCCESS')
githubNotify(context: "${label}", description: 'Success', status: 'SUCCESS')
} catch (e) {
def tarball = "${prefix}_${label}_dds_logs.tar.gz"
sh "tar czvf ${tarball} -C \${WORKSPACE}/build/test/ .DDS/"
sh "tar czvf ${tarball} -C \${WORKSPACE}/build/test .DDS/"
archiveArtifacts tarball
deleteDir()
githubNotify(context: "${prefix}/${label}", description: 'Error', status: 'ERROR')
githubNotify(context: "${label}", description: 'Error', status: 'ERROR')
throw e
}
}
@@ -75,26 +75,16 @@ def jobMatrix(String prefix, List specs, Closure callback) {
pipeline{
agent none
stages {
stage("Run CI Matrix") {
stage("CI") {
steps{
script {
def build_jobs = jobMatrix('build', [
[os: 'Debian8', arch: 'x86_64', compiler: 'gcc9.1.0', fairsoft: 'fairmq_dev'],
[os: 'macOS10.14', arch: 'x86_64', compiler: 'AppleClang11.0', fairsoft: 'fairmq_dev'],
[os: 'macOS10.15', arch: 'x86_64', compiler: 'AppleClang12.0', fairsoft: 'fairmq_dev'],
]) { spec, label ->
sh './Dart.sh alfa_ci Dart.cfg'
}
def builds = jobMatrix('build', [
[os: 'alice-centos', ver: '7', arch: 'x86_64', compiler: 'gcc-7'],
[os: 'fedora', ver: '32', arch: 'x86_64', compiler: 'gcc-10'],
[os: 'macos', ver: '11', arch: 'x86_64', compiler: 'apple-clang-12'],
])
def profile_jobs = jobMatrix('codecov', [
[os: 'Debian8', arch: 'x86_64', compiler: 'gcc9.1.0', fairsoft: 'fairmq_dev'],
]) { spec, label ->
withCredentials([string(credentialsId: 'fairmq_codecov_token', variable: 'CODECOV_TOKEN')]) {
sh './Dart.sh codecov Dart.cfg'
}
}
parallel(build_jobs + profile_jobs)
parallel(builds)
}
}
}

View File

@@ -1,81 +0,0 @@
#!groovy
def specToLabel(Map spec) {
return "${spec.os}-${spec.arch}-${spec.compiler}-FairSoft_${spec.fairsoft}"
}
def buildMatrix(List specs, Closure callback) {
def nodes = [:]
for (spec in specs) {
def label = specToLabel(spec)
def fairsoft = spec.fairsoft
def os = spec.os
def compiler = spec.compiler
nodes[label] = {
node(label) {
try {
deleteDir()
checkout scm
sh """\
echo "export SIMPATH=\${SIMPATH_PREFIX}${fairsoft}" >> Dart.cfg
echo "export FAIRSOFT_VERSION=${fairsoft}" >> Dart.cfg
"""
if (os =~ /Debian/ && compiler =~ /gcc9/) {
sh '''\
echo "source /etc/profile.d/modules.sh" >> Dart.cfg
echo "module use /cvmfs/it.gsi.de/modulefiles" >> Dart.cfg
echo "module load compiler/gcc/9.1.0" >> Dart.cfg
'''
}
if (os =~ /MacOS/) {
sh "echo \"export EXTRA_FLAGS='-DCMAKE_CXX_COMPILER=clang++'\" >> Dart.cfg"
} else {
sh "echo \"export EXTRA_FLAGS='-DCMAKE_CXX_COMPILER=g++'\" >> Dart.cfg"
}
sh '''\
echo "export BUILDDIR=$PWD/build" >> Dart.cfg
echo "export SOURCEDIR=$PWD" >> Dart.cfg
echo "export PATH=\\\$SIMPATH/bin:\\\$PATH" >> Dart.cfg
echo "export GIT_BRANCH=dev" >> Dart.cfg
echo "echo \\\$PATH" >> Dart.cfg
'''
sh 'cat Dart.cfg'
callback.call(spec, label)
deleteDir()
} catch (e) {
def tarball = "${label}_dds_logs.tar.gz"
sh "tar czvf ${tarball} -C \${WORKSPACE}/build/test/ .DDS/"
archiveArtifacts tarball
deleteDir()
throw e
}
}
}
}
return nodes
}
pipeline{
agent none
triggers { cron('H 2 * * *') }
stages {
stage("Run Nightly Build/Test Matrix") {
steps{
script {
parallel(buildMatrix([
[os: 'Debian8', arch: 'x86_64', compiler: 'gcc9.1.0', fairsoft: 'fairmq_dev'],
[os: 'MacOS10.13', arch: 'x86_64', compiler: 'AppleLLVM10.0.0', fairsoft: 'fairmq_dev'],
[os: 'MacOS10.14', arch: 'x86_64', compiler: 'AppleLLVM10.0.0', fairsoft: 'fairmq_dev'],
]) { spec, label ->
sh './Dart.sh Nightly Dart.cfg'
sh './Dart.sh Profile Dart.cfg'
})
}
}
}
}
}

View File

@@ -1,5 +1,5 @@
<!-- {#mainpage} -->
# FairMQ [![license](https://alfa-ci.gsi.de/shields/badge/license-LGPL--3.0-orange.svg)](COPYRIGHT) [![build status](https://alfa-ci.gsi.de/buildStatus/icon?job=FairRootGroup/FairMQ/master)](https://alfa-ci.gsi.de/blue/organizations/jenkins/FairRootGroup%2FFairMQ/branches) [![test coverage master branch](https://codecov.io/gh/FairRootGroup/FairMQ/branch/master/graph/badge.svg)](https://codecov.io/gh/FairRootGroup/FairMQ/branch/master) [![Coverity Badge](https://alfa-ci.gsi.de/shields/coverity/scan/fairrootgroup-fairmq.svg)](https://scan.coverity.com/projects/fairrootgroup-fairmq) [![Codacy Badge](https://api.codacy.com/project/badge/Grade/6b648d95d68d4c4eae833b84f84d299c)](https://www.codacy.com/app/dennisklein/FairMQ?utm_source=github.com&amp;utm_medium=referral&amp;utm_content=FairRootGroup/FairMQ&amp;utm_campaign=Badge_Grade)
# FairMQ [![license](https://alfa-ci.gsi.de/shields/badge/license-LGPL--3.0-orange.svg)](COPYRIGHT) [![build status](https://alfa-ci.gsi.de/buildStatus/icon?job=FairRootGroup/FairMQ/dev)](https://alfa-ci.gsi.de/blue/organizations/jenkins/FairRootGroup%2FFairMQ/branches) [![Coverity Badge](https://alfa-ci.gsi.de/shields/coverity/scan/fairrootgroup-fairmq.svg)](https://scan.coverity.com/projects/fairrootgroup-fairmq)
C++ Message Queuing Library and Framework

View File

@@ -203,6 +203,13 @@ macro(set_fairmq_defaults)
set_property(GLOBAL PROPERTY RULE_LAUNCH_COMPILE ${CCACHE})
endif()
endif()
if( CMAKE_CXX_COMPILER_ID STREQUAL "GNU"
AND CMAKE_CXX_COMPILER_VERSION VERSION_LESS 9)
set(FAIRMQ_HAS_STD_FILESYSTEM 0)
else()
set(FAIRMQ_HAS_STD_FILESYSTEM 1)
endif()
endmacro()
function(join VALUES GLUE OUTPUT)

View File

@@ -6,20 +6,11 @@
# copied verbatim in the file "LICENSE" #
################################################################################
add_library(Example11Lib STATIC
"Sampler.cxx"
"Sampler.h"
"Sink.cxx"
"Sink.h"
)
add_executable(fairmq-ex-1-1-sampler sampler.cxx)
target_link_libraries(fairmq-ex-1-1-sampler PRIVATE FairMQ)
target_link_libraries(Example11Lib PUBLIC FairMQ)
add_executable(fairmq-ex-1-1-sampler runSampler.cxx)
target_link_libraries(fairmq-ex-1-1-sampler PRIVATE Example11Lib)
add_executable(fairmq-ex-1-1-sink runSink.cxx)
target_link_libraries(fairmq-ex-1-1-sink PRIVATE Example11Lib)
add_executable(fairmq-ex-1-1-sink sink.cxx)
target_link_libraries(fairmq-ex-1-1-sink PRIVATE FairMQ)
add_custom_target(Example11 DEPENDS fairmq-ex-1-1-sampler fairmq-ex-1-1-sink)
@@ -40,12 +31,12 @@ set_tests_properties(Example.1-1.shmem PROPERTIES TIMEOUT "30" RUN_SERIAL true P
# install
install(
TARGETS
fairmq-ex-1-1-sampler
fairmq-ex-1-1-sink
TARGETS
fairmq-ex-1-1-sampler
fairmq-ex-1-1-sink
LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR}
RUNTIME DESTINATION ${PROJECT_INSTALL_BINDIR}
LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR}
RUNTIME DESTINATION ${PROJECT_INSTALL_BINDIR}
)
# configure run script with different executable paths for build and for install directories
@@ -54,7 +45,7 @@ set(FAIRMQ_BIN_DIR ${CMAKE_INSTALL_PREFIX}/${PROJECT_INSTALL_BINDIR}/fairmq)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-1-1.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-1-1.sh_install)
install(
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-1-1.sh_install
DESTINATION ${PROJECT_INSTALL_BINDIR}
RENAME fairmq-start-ex-1-1.sh
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-1-1.sh_install
DESTINATION ${PROJECT_INSTALL_BINDIR}
RENAME fairmq-start-ex-1-1.sh
)

View File

@@ -3,7 +3,7 @@
A simple topology of two devices - **Sampler** and **Sink**. **Sampler** sends data to **Sink** via the **PUSH-PULL** pattern.
`runSampler.cxx` and `runSink.cxx` configure and run the devices.
`sampler.cxx` and `sink.cxx` configure and run the devices.
The executables take two command line parameters: `--id` and `--channel-config`. The value of `--id` should be a unique identifier and the value for `--channel-config` is the configuration of the communication channel. .

View File

@@ -1,59 +0,0 @@
/********************************************************************************
* Copyright (C) 2014-2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "Sampler.h"
using namespace std;
namespace example_1_1
{
Sampler::Sampler()
: fText()
, fMaxIterations(0)
, fNumIterations(0)
{}
void Sampler::InitTask()
{
// Get the fText and fMaxIterations values from the command line options (via fConfig)
fText = fConfig->GetProperty<string>("text");
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
}
bool Sampler::ConditionalRun()
{
// create a copy of the data with new(), that will be deleted after the transfer is complete
string* text = new string(fText);
// create message object with a pointer to the data buffer, its size,
// custom deletion function (called when transfer is done),
// and pointer to the object managing the data buffer
FairMQMessagePtr msg(NewMessage(
const_cast<char*>(text->c_str()),
text->length(),
[](void* /*data*/, void* object) { delete static_cast<string*>(object); },
text));
LOG(info) << "Sending \"" << fText << "\"";
// in case of error or transfer interruption, return false to go to the Ready state
// successfull transfer will return number of bytes transfered (can be 0 if sending an empty message).
if (Send(msg, "data") < 0) {
return false;
} else if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) {
LOG(info) << "Configured maximum number of iterations reached. Leaving RUNNING state.";
return false;
}
return true;
}
Sampler::~Sampler() {}
} // namespace example_1_1

View File

@@ -1,42 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* Sampler.h
*
* @since 2014-10-10
* @author A. Rybalchenko
*/
#ifndef FAIRMQEXAMPLE11SAMPLER_H
#define FAIRMQEXAMPLE11SAMPLER_H
#include <string>
#include "FairMQDevice.h"
namespace example_1_1
{
class Sampler : public FairMQDevice
{
public:
Sampler();
virtual ~Sampler();
protected:
std::string fText;
uint64_t fMaxIterations;
uint64_t fNumIterations;
void InitTask() override;
bool ConditionalRun() override;
};
} // namespace example_1_1
#endif /* FAIRMQEXAMPLE11SAMPLER_H */

View File

@@ -1,54 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* Sink.cxx
*
* @since 2014-10-10
* @author A. Rybalchenko
*/
#include "Sink.h"
using namespace std;
namespace example_1_1
{
Sink::Sink()
: fMaxIterations(0)
, fNumIterations(0)
{
// register a handler for data arriving on "data" channel
OnData("data", &Sink::HandleData);
}
void Sink::InitTask()
{
// Get the fMaxIterations value from the command line options (via fConfig)
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
}
// handler is called whenever a message arrives on "data", with a reference to the message and a
// sub-channel index (here 0)
bool Sink::HandleData(FairMQMessagePtr& msg, int /*index*/)
{
LOG(info) << "Received: \"" << string(static_cast<char*>(msg->GetData()), msg->GetSize()) << "\"";
if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) {
LOG(info) << "Configured maximum number of iterations reached. Leaving RUNNING state.";
return false;
}
// return true if you want the handler to be called again (otherwise return false go to the
// Ready state)
return true;
}
Sink::~Sink() {}
} // namespace example_1_1

View File

@@ -1,40 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* Sink.h
*
* @since 2014-10-10
* @author A. Rybalchenko
*/
#ifndef FAIRMQEXAMPLE11SINK_H
#define FAIRMQEXAMPLE11SINK_H
#include "FairMQDevice.h"
namespace example_1_1
{
class Sink : public FairMQDevice
{
public:
Sink();
virtual ~Sink();
protected:
virtual void InitTask();
bool HandleData(FairMQMessagePtr&, int);
private:
uint64_t fMaxIterations;
uint64_t fNumIterations;
};
} // namespace example_1_1
#endif /* FAIRMQEXAMPLE11SINK_H */

View File

@@ -1,24 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "runFairMQDevice.h"
#include "Sampler.h"
namespace bpo = boost::program_options;
void addCustomOptions(bpo::options_description& options)
{
options.add_options()
("text", bpo::value<std::string>()->default_value("Hello"), "Text to send out")
("max-iterations", bpo::value<uint64_t>()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
}
FairMQDevicePtr getDevice(const fair::mq::ProgOptions& /*config*/)
{
return new example_1_1::Sampler();
}

View File

@@ -1,23 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "runFairMQDevice.h"
#include "Sink.h"
namespace bpo = boost::program_options;
void addCustomOptions(bpo::options_description& options)
{
options.add_options()
("max-iterations", bpo::value<uint64_t>()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
}
FairMQDevicePtr getDevice(const fair::mq::ProgOptions& /*config*/)
{
return new example_1_1::Sink();
}

75
examples/1-1/sampler.cxx Normal file
View File

@@ -0,0 +1,75 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <fairmq/Device.h>
#include <fairmq/runDevice.h>
#include <string>
namespace bpo = boost::program_options;
class Sampler : public FairMQDevice
{
public:
Sampler()
: fMaxIterations(0)
, fNumIterations(0)
{}
protected:
std::string fText;
uint64_t fMaxIterations;
uint64_t fNumIterations;
void InitTask() override
{
// Get the fText and fMaxIterations values from the command line options (via fConfig)
fText = fConfig->GetProperty<std::string>("text");
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
}
bool ConditionalRun() override
{
// create a copy of the data with new(), that will be deleted after the transfer is complete
std::string* text = new std::string(fText);
// create message object with a pointer to the data buffer, its size,
// custom deletion function (called when transfer is done),
// and pointer to the object managing the data buffer
FairMQMessagePtr msg(NewMessage(
const_cast<char*>(text->c_str()),
text->length(),
[](void* /*data*/, void* object) { delete static_cast<std::string*>(object); },
text));
LOG(info) << "Sending \"" << fText << "\"";
// in case of error or transfer interruption, return false to go to the Ready state
// successfull transfer will return number of bytes transfered (can be 0 if sending an empty message).
if (Send(msg, "data") < 0) {
return false;
} else if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) {
LOG(info) << "Configured maximum number of iterations reached. Leaving RUNNING state.";
return false;
}
return true;
}
};
void addCustomOptions(bpo::options_description& options)
{
options.add_options()
("text", bpo::value<std::string>()->default_value("Hello"), "Text to send out")
("max-iterations", bpo::value<uint64_t>()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
}
std::unique_ptr<fair::mq::Device> getDevice(fair::mq::ProgOptions& /*config*/)
{
return std::make_unique<Sampler>();
}

62
examples/1-1/sink.cxx Normal file
View File

@@ -0,0 +1,62 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <fairmq/Device.h>
#include <fairmq/runDevice.h>
#include <string>
namespace bpo = boost::program_options;
class Sink : public FairMQDevice
{
public:
Sink()
: fMaxIterations(0)
, fNumIterations(0)
{
// register a handler for data arriving on "data" channel
OnData("data", &Sink::HandleData);
}
protected:
void InitTask() override
{
// Get the fMaxIterations value from the command line options (via fConfig)
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
}
bool HandleData(FairMQMessagePtr& msg, int)
{
LOG(info) << "Received: \"" << std::string(static_cast<char*>(msg->GetData()), msg->GetSize()) << "\"";
if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) {
LOG(info) << "Configured maximum number of iterations reached. Leaving RUNNING state.";
return false;
}
// return true if you want the handler to be called again (otherwise return false go to the
// Ready state)
return true;
}
private:
uint64_t fMaxIterations;
uint64_t fNumIterations;
};
void addCustomOptions(bpo::options_description& options)
{
options.add_options()
("max-iterations", bpo::value<uint64_t>()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
}
std::unique_ptr<fair::mq::Device> getDevice(fair::mq::ProgOptions& /*config*/)
{
return std::make_unique<Sink>();
}

View File

@@ -6,25 +6,14 @@
# copied verbatim in the file "LICENSE" #
################################################################################
add_library(Example1N1Lib STATIC
"Sampler.cxx"
"Sampler.h"
"Processor.cxx"
"Processor.h"
"Sink.cxx"
"Sink.h"
)
add_executable(fairmq-ex-1-n-1-sampler sampler.cxx)
target_link_libraries(fairmq-ex-1-n-1-sampler PRIVATE FairMQ)
target_link_libraries(Example1N1Lib PUBLIC FairMQ)
add_executable(fairmq-ex-1-n-1-processor processor.cxx)
target_link_libraries(fairmq-ex-1-n-1-processor PRIVATE FairMQ)
add_executable(fairmq-ex-1-n-1-sampler runSampler.cxx)
target_link_libraries(fairmq-ex-1-n-1-sampler PRIVATE Example1N1Lib)
add_executable(fairmq-ex-1-n-1-processor runProcessor.cxx)
target_link_libraries(fairmq-ex-1-n-1-processor PRIVATE Example1N1Lib)
add_executable(fairmq-ex-1-n-1-sink runSink.cxx)
target_link_libraries(fairmq-ex-1-n-1-sink PRIVATE Example1N1Lib)
add_executable(fairmq-ex-1-n-1-sink sink.cxx)
target_link_libraries(fairmq-ex-1-n-1-sink PRIVATE FairMQ)
add_custom_target(Example1N1 DEPENDS fairmq-ex-1-n-1-sampler fairmq-ex-1-n-1-processor fairmq-ex-1-n-1-sink)
@@ -48,13 +37,13 @@ set_tests_properties(Example.1-n-1.shmem PROPERTIES TIMEOUT "30" RUN_SERIAL true
# install
install(
TARGETS
fairmq-ex-1-n-1-sampler
fairmq-ex-1-n-1-processor
fairmq-ex-1-n-1-sink
TARGETS
fairmq-ex-1-n-1-sampler
fairmq-ex-1-n-1-processor
fairmq-ex-1-n-1-sink
LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR}
RUNTIME DESTINATION ${PROJECT_INSTALL_BINDIR}
LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR}
RUNTIME DESTINATION ${PROJECT_INSTALL_BINDIR}
)
# configure run script with different executable paths for build and for install directories
@@ -64,12 +53,12 @@ set(FAIRMQ_BIN_DIR ${CMAKE_INSTALL_PREFIX}/${PROJECT_INSTALL_BINDIR}/fairmq)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-1-n-1.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-1-n-1.sh_install)
install(
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-1-n-1.sh_install
DESTINATION ${PROJECT_INSTALL_BINDIR}
RENAME fairmq-start-ex-1-n-1.sh
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-1-n-1.sh_install
DESTINATION ${PROJECT_INSTALL_BINDIR}
RENAME fairmq-start-ex-1-n-1.sh
)
install(
FILES ${CMAKE_CURRENT_BINARY_DIR}/ex-1-n-1.json
DESTINATION ${PROJECT_INSTALL_DATADIR}
FILES ${CMAKE_CURRENT_BINARY_DIR}/ex-1-n-1.json
DESTINATION ${PROJECT_INSTALL_DATADIR}
)

View File

@@ -1,53 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "Processor.h"
#include <string>
using namespace std;
namespace example_1_n_1
{
Processor::Processor()
{
OnData("data1", &Processor::HandleData);
}
bool Processor::HandleData(FairMQMessagePtr& msg, int /*index*/)
{
LOG(info) << "Received data, processing...";
// Modify the received string
string* text = new std::string(static_cast<char*>(msg->GetData()), msg->GetSize());
*text += " (modified by " + fId + ")";
// create message object with a pointer to the data buffer,
// its size,
// custom deletion function (called when transfer is done),
// and pointer to the object managing the data buffer
FairMQMessagePtr msg2(NewMessage(const_cast<char*>(text->c_str()),
text->length(),
[](void* /*data*/, void* object) { delete static_cast<string*>(object); },
text));
// Send out the output message
if (Send(msg2, "data2") < 0)
{
return false;
}
return true;
}
Processor::~Processor()
{
}
} // namespace example_1_n_1

View File

@@ -1,29 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#ifndef FAIRMQEXAMPLE1N1PROCESSOR_H_
#define FAIRMQEXAMPLE1N1PROCESSOR_H_
#include "FairMQDevice.h"
namespace example_1_n_1
{
class Processor : public FairMQDevice
{
public:
Processor();
virtual ~Processor();
protected:
bool HandleData(FairMQMessagePtr&, int);
};
} // namespace example_1_n_1
#endif /* FAIRMQEXAMPLE1N1PROCESSOR_H_ */

View File

@@ -1,68 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* Sampler.cpp
*
* @since 2014-10-10
* @author A. Rybalchenko
*/
#include <thread> // this_thread::sleep_for
#include <chrono>
#include "Sampler.h"
using namespace std;
namespace example_1_n_1
{
Sampler::Sampler()
: fText()
, fMaxIterations(0)
, fNumIterations(0)
{
}
void Sampler::InitTask()
{
// Get the fText and fMaxIterations values from the command line options (via fConfig)
fText = fConfig->GetProperty<string>("text");
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
}
bool Sampler::ConditionalRun()
{
// Initializing message with NewStaticMessage will avoid copy
// but won't delete the data after the sending is completed.
FairMQMessagePtr msg(NewStaticMessage(fText));
LOG(info) << "Sending \"" << fText << "\"";
// in case of error or transfer interruption, return false to go to IDLE state
// successfull transfer will return number of bytes transfered (can be 0 if sending an empty message).
if (Send(msg, "data1") < 0)
{
return false;
}
else if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations)
{
LOG(info) << "Configured maximum number of iterations reached. Leaving RUNNING state.";
return false;
}
this_thread::sleep_for(chrono::seconds(1));
return true;
}
Sampler::~Sampler()
{
}
} // namespace example_1_n_1

View File

@@ -1,42 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* Sampler.h
*
* @since 2014-10-10
* @author A. Rybalchenko
*/
#ifndef FAIRMQEXAMPLE1N1SAMPLER_H_
#define FAIRMQEXAMPLE1N1SAMPLER_H_
#include <string>
#include "FairMQDevice.h"
namespace example_1_n_1
{
class Sampler : public FairMQDevice
{
public:
Sampler();
virtual ~Sampler();
protected:
std::string fText;
uint64_t fMaxIterations;
uint64_t fNumIterations;
virtual void InitTask();
virtual bool ConditionalRun();
};
} // namespace example_1_n_1
#endif /* FAIRMQEXAMPLE1N1SAMPLER_H_ */

View File

@@ -1,55 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* Sink.cxx
*
* @since 2014-10-10
* @author A. Rybalchenko
*/
#include "Sink.h"
using namespace std;
namespace example_1_n_1
{
Sink::Sink()
: fMaxIterations(0)
, fNumIterations(0)
{
// register a handler for data arriving on "data2" channel
OnData("data2", &Sink::HandleData);
}
void Sink::InitTask()
{
// Get the fMaxIterations value from the command line options (via fConfig)
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
}
// handler is called whenever a message arrives on "data2", with a reference to the message and a sub-channel index (here 0)
bool Sink::HandleData(FairMQMessagePtr& msg, int /*index*/)
{
LOG(info) << "Received: \"" << string(static_cast<char*>(msg->GetData()), msg->GetSize()) << "\"";
if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations)
{
LOG(info) << "Configured maximum number of iterations reached. Leaving RUNNING state.";
return false;
}
// return true if want to be called again (otherwise return false go to IDLE state)
return true;
}
Sink::~Sink()
{
}
} // namespace example_1_n_1

View File

@@ -1,40 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* Sink.h
*
* @since 2014-10-10
* @author A. Rybalchenko
*/
#ifndef FAIRMQEXAMPLE1N1SINK_H
#define FAIRMQEXAMPLE1N1SINK_H
#include "FairMQDevice.h"
namespace example_1_n_1
{
class Sink : public FairMQDevice
{
public:
Sink();
virtual ~Sink();
protected:
virtual void InitTask();
bool HandleData(FairMQMessagePtr&, int);
private:
uint64_t fMaxIterations;
uint64_t fNumIterations;
};
} // namespace example_1_n_1
#endif /* FAIRMQEXAMPLE1N1SINK_H */

View File

@@ -0,0 +1,58 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <fairmq/Device.h>
#include <fairmq/runDevice.h>
#include <string>
namespace bpo = boost::program_options;
class Processor : public FairMQDevice
{
public:
Processor()
{
OnData("data1", &Processor::HandleData);
}
protected:
bool HandleData(FairMQMessagePtr& msg, int)
{
LOG(info) << "Received data, processing...";
// Modify the received string
std::string* text = new std::string(static_cast<char*>(msg->GetData()), msg->GetSize());
*text += " (modified by " + fId + ")";
// create message object with a pointer to the data buffer,
// its size,
// custom deletion function (called when transfer is done),
// and pointer to the object managing the data buffer
FairMQMessagePtr msg2(NewMessage(const_cast<char*>(text->c_str()),
text->length(),
[](void* /*data*/, void* object) { delete static_cast<std::string*>(object); },
text));
// Send out the output message
if (Send(msg2, "data2") < 0) {
return false;
}
return true;
}
};
void addCustomOptions(bpo::options_description& /*options*/)
{
}
std::unique_ptr<fair::mq::Device> getDevice(fair::mq::ProgOptions& /*config*/)
{
return std::make_unique<Processor>();
}

View File

@@ -1,21 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "runFairMQDevice.h"
#include "Processor.h"
namespace bpo = boost::program_options;
void addCustomOptions(bpo::options_description& /*options*/)
{
}
FairMQDevicePtr getDevice(const fair::mq::ProgOptions& /*config*/)
{
return new example_1_n_1::Processor();
}

View File

@@ -1,24 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "runFairMQDevice.h"
#include "Sampler.h"
namespace bpo = boost::program_options;
void addCustomOptions(bpo::options_description& options)
{
options.add_options()
("text", bpo::value<std::string>()->default_value("Hello"), "Text to send out")
("max-iterations", bpo::value<uint64_t>()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
}
FairMQDevicePtr getDevice(const fair::mq::ProgOptions& /*config*/)
{
return new example_1_n_1::Sampler();
}

View File

@@ -1,23 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "runFairMQDevice.h"
#include "Sink.h"
namespace bpo = boost::program_options;
void addCustomOptions(bpo::options_description& options)
{
options.add_options()
("max-iterations", bpo::value<uint64_t>()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
}
FairMQDevicePtr getDevice(const fair::mq::ProgOptions& /*config*/)
{
return new example_1_n_1::Sink();
}

View File

@@ -0,0 +1,71 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <fairmq/Device.h>
#include <fairmq/runDevice.h>
#include <string>
#include <thread> // this_thread::sleep_for
#include <chrono>
namespace bpo = boost::program_options;
class Sampler : public FairMQDevice
{
public:
Sampler()
: fMaxIterations(0)
, fNumIterations(0)
{}
protected:
std::string fText;
uint64_t fMaxIterations;
uint64_t fNumIterations;
void InitTask() override
{
// Get the fText and fMaxIterations values from the command line options (via fConfig)
fText = fConfig->GetProperty<std::string>("text");
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
}
bool ConditionalRun() override
{
// Initializing message with NewStaticMessage will avoid copy
// but won't delete the data after the sending is completed.
FairMQMessagePtr msg(NewStaticMessage(fText));
LOG(info) << "Sending \"" << fText << "\"";
// in case of error or transfer interruption, return false to go to IDLE state
// successfull transfer will return number of bytes transfered (can be 0 if sending an empty message).
if (Send(msg, "data1") < 0) {
return false;
} else if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) {
LOG(info) << "Configured maximum number of iterations reached. Leaving RUNNING state.";
return false;
}
std::this_thread::sleep_for(std::chrono::seconds(1));
return true;
}
};
void addCustomOptions(bpo::options_description& options)
{
options.add_options()
("text", bpo::value<std::string>()->default_value("Hello"), "Text to send out")
("max-iterations", bpo::value<uint64_t>()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
}
std::unique_ptr<fair::mq::Device> getDevice(fair::mq::ProgOptions& /*config*/)
{
return std::make_unique<Sampler>();
}

61
examples/1-n-1/sink.cxx Normal file
View File

@@ -0,0 +1,61 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <fairmq/Device.h>
#include <fairmq/runDevice.h>
#include <string>
namespace bpo = boost::program_options;
class Sink : public FairMQDevice
{
public:
Sink()
: fMaxIterations(0)
, fNumIterations(0)
{
// register a handler for data arriving on "data2" channel
OnData("data2", &Sink::HandleData);
}
protected:
virtual void InitTask() override
{
// Get the fMaxIterations value from the command line options (via fConfig)
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
}
bool HandleData(FairMQMessagePtr& msg, int)
{
LOG(info) << "Received: \"" << std::string(static_cast<char*>(msg->GetData()), msg->GetSize()) << "\"";
if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) {
LOG(info) << "Configured maximum number of iterations reached. Leaving RUNNING state.";
return false;
}
// return true if want to be called again (otherwise return false go to IDLE state)
return true;
}
private:
uint64_t fMaxIterations;
uint64_t fNumIterations;
};
void addCustomOptions(bpo::options_description& options)
{
options.add_options()
("max-iterations", bpo::value<uint64_t>()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
}
std::unique_ptr<fair::mq::Device> getDevice(fair::mq::ProgOptions& /*config*/)
{
return std::make_unique<Sink>();
}

View File

@@ -34,7 +34,7 @@ set(FAIRMQ_BIN_DIR ${CMAKE_INSTALL_PREFIX}/${PROJECT_INSTALL_BINDIR}/fairmq)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-builtin-devices.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-builtin-devices.sh_install)
install(
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-builtin-devices.sh_install
DESTINATION ${PROJECT_INSTALL_BINDIR}
RENAME fairmq-start-ex-builtin-devices.sh
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-builtin-devices.sh_install
DESTINATION ${PROJECT_INSTALL_BINDIR}
RENAME fairmq-start-ex-builtin-devices.sh
)

View File

@@ -6,21 +6,12 @@
# copied verbatim in the file "LICENSE" #
################################################################################
add_library(ExampleCopyPushLib STATIC
"Sampler.cxx"
"Sampler.h"
"Sink.cxx"
"Sink.h"
)
target_link_libraries(ExampleCopyPushLib PUBLIC FairMQ)
add_executable(fairmq-ex-copypush-sampler sampler.cxx)
target_link_libraries(fairmq-ex-copypush-sampler PRIVATE FairMQ)
add_executable(fairmq-ex-copypush-sampler runSampler.cxx)
target_link_libraries(fairmq-ex-copypush-sampler PRIVATE ExampleCopyPushLib)
add_executable(fairmq-ex-copypush-sink runSink.cxx)
target_link_libraries(fairmq-ex-copypush-sink PRIVATE ExampleCopyPushLib)
add_executable(fairmq-ex-copypush-sink sink.cxx)
target_link_libraries(fairmq-ex-copypush-sink PRIVATE FairMQ)
add_custom_target(ExampleCopyPush DEPENDS fairmq-ex-copypush-sampler fairmq-ex-copypush-sink)
@@ -41,12 +32,12 @@ set_tests_properties(Example.CopyPush.shmem PROPERTIES TIMEOUT "30" RUN_SERIAL t
# install
install(
TARGETS
fairmq-ex-copypush-sampler
fairmq-ex-copypush-sink
TARGETS
fairmq-ex-copypush-sampler
fairmq-ex-copypush-sink
LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR}
RUNTIME DESTINATION ${PROJECT_INSTALL_BINDIR}
LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR}
RUNTIME DESTINATION ${PROJECT_INSTALL_BINDIR}
)
# configure run script with different executable paths for build and for install directories
@@ -55,7 +46,7 @@ set(FAIRMQ_BIN_DIR ${CMAKE_INSTALL_PREFIX}/${PROJECT_INSTALL_BINDIR}/fairmq)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-copypush.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-copypush.sh_install)
install(
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-copypush.sh_install
DESTINATION ${PROJECT_INSTALL_BINDIR}
RENAME fairmq-start-ex-copypush.sh
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-copypush.sh_install
DESTINATION ${PROJECT_INSTALL_BINDIR}
RENAME fairmq-start-ex-copypush.sh
)

View File

@@ -1,68 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* Sampler.cpp
*
* @since 2014-10-10
* @author A. Rybalchenko
*/
#include <thread> // this_thread::sleep_for
#include <chrono>
#include "Sampler.h"
using namespace std;
namespace example_copypush
{
Sampler::Sampler()
: fNumDataChannels(0)
, fCounter(0)
, fMaxIterations(0)
, fNumIterations(0)
{
}
void Sampler::InitTask()
{
fNumDataChannels = fChannels.at("data").size();
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
}
bool Sampler::ConditionalRun()
{
// NewSimpleMessage creates a copy of the data and takes care of its destruction (after the transfer takes place).
// Should only be used for small data because of the cost of an additional copy
FairMQMessagePtr msg(NewSimpleMessage(fCounter++));
for (int i = 0; i < fNumDataChannels - 1; ++i)
{
FairMQMessagePtr msgCopy(NewMessage());
msgCopy->Copy(*msg);
Send(msgCopy, "data", i);
}
Send(msg, "data", fNumDataChannels - 1);
if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations)
{
LOG(info) << "Configured maximum number of iterations reached. Leaving RUNNING state.";
return false;
}
this_thread::sleep_for(chrono::seconds(1));
return true;
}
Sampler::~Sampler()
{
}
} // namespace example_copypush

View File

@@ -1,43 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* Sampler.h
*
* @since 2014-10-10
* @author A. Rybalchenko
*/
#ifndef FAIRMQEXAMPLECOPYPUSHSAMPLER_H
#define FAIRMQEXAMPLECOPYPUSHSAMPLER_H
#include "FairMQDevice.h"
#include <stdint.h> // uint64_t
namespace example_copypush
{
class Sampler : public FairMQDevice
{
public:
Sampler();
virtual ~Sampler();
protected:
virtual void InitTask();
virtual bool ConditionalRun();
int fNumDataChannels;
uint64_t fCounter;
uint64_t fMaxIterations;
uint64_t fNumIterations;
};
} // namespace example_copypush
#endif /* FAIRMQEXAMPLECOPYPUSHSAMPLER_H */

View File

@@ -1,51 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* Sink.cxx
*
* @since 2014-10-10
* @author A. Rybalchenko
*/
#include "Sink.h"
namespace example_copypush
{
Sink::Sink()
: fMaxIterations(0)
, fNumIterations(0)
{
OnData("data", &Sink::HandleData);
}
void Sink::InitTask()
{
// Get the fMaxIterations value from the command line options (via fConfig)
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
}
bool Sink::HandleData(FairMQMessagePtr& msg, int /*index*/)
{
LOG(info) << "Received message: \"" << *(static_cast<uint64_t*>(msg->GetData())) << "\"";
if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations)
{
LOG(info) << "Configured maximum number of iterations reached. Leaving RUNNING state.";
return false;
}
// return true if want to be called again (otherwise go to IDLE state)
return true;
}
Sink::~Sink()
{
}
} // namespace example_copypush

View File

@@ -1,42 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* Sink.h
*
* @since 2014-10-10
* @author A. Rybalchenko
*/
#ifndef FAIRMQEXAMPLECOPYPUSHSINK_H
#define FAIRMQEXAMPLECOPYPUSHSINK_H
#include "FairMQDevice.h"
#include <stdint.h> // uint64_t
namespace example_copypush
{
class Sink : public FairMQDevice
{
public:
Sink();
virtual ~Sink();
protected:
virtual void InitTask();
bool HandleData(FairMQMessagePtr&, int);
private:
uint64_t fMaxIterations;
uint64_t fNumIterations;
};
} // namespace example_copypush
#endif /* FAIRMQEXAMPLECOPYPUSHSINK_H */

View File

@@ -1,23 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "runFairMQDevice.h"
#include "Sampler.h"
namespace bpo = boost::program_options;
void addCustomOptions(bpo::options_description& options)
{
options.add_options()
("max-iterations", bpo::value<uint64_t>()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
}
FairMQDevicePtr getDevice(const fair::mq::ProgOptions& /*config*/)
{
return new example_copypush::Sampler();
}

View File

@@ -1,23 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "runFairMQDevice.h"
#include "Sink.h"
namespace bpo = boost::program_options;
void addCustomOptions(bpo::options_description& options)
{
options.add_options()
("max-iterations", bpo::value<uint64_t>()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
}
FairMQDevicePtr getDevice(const fair::mq::ProgOptions& /*config*/)
{
return new example_copypush::Sink();
}

View File

@@ -0,0 +1,74 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <fairmq/Device.h>
#include <fairmq/runDevice.h>
#include <chrono>
#include <cstdint> // uint64_t
#include <thread> // this_thread::sleep_for
namespace bpo = boost::program_options;
class Sampler : public FairMQDevice
{
public:
Sampler()
: fNumDataChannels(0)
, fCounter(0)
, fMaxIterations(0)
, fNumIterations(0)
{}
protected:
void InitTask() override
{
fNumDataChannels = fChannels.at("data").size();
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
}
bool ConditionalRun() override
{
// NewSimpleMessage creates a copy of the data and takes care of its destruction (after the transfer takes place).
// Should only be used for small data because of the cost of an additional copy
FairMQMessagePtr msg(NewSimpleMessage(fCounter++));
for (int i = 0; i < fNumDataChannels - 1; ++i) {
FairMQMessagePtr msgCopy(NewMessage());
msgCopy->Copy(*msg);
Send(msgCopy, "data", i);
}
Send(msg, "data", fNumDataChannels - 1);
if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) {
LOG(info) << "Configured maximum number of iterations reached. Leaving RUNNING state.";
return false;
}
std::this_thread::sleep_for(std::chrono::seconds(1));
return true;
}
int fNumDataChannels;
uint64_t fCounter;
uint64_t fMaxIterations;
uint64_t fNumIterations;
};
void addCustomOptions(bpo::options_description& options)
{
options.add_options()
("max-iterations", bpo::value<uint64_t>()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
}
std::unique_ptr<fair::mq::Device> getDevice(fair::mq::ProgOptions& /*config*/)
{
return std::make_unique<Sampler>();
}

View File

@@ -0,0 +1,60 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <fairmq/Device.h>
#include <fairmq/runDevice.h>
#include <cstdint> // uint64_t
namespace bpo = boost::program_options;
class Sink : public FairMQDevice
{
public:
Sink()
: fMaxIterations(0)
, fNumIterations(0)
{
OnData("data", &Sink::HandleData);
}
protected:
void InitTask() override
{
// Get the fMaxIterations value from the command line options (via fConfig)
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
}
bool HandleData(FairMQMessagePtr& msg, int)
{
LOG(info) << "Received message: \"" << *(static_cast<uint64_t*>(msg->GetData())) << "\"";
if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) {
LOG(info) << "Configured maximum number of iterations reached. Leaving RUNNING state.";
return false;
}
// return true if want to be called again (otherwise go to IDLE state)
return true;
}
private:
uint64_t fMaxIterations;
uint64_t fNumIterations;
};
void addCustomOptions(bpo::options_description& options)
{
options.add_options()
("max-iterations", bpo::value<uint64_t>()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
}
std::unique_ptr<fair::mq::Device> getDevice(fair::mq::ProgOptions& /*config*/)
{
return std::make_unique<Sink>();
}

View File

@@ -6,25 +6,14 @@
# copied verbatim in the file "LICENSE" #
################################################################################
add_library(ExampleDDSLib STATIC
"Sampler.cxx"
"Sampler.h"
"Processor.cxx"
"Processor.h"
"Sink.cxx"
"Sink.h"
)
add_executable(fairmq-ex-dds-sampler sampler.cxx)
target_link_libraries(fairmq-ex-dds-sampler PRIVATE FairMQ)
target_link_libraries(ExampleDDSLib PUBLIC FairMQ)
add_executable(fairmq-ex-dds-processor processor.cxx)
target_link_libraries(fairmq-ex-dds-processor PRIVATE FairMQ)
add_executable(fairmq-ex-dds-sampler runSampler.cxx)
target_link_libraries(fairmq-ex-dds-sampler PRIVATE ExampleDDSLib)
add_executable(fairmq-ex-dds-processor runProcessor.cxx)
target_link_libraries(fairmq-ex-dds-processor PRIVATE ExampleDDSLib)
add_executable(fairmq-ex-dds-sink runSink.cxx)
target_link_libraries(fairmq-ex-dds-sink PRIVATE ExampleDDSLib)
add_executable(fairmq-ex-dds-sink sink.cxx)
target_link_libraries(fairmq-ex-dds-sink PRIVATE FairMQ)
add_custom_target(ExampleDDS DEPENDS fairmq-ex-dds-sampler fairmq-ex-dds-processor fairmq-ex-dds-sink)

View File

@@ -1,52 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "Processor.h"
#include "FairMQLogger.h"
using namespace std;
namespace example_dds
{
Processor::Processor()
{
OnData("data1", &Processor::HandleData);
}
bool Processor::HandleData(FairMQMessagePtr& msg, int /*index*/)
{
LOG(info) << "Received data, processing...";
// Modify the received string
string* text = new std::string(static_cast<char*>(msg->GetData()), msg->GetSize());
*text += " (modified by " + fId + ")";
// create message object with a pointer to the data buffer,
// its size,
// custom deletion function (called when transfer is done),
// and pointer to the object managing the data buffer
FairMQMessagePtr msg2(NewMessage(const_cast<char*>(text->c_str()),
text->length(),
[](void* /*data*/, void* object) { delete static_cast<string*>(object); },
text));
// Send out the output message
if (Send(msg2, "data2") < 0)
{
return false;
}
return true;
}
Processor::~Processor()
{
}
} // namespace example_dds

View File

@@ -1,29 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#ifndef FAIRMQEXAMPLEDDSPROCESSOR_H
#define FAIRMQEXAMPLEDDSPROCESSOR_H
#include "FairMQDevice.h"
namespace example_dds
{
class Processor : public FairMQDevice
{
public:
Processor();
virtual ~Processor();
protected:
bool HandleData(FairMQMessagePtr&, int);
};
}
#endif /* FAIRMQEXAMPLEDDSPROCESSOR_H */

View File

@@ -1,64 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* Sampler.cpp
*
* @since 2014-10-10
* @author A. Rybalchenko
*/
#include <thread> // this_thread::sleep_for
#include <chrono>
#include "Sampler.h"
using namespace std;
namespace example_dds
{
Sampler::Sampler()
{
}
void Sampler::InitTask()
{
fIterations = fConfig->GetValue<uint64_t>("iterations");
fCounter = 0;
}
bool Sampler::ConditionalRun()
{
// NewSimpleMessage creates a copy of the data and takes care of its destruction (after the transfer takes place).
// Should only be used for small data because of the cost of an additional copy
FairMQMessagePtr msg(NewSimpleMessage("Data"));
LOG(info) << "Sending \"Data\"";
// in case of error or transfer interruption, return false to go to IDLE state
// successfull transfer will return number of bytes transfered (can be 0 if sending an empty message).
if (Send(msg, "data1") < 0) {
return false;
}
if (fIterations > 0) {
++fCounter;
if (fCounter >= fIterations) {
LOG(info) << "Sent " << fCounter << " messages. Finished.";
return false;
}
}
return true;
}
Sampler::~Sampler()
{
}
}

View File

@@ -1,40 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* Sampler.h
*
* @since 2014-10-10
* @author A. Rybalchenko
*/
#ifndef FAIRMQEXAMPLEDDSSAMPLER_H
#define FAIRMQEXAMPLEDDSSAMPLER_H
#include "FairMQDevice.h"
namespace example_dds
{
class Sampler : public FairMQDevice
{
public:
Sampler();
virtual ~Sampler();
void InitTask() override;
protected:
bool ConditionalRun() override;
private:
uint64_t fIterations;
uint64_t fCounter;
};
} // namespace example_dds
#endif /* FAIRMQEXAMPLEDDSSAMPLER_H */

View File

@@ -1,54 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* Sink.cxx
*
* @since 2014-10-10
* @author A. Rybalchenko
*/
#include "Sink.h"
using namespace std;
namespace example_dds
{
Sink::Sink()
{
// register a handler for data arriving on "data2" channel
OnData("data2", &Sink::HandleData);
}
void Sink::InitTask()
{
fIterations = fConfig->GetValue<uint64_t>("iterations");
fCounter = 0;
}
// handler is called whenever a message arrives on "data2", with a reference to the message and a sub-channel index (here 0)
bool Sink::HandleData(FairMQMessagePtr& msg, int /*index*/)
{
LOG(info) << "Received: \"" << string(static_cast<char*>(msg->GetData()), msg->GetSize()) << "\"";
if (fIterations > 0) {
++fCounter;
if (fCounter >= fIterations) {
LOG(info) << "Received " << fCounter << " messages. Finished.";
return false;
}
}
// return true if want to be called again (otherwise go to IDLE state)
return true;
}
Sink::~Sink()
{
}
} // namespace example_dds

View File

@@ -1,40 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* Sink.h
*
* @since 2014-10-10
* @author A. Rybalchenko
*/
#ifndef FAIRMQEXAMPLEDDSSINK_H
#define FAIRMQEXAMPLEDDSSINK_H
#include "FairMQDevice.h"
namespace example_dds
{
class Sink : public FairMQDevice
{
public:
Sink();
virtual ~Sink();
void InitTask() override;
protected:
bool HandleData(FairMQMessagePtr&, int);
private:
uint64_t fIterations;
uint64_t fCounter;
};
} // namespace example_dds
#endif /* FAIRMQEXAMPLEDDSSINK_H */

View File

@@ -0,0 +1,58 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <fairmq/Device.h>
#include <fairmq/runDevice.h>
#include <string>
namespace bpo = boost::program_options;
class Processor : public FairMQDevice
{
public:
Processor()
{
OnData("data1", &Processor::HandleData);
}
protected:
bool HandleData(FairMQMessagePtr& msg, int)
{
LOG(info) << "Received data, processing...";
// Modify the received string
std::string* text = new std::string(static_cast<char*>(msg->GetData()), msg->GetSize());
*text += " (modified by " + fId + ")";
// create message object with a pointer to the data buffer,
// its size,
// custom deletion function (called when transfer is done),
// and pointer to the object managing the data buffer
FairMQMessagePtr msg2(NewMessage(const_cast<char*>(text->c_str()),
text->length(),
[](void* /*data*/, void* object) { delete static_cast<std::string*>(object); },
text));
// Send out the output message
if (Send(msg2, "data2") < 0) {
return false;
}
return true;
}
};
void addCustomOptions(bpo::options_description& /*options*/)
{
}
std::unique_ptr<fair::mq::Device> getDevice(fair::mq::ProgOptions& /*config*/)
{
return std::make_unique<Processor>();
}

View File

@@ -1,21 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "runFairMQDevice.h"
#include "Processor.h"
namespace bpo = boost::program_options;
void addCustomOptions(bpo::options_description& /*options*/)
{
}
FairMQDevicePtr getDevice(const fair::mq::ProgOptions& /*config*/)
{
return new example_dds::Processor();
}

View File

@@ -1,25 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "runFairMQDevice.h"
#include "Sampler.h"
namespace bpo = boost::program_options;
void addCustomOptions(bpo::options_description& options)
{
options.add_options()(
"iterations,i",
bpo::value<uint64_t>()->default_value(1000),
"Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
}
FairMQDevicePtr getDevice(const fair::mq::ProgOptions& /*config*/)
{
return new example_dds::Sampler();
}

View File

@@ -1,25 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "runFairMQDevice.h"
#include "Sink.h"
namespace bpo = boost::program_options;
void addCustomOptions(bpo::options_description& options)
{
options.add_options()(
"iterations,i",
bpo::value<uint64_t>()->default_value(1000),
"Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
}
FairMQDevicePtr getDevice(const fair::mq::ProgOptions& /*config*/)
{
return new example_dds::Sink();
}

71
examples/dds/sampler.cxx Normal file
View File

@@ -0,0 +1,71 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <fairmq/Device.h>
#include <fairmq/runDevice.h>
#include <cstdint> // uint64_t
namespace bpo = boost::program_options;
class Sampler : public FairMQDevice
{
public:
Sampler()
: fIterations(0)
, fCounter(0)
{}
void InitTask() override
{
fIterations = fConfig->GetValue<uint64_t>("iterations");
}
protected:
bool ConditionalRun() override
{
// NewSimpleMessage creates a copy of the data and takes care of its destruction (after the transfer takes place).
// Should only be used for small data because of the cost of an additional copy
FairMQMessagePtr msg(NewSimpleMessage("Data"));
LOG(info) << "Sending \"Data\"";
// in case of error or transfer interruption, return false to go to IDLE state
// successfull transfer will return number of bytes transfered (can be 0 if sending an empty message).
if (Send(msg, "data1") < 0) {
return false;
}
if (fIterations > 0) {
++fCounter;
if (fCounter >= fIterations) {
LOG(info) << "Sent " << fCounter << " messages. Finished.";
return false;
}
}
return true;
}
private:
uint64_t fIterations;
uint64_t fCounter;
};
void addCustomOptions(bpo::options_description& options)
{
options.add_options()(
"iterations,i",
bpo::value<uint64_t>()->default_value(1000),
"Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
}
std::unique_ptr<fair::mq::Device> getDevice(fair::mq::ProgOptions& /*config*/)
{
return std::make_unique<Sampler>();
}

62
examples/dds/sink.cxx Normal file
View File

@@ -0,0 +1,62 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <fairmq/Device.h>
#include <fairmq/runDevice.h>
#include <string>
namespace bpo = boost::program_options;
class Sink : public FairMQDevice
{
public:
Sink()
: fIterations(0)
, fCounter(0)
{
OnData("data2", &Sink::HandleData);
}
void InitTask() override
{
fIterations = fConfig->GetValue<uint64_t>("iterations");
}
protected:
bool HandleData(FairMQMessagePtr& msg, int)
{
LOG(info) << "Received: \"" << std::string(static_cast<char*>(msg->GetData()), msg->GetSize()) << "\"";
if (fIterations > 0) {
++fCounter;
if (fCounter >= fIterations) {
LOG(info) << "Received " << fCounter << " messages. Finished.";
return false;
}
}
// return true if want to be called again (otherwise go to IDLE state)
return true;
}
private:
uint64_t fIterations;
uint64_t fCounter;
};
void addCustomOptions(bpo::options_description& options)
{
options.add_options()(
"iterations,i",
bpo::value<uint64_t>()->default_value(1000),
"Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
}
std::unique_ptr<fair::mq::Device> getDevice(fair::mq::ProgOptions& /*config*/)
{
return std::make_unique<Sink>();
}

View File

@@ -6,20 +6,11 @@
# copied verbatim in the file "LICENSE" #
################################################################################
add_library(ExampleMultipartLib STATIC
"Sampler.cxx"
"Sampler.h"
"Sink.cxx"
"Sink.h"
)
add_executable(fairmq-ex-multipart-sampler sampler.cxx)
target_link_libraries(fairmq-ex-multipart-sampler PRIVATE FairMQ)
target_link_libraries(ExampleMultipartLib PUBLIC FairMQ)
add_executable(fairmq-ex-multipart-sampler runSampler.cxx)
target_link_libraries(fairmq-ex-multipart-sampler PRIVATE ExampleMultipartLib)
add_executable(fairmq-ex-multipart-sink runSink.cxx)
target_link_libraries(fairmq-ex-multipart-sink PRIVATE ExampleMultipartLib)
add_executable(fairmq-ex-multipart-sink sink.cxx)
target_link_libraries(fairmq-ex-multipart-sink PRIVATE FairMQ)
add_custom_target(ExampleMultipart DEPENDS fairmq-ex-multipart-sampler fairmq-ex-multipart-sink)
@@ -45,12 +36,12 @@ endif()
# install
install(
TARGETS
fairmq-ex-multipart-sampler
fairmq-ex-multipart-sink
TARGETS
fairmq-ex-multipart-sampler
fairmq-ex-multipart-sink
LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR}
RUNTIME DESTINATION ${PROJECT_INSTALL_BINDIR}
LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR}
RUNTIME DESTINATION ${PROJECT_INSTALL_BINDIR}
)
# configure run script with different executable paths for build and for install directories
@@ -59,7 +50,7 @@ set(FAIRMQ_BIN_DIR ${CMAKE_INSTALL_PREFIX}/${PROJECT_INSTALL_BINDIR}/fairmq)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-multipart.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-multipart.sh_install)
install(
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-multipart.sh_install
DESTINATION ${PROJECT_INSTALL_BINDIR}
RENAME fairmq-start-ex-multipart.sh
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-multipart.sh_install
DESTINATION ${PROJECT_INSTALL_BINDIR}
RENAME fairmq-start-ex-multipart.sh
)

View File

@@ -9,6 +9,8 @@
#ifndef FAIRMQEXAMPLEMULTIPARTHEADER_H
#define FAIRMQEXAMPLEMULTIPARTHEADER_H
#include <cstdint>
namespace example_multipart
{
@@ -19,4 +21,4 @@ struct Header
}
#endif /* FAIRMQEXAMPLEMULTIPARTHEADER_H */
#endif /* FAIRMQEXAMPLEMULTIPARTHEADER_H */

View File

@@ -1,86 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* Sampler.cpp
*
* @since 2014-10-10
* @author A. Rybalchenko
*/
#include <thread> // this_thread::sleep_for
#include <chrono>
#include "Sampler.h"
#include "Header.h"
using namespace std;
namespace example_multipart
{
Sampler::Sampler()
: fMaxIterations(5)
, fNumIterations(0)
{
}
void Sampler::InitTask()
{
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
}
bool Sampler::ConditionalRun()
{
Header header;
header.stopFlag = 0;
// Set stopFlag to 1 for last message.
if (fMaxIterations > 0 && fNumIterations == fMaxIterations - 1) {
header.stopFlag = 1;
}
LOG(info) << "Sending header with stopFlag: " << header.stopFlag;
FairMQParts parts;
// NewSimpleMessage creates a copy of the data and takes care of its destruction (after the transfer takes place).
// Should only be used for small data because of the cost of an additional copy
parts.AddPart(NewSimpleMessage(header));
parts.AddPart(NewMessage(1000));
// create more data parts, testing the FairMQParts in-place constructor
FairMQParts auxData{ NewMessage(500), NewMessage(600), NewMessage(700) };
assert(auxData.Size() == 3);
parts.AddPart(std::move(auxData));
assert(auxData.Size() == 0);
assert(parts.Size() == 5);
parts.AddPart(NewMessage());
assert(parts.Size() == 6);
parts.AddPart(NewMessage(100));
assert(parts.Size() == 7);
LOG(info) << "Sending body of size: " << parts.At(1)->GetSize();
Send(parts, "data");
// Go out of the sending loop if the stopFlag was sent.
if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) {
LOG(info) << "Configured maximum number of iterations reached. Leaving RUNNING state.";
return false;
}
// Wait a second to keep the output readable.
this_thread::sleep_for(chrono::seconds(1));
return true;
}
} // namespace example_multipart

View File

@@ -1,40 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* Sampler.h
*
* @since 2014-10-10
* @author A. Rybalchenko
*/
#ifndef FAIRMQEXAMPLEDDSSAMPLER_H
#define FAIRMQEXAMPLEDDSSAMPLER_H
#include "FairMQDevice.h"
namespace example_multipart
{
class Sampler : public FairMQDevice
{
public:
Sampler();
virtual ~Sampler() {}
protected:
virtual void InitTask();
virtual bool ConditionalRun();
private:
uint64_t fMaxIterations;
uint64_t fNumIterations;
};
} // namespace example_multipart
#endif /* FAIRMQEXAMPLEDDSSAMPLER_H */

View File

@@ -1,52 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* Sink.cxx
*
* @since 2014-10-10
* @author A. Rybalchenko
*/
#include "Sink.h"
#include "Header.h"
using namespace std;
namespace example_multipart
{
bool Sink::HandleData(FairMQParts& parts, int /*index*/)
{
LOG(info) << "Received message with " << parts.Size() << " parts";
Header header;
header.stopFlag = (static_cast<Header*>(parts.At(0)->GetData()))->stopFlag;
LOG(info) << "Received part 1 (header) with stopFlag: " << header.stopFlag;
LOG(info) << "Received part 2 of size: " << parts.At(1)->GetSize();
assert(parts.At(1)->GetSize() == 1000);
LOG(info) << "Received part 3 of size: " << parts.At(2)->GetSize();
assert(parts.At(2)->GetSize() == 500);
LOG(info) << "Received part 4 of size: " << parts.At(3)->GetSize();
assert(parts.At(3)->GetSize() == 600);
LOG(info) << "Received part 5 of size: " << parts.At(4)->GetSize();
assert(parts.At(4)->GetSize() == 700);
LOG(info) << "Received part 6 of size: " << parts.At(5)->GetSize();
assert(parts.At(5)->GetSize() == 0);
LOG(info) << "Received part 7 of size: " << parts.At(6)->GetSize();
assert(parts.At(6)->GetSize() == 100);
if (header.stopFlag == 1) {
LOG(info) << "stopFlag is 1, going IDLE";
return false;
}
return true;
}
}

View File

@@ -1,34 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* Sink.h
*
* @since 2014-10-10
* @author A. Rybalchenko
*/
#ifndef FAIRMQEXAMPLEMULTIPARTSINK_H
#define FAIRMQEXAMPLEMULTIPARTSINK_H
#include "FairMQDevice.h"
namespace example_multipart
{
class Sink : public FairMQDevice
{
public:
Sink() { OnData("data", &Sink::HandleData); }
virtual ~Sink() {}
protected:
bool HandleData(FairMQParts&, int);
};
}
#endif /* FAIRMQEXAMPLEMULTIPARTSINK_H */

View File

@@ -1,23 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "runFairMQDevice.h"
#include "Sampler.h"
namespace bpo = boost::program_options;
void addCustomOptions(bpo::options_description& options)
{
options.add_options()
("max-iterations", bpo::value<uint64_t>()->default_value(5), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
}
FairMQDevicePtr getDevice(const fair::mq::ProgOptions& /*config*/)
{
return new example_multipart::Sampler();
}

View File

@@ -0,0 +1,97 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "Header.h"
#include <fairmq/Device.h>
#include <fairmq/runDevice.h>
#include <chrono>
#include <cstdint>
#include <thread> // this_thread::sleep_for
namespace bpo = boost::program_options;
class Sampler : public FairMQDevice
{
public:
Sampler()
: fMaxIterations(5)
, fNumIterations(0)
{}
protected:
void InitTask() override
{
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
}
bool ConditionalRun() override
{
example_multipart::Header header;
header.stopFlag = 0;
// Set stopFlag to 1 for last message.
if (fMaxIterations > 0 && fNumIterations == fMaxIterations - 1) {
header.stopFlag = 1;
}
LOG(info) << "Sending header with stopFlag: " << header.stopFlag;
FairMQParts parts;
// NewSimpleMessage creates a copy of the data and takes care of its destruction (after the transfer takes place).
// Should only be used for small data because of the cost of an additional copy
parts.AddPart(NewSimpleMessage(header));
parts.AddPart(NewMessage(1000));
// create more data parts, testing the FairMQParts in-place constructor
FairMQParts auxData{ NewMessage(500), NewMessage(600), NewMessage(700) };
assert(auxData.Size() == 3);
parts.AddPart(std::move(auxData));
assert(auxData.Size() == 0);
assert(parts.Size() == 5);
parts.AddPart(NewMessage());
assert(parts.Size() == 6);
parts.AddPart(NewMessage(100));
assert(parts.Size() == 7);
LOG(info) << "Sending body of size: " << parts.At(1)->GetSize();
Send(parts, "data");
// Go out of the sending loop if the stopFlag was sent.
if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) {
LOG(info) << "Configured maximum number of iterations reached. Leaving RUNNING state.";
return false;
}
// Wait a second to keep the output readable.
std::this_thread::sleep_for(std::chrono::seconds(1));
return true;
}
private:
uint64_t fMaxIterations;
uint64_t fNumIterations;
};
void addCustomOptions(bpo::options_description& options)
{
options.add_options()
("max-iterations", bpo::value<uint64_t>()->default_value(5), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
}
std::unique_ptr<fair::mq::Device> getDevice(fair::mq::ProgOptions& /*config*/)
{
return std::make_unique<Sampler>();
}

View File

@@ -0,0 +1,63 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "Header.h"
#include <fairmq/Device.h>
#include <fairmq/runDevice.h>
namespace bpo = boost::program_options;
class Sink : public FairMQDevice
{
public:
Sink()
{
OnData("data", &Sink::HandleData);
}
protected:
bool HandleData(FairMQParts& parts, int)
{
LOG(info) << "Received message with " << parts.Size() << " parts";
example_multipart::Header header;
header.stopFlag = (static_cast<example_multipart::Header*>(parts.At(0)->GetData()))->stopFlag;
LOG(info) << "Received part 1 (header) with stopFlag: " << header.stopFlag;
LOG(info) << "Received part 2 of size: " << parts.At(1)->GetSize();
assert(parts.At(1)->GetSize() == 1000);
LOG(info) << "Received part 3 of size: " << parts.At(2)->GetSize();
assert(parts.At(2)->GetSize() == 500);
LOG(info) << "Received part 4 of size: " << parts.At(3)->GetSize();
assert(parts.At(3)->GetSize() == 600);
LOG(info) << "Received part 5 of size: " << parts.At(4)->GetSize();
assert(parts.At(4)->GetSize() == 700);
LOG(info) << "Received part 6 of size: " << parts.At(5)->GetSize();
assert(parts.At(5)->GetSize() == 0);
LOG(info) << "Received part 7 of size: " << parts.At(6)->GetSize();
assert(parts.At(6)->GetSize() == 100);
if (header.stopFlag == 1) {
LOG(info) << "stopFlag is 1, going IDLE";
return false;
}
return true;
}
};
void addCustomOptions(bpo::options_description& /*options*/)
{
}
std::unique_ptr<fair::mq::Device> getDevice(fair::mq::ProgOptions& /*config*/)
{
return std::make_unique<Sink>();
}

View File

@@ -1,48 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* Broadcaster.cpp
*
* @since 2014-10-10
* @author A. Rybalchenko
*/
#include <thread> // this_thread::sleep_for
#include <chrono>
#include "Broadcaster.h"
using namespace std;
namespace example_multiple_channels
{
Broadcaster::Broadcaster()
{
}
bool Broadcaster::ConditionalRun()
{
this_thread::sleep_for(chrono::seconds(1));
// NewSimpleMessage creates a copy of the data and takes care of its destruction (after the transfer takes place).
// Should only be used for small data because of the cost of an additional copy
FairMQMessagePtr msg(NewSimpleMessage("OK"));
LOG(info) << "Sending OK";
Send(msg, "broadcast");
return true;
}
Broadcaster::~Broadcaster()
{
}
}

View File

@@ -1,35 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* Broadcaster.h
*
* @since 2014-10-10
* @author A. Rybalchenko
*/
#ifndef FAIRMQEXAMPLEMULTIPLECHANNELSBROADCASTER_H
#define FAIRMQEXAMPLEMULTIPLECHANNELSBROADCASTER_H
#include "FairMQDevice.h"
namespace example_multiple_channels
{
class Broadcaster : public FairMQDevice
{
public:
Broadcaster();
virtual ~Broadcaster();
protected:
virtual bool ConditionalRun();
};
}
#endif /* FAIRMQEXAMPLEMULTIPLECHANNELSBROADCASTER_H */

View File

@@ -6,25 +6,14 @@
# copied verbatim in the file "LICENSE" #
################################################################################
add_library(ExampleMultipleChannelsLib STATIC
"Sampler.cxx"
"Sampler.h"
"Broadcaster.cxx"
"Broadcaster.h"
"Sink.cxx"
"Sink.h"
)
add_executable(fairmq-ex-multiple-channels-sampler sampler.cxx)
target_link_libraries(fairmq-ex-multiple-channels-sampler PRIVATE FairMQ)
target_link_libraries(ExampleMultipleChannelsLib PUBLIC FairMQ)
add_executable(fairmq-ex-multiple-channels-broadcaster broadcaster.cxx)
target_link_libraries(fairmq-ex-multiple-channels-broadcaster PRIVATE FairMQ)
add_executable(fairmq-ex-multiple-channels-sampler runSampler.cxx)
target_link_libraries(fairmq-ex-multiple-channels-sampler PRIVATE ExampleMultipleChannelsLib)
add_executable(fairmq-ex-multiple-channels-broadcaster runBroadcaster.cxx)
target_link_libraries(fairmq-ex-multiple-channels-broadcaster PRIVATE ExampleMultipleChannelsLib)
add_executable(fairmq-ex-multiple-channels-sink runSink.cxx)
target_link_libraries(fairmq-ex-multiple-channels-sink PRIVATE ExampleMultipleChannelsLib)
add_executable(fairmq-ex-multiple-channels-sink sink.cxx)
target_link_libraries(fairmq-ex-multiple-channels-sink PRIVATE FairMQ)
add_custom_target(ExampleMultipleChannels DEPENDS fairmq-ex-multiple-channels-sampler fairmq-ex-multiple-channels-broadcaster fairmq-ex-multiple-channels-sink)
@@ -42,13 +31,13 @@ set_tests_properties(Example.MultipleChannels.zeromq PROPERTIES TIMEOUT "30" RUN
# install
install(
TARGETS
fairmq-ex-multiple-channels-sampler
fairmq-ex-multiple-channels-broadcaster
fairmq-ex-multiple-channels-sink
TARGETS
fairmq-ex-multiple-channels-sampler
fairmq-ex-multiple-channels-broadcaster
fairmq-ex-multiple-channels-sink
LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR}
RUNTIME DESTINATION ${PROJECT_INSTALL_BINDIR}
LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR}
RUNTIME DESTINATION ${PROJECT_INSTALL_BINDIR}
)
# configure run script with different executable paths for build and for install directories
@@ -57,7 +46,7 @@ set(FAIRMQ_BIN_DIR ${CMAKE_INSTALL_PREFIX}/${PROJECT_INSTALL_BINDIR}/fairmq)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-multiple-channels.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-multiple-channels.sh_install)
install(
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-multiple-channels.sh_install
DESTINATION ${PROJECT_INSTALL_BINDIR}
RENAME fairmq-start-ex-multiple-channels.sh
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-multiple-channels.sh_install
DESTINATION ${PROJECT_INSTALL_BINDIR}
RENAME fairmq-start-ex-multiple-channels.sh
)

View File

@@ -1,82 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* Sampler.cpp
*
* @since 2014-10-10
* @author A. Rybalchenko
*/
#include <memory> // unique_ptr
#include <thread> // this_thread::sleep_for
#include <chrono>
#include "Sampler.h"
#include "FairMQPoller.h"
using namespace std;
namespace example_multiple_channels
{
Sampler::Sampler()
: fText()
, fMaxIterations(0)
, fNumIterations(0)
{
}
void Sampler::InitTask()
{
fText = fConfig->GetProperty<string>("text");
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
}
void Sampler::Run()
{
FairMQPollerPtr poller(NewPoller("data", "broadcast"));
while (!NewStatePending())
{
poller->Poll(100);
if (poller->CheckInput("broadcast", 0))
{
FairMQMessagePtr msg(NewMessage());
if (Receive(msg, "broadcast") > 0)
{
LOG(info) << "Received broadcast: \"" << string(static_cast<char*>(msg->GetData()), msg->GetSize()) << "\"";
}
}
if (poller->CheckOutput("data", 0))
{
this_thread::sleep_for(chrono::seconds(1));
FairMQMessagePtr msg(NewSimpleMessage(fText));
if (Send(msg, "data") > 0)
{
LOG(info) << "Sent \"" << fText << "\"";
if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations)
{
LOG(info) << "Configured maximum number of iterations reached. Leaving RUNNING state.";
break;
}
}
}
}
}
Sampler::~Sampler()
{
}
}

View File

@@ -1,42 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* Sampler.h
*
* @since 2014-10-10
* @author A. Rybalchenko
*/
#ifndef FAIRMQEXAMPLEMULTIPLECHANNELSSAMPLER_H
#define FAIRMQEXAMPLEMULTIPLECHANNELSSAMPLER_H
#include <string>
#include "FairMQDevice.h"
namespace example_multiple_channels
{
class Sampler : public FairMQDevice
{
public:
Sampler();
virtual ~Sampler();
protected:
std::string fText;
uint64_t fMaxIterations;
uint64_t fNumIterations;
virtual void Run();
virtual void InitTask();
};
}
#endif /* FAIRMQEXAMPLEMULTIPLECHANNELSSAMPLER_H */

View File

@@ -1,71 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* Sink.cxx
*
* @since 2014-10-10
* @author A. Rybalchenko
*/
#include "Sink.h"
using namespace std;
namespace example_multiple_channels
{
Sink::Sink()
: fReceivedData(false)
, fReceivedBroadcast(false)
, fMaxIterations(0)
, fNumIterations(0)
{
OnData("broadcast", &Sink::HandleBroadcast);
OnData("data", &Sink::HandleData);
}
void Sink::InitTask()
{
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
}
bool Sink::HandleBroadcast(FairMQMessagePtr& msg, int /*index*/)
{
LOG(info) << "Received broadcast: \"" << string(static_cast<char*>(msg->GetData()), msg->GetSize()) << "\"";
fReceivedBroadcast = true;
return CheckIterations();
}
bool Sink::HandleData(FairMQMessagePtr& msg, int /*index*/)
{
LOG(info) << "Received message: \"" << string(static_cast<char*>(msg->GetData()), msg->GetSize()) << "\"";
fReceivedData = true;
return CheckIterations();
}
bool Sink::CheckIterations()
{
if (fMaxIterations > 0)
{
if (fReceivedData && fReceivedBroadcast && ++fNumIterations >= fMaxIterations)
{
LOG(info) << "Configured maximum number of iterations reached & Received messages from both sources. Leaving RUNNING state.";
return false;
}
}
return true;
}
Sink::~Sink()
{
}
}

View File

@@ -1,44 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* Sink.h
*
* @since 2014-10-10
* @author A. Rybalchenko
*/
#ifndef FAIRMQEXAMPLEMULTIPLECHANNELSSINK_H
#define FAIRMQEXAMPLEMULTIPLECHANNELSSINK_H
#include "FairMQDevice.h"
namespace example_multiple_channels
{
class Sink : public FairMQDevice
{
public:
Sink();
virtual ~Sink();
protected:
bool HandleBroadcast(FairMQMessagePtr&, int);
bool HandleData(FairMQMessagePtr&, int);
bool CheckIterations();
virtual void InitTask();
private:
bool fReceivedData;
bool fReceivedBroadcast;
uint64_t fMaxIterations;
uint64_t fNumIterations;
};
}
#endif /* FAIRMQEXAMPLEMULTIPLECHANNELSSINK_H */

View File

@@ -0,0 +1,46 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <fairmq/Device.h>
#include <fairmq/runDevice.h>
#include <thread> // this_thread::sleep_for
#include <chrono>
namespace bpo = boost::program_options;
class Broadcaster : public FairMQDevice
{
public:
Broadcaster() {}
protected:
bool ConditionalRun() override
{
std::this_thread::sleep_for(std::chrono::seconds(1));
// NewSimpleMessage creates a copy of the data and takes care of its destruction (after the transfer takes place).
// Should only be used for small data because of the cost of an additional copy
FairMQMessagePtr msg(NewSimpleMessage("OK"));
LOG(info) << "Sending OK";
Send(msg, "broadcast");
return true;
}
};
void addCustomOptions(bpo::options_description& /*options*/)
{
}
std::unique_ptr<fair::mq::Device> getDevice(fair::mq::ProgOptions& /*config*/)
{
return std::make_unique<Broadcaster>();
}

View File

@@ -1,21 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "runFairMQDevice.h"
#include "Broadcaster.h"
namespace bpo = boost::program_options;
void addCustomOptions(bpo::options_description& /*options*/)
{
}
FairMQDevicePtr getDevice(const fair::mq::ProgOptions& /*config*/)
{
return new example_multiple_channels::Broadcaster();
}

View File

@@ -1,24 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "runFairMQDevice.h"
#include "Sampler.h"
namespace bpo = boost::program_options;
void addCustomOptions(bpo::options_description& options)
{
options.add_options()
("text", bpo::value<std::string>()->default_value("Hello"), "Text to send out")
("max-iterations", bpo::value<uint64_t>()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
}
FairMQDevicePtr getDevice(const fair::mq::ProgOptions& /*config*/)
{
return new example_multiple_channels::Sampler();
}

View File

@@ -1,23 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "runFairMQDevice.h"
#include "Sink.h"
namespace bpo = boost::program_options;
void addCustomOptions(bpo::options_description& options)
{
options.add_options()
("max-iterations", bpo::value<uint64_t>()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
}
FairMQDevicePtr getDevice(const fair::mq::ProgOptions& /*config*/)
{
return new example_multiple_channels::Sink();
}

View File

@@ -0,0 +1,81 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <fairmq/Device.h>
#include <FairMQPoller.h>
#include <fairmq/runDevice.h>
#include <chrono>
#include <string>
#include <thread> // this_thread::sleep_for
namespace bpo = boost::program_options;
class Sampler : public FairMQDevice
{
public:
Sampler()
: fMaxIterations(0)
, fNumIterations(0)
{}
void InitTask() override
{
fText = fConfig->GetProperty<std::string>("text");
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
}
void Run() override
{
FairMQPollerPtr poller(NewPoller("data", "broadcast"));
while (!NewStatePending()) {
poller->Poll(100);
if (poller->CheckInput("broadcast", 0)) {
FairMQMessagePtr msg(NewMessage());
if (Receive(msg, "broadcast") > 0) {
LOG(info) << "Received broadcast: \"" << std::string(static_cast<char*>(msg->GetData()), msg->GetSize()) << "\"";
}
}
if (poller->CheckOutput("data", 0)) {
std::this_thread::sleep_for(std::chrono::seconds(1));
FairMQMessagePtr msg(NewSimpleMessage(fText));
if (Send(msg, "data") > 0) {
LOG(info) << "Sent \"" << fText << "\"";
if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) {
LOG(info) << "Configured maximum number of iterations reached. Leaving RUNNING state.";
break;
}
}
}
}
}
protected:
std::string fText;
uint64_t fMaxIterations;
uint64_t fNumIterations;
};
void addCustomOptions(bpo::options_description& options)
{
options.add_options()
("text", bpo::value<std::string>()->default_value("Hello"), "Text to send out")
("max-iterations", bpo::value<uint64_t>()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
}
std::unique_ptr<fair::mq::Device> getDevice(fair::mq::ProgOptions& /*config*/)
{
return std::make_unique<Sampler>();
}

View File

@@ -0,0 +1,79 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <fairmq/Device.h>
#include <fairmq/runDevice.h>
#include <cstdint> // uint64_t
#include <string>
namespace bpo = boost::program_options;
class Sink : public FairMQDevice
{
public:
Sink()
: fReceivedData(false)
, fReceivedBroadcast(false)
, fMaxIterations(0)
, fNumIterations(0)
{
OnData("broadcast", &Sink::HandleBroadcast);
OnData("data", &Sink::HandleData);
}
void InitTask() override
{
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
}
bool HandleBroadcast(FairMQMessagePtr& msg, int /*index*/)
{
LOG(info) << "Received broadcast: \"" << std::string(static_cast<char*>(msg->GetData()), msg->GetSize()) << "\"";
fReceivedBroadcast = true;
return CheckIterations();
}
bool HandleData(FairMQMessagePtr& msg, int /*index*/)
{
LOG(info) << "Received message: \"" << std::string(static_cast<char*>(msg->GetData()), msg->GetSize()) << "\"";
fReceivedData = true;
return CheckIterations();
}
bool CheckIterations()
{
if (fMaxIterations > 0) {
if (fReceivedData && fReceivedBroadcast && ++fNumIterations >= fMaxIterations) {
LOG(info) << "Configured maximum number of iterations reached & Received messages from both sources. Leaving RUNNING state.";
return false;
}
}
return true;
}
private:
bool fReceivedData;
bool fReceivedBroadcast;
uint64_t fMaxIterations;
uint64_t fNumIterations;
};
void addCustomOptions(bpo::options_description& options)
{
options.add_options()
("max-iterations", bpo::value<uint64_t>()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
}
std::unique_ptr<fair::mq::Device> getDevice(fair::mq::ProgOptions& /*config*/)
{
return std::make_unique<Sink>();
}

View File

@@ -6,25 +6,14 @@
# copied verbatim in the file "LICENSE" #
################################################################################
add_library(ExampleMultipleTransportsLib STATIC
"Sampler1.cxx"
"Sampler1.h"
"Sampler2.cxx"
"Sampler2.h"
"Sink.cxx"
"Sink.h"
)
add_executable(fairmq-ex-multiple-transports-sampler1 sampler1.cxx)
target_link_libraries(fairmq-ex-multiple-transports-sampler1 PRIVATE FairMQ)
target_link_libraries(ExampleMultipleTransportsLib PUBLIC FairMQ)
add_executable(fairmq-ex-multiple-transports-sampler2 sampler2.cxx)
target_link_libraries(fairmq-ex-multiple-transports-sampler2 PRIVATE FairMQ)
add_executable(fairmq-ex-multiple-transports-sampler1 runSampler1.cxx)
target_link_libraries(fairmq-ex-multiple-transports-sampler1 PRIVATE ExampleMultipleTransportsLib)
add_executable(fairmq-ex-multiple-transports-sampler2 runSampler2.cxx)
target_link_libraries(fairmq-ex-multiple-transports-sampler2 PRIVATE ExampleMultipleTransportsLib)
add_executable(fairmq-ex-multiple-transports-sink runSink.cxx)
target_link_libraries(fairmq-ex-multiple-transports-sink PRIVATE ExampleMultipleTransportsLib)
add_executable(fairmq-ex-multiple-transports-sink sink.cxx)
target_link_libraries(fairmq-ex-multiple-transports-sink PRIVATE FairMQ)
add_custom_target(ExampleMultipleTransports DEPENDS fairmq-ex-multiple-transports-sampler1 fairmq-ex-multiple-transports-sampler2 fairmq-ex-multiple-transports-sink)
@@ -41,13 +30,13 @@ set_tests_properties(Example.MultipleTransports PROPERTIES TIMEOUT "30" RUN_SERI
# install
install(
TARGETS
fairmq-ex-multiple-transports-sampler1
fairmq-ex-multiple-transports-sampler2
fairmq-ex-multiple-transports-sink
TARGETS
fairmq-ex-multiple-transports-sampler1
fairmq-ex-multiple-transports-sampler2
fairmq-ex-multiple-transports-sink
LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR}
RUNTIME DESTINATION ${PROJECT_INSTALL_BINDIR}
LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR}
RUNTIME DESTINATION ${PROJECT_INSTALL_BINDIR}
)
# configure run script with different executable paths for install directories
@@ -56,7 +45,7 @@ set(FAIRMQ_BIN_DIR ${CMAKE_INSTALL_PREFIX}/${PROJECT_INSTALL_BINDIR}/fairmq)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-multiple-transports.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-multiple-transports.sh_install)
install(
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-multiple-transports.sh_install
DESTINATION ${PROJECT_INSTALL_BINDIR}
RENAME fairmq-start-ex-multiple-transports.sh
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-multiple-transports.sh_install
DESTINATION ${PROJECT_INSTALL_BINDIR}
RENAME fairmq-start-ex-multiple-transports.sh
)

View File

@@ -1,80 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "Sampler1.h"
using namespace std;
namespace example_multiple_transports
{
Sampler1::Sampler1()
: fAckListener()
, fMaxIterations(0)
, fNumIterations(0)
{
}
void Sampler1::InitTask()
{
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
}
void Sampler1::PreRun()
{
fAckListener = thread(&Sampler1::ListenForAcks, this);
}
bool Sampler1::ConditionalRun()
{
// Creates a message using the transport of channel data1
FairMQMessagePtr msg(NewMessageFor("data1", 0, 1000000));
// in case of error or transfer interruption, return false to go to IDLE state
// successfull transfer will return number of bytes transfered (can be 0 if sending an empty message).
if (Send(msg, "data1") < 0)
{
return false;
}
if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations)
{
LOG(info) << "Configured maximum number of iterations reached. Leaving RUNNING state.";
return false;
}
return true;
}
void Sampler1::PostRun()
{
fAckListener.join();
}
void Sampler1::ListenForAcks()
{
uint64_t numAcks = 0;
while (!NewStatePending())
{
FairMQMessagePtr ack(NewMessageFor("ack", 0));
if (Receive(ack, "ack") < 0)
{
break;
}
++numAcks;
}
LOG(info) << "Acknowledged " << numAcks << " messages";
}
Sampler1::~Sampler1()
{
}
} // namespace example_multiple_transports

View File

@@ -1,39 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#ifndef FAIRMQEXAMPLEMULTIPLETRANSPORTSSAMPLER1_H
#define FAIRMQEXAMPLEMULTIPLETRANSPORTSSAMPLER1_H
#include <thread>
#include "FairMQDevice.h"
namespace example_multiple_transports
{
class Sampler1 : public FairMQDevice
{
public:
Sampler1();
virtual ~Sampler1();
protected:
virtual void InitTask();
virtual void PreRun();
virtual bool ConditionalRun();
virtual void PostRun();
void ListenForAcks();
std::thread fAckListener;
uint64_t fMaxIterations;
uint64_t fNumIterations;
};
} // namespace example_multiple_transports
#endif /* FAIRMQEXAMPLEMULTIPLETRANSPORTSSAMPLER1_H */

View File

@@ -1,51 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "Sampler2.h"
using namespace std;
namespace example_multiple_transports
{
Sampler2::Sampler2()
: fMaxIterations(0)
, fNumIterations(0)
{
}
void Sampler2::InitTask()
{
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
}
bool Sampler2::ConditionalRun()
{
FairMQMessagePtr msg(NewMessage(1000));
// in case of error or transfer interruption, return false to go to IDLE state
// successfull transfer will return number of bytes transfered (can be 0 if sending an empty message).
if (Send(msg, "data2") < 0)
{
return false;
}
if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations)
{
LOG(info) << "Configured maximum number of iterations reached. Leaving RUNNING state.";
return false;
}
return true;
}
Sampler2::~Sampler2()
{
}
} // namespace example_multiple_transports

View File

@@ -1,34 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#ifndef FAIRMQEXAMPLEMULTIPLETRANSPORTSSAMPLER2_H
#define FAIRMQEXAMPLEMULTIPLETRANSPORTSSAMPLER2_H
#include "FairMQDevice.h"
namespace example_multiple_transports
{
class Sampler2 : public FairMQDevice
{
public:
Sampler2();
virtual ~Sampler2();
protected:
virtual void InitTask();
virtual bool ConditionalRun();
private:
uint64_t fMaxIterations;
uint64_t fNumIterations;
};
} // namespace example_multiple_transports
#endif /* FAIRMQEXAMPLEMULTIPLETRANSPORTSSAMPLER2_H */

View File

@@ -1,78 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* Sink.cxx
*
* @since 2014-10-10
* @author A. Rybalchenko
*/
#include "Sink.h"
using namespace std;
namespace example_multiple_transports
{
Sink::Sink()
: fMaxIterations(0)
, fNumIterations1(0)
, fNumIterations2(0)
{
// register a handler for data arriving on "data" channel
OnData("data1", &Sink::HandleData1);
OnData("data2", &Sink::HandleData2);
}
void Sink::InitTask()
{
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
}
// handler is called whenever a message arrives on "data", with a reference to the message and a sub-channel index (here 0)
bool Sink::HandleData1(FairMQMessagePtr& /*msg*/, int /*index*/)
{
fNumIterations1++;
// Creates a message using the transport of channel ack
FairMQMessagePtr ack(NewMessageFor("ack", 0));
if (Send(ack, "ack") < 0)
{
return false;
}
// return true if want to be called again (otherwise go to IDLE state)
return CheckIterations();
}
// handler is called whenever a message arrives on "data", with a reference to the message and a sub-channel index (here 0)
bool Sink::HandleData2(FairMQMessagePtr& /*msg*/, int /*index*/)
{
fNumIterations2++;
// return true if want to be called again (otherwise go to IDLE state)
return CheckIterations();
}
bool Sink::CheckIterations()
{
if (fMaxIterations > 0)
{
if (fNumIterations1 >= fMaxIterations && fNumIterations2 >= fMaxIterations)
{
LOG(info) << "Configured maximum number of iterations reached & Received messages from both sources. Leaving RUNNING state.";
return false;
}
}
return true;
}
Sink::~Sink()
{
}
} // namespace example_multiple_transports

View File

@@ -1,43 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* Sink.h
*
* @since 2014-10-10
* @author A. Rybalchenko
*/
#ifndef FAIRMQEXAMPLEMULTIPLETRANSPORTSSINK_H
#define FAIRMQEXAMPLEMULTIPLETRANSPORTSSINK_H
#include "FairMQDevice.h"
namespace example_multiple_transports
{
class Sink : public FairMQDevice
{
public:
Sink();
virtual ~Sink();
protected:
virtual void InitTask();
bool HandleData1(FairMQMessagePtr&, int);
bool HandleData2(FairMQMessagePtr&, int);
bool CheckIterations();
private:
uint64_t fMaxIterations;
uint64_t fNumIterations1;
uint64_t fNumIterations2;
};
} // namespace example_multiple_transports
#endif /* FAIRMQEXAMPLEMULTIPLETRANSPORTSSINK_H */

View File

@@ -1,23 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "runFairMQDevice.h"
#include "Sampler1.h"
namespace bpo = boost::program_options;
void addCustomOptions(bpo::options_description& options)
{
options.add_options()
("max-iterations", bpo::value<uint64_t>()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
}
FairMQDevicePtr getDevice(const fair::mq::ProgOptions& /*config*/)
{
return new example_multiple_transports::Sampler1();
}

View File

@@ -1,23 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "runFairMQDevice.h"
#include "Sampler2.h"
namespace bpo = boost::program_options;
void addCustomOptions(bpo::options_description& options)
{
options.add_options()
("max-iterations", bpo::value<uint64_t>()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
}
FairMQDevicePtr getDevice(const fair::mq::ProgOptions& /*config*/)
{
return new example_multiple_transports::Sampler2();
}

View File

@@ -1,23 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "runFairMQDevice.h"
#include "Sink.h"
namespace bpo = boost::program_options;
void addCustomOptions(bpo::options_description& options)
{
options.add_options()
("max-iterations", bpo::value<uint64_t>()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
}
FairMQDevicePtr getDevice(const fair::mq::ProgOptions& /*config*/)
{
return new example_multiple_transports::Sink();
}

View File

@@ -0,0 +1,89 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <fairmq/Device.h>
#include <fairmq/runDevice.h>
#include <thread>
namespace bpo = boost::program_options;
class Sampler1 : public FairMQDevice
{
public:
Sampler1()
: fAckListener()
, fMaxIterations(0)
, fNumIterations(0)
{}
protected:
void InitTask() override
{
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
}
void PreRun() override
{
fAckListener = std::thread(&Sampler1::ListenForAcks, this);
}
bool ConditionalRun() override
{
// Creates a message using the transport of channel data1
FairMQMessagePtr msg(NewMessageFor("data1", 0, 1000000));
// in case of error or transfer interruption, return false to go to IDLE state
// successfull transfer will return number of bytes transfered (can be 0 if sending an empty message).
if (Send(msg, "data1") < 0) {
return false;
}
if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) {
LOG(info) << "Configured maximum number of iterations reached. Leaving RUNNING state.";
return false;
}
return true;
}
void PostRun() override
{
fAckListener.join();
}
void ListenForAcks()
{
uint64_t numAcks = 0;
while (!NewStatePending()) {
FairMQMessagePtr ack(NewMessageFor("ack", 0));
if (Receive(ack, "ack") < 0) {
break;
}
++numAcks;
}
LOG(info) << "Acknowledged " << numAcks << " messages";
}
std::thread fAckListener;
uint64_t fMaxIterations;
uint64_t fNumIterations;
};
void addCustomOptions(bpo::options_description& options)
{
options.add_options()
("max-iterations", bpo::value<uint64_t>()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
}
std::unique_ptr<fair::mq::Device> getDevice(fair::mq::ProgOptions& /*config*/)
{
return std::make_unique<Sampler1>();
}

View File

@@ -0,0 +1,63 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <fairmq/Device.h>
#include <fairmq/runDevice.h>
#include <cstdint> // uint64_t
namespace bpo = boost::program_options;
class Sampler2 : public FairMQDevice
{
public:
Sampler2()
: fMaxIterations(0)
, fNumIterations(0)
{}
protected:
void InitTask() override
{
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
}
bool ConditionalRun() override
{
FairMQMessagePtr msg(NewMessage(1000));
// in case of error or transfer interruption, return false to go to IDLE state
// successfull transfer will return number of bytes transfered (can be 0 if sending an empty message).
if (Send(msg, "data2") < 0) {
return false;
}
if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) {
LOG(info) << "Configured maximum number of iterations reached. Leaving RUNNING state.";
return false;
}
return true;
}
private:
uint64_t fMaxIterations;
uint64_t fNumIterations;
};
void addCustomOptions(bpo::options_description& options)
{
options.add_options()
("max-iterations", bpo::value<uint64_t>()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
}
std::unique_ptr<fair::mq::Device> getDevice(fair::mq::ProgOptions& /*config*/)
{
return std::make_unique<Sampler2>();
}

View File

@@ -0,0 +1,82 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <fairmq/Device.h>
#include <fairmq/runDevice.h>
namespace bpo = boost::program_options;
class Sink : public FairMQDevice
{
public:
Sink()
: fMaxIterations(0)
, fNumIterations1(0)
, fNumIterations2(0)
{
// register a handler for data arriving on "data" channel
OnData("data1", &Sink::HandleData1);
OnData("data2", &Sink::HandleData2);
}
protected:
void InitTask() override
{
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
}
// handler is called whenever a message arrives on "data", with a reference to the message and a sub-channel index (here 0)
bool HandleData1(FairMQMessagePtr& /*msg*/, int /*index*/)
{
fNumIterations1++;
// Creates a message using the transport of channel ack
FairMQMessagePtr ack(NewMessageFor("ack", 0));
if (Send(ack, "ack") < 0) {
return false;
}
// return true if want to be called again (otherwise go to IDLE state)
return CheckIterations();
}
// handler is called whenever a message arrives on "data", with a reference to the message and a sub-channel index (here 0)
bool HandleData2(FairMQMessagePtr& /*msg*/, int /*index*/)
{
fNumIterations2++;
// return true if want to be called again (otherwise go to IDLE state)
return CheckIterations();
}
bool CheckIterations()
{
if (fMaxIterations > 0) {
if (fNumIterations1 >= fMaxIterations && fNumIterations2 >= fMaxIterations) {
LOG(info) << "Configured maximum number of iterations reached & Received messages from both sources. Leaving RUNNING state.";
return false;
}
}
return true;
}
private:
uint64_t fMaxIterations;
uint64_t fNumIterations1;
uint64_t fNumIterations2;
};
void addCustomOptions(bpo::options_description& options)
{
options.add_options()
("max-iterations", bpo::value<uint64_t>()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
}
std::unique_ptr<fair::mq::Device> getDevice(fair::mq::ProgOptions& /*config*/)
{
return std::make_unique<Sink>();
}

View File

@@ -6,13 +6,13 @@
# copied verbatim in the file "LICENSE" #
################################################################################
add_executable(fairmq-ex-n-m-synchronizer runSynchronizer.cxx)
add_executable(fairmq-ex-n-m-synchronizer synchronizer.cxx)
target_link_libraries(fairmq-ex-n-m-synchronizer PRIVATE FairMQ)
add_executable(fairmq-ex-n-m-sender runSender.cxx)
add_executable(fairmq-ex-n-m-sender sender.cxx)
target_link_libraries(fairmq-ex-n-m-sender PRIVATE FairMQ)
add_executable(fairmq-ex-n-m-receiver runReceiver.cxx)
add_executable(fairmq-ex-n-m-receiver receiver.cxx)
target_link_libraries(fairmq-ex-n-m-receiver PRIVATE FairMQ)
add_custom_target(ExampleNM DEPENDS fairmq-ex-n-m-synchronizer fairmq-ex-n-m-sender fairmq-ex-n-m-receiver)

View File

@@ -8,8 +8,8 @@
#include "Header.h"
#include <FairMQDevice.h>
#include <runFairMQDevice.h>
#include <fairmq/Device.h>
#include <fairmq/runDevice.h>
#include <string>
#include <iomanip>
@@ -116,4 +116,7 @@ void addCustomOptions(bpo::options_description& options)
("max-timeframes", bpo::value<int>()->default_value(0), "Maximum number of timeframes to receive (0 - unlimited)");
}
FairMQDevice* getDevice(const FairMQProgOptions& /* config */) { return new Receiver(); }
std::unique_ptr<fair::mq::Device> getDevice(FairMQProgOptions& /* config */)
{
return std::make_unique<Receiver>();
}

View File

@@ -8,8 +8,8 @@
#include "Header.h"
#include <FairMQDevice.h>
#include <runFairMQDevice.h>
#include <fairmq/Device.h>
#include <fairmq/runDevice.h>
#include <string>
@@ -76,4 +76,7 @@ void addCustomOptions(bpo::options_description& options)
("subtimeframe-size", bpo::value<int>()->default_value(1000), "Subtimeframe size in bytes")
("num-receivers", bpo::value<int>()->required(), "Number of EPNs");
}
FairMQDevice* getDevice(const FairMQProgOptions& /* config */) { return new Sender(); }
std::unique_ptr<fair::mq::Device> getDevice(FairMQProgOptions& /* config */)
{
return std::make_unique<Sender>();
}

View File

@@ -6,8 +6,8 @@
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <FairMQDevice.h>
#include <runFairMQDevice.h>
#include <fairmq/Device.h>
#include <fairmq/runDevice.h>
#include <string>
#include <cstdint>
@@ -43,4 +43,7 @@ class Synchronizer : public FairMQDevice
};
void addCustomOptions(bpo::options_description& /* options */) {}
FairMQDevice* getDevice(const FairMQProgOptions& /* config */) { return new Synchronizer(); }
std::unique_ptr<fair::mq::Device> getDevice(FairMQProgOptions& /* config */)
{
return std::make_unique<Synchronizer>();
}

View File

@@ -6,16 +6,16 @@
# copied verbatim in the file "LICENSE" #
################################################################################
add_executable(fairmq-ex-qc-sampler runSampler.cxx)
add_executable(fairmq-ex-qc-sampler sampler.cxx)
target_link_libraries(fairmq-ex-qc-sampler PRIVATE FairMQ)
add_executable(fairmq-ex-qc-dispatcher runQCDispatcher.cxx)
add_executable(fairmq-ex-qc-dispatcher qCDispatcher.cxx)
target_link_libraries(fairmq-ex-qc-dispatcher PRIVATE FairMQ)
add_executable(fairmq-ex-qc-task runQCTask.cxx)
add_executable(fairmq-ex-qc-task qCTask.cxx)
target_link_libraries(fairmq-ex-qc-task PRIVATE FairMQ)
add_executable(fairmq-ex-qc-sink runSink.cxx)
add_executable(fairmq-ex-qc-sink sink.cxx)
target_link_libraries(fairmq-ex-qc-sink PRIVATE FairMQ)
add_custom_target(ExampleQC DEPENDS fairmq-ex-qc-sampler fairmq-ex-qc-dispatcher fairmq-ex-qc-task fairmq-ex-qc-sink)

View File

@@ -6,8 +6,8 @@
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "runFairMQDevice.h"
#include "FairMQDevice.h"
#include <fairmq/Device.h>
#include <fairmq/runDevice.h>
class QCDispatcher : public FairMQDevice
{
@@ -56,4 +56,7 @@ class QCDispatcher : public FairMQDevice
};
void addCustomOptions(boost::program_options::options_description& /*options*/) {}
FairMQDevicePtr getDevice(const fair::mq::ProgOptions& /*config*/) { return new QCDispatcher(); }
std::unique_ptr<fair::mq::Device> getDevice(fair::mq::ProgOptions& /*config*/)
{
return std::make_unique<QCDispatcher>();
}

View File

@@ -6,8 +6,8 @@
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "runFairMQDevice.h"
#include "FairMQDevice.h"
#include <fairmq/Device.h>
#include <fairmq/runDevice.h>
class QCTask : public FairMQDevice
{
@@ -23,4 +23,7 @@ class QCTask : public FairMQDevice
namespace bpo = boost::program_options;
void addCustomOptions(bpo::options_description& /*options*/) {}
FairMQDevicePtr getDevice(const fair::mq::ProgOptions& /*config*/) { return new QCTask(); }
std::unique_ptr<fair::mq::Device> getDevice(fair::mq::ProgOptions& /*config*/)
{
return std::make_unique<QCTask>();
}

View File

@@ -6,8 +6,8 @@
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "runFairMQDevice.h"
#include "FairMQDevice.h"
#include <fairmq/Device.h>
#include <fairmq/runDevice.h>
#include <thread> // this_thread::sleep_for
#include <chrono>
@@ -33,4 +33,7 @@ class Sampler : public FairMQDevice
namespace bpo = boost::program_options;
void addCustomOptions(bpo::options_description&) {}
FairMQDevicePtr getDevice(const fair::mq::ProgOptions& /*config*/) { return new Sampler(); }
std::unique_ptr<fair::mq::Device> getDevice(fair::mq::ProgOptions& /*config*/)
{
return std::make_unique<Sampler>();
}

View File

@@ -6,8 +6,8 @@
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "runFairMQDevice.h"
#include "FairMQDevice.h"
#include <fairmq/Device.h>
#include <fairmq/runDevice.h>
#include <string>
@@ -22,4 +22,7 @@ class Sink : public FairMQDevice
namespace bpo = boost::program_options;
void addCustomOptions(bpo::options_description&) {}
FairMQDevicePtr getDevice(const fair::mq::ProgOptions& /*config*/) { return new Sink(); }
std::unique_ptr<fair::mq::Device> getDevice(fair::mq::ProgOptions& /*config*/)
{
return std::make_unique<Sink>();
}

View File

@@ -6,19 +6,19 @@
# copied verbatim in the file "LICENSE" #
################################################################################
add_executable(fairmq-ex-readout-readout runReadout.cxx)
add_executable(fairmq-ex-readout-readout readout.cxx)
target_link_libraries(fairmq-ex-readout-readout PRIVATE FairMQ)
add_executable(fairmq-ex-readout-builder runBuilder.cxx)
add_executable(fairmq-ex-readout-builder builder.cxx)
target_link_libraries(fairmq-ex-readout-builder PRIVATE FairMQ)
add_executable(fairmq-ex-readout-processor runProcessor.cxx)
add_executable(fairmq-ex-readout-processor processor.cxx)
target_link_libraries(fairmq-ex-readout-processor PRIVATE FairMQ)
add_executable(fairmq-ex-readout-sender runSender.cxx)
add_executable(fairmq-ex-readout-sender sender.cxx)
target_link_libraries(fairmq-ex-readout-sender PRIVATE FairMQ)
add_executable(fairmq-ex-readout-receiver runReceiver.cxx)
add_executable(fairmq-ex-readout-receiver receiver.cxx)
target_link_libraries(fairmq-ex-readout-receiver PRIVATE FairMQ)
set(EX_BIN_DIR ${CMAKE_CURRENT_BINARY_DIR})
@@ -29,15 +29,15 @@ configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-readout-processing.sh
# install
install(
TARGETS
fairmq-ex-readout-readout
fairmq-ex-readout-builder
fairmq-ex-readout-processor
fairmq-ex-readout-sender
fairmq-ex-readout-receiver
TARGETS
fairmq-ex-readout-readout
fairmq-ex-readout-builder
fairmq-ex-readout-processor
fairmq-ex-readout-sender
fairmq-ex-readout-receiver
LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR}
RUNTIME DESTINATION ${PROJECT_INSTALL_BINDIR}
LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR}
RUNTIME DESTINATION ${PROJECT_INSTALL_BINDIR}
)
# configure run script with different executable paths for build and for install directories
@@ -47,13 +47,13 @@ configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-readout.sh.in ${CMAKE
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-readout-processing.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-readout-processing.sh_install)
install(
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-readout.sh_install
DESTINATION ${PROJECT_INSTALL_BINDIR}
RENAME fairmq-start-ex-readout.sh
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-readout.sh_install
DESTINATION ${PROJECT_INSTALL_BINDIR}
RENAME fairmq-start-ex-readout.sh
)
install(
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-readout-processing.sh_install
DESTINATION ${PROJECT_INSTALL_BINDIR}
RENAME fairmq-start-ex-readout-processing.sh
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-readout-processing.sh_install
DESTINATION ${PROJECT_INSTALL_BINDIR}
RENAME fairmq-start-ex-readout-processing.sh
)

View File

@@ -5,15 +5,13 @@
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#ifndef FAIRMQEXAMPLEREGIONBUILDER_H
#define FAIRMQEXAMPLEREGIONBUILDER_H
#include "FairMQDevice.h"
#include <fairmq/Device.h>
#include <fairmq/runDevice.h>
#include <string>
namespace example_readout
{
namespace bpo = boost::program_options;
class Builder : public FairMQDevice
{
@@ -41,6 +39,13 @@ class Builder : public FairMQDevice
std::string fOutputChannelName;
};
} // namespace example_readout
void addCustomOptions(bpo::options_description& options)
{
options.add_options()
("output-name", bpo::value<std::string>()->default_value("bs"), "Output channel name");
}
#endif /* FAIRMQEXAMPLEREGIONBUILDER_H */
std::unique_ptr<fair::mq::Device> getDevice(fair::mq::ProgOptions& /*config*/)
{
return std::make_unique<Builder>();
}

View File

@@ -5,13 +5,11 @@
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#ifndef FAIRMQEXAMPLEREGIONPROCESSOR_H
#define FAIRMQEXAMPLEREGIONPROCESSOR_H
#include "FairMQDevice.h"
#include <fairmq/Device.h>
#include <fairmq/runDevice.h>
namespace example_readout
{
namespace bpo = boost::program_options;
class Processor : public FairMQDevice
{
@@ -32,6 +30,10 @@ class Processor : public FairMQDevice
}
};
} // namespace example_readout
void addCustomOptions(bpo::options_description& /* options */)
{}
#endif /* FAIRMQEXAMPLEREGIONPROCESSOR_H */
std::unique_ptr<fair::mq::Device> getDevice(fair::mq::ProgOptions& /*config*/)
{
return std::make_unique<Processor>();
}

View File

@@ -5,17 +5,15 @@
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#ifndef FAIRMQEXAMPLEREADOUTREADOUT_H
#define FAIRMQEXAMPLEREADOUTREADOUT_H
#include <fairmq/Device.h>
#include <fairmq/runDevice.h>
#include <atomic>
#include <thread>
#include <chrono>
#include "FairMQDevice.h"
namespace example_readout
{
namespace bpo = boost::program_options;
class Readout : public FairMQDevice
{
@@ -86,6 +84,14 @@ class Readout : public FairMQDevice
std::atomic<uint64_t> fNumUnackedMsgs;
};
} // namespace example_readout
void addCustomOptions(bpo::options_description& options)
{
options.add_options()
("msg-size", bpo::value<int>()->default_value(1000), "Message size in bytes")
("max-iterations", bpo::value<uint64_t>()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
}
#endif /* FAIRMQEXAMPLEREADOUTREADOUT_H */
std::unique_ptr<fair::mq::Device> getDevice(fair::mq::ProgOptions& /*config*/)
{
return std::make_unique<Readout>();
}

Some files were not shown because too many files have changed in this diff Show More