Compare commits

..

48 Commits

Author SHA1 Message Date
Alexey Rybalchenko
091d0824d1 ofi: fix Events() signature 2021-05-07 21:33:15 +02:00
Alexey Rybalchenko
857aa84fa3 add mlock/zero options to unmanaged region 2021-05-07 21:33:15 +02:00
Alexey Rybalchenko
c85d6e079c shm: reduce shm contention when dealing with ack queues 2021-05-07 21:33:15 +02:00
Alexey Rybalchenko
4e466514d2 region example: fix msg counter 2021-05-07 21:33:15 +02:00
Alexey Rybalchenko
8b4056e408 Update docs 2021-05-07 21:33:15 +02:00
Alexey Rybalchenko
b67b80e0ad shmmonitor: add severity setting 2021-05-07 21:33:15 +02:00
Alexey Rybalchenko
2c89b24857 shm: eliminate race/deadlock in region subscriptions 2021-05-07 21:33:15 +02:00
Alexey Rybalchenko
c6a6a5f21b Check transport type of msg and corresponding region 2021-05-07 21:33:15 +02:00
Alexey Rybalchenko
9defa71622 Add GetType() to UnmanagedRegion 2021-05-07 21:33:15 +02:00
Alexey Rybalchenko
ed2dcedf03 Add operator<< for fair::mq::Transport 2021-05-07 21:33:15 +02:00
Alexey Rybalchenko
a3d56b9aeb configurable transport for region example script 2021-05-07 21:33:15 +02:00
Alexey Rybalchenko
8a2641d842 shm: check result of region acquisition 2021-05-07 21:33:15 +02:00
Alexey Rybalchenko
2ca62d06db shm region cache: fix multiple sessions issue 2021-05-07 14:20:00 +02:00
Alexey Rybalchenko
87e0ca5450 add region cache test 2021-05-07 14:20:00 +02:00
Gvozden Neskovic
ef5b3c782e improve message counter cache line use 2021-05-07 14:20:00 +02:00
Gvozden Neskovic
f7ba3052aa use thread local cache to avoid interprocess lock on shm GetData 2021-05-07 14:20:00 +02:00
Dennis Klein
a90dbf64de Fix -Wunused-result
Fixes #281
2021-05-07 13:18:12 +02:00
Dennis Klein
9724f184f4 Fallback to Boost.Filesystem on GCC 8 2021-05-07 13:13:16 +02:00
Dennis Klein
057ba03776 PluginManager: Do not load built-in plugins via dlopen/dlsym
fixes #351
2021-05-05 03:52:12 +02:00
Giulio Eulisse
6dfea32aee Improve Events API
If the call is interrupted by a signal, this will throw, which we clearly do not want. Simplifying the API to let the user decide what to do on error is probably the best option.
2021-05-04 22:54:19 +02:00
Dennis Klein
868fe02ee9 CI: Submit results to CDash for each build step 2021-04-08 16:22:47 +02:00
Dennis Klein
a2016a9361 CI: Add alice-centos-7 environment 2021-04-08 16:22:47 +02:00
Dennis Klein
ea9aede652 Fallback to <boost/filesystem> on GCC 7 2021-04-08 16:22:47 +02:00
Alexey Rybalchenko
77bf12c8e8 docs patch 2021-04-08 12:38:18 +02:00
Alexey Rybalchenko
f3bc9e05a8 shmmonitor: update docs 2021-04-08 10:11:23 +02:00
Alexey Rybalchenko
5facc441b8 shmmonitor: add --list-all 2021-04-08 10:11:23 +02:00
Alexey Rybalchenko
2602f53585 Add tools: StrStartsWith, StrEndsWith 2021-04-08 10:11:23 +02:00
Alexey Rybalchenko
0976465338 shm: reduce delay between monitor daemon launch & HBs 2021-04-08 10:11:23 +02:00
Alexey Rybalchenko
9144258b89 shmmonitor: daemon output to file if FAIRMQ_SHMMONITOR_VERBOSE is true 2021-04-08 10:11:23 +02:00
Alexey Rybalchenko
be55565617 shmmonitor: use fairlogger 2021-04-08 10:11:23 +02:00
Alexey Rybalchenko
d7e2fbecea shmmonitor: refactor to separate monitoring from output 2021-04-08 10:11:23 +02:00
Alexey Rybalchenko
72175e5757 shmmonitor: optimize startup to avoid repeated start 2021-04-08 10:11:23 +02:00
Alexey Rybalchenko
effba534f0 shmmonitor: add session name and creator id to the output 2021-04-08 10:11:23 +02:00
Alexey Rybalchenko
efd42075a9 shmmonitor: enable read only access 2021-04-08 10:11:23 +02:00
Alexey Rybalchenko
5228407932 shmmonitor: distinguish daemon from monitor mode (orthogonal) 2021-04-08 10:11:23 +02:00
Alexey Rybalchenko
30e81d58f8 shmmonitor: allow getting shmids based on session/userid 2021-04-08 10:11:23 +02:00
Dennis Klein
2c7c46f2fd Remove codacy badge 2021-03-26 10:06:10 +01:00
Dennis Klein
0a5122bb24 Remove codecov badge 2021-03-26 10:06:10 +01:00
Dennis Klein
fc49687879 builtin devices: Reorganize 2021-03-26 10:06:10 +01:00
Dennis Klein
66a4df0667 fairmq-uuid-gen: Move to tools directory 2021-03-26 10:06:10 +01:00
Dennis Klein
978191fa6c Introduce <fairmq/runDevice.h> 2021-03-26 10:06:10 +01:00
Dennis Klein
cef6d0afcd Introduce <fairmq/Device.h> 2021-03-26 10:06:10 +01:00
Dennis Klein
47ec550792 control plugin: Move to subdirectory for consistency 2021-03-26 10:06:10 +01:00
Dennis Klein
b4aeb320e5 CI: Collect DDS logs on error 2021-03-26 10:06:10 +01:00
Dennis Klein
107248be0a Reorganize includes for consistency 2021-03-26 10:06:10 +01:00
Dennis Klein
68ceaba501 CI: Filter and process warnings and errors 2021-03-26 10:06:10 +01:00
Dennis Klein
4b6cf8b181 Fix aggregate initialization issue before C++20
Use value initialization to prevent

error: temporary of type ... has protected destructor

see https://stackoverflow.com/a/56745475
2021-03-26 10:06:10 +01:00
Dennis Klein
21d6cf9830 CI: Run clang-tidy 2021-03-26 10:06:10 +01:00
61 changed files with 1069 additions and 523 deletions

View File

@@ -1,3 +1,3 @@
---
Checks: '*,-google-*,-fuchsia-*,-cert-*,-llvm-header-guard,-readability-named-parameter,-misc-non-private-member-variables-in-classes,-*-magic-numbers,-llvm-include-order,-hicpp-no-array-decay,-performance-unnecessary-value-param,-cppcoreguidelines-pro-bounds-array-to-pointer-decay,-modernize-use-trailing-return-type,-readability-redundant-member-init'
Checks: 'cppcoreguidelines-*,misc-unused-alias-decls,misc-unused-parameters,modernize-deprecated-headers,modernize-raw-string-literal,modernize-redundant-void-arg,modernize-use-bool-literals,modernize-use-default-member-init,modernize-use-emplace,modernize-use-equals-default,modernize-use-equals-delete,modernize-use-noexcept,modernize-use-nullptr,modernize-use-override,modernize-use-using,performance-faster-string-find,performance-for-range-copy,performance-unnecessary-copy-initialization,readability-avoid-const-params-in-decls,readability-braces-around-statements,readability-container-size-empty,readability-delete-null-pointer,readability-redundant-member-init,readability-redundant-string-init,readability-static-accessed-through-instance,readability-string-compare'
HeaderFilterRegex: '/(fairmq/)'

View File

@@ -44,13 +44,19 @@ list(APPEND options
"-DDISABLE_COLOR=ON"
"-DBUILD_SDK_COMMANDS=ON"
"-DBUILD_SDK=ON"
"-DBUILD_DDS_PLUGIN=ON"
)
"-DBUILD_DDS_PLUGIN=ON")
if(RUN_STATIC_ANALYSIS)
list(APPEND options "-DRUN_STATIC_ANALYSIS=ON")
endif()
list(JOIN options ";" optionsstr)
ctest_configure(OPTIONS "${optionsstr}")
ctest_submit()
ctest_build(FLAGS "-j${NCPUS}")
ctest_submit()
ctest_test(BUILD "${CTEST_BINARY_DIRECTORY}"
PARALLEL_LEVEL 1
SCHEDULE_RANDOM ON

36
Jenkinsfile vendored
View File

@@ -1,6 +1,6 @@
#!groovy
def jobMatrix(String prefix, String type, List specs) {
def jobMatrix(String type, List specs) {
def nodes = [:]
for (spec in specs) {
job = "${spec.os}-${spec.ver}-${spec.arch}-${spec.compiler}"
@@ -12,7 +12,7 @@ def jobMatrix(String prefix, String type, List specs) {
nodes[label] = {
node(selector) {
githubNotify(context: "${prefix}/${label}", description: 'Building ...', status: 'PENDING')
githubNotify(context: "${label}", description: 'Building ...', status: 'PENDING')
try {
deleteDir()
checkout scm
@@ -29,7 +29,11 @@ def jobMatrix(String prefix, String type, List specs) {
sh "cat ${jobscript}"
sh "bash ${jobscript}"
} else {
def containercmd = "singularity exec -B/shared ${env.SINGULARITY_CONTAINER_ROOT}/fairmq/${os}.${ver}.sif bash -l -c \\\"${ctestcmd}\\\""
def static_analysis = "OFF"
if (selector =~ /^fedora/) {
static_analysis = "ON"
}
def containercmd = "singularity exec -B/shared ${env.SINGULARITY_CONTAINER_ROOT}/fairmq/${os}.${ver}.sif bash -l -c \\\"${ctestcmd} -DRUN_STATIC_ANALYSIS=${static_analysis}\\\""
sh """\
echo \"echo \\\"*** Job started at .......: \\\$(date -R)\\\"\" >> ${jobscript}
echo \"echo \\\"*** Job ID ...............: \\\${SLURM_JOB_ID}\\\"\" >> ${jobscript}
@@ -40,13 +44,26 @@ def jobMatrix(String prefix, String type, List specs) {
"""
sh "cat ${jobscript}"
sh "test/ci/slurm-submit.sh \"FairMQ \${JOB_BASE_NAME} ${label}\" ${jobscript}"
withChecks('Static Analysis') {
if (static_analysis == "ON") {
recordIssues(enabledForFailure: true,
tools: [gcc(pattern: 'build/Testing/Temporary/*.log')],
filters: [excludeFile('extern/*'), excludeFile('usr/*')],
skipBlames: true)
}
}
}
deleteDir()
githubNotify(context: "${prefix}/${label}", description: 'Success', status: 'SUCCESS')
githubNotify(context: "${label}", description: 'Success', status: 'SUCCESS')
} catch (e) {
def tarball = "${prefix}_${label}_dds_logs.tar.gz"
sh "tar czvf ${tarball} -C \${WORKSPACE}/build/test .DDS/"
archiveArtifacts tarball
deleteDir()
githubNotify(context: "${prefix}/${label}", description: 'Error', status: 'ERROR')
githubNotify(context: "${label}", description: 'Error', status: 'ERROR')
throw e
}
}
@@ -58,12 +75,13 @@ def jobMatrix(String prefix, String type, List specs) {
pipeline{
agent none
stages {
stage("Run CI Matrix") {
stage("CI") {
steps{
script {
def builds = jobMatrix('alfa-ci', 'build', [
[os: 'fedora', ver: '32', arch: 'x86_64', compiler: 'gcc-10'],
[os: 'macos', ver: '11', arch: 'x86_64', compiler: 'apple-clang-12'],
def builds = jobMatrix('build', [
[os: 'alice-centos', ver: '7', arch: 'x86_64', compiler: 'gcc-7'],
[os: 'fedora', ver: '32', arch: 'x86_64', compiler: 'gcc-10'],
[os: 'macos', ver: '11', arch: 'x86_64', compiler: 'apple-clang-12'],
])
parallel(builds)

View File

@@ -1,5 +1,5 @@
<!-- {#mainpage} -->
# FairMQ [![license](https://alfa-ci.gsi.de/shields/badge/license-LGPL--3.0-orange.svg)](COPYRIGHT) [![build status](https://alfa-ci.gsi.de/buildStatus/icon?job=FairRootGroup/FairMQ/master)](https://alfa-ci.gsi.de/blue/organizations/jenkins/FairRootGroup%2FFairMQ/branches) [![test coverage master branch](https://codecov.io/gh/FairRootGroup/FairMQ/branch/master/graph/badge.svg)](https://codecov.io/gh/FairRootGroup/FairMQ/branch/master) [![Coverity Badge](https://alfa-ci.gsi.de/shields/coverity/scan/fairrootgroup-fairmq.svg)](https://scan.coverity.com/projects/fairrootgroup-fairmq) [![Codacy Badge](https://api.codacy.com/project/badge/Grade/6b648d95d68d4c4eae833b84f84d299c)](https://www.codacy.com/app/dennisklein/FairMQ?utm_source=github.com&amp;utm_medium=referral&amp;utm_content=FairRootGroup/FairMQ&amp;utm_campaign=Badge_Grade)
# FairMQ [![license](https://alfa-ci.gsi.de/shields/badge/license-LGPL--3.0-orange.svg)](COPYRIGHT) [![build status](https://alfa-ci.gsi.de/buildStatus/icon?job=FairRootGroup/FairMQ/dev)](https://alfa-ci.gsi.de/blue/organizations/jenkins/FairRootGroup%2FFairMQ/branches) [![Coverity Badge](https://alfa-ci.gsi.de/shields/coverity/scan/fairrootgroup-fairmq.svg)](https://scan.coverity.com/projects/fairrootgroup-fairmq)
C++ Message Queuing Library and Framework

View File

@@ -203,6 +203,13 @@ macro(set_fairmq_defaults)
set_property(GLOBAL PROPERTY RULE_LAUNCH_COMPILE ${CCACHE})
endif()
endif()
if( CMAKE_CXX_COMPILER_ID STREQUAL "GNU"
AND CMAKE_CXX_COMPILER_VERSION VERSION_LESS 9)
set(FAIRMQ_HAS_STD_FILESYSTEM 0)
else()
set(FAIRMQ_HAS_STD_FILESYSTEM 1)
endif()
endmacro()
function(join VALUES GLUE OUTPUT)

View File

@@ -5,12 +5,6 @@
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* Sampler.cpp
*
* @since 2014-10-10
* @author A. Rybalchenko
*/
#include "Sampler.h"
@@ -37,17 +31,17 @@ void Sampler::InitTask()
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
fChannels.at("data").at(0).Transport()->SubscribeToRegionEvents([](FairMQRegionInfo info) {
LOG(info) << "Region event: " << info.event
<< ", managed: " << info.managed
LOG(info) << "Region event: " << info.event << ": "
<< (info.managed ? "managed" : "unmanaged")
<< ", id: " << info.id
<< ", ptr: " << info.ptr
<< ", size: " << info.size
<< ", flags: " << info.flags;
});
fRegion = FairMQUnmanagedRegionPtr(NewUnmanagedRegionFor("data",
0,
10000000,
fRegion = FairMQUnmanagedRegionPtr(NewUnmanagedRegionFor("data", // region is created using the transport of this channel...
0, // ... and this sub-channel
10000000, // region size
[this](const std::vector<fair::mq::RegionBlock>& blocks) { // callback to be called when message buffers no longer needed by transport
lock_guard<mutex> lock(fMtx);
fNumUnackedMsgs -= blocks.size();
@@ -55,7 +49,10 @@ void Sampler::InitTask()
if (fMaxIterations > 0) {
LOG(info) << "Received " << blocks.size() << " acks";
}
}
},
"", // path, if a region is backed by a file
0, // flags that are passed for region creation
fair::mq::RegionConfig{true, true} // additional config: { call mlock on the region, zero the region memory }
));
fRegion->SetLinger(fLinger);
}
@@ -75,20 +72,19 @@ bool Sampler::ConditionalRun()
// std::this_thread::sleep_for(std::chrono::seconds(1));
lock_guard<mutex> lock(fMtx);
++fNumUnackedMsgs;
if (Send(msg, "data", 0) > 0) {
if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) {
LOG(info) << "Configured maximum number of iterations reached. Leaving RUNNING state.";
return false;
}
}
++fNumUnackedMsgs;
return true;
}
void Sampler::ResetTask()
{
// On destruction UnmanagedRegion will try to TODO
fRegion.reset();
{
lock_guard<mutex> lock(fMtx);

View File

@@ -30,8 +30,8 @@ void Sink::InitTask()
// Get the fMaxIterations value from the command line options (via fConfig)
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
fChannels.at("data").at(0).Transport()->SubscribeToRegionEvents([](FairMQRegionInfo info) {
LOG(info) << "Region event: " << info.event
<< ", managed: " << info.managed
LOG(info) << "Region event: " << info.event << ": "
<< (info.managed ? "managed" : "unmanaged")
<< ", id: " << info.id
<< ", ptr: " << info.ptr
<< ", size: " << info.size

View File

@@ -2,9 +2,14 @@
export FAIRMQ_PATH=@FAIRMQ_BIN_DIR@
transport="shmem"
msgSize="1000000"
if [[ $1 =~ ^[0-9]+$ ]]; then
if [[ $1 =~ ^[a-z]+$ ]]; then
transport=$1
fi
if [[ $2 =~ ^[0-9]+$ ]]; then
msgSize=$1
fi
@@ -13,13 +18,13 @@ SAMPLER+=" --id sampler1"
SAMPLER+=" --severity debug"
SAMPLER+=" --msg-size $msgSize"
# SAMPLER+=" --rate 10"
SAMPLER+=" --transport shmem"
SAMPLER+=" --transport $transport"
SAMPLER+=" --channel-config name=data,type=push,method=bind,address=tcp://127.0.0.1:7777,sndKernelSize=212992"
xterm -geometry 80x23+0+0 -hold -e @EX_BIN_DIR@/$SAMPLER &
xterm -geometry 120x60+0+0 -hold -e @EX_BIN_DIR@/$SAMPLER &
SINK="fairmq-ex-region-sink"
SINK+=" --id sink1"
SINK+=" --severity debug"
SINK+=" --transport shmem"
SINK+=" --transport $transport"
SINK+=" --channel-config name=data,type=pull,method=connect,address=tcp://127.0.0.1:7777,rcvKernelSize=212992"
xterm -geometry 80x23+500+0 -hold -e @EX_BIN_DIR@/$SINK &
xterm -geometry 120x60+750+0 -hold -e @EX_BIN_DIR@/$SINK &

View File

@@ -31,6 +31,7 @@ SAMPLER_PID=$!
SINK="fairmq-ex-region-sink"
SINK+=" --id sink1"
SINK+=" --transport $transport"
SINK+=" --severity debug"
SINK+=" --session $SESSION"
SINK+=" --verbosity veryhigh"
SINK+=" --control static --color false"

View File

@@ -1,5 +1,5 @@
################################################################################
# Copyright (C) 2012-2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
# Copyright (C) 2012-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
# #
# This software is distributed under the terms of the #
# GNU Lesser General Public Licence (LGPL) version 3, #
@@ -62,6 +62,7 @@ if(BUILD_FAIRMQ OR BUILD_SDK)
target_link_libraries(${target}
PRIVATE
FairLogger::FairLogger
Threads::Threads
PUBLIC
Boost::boost
)
@@ -145,6 +146,7 @@ if(BUILD_FAIRMQ)
# libFairMQ header files #
##########################
set(FAIRMQ_PUBLIC_HEADER_FILES
Device.h
DeviceRunner.h
EventManager.h
FairMQChannel.h
@@ -169,14 +171,21 @@ if(BUILD_FAIRMQ)
Plugin.h
PluginManager.h
PluginServices.h
runDevice.h
runFairMQDevice.h
shmem/Monitor.h
)
set(FAIRMQ_PRIVATE_HEADER_FILES
devices/BenchmarkSampler.h
devices/Merger.h
devices/Multiplier.h
devices/Proxy.h
devices/Sink.h
devices/Splitter.h
plugins/Builtin.h
plugins/config/Config.h
plugins/Control.h
plugins/control/Control.h
shmem/Message.h
shmem/Poller.h
shmem/UnmanagedRegion.h
@@ -223,7 +232,7 @@ if(BUILD_FAIRMQ)
Properties.cxx
SuboptParser.cxx
plugins/config/Config.cxx
plugins/Control.cxx
plugins/control/Control.cxx
MemoryResources.cxx
shmem/Monitor.cxx
)
@@ -243,7 +252,7 @@ if(BUILD_FAIRMQ)
# configure files #
###################
set(FAIRMQ_BIN_DIR ${CMAKE_BINARY_DIR}/fairmq)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/run/startMQBenchmark.sh.in ${CMAKE_CURRENT_BINARY_DIR}/startMQBenchmark.sh)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/devices/startMQBenchmark.sh.in ${CMAKE_CURRENT_BINARY_DIR}/startMQBenchmark.sh)
#################################
# define libFairMQ build target #
@@ -274,6 +283,7 @@ if(BUILD_FAIRMQ)
if(BUILD_OFI_TRANSPORT)
target_compile_definitions(${_target} PRIVATE BUILD_OFI_TRANSPORT)
endif()
target_compile_definitions(${_target} PUBLIC FAIRMQ_HAS_STD_FILESYSTEM=${FAIRMQ_HAS_STD_FILESYSTEM})
#######################
@@ -347,22 +357,22 @@ if(BUILD_FAIRMQ)
###############
# executables #
###############
add_executable(fairmq-bsampler run/runBenchmarkSampler.cxx)
add_executable(fairmq-bsampler devices/runBenchmarkSampler.cxx)
target_link_libraries(fairmq-bsampler FairMQ)
add_executable(fairmq-merger run/runMerger.cxx)
add_executable(fairmq-merger devices/runMerger.cxx)
target_link_libraries(fairmq-merger FairMQ)
add_executable(fairmq-multiplier run/runMultiplier.cxx)
add_executable(fairmq-multiplier devices/runMultiplier.cxx)
target_link_libraries(fairmq-multiplier FairMQ)
add_executable(fairmq-proxy run/runProxy.cxx)
add_executable(fairmq-proxy devices/runProxy.cxx)
target_link_libraries(fairmq-proxy FairMQ)
add_executable(fairmq-sink run/runSink.cxx)
add_executable(fairmq-sink devices/runSink.cxx)
target_link_libraries(fairmq-sink FairMQ)
add_executable(fairmq-splitter run/runSplitter.cxx)
add_executable(fairmq-splitter devices/runSplitter.cxx)
target_link_libraries(fairmq-splitter FairMQ)
add_executable(fairmq-shmmonitor shmem/Monitor.cxx shmem/Monitor.h shmem/runMonitor.cxx)
@@ -375,6 +385,7 @@ if(BUILD_FAIRMQ)
$<$<PLATFORM_ID:Linux>:rt>
Boost::boost
Boost::date_time
$<$<NOT:${FAIRMQ_HAS_STD_FILESYSTEM}>:Boost::filesystem>
Boost::program_options
FairLogger::FairLogger
PicoSHA2
@@ -382,9 +393,13 @@ if(BUILD_FAIRMQ)
target_include_directories(fairmq-shmmonitor PUBLIC
$<BUILD_INTERFACE:${CMAKE_SOURCE_DIR}>
)
target_compile_definitions(fairmq-shmmonitor PUBLIC FAIRMQ_HAS_STD_FILESYSTEM=${FAIRMQ_HAS_STD_FILESYSTEM})
add_executable(fairmq-uuid-gen run/runUuidGenerator.cxx)
target_link_libraries(fairmq-uuid-gen FairMQ)
add_executable(fairmq-uuid-gen tools/runUuidGenerator.cxx)
target_link_libraries(fairmq-uuid-gen PUBLIC
Boost::program_options
Tools
)
###########

21
fairmq/Device.h Normal file
View File

@@ -0,0 +1,21 @@
/********************************************************************************
* Copyright (C) 2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#ifndef FAIR_MQ_DEVICE_H
#define FAIR_MQ_DEVICE_H
#include <FairMQDevice.h>
namespace fair::mq
{
using Device = ::FairMQDevice;
} // namespace fair::mq
#endif /* FAIR_MQ_DEVICE_H */

View File

@@ -1,5 +1,5 @@
/********************************************************************************
* Copyright (C) 2017-2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* Copyright (C) 2017-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *

View File

@@ -1,5 +1,5 @@
/********************************************************************************
* Copyright (C) 2017-2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* Copyright (C) 2017-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
@@ -9,10 +9,10 @@
#ifndef FAIR_MQ_DEVICERUNNER_H
#define FAIR_MQ_DEVICERUNNER_H
#include <fairmq/Device.h>
#include <fairmq/EventManager.h>
#include <fairmq/PluginManager.h>
#include <fairmq/ProgOptions.h>
#include <FairMQDevice.h>
#include <functional>
#include <memory>
@@ -73,7 +73,7 @@ class DeviceRunner
std::vector<std::string> fRawCmdLineArgs;
fair::mq::ProgOptions fConfig;
std::unique_ptr<FairMQDevice> fDevice;
std::unique_ptr<Device> fDevice;
PluginManager fPluginManager;
const bool fPrintLogo;

View File

@@ -9,17 +9,17 @@
#ifndef FAIRMQDEVICE_H_
#define FAIRMQDEVICE_H_
#include <StateMachine.h>
#include <FairMQTransportFactory.h>
#include <fairmq/Transports.h>
#include <fairmq/StateQueue.h>
#include <FairMQChannel.h>
#include <FairMQLogger.h>
#include <FairMQMessage.h>
#include <FairMQParts.h>
#include <FairMQTransportFactory.h>
#include <FairMQUnmanagedRegion.h>
#include <FairMQLogger.h>
#include <fairmq/ProgOptions.h>
#include <fairmq/StateMachine.h>
#include <fairmq/StateQueue.h>
#include <fairmq/Transports.h>
#include <fairmq/tools/Version.h>
#include <vector>
#include <memory> // unique_ptr
@@ -34,8 +34,6 @@
#include <cstddef>
#include <utility> // pair
#include <fairmq/tools/Version.h>
using FairMQChannelMap = std::unordered_map<std::string, std::vector<FairMQChannel>>;
using InputMsgCallback = std::function<bool(FairMQMessagePtr&, int)>;

View File

@@ -1,5 +1,5 @@
/********************************************************************************
* Copyright (C) 2014-2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* Copyright (C) 2014-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
@@ -9,8 +9,9 @@
#ifndef FAIRMQPOLLER_H_
#define FAIRMQPOLLER_H_
#include <string>
#include <memory>
#include <stdexcept>
#include <string>
class FairMQPoller
{

View File

@@ -56,7 +56,7 @@ class FairMQSocket
/// If the backend supports it, fills the unsigned integer @a events with the ZMQ_EVENTS value
/// DISCLAIMER: this API is experimental and unsupported and might be dropped / refactored in
/// the future.
virtual void Events(uint32_t* events) = 0;
virtual int Events(uint32_t* events) = 0;
virtual void SetLinger(const int value) = 0;
virtual int GetLinger() const = 0;
virtual void SetSndBufSize(const int value) = 0;

View File

@@ -92,8 +92,8 @@ class FairMQTransportFactory
/// @param path optional parameter to pass to the underlying transport
/// @param flags optional parameter to pass to the underlying transport
/// @return pointer to UnmanagedRegion
virtual FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback = nullptr, const std::string& path = "", int flags = 0) = 0;
virtual FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionBulkCallback callback = nullptr, const std::string& path = "", int flags = 0) = 0;
virtual FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) = 0;
virtual FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionBulkCallback callback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) = 0;
/// @brief Create new UnmanagedRegion
/// @param size size of the region
/// @param userFlags flags to be stored with the region, have no effect on the transport, but can be retrieved from the region by the user
@@ -101,8 +101,8 @@ class FairMQTransportFactory
/// @param path optional parameter to pass to the underlying transport
/// @param flags optional parameter to pass to the underlying transport
/// @return pointer to UnmanagedRegion
virtual FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, FairMQRegionCallback callback = nullptr, const std::string& path = "", int flags = 0) = 0;
virtual FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, FairMQRegionBulkCallback callback = nullptr, const std::string& path = "", int flags = 0) = 0;
virtual FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, FairMQRegionCallback callback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) = 0;
virtual FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, FairMQRegionBulkCallback callback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) = 0;
/// @brief Subscribe to region events (creation, destruction, ...)
/// @param callback the callback that is called when a region event occurs

View File

@@ -79,6 +79,7 @@ class FairMQUnmanagedRegion
virtual void SetLinger(uint32_t linger) = 0;
virtual uint32_t GetLinger() const = 0;
virtual fair::mq::Transport GetType() const = 0;
FairMQTransportFactory* GetTransport() { return fTransport; }
void SetTransport(FairMQTransportFactory* transport) { fTransport = transport; }
@@ -107,6 +108,19 @@ inline std::ostream& operator<<(std::ostream& os, const FairMQRegionEvent& event
namespace fair::mq
{
struct RegionConfig {
bool lock;
bool zero;
RegionConfig()
: lock(false), zero(false)
{}
RegionConfig(bool l, bool z)
: lock(l), zero(z)
{}
};
using RegionCallback = FairMQRegionCallback;
using RegionBulkCallback = FairMQRegionBulkCallback;
using RegionEventCallback = FairMQRegionEventCallback;

View File

@@ -15,7 +15,7 @@
#ifndef FAIR_MQ_MEMORY_RESOURCES_H
#define FAIR_MQ_MEMORY_RESOURCES_H
#include <fairmq/FairMQMessage.h>
#include <FairMQMessage.h>
class FairMQTransportFactory;
#include <boost/container/container_fwd.hpp>

View File

@@ -200,7 +200,19 @@ auto fair::mq::PluginManager::LoadPluginStatic(const string& pluginName) -> void
// Load symbol
if (fPluginFactories.find(pluginName) == fPluginFactories.end()) {
try {
LoadSymbols(pluginName, dll::program_location());
if ("control" == pluginName) {
try {
fPluginProgOptions.insert({pluginName, plugins::ControlPluginProgramOptions().value()});
}
catch (const boost::bad_optional_access& e) { /* just ignore, if no prog options are declared */ }
} else if ("config" == pluginName) {
try {
fPluginProgOptions.insert({pluginName, plugins::ConfigPluginProgramOptions().value()});
}
catch (const boost::bad_optional_access& e) { /* just ignore, if no prog options are declared */ }
} else {
LoadSymbols(pluginName, dll::program_location());
}
fPluginOrder.push_back(pluginName);
} catch (boost::system::system_error& e) {
throw PluginLoadError(ToString("An error occurred while loading static plugin ", pluginName, ": ", e.what()));
@@ -211,7 +223,13 @@ auto fair::mq::PluginManager::LoadPluginStatic(const string& pluginName) -> void
auto fair::mq::PluginManager::InstantiatePlugin(const string& pluginName) -> void
{
if (fPlugins.find(pluginName) == fPlugins.end()) {
fPlugins[pluginName] = fPluginFactories[pluginName](*fPluginServices);
if ("control" == pluginName) {
fPlugins[pluginName] = plugins::Make_control_Plugin(fPluginServices.get());
} else if ("config" == pluginName) {
fPlugins[pluginName] = plugins::Make_config_Plugin(fPluginServices.get());
} else {
fPlugins[pluginName] = fPluginFactories[pluginName](*fPluginServices);
}
}
}

View File

@@ -12,6 +12,7 @@
#include <fairmq/tools/Strings.h>
#include <memory>
#include <ostream>
#include <stdexcept>
#include <string>
#include <unordered_map>
@@ -60,6 +61,11 @@ try {
throw TransportError(tools::ToString("Unknown transport provided: ", transport));
}
inline std::ostream& operator<<(std::ostream& os, const Transport& transport)
{
return os << TransportName(transport);
}
} // namespace fair::mq
#endif /* FAIR_MQ_TRANSPORTS_H */

View File

@@ -1,32 +1,35 @@
/********************************************************************************
* Copyright (C) 2014-2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* Copyright (C) 2014-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#ifndef FAIRMQBENCHMARKSAMPLER_H_
#define FAIRMQBENCHMARKSAMPLER_H_
#ifndef FAIR_MQ_BENCHMARKSAMPLER_H
#define FAIR_MQ_BENCHMARKSAMPLER_H
#include "../FairMQLogger.h"
#include "FairMQDevice.h"
#include "tools/RateLimit.h"
#include <fairmq/Device.h>
#include <fairmq/tools/RateLimit.h>
#include <chrono>
#include <cstddef> // size_t
#include <cstdint> // uint64_t
#include <cstring> // memset
#include <fairlogger/Logger.h>
#include <string>
namespace fair::mq
{
/**
* Sampler to generate traffic for benchmarking.
*/
class FairMQBenchmarkSampler : public FairMQDevice
class BenchmarkSampler : public Device
{
public:
FairMQBenchmarkSampler()
BenchmarkSampler()
: fMultipart(false)
, fMemSet(false)
, fNumParts(1)
@@ -117,4 +120,6 @@ class FairMQBenchmarkSampler : public FairMQDevice
std::string fOutChannelName;
};
#endif /* FAIRMQBENCHMARKSAMPLER_H_ */
} // namespace fair::mq
#endif /* FAIR_MQ_BENCHMARKSAMPLER_H */

View File

@@ -1,36 +1,32 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* Copyright (C) 2014-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* FairMQMerger.h
*
* @since 2012-12-06
* @author D. Klein, A. Rybalchenko
*/
#ifndef FAIRMQMERGER_H_
#define FAIRMQMERGER_H_
#ifndef FAIR_MQ_MERGER_H
#define FAIR_MQ_MERGER_H
#include "FairMQDevice.h"
#include "../FairMQPoller.h"
#include "../FairMQLogger.h"
#include <FairMQPoller.h>
#include <fairmq/Device.h>
#include <fairlogger/Logger.h>
#include <string>
#include <vector>
class FairMQMerger : public FairMQDevice
namespace fair::mq
{
class Merger : public Device
{
public:
FairMQMerger()
Merger()
: fMultipart(true)
, fInChannelName("data-in")
, fOutChannelName("data-out")
{}
~FairMQMerger() {}
protected:
bool fMultipart;
@@ -112,4 +108,6 @@ class FairMQMerger : public FairMQDevice
}
};
#endif /* FAIRMQMERGER_H_ */
} // namespace fair::mq
#endif /* FAIR_MQ_MERGER_H */

View File

@@ -1,29 +1,31 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* Copyright (C) 2014-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#ifndef FAIRMQMULTIPLIER_H_
#define FAIRMQMULTIPLIER_H_
#ifndef FAIR_MQ_MULTIPLIER_H
#define FAIR_MQ_MULTIPLIER_H
#include "FairMQDevice.h"
#include <fairmq/Device.h>
#include <string>
#include <vector>
class FairMQMultiplier : public FairMQDevice
namespace fair::mq
{
class Multiplier : public Device
{
public:
FairMQMultiplier()
Multiplier()
: fMultipart(true)
, fNumOutputs(0)
, fInChannelName()
, fOutChannelNames()
{}
~FairMQMultiplier() {}
protected:
bool fMultipart;
@@ -39,9 +41,9 @@ class FairMQMultiplier : public FairMQDevice
fNumOutputs = fChannels.at(fOutChannelNames.at(0)).size();
if (fMultipart) {
OnData(fInChannelName, &FairMQMultiplier::HandleMultipartData);
OnData(fInChannelName, &Multiplier::HandleMultipartData);
} else {
OnData(fInChannelName, &FairMQMultiplier::HandleSingleData);
OnData(fInChannelName, &Multiplier::HandleSingleData);
}
}
@@ -107,4 +109,6 @@ class FairMQMultiplier : public FairMQDevice
}
};
#endif /* FAIRMQMULTIPLIER_H_ */
} // namespace fair::mq
#endif /* FAIR_MQ_MULTIPLIER_H */

View File

@@ -1,33 +1,29 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* Copyright (C) 2014-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* FairMQProxy.h
*
* @since 2013-10-02
* @author A. Rybalchenko
*/
#ifndef FAIRMQPROXY_H_
#define FAIRMQPROXY_H_
#ifndef FAIR_MQ_PROXY_H
#define FAIR_MQ_PROXY_H
#include "FairMQDevice.h"
#include <fairmq/Device.h>
#include <string>
class FairMQProxy : public FairMQDevice
namespace fair::mq
{
class Proxy : public Device
{
public:
FairMQProxy()
Proxy()
: fMultipart(true)
, fInChannelName()
, fOutChannelName()
{}
~FairMQProxy() {}
protected:
bool fMultipart;
@@ -73,4 +69,6 @@ class FairMQProxy : public FairMQDevice
}
};
#endif /* FAIRMQPROXY_H_ */
} // namespace fair::mq
#endif /* FAIR_MQ_PROXY_H */

View File

@@ -2,9 +2,9 @@
With FairMQ several generic devices are provided:
- **FairMQBenchmarkSampler**: generates random data of configurable size and at configurable rate and sends it out on an output channel.
- **FairMQSink**: receives messages on the input channel and simply discards them.
- **FairMQMerger**: receives data from multiple input channels and forwards it to a single output channel.
- **FairMQSplitter**: receives messages on a single input channels and round-robins them among multiple output channels (which can have different socket types).
- **FairMQMultiplier**: receives data from a single input channel and multiplies (copies) it to two or more output channels.
- **FairMQProxy**: connects input channel to output channel, where both can have different socket types and multiple peers.
- **BenchmarkSampler**: generates random data of configurable size and at configurable rate and sends it out on an output channel.
- **Sink**: receives messages on the input channel and simply discards them.
- **Merger**: receives data from multiple input channels and forwards it to a single output channel.
- **Splitter**: receives messages on a single input channels and round-robins them among multiple output channels (which can have different socket types).
- **Multiplier**: receives data from a single input channel and multiplies (copies) it to two or more output channels.
- **Proxy**: connects input channel to output channel, where both can have different socket types and multiple peers.

View File

@@ -1,33 +1,31 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* Copyright (C) 2014-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* FairMQSink.h
*
* @since 2013-01-09
* @author D. Klein, A. Rybalchenko
*/
#ifndef FAIRMQSINK_H_
#define FAIRMQSINK_H_
#ifndef FAIR_MQ_SINK_H
#define FAIR_MQ_SINK_H
#include "../FairMQDevice.h"
#include "../FairMQLogger.h"
#include <FairMQPoller.h>
#include <fairmq/Device.h>
#include <fairmq/tools/Strings.h>
#include <chrono>
#include <string>
#include <fairlogger/Logger.h>
#include <fstream>
#include <string>
#include <stdexcept>
class FairMQSink : public FairMQDevice
namespace fair::mq
{
class Sink : public Device
{
public:
FairMQSink()
Sink()
: fMultipart(false)
, fMaxIterations(0)
, fNumIterations(0)
@@ -37,8 +35,6 @@ class FairMQSink : public FairMQDevice
, fOutFilename()
{}
~FairMQSink() {}
protected:
bool fMultipart;
uint64_t fMaxIterations;
@@ -145,4 +141,6 @@ class FairMQSink : public FairMQDevice
}
};
#endif /* FAIRMQSINK_H_ */
} // namespace fair::mq
#endif /* FAIR_MQ_SINK_H */

View File

@@ -1,35 +1,31 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* Copyright (C) 2014-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* FairMQSplitter.h
*
* @since 2012-12-06
* @author D. Klein, A. Rybalchenko
*/
#ifndef FAIRMQSPLITTER_H_
#define FAIRMQSPLITTER_H_
#ifndef FAIR_MQ_SPLITTER_H
#define FAIR_MQ_SPLITTER_H
#include "FairMQDevice.h"
#include <fairmq/Device.h>
#include <string>
class FairMQSplitter : public FairMQDevice
namespace fair::mq
{
class Splitter : public Device
{
public:
FairMQSplitter()
Splitter()
: fMultipart(true)
, fNumOutputs(0)
, fDirection(0)
, fInChannelName()
, fOutChannelName()
{}
~FairMQSplitter() {}
protected:
bool fMultipart;
@@ -47,9 +43,9 @@ class FairMQSplitter : public FairMQDevice
fDirection = 0;
if (fMultipart) {
OnData(fInChannelName, &FairMQSplitter::HandleData<FairMQParts>);
OnData(fInChannelName, &Splitter::HandleData<FairMQParts>);
} else {
OnData(fInChannelName, &FairMQSplitter::HandleData<FairMQMessagePtr>);
OnData(fInChannelName, &Splitter::HandleData<FairMQMessagePtr>);
}
}
@@ -66,4 +62,6 @@ class FairMQSplitter : public FairMQDevice
}
};
#endif /* FAIRMQSPLITTER_H_ */
} // namespace fair::mq
#endif /* FAIR_MQ_SPLITTER_H */

View File

@@ -1,13 +1,13 @@
/********************************************************************************
* Copyright (C) 2014-2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* Copyright (C) 2014-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <runFairMQDevice.h>
#include <devices/FairMQBenchmarkSampler.h>
#include <fairmq/devices/BenchmarkSampler.h>
#include <fairmq/runDevice.h>
namespace bpo = boost::program_options;
@@ -24,7 +24,7 @@ void addCustomOptions(bpo::options_description& options)
("msg-rate", bpo::value<float>()->default_value(0), "Msg rate limit in maximum number of messages per second");
}
FairMQDevicePtr getDevice(const fair::mq::ProgOptions& /* config */)
std::unique_ptr<fair::mq::Device> getDevice(const fair::mq::ProgOptions& /* config */)
{
return new FairMQBenchmarkSampler();
return std::make_unique<fair::mq::BenchmarkSampler>();
}

View File

@@ -1,13 +1,13 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* Copyright (C) 2014-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <runFairMQDevice.h>
#include <devices/FairMQMerger.h>
#include <fairmq/devices/Merger.h>
#include <fairmq/runDevice.h>
namespace bpo = boost::program_options;
@@ -19,7 +19,7 @@ void addCustomOptions(bpo::options_description& options)
("multipart", bpo::value<bool>()->default_value(true), "Handle multipart payloads");
}
FairMQDevicePtr getDevice(const fair::mq::ProgOptions& /*config*/)
std::unique_ptr<fair::mq::Device> getDevice(const fair::mq::ProgOptions& /*config*/)
{
return new FairMQMerger();
return std::make_unique<fair::mq::Merger>();
}

View File

@@ -1,13 +1,13 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* Copyright (C) 2014-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <runFairMQDevice.h>
#include <devices/FairMQMultiplier.h>
#include <fairmq/devices/Multiplier.h>
#include <fairmq/runDevice.h>
namespace bpo = boost::program_options;
@@ -19,7 +19,7 @@ void addCustomOptions(bpo::options_description& options)
("multipart", bpo::value<bool>()->default_value(true), "Handle multipart payloads");
}
FairMQDevicePtr getDevice(const fair::mq::ProgOptions& /*config*/)
std::unique_ptr<fair::mq::Device> getDevice(const fair::mq::ProgOptions& /*config*/)
{
return new FairMQMultiplier();
return std::make_unique<fair::mq::Multiplier>();
}

View File

@@ -1,13 +1,13 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* Copyright (C) 2014-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <runFairMQDevice.h>
#include <devices/FairMQProxy.h>
#include <fairmq/devices/Proxy.h>
#include <fairmq/runDevice.h>
namespace bpo = boost::program_options;
@@ -19,7 +19,7 @@ void addCustomOptions(bpo::options_description& options)
("multipart", bpo::value<bool>()->default_value(true), "Handle multipart payloads");
}
FairMQDevicePtr getDevice(const fair::mq::ProgOptions& /*config*/)
std::unique_ptr<fair::mq::Device> getDevice(const fair::mq::ProgOptions& /*config*/)
{
return new FairMQProxy();
return std::make_unique<fair::mq::Proxy>();
}

View File

@@ -1,13 +1,13 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* Copyright (C) 2014-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <runFairMQDevice.h>
#include <devices/FairMQSink.h>
#include <fairmq/devices/Sink.h>
#include <fairmq/runDevice.h>
namespace bpo = boost::program_options;
@@ -21,7 +21,7 @@ void addCustomOptions(bpo::options_description& options)
("multipart", bpo::value<bool>()->default_value(false), "Handle multipart payloads");
}
FairMQDevicePtr getDevice(const fair::mq::ProgOptions& /*config*/)
std::unique_ptr<fair::mq::Device> getDevice(const fair::mq::ProgOptions& /*config*/)
{
return new FairMQSink();
return std::make_unique<fair::mq::Sink>();
}

View File

@@ -1,13 +1,13 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* Copyright (C) 2014-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <runFairMQDevice.h>
#include <devices/FairMQSplitter.h>
#include <fairmq/devices/Splitter.h>
#include <fairmq/runDevice.h>
namespace bpo = boost::program_options;
@@ -19,7 +19,7 @@ void addCustomOptions(bpo::options_description& options)
("multipart", bpo::value<bool>()->default_value(true), "Handle multipart payloads");
}
FairMQDevicePtr getDevice(const fair::mq::ProgOptions& /*config*/)
std::unique_ptr<fair::mq::Device> getDevice(const fair::mq::ProgOptions& /*config*/)
{
return new FairMQSplitter();
return std::make_unique<fair::mq::Splitter>();
}

View File

@@ -41,7 +41,7 @@ class Socket final : public fair::mq::Socket
auto GetId() const -> std::string override { return fId; }
auto Events(uint32_t *events) -> void override { *events = 0; }
auto Events(uint32_t *events) -> int override { *events = 0; }
auto Bind(const std::string& address) -> bool override;
auto Connect(const std::string& address) -> bool override;

View File

@@ -92,22 +92,22 @@ auto TransportFactory::CreatePoller(const unordered_map<string, vector<FairMQCha
// return PollerPtr{new Poller(channelsMap, channelList)};
}
auto TransportFactory::CreateUnmanagedRegion(const size_t /*size*/, FairMQRegionCallback /*callback*/, const std::string& /* path = "" */, int /* flags = 0 */) -> UnmanagedRegionPtr
auto TransportFactory::CreateUnmanagedRegion(const size_t /*size*/, FairMQRegionCallback /*callback*/, const std::string& /* path = "" */, int /* flags = 0 */, fair::mq::RegionConfig /* cfg = fair::mq::RegionConfig() */) -> UnmanagedRegionPtr
{
throw runtime_error{"Not yet implemented UMR."};
}
auto TransportFactory::CreateUnmanagedRegion(const size_t /*size*/, FairMQRegionBulkCallback /*callback*/, const std::string& /* path = "" */, int /* flags = 0 */) -> UnmanagedRegionPtr
auto TransportFactory::CreateUnmanagedRegion(const size_t /*size*/, FairMQRegionBulkCallback /*callback*/, const std::string& /* path = "" */, int /* flags = 0 */, fair::mq::RegionConfig /* cfg = fair::mq::RegionConfig() */) -> UnmanagedRegionPtr
{
throw runtime_error{"Not yet implemented UMR."};
}
auto TransportFactory::CreateUnmanagedRegion(const size_t /*size*/, const int64_t /*userFlags*/, FairMQRegionCallback /*callback*/, const std::string& /* path = "" */, int /* flags = 0 */) -> UnmanagedRegionPtr
auto TransportFactory::CreateUnmanagedRegion(const size_t /*size*/, const int64_t /*userFlags*/, FairMQRegionCallback /*callback*/, const std::string& /* path = "" */, int /* flags = 0 */, fair::mq::RegionConfig /* cfg = fair::mq::RegionConfig() */) -> UnmanagedRegionPtr
{
throw runtime_error{"Not yet implemented UMR."};
}
auto TransportFactory::CreateUnmanagedRegion(const size_t /*size*/, const int64_t /*userFlags*/, FairMQRegionBulkCallback /*callback*/, const std::string& /* path = "" */, int /* flags = 0 */) -> UnmanagedRegionPtr
auto TransportFactory::CreateUnmanagedRegion(const size_t /*size*/, const int64_t /*userFlags*/, FairMQRegionBulkCallback /*callback*/, const std::string& /* path = "" */, int /* flags = 0 */, fair::mq::RegionConfig /* cfg = fair::mq::RegionConfig() */) -> UnmanagedRegionPtr
{
throw runtime_error{"Not yet implemented UMR."};
}

View File

@@ -1,5 +1,5 @@
/********************************************************************************
* Copyright (C) 2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* Copyright (C) 2017-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
@@ -9,4 +9,4 @@
// List of all builtin plugin headers (the ones which call REGISTER_FAIRMQ_PLUGIN macro)
#include <fairmq/plugins/config/Config.h>
#include <fairmq/plugins/Control.h>
#include <fairmq/plugins/control/Control.h>

59
fairmq/runDevice.h Normal file
View File

@@ -0,0 +1,59 @@
/********************************************************************************
* Copyright (C) 2017-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <fairmq/DeviceRunner.h>
#include <boost/program_options.hpp>
#include <memory>
// to be implemented by the user to return a child class of FairMQDevice
std::unique_ptr<fair::mq::Device> getDevice(const fair::mq::ProgOptions& config);
// to be implemented by the user to add custom command line options (or just with empty body)
void addCustomOptions(boost::program_options::options_description&);
int main(int argc, char* argv[])
{
using namespace fair::mq;
using namespace fair::mq::hooks;
try {
DeviceRunner runner(argc, argv);
// runner.AddHook<LoadPlugins>([](DeviceRunner& r){
// // for example:
// r.fPluginManager->SetSearchPaths({"/lib", "/lib/plugins"});
// r.fPluginManager->LoadPlugin("asdf");
// });
runner.AddHook<SetCustomCmdLineOptions>([](DeviceRunner& r){
boost::program_options::options_description customOptions("Custom options");
addCustomOptions(customOptions);
r.fConfig.AddToCmdLineOptions(customOptions);
});
// runner.AddHook<ModifyRawCmdLineArgs>([](DeviceRunner& r){
// // for example:
// r.fRawCmdLineArgs.push_back("--blubb");
// });
runner.AddHook<InstantiateDevice>([](DeviceRunner& r){
r.fDevice = getDevice(r.fConfig);
});
return runner.Run();
// Run with builtin catch all exception handler, just:
// return runner.RunWithExceptionHandlers();
} catch (std::exception& e) {
LOG(error) << "Uncaught exception reached the top of main: " << e.what();
return 1;
} catch (...) {
LOG(error) << "Uncaught exception reached the top of main.";
return 1;
}
}

View File

@@ -65,7 +65,10 @@ struct DDSEnvironment::Impl
" mkdir -p \"$HOME/.DDS\"\n"
" dds-user-defaults --ignore-default-sid -d -c \"$HOME/.DDS/DDS.cfg\"\n"
"fi\n";
std::system(cmd.str().c_str());
auto rc(std::system(cmd.str().c_str()));
if (rc != 0) {
LOG(warn) << "DDSEnvironment::SetupConfigHome failed";
}
}
auto SetupPath() -> void

View File

@@ -91,6 +91,17 @@ struct SegmentInfo
AllocationAlgorithm fAllocationAlgorithm;
};
struct SessionInfo
{
SessionInfo(const char* sessionName, int creatorId, const VoidAlloc& alloc)
: fSessionName(sessionName, alloc)
, fCreatorId(creatorId)
{}
Str fSessionName;
int fCreatorId;
};
using Uint16SegmentInfoPairAlloc = boost::interprocess::allocator<std::pair<const uint16_t, SegmentInfo>, SegmentManager>;
using Uint16SegmentInfoHashMap = boost::unordered_map<uint16_t, SegmentInfo, boost::hash<uint16_t>, std::equal_to<uint16_t>, Uint16SegmentInfoPairAlloc>;
// using Uint16SegmentInfoMap = boost::interprocess::map<uint16_t, SegmentInfo, std::less<uint16_t>, Uint16SegmentInfoPairAlloc>;
@@ -195,9 +206,9 @@ struct RegionBlock
// find id for unique shmem name:
// a hash of user id + session id, truncated to 8 characters (to accommodate for name size limit on some systems (MacOS)).
inline std::string makeShmIdStr(const std::string& sessionId)
inline std::string makeShmIdStr(const std::string& sessionId, const std::string& userId)
{
std::string seed((std::to_string(geteuid()) + sessionId));
std::string seed(userId + sessionId);
// generate a 8-digit hex value out of sha256 hash
std::vector<unsigned char> hash(4);
picosha2::hash256(seed.begin(), seed.end(), hash.begin(), hash.end());
@@ -205,6 +216,11 @@ inline std::string makeShmIdStr(const std::string& sessionId)
return picosha2::bytes_to_hex_string(hash.begin(), hash.end());
}
inline std::string makeShmIdStr(const std::string& sessionId)
{
return makeShmIdStr(sessionId, std::to_string(geteuid()));
}
inline uint64_t makeShmIdUint64(const std::string& sessionId)
{
std::string shmId = makeShmIdStr(sessionId);

View File

@@ -44,8 +44,12 @@
#include <thread>
#include <unordered_map>
#include <utility> // pair
#include <tuple>
#include <vector>
#include <unistd.h> // getuid
#include <sys/types.h> // getuid
#include <sys/mman.h> // mlock
namespace fair::mq::shmem
@@ -54,15 +58,16 @@ namespace fair::mq::shmem
class Manager
{
public:
Manager(std::string shmId, std::string deviceId, size_t size, const ProgOptions* config)
: fShmId(std::move(shmId))
Manager(const std::string& sessionName, std::string deviceId, size_t size, const ProgOptions* config)
: fShmId64(makeShmIdUint64(sessionName))
, fShmId(makeShmIdStr(sessionName))
, fSegmentId(config ? config->GetProperty<uint16_t>("shm-segment-id", 0) : 0)
, fDeviceId(std::move(deviceId))
, fSegments()
, fManagementSegment(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_mng").c_str(), 6553600)
, fShmVoidAlloc(fManagementSegment.get_segment_manager())
, fShmMtx(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_mtx").c_str())
, fRegionEventsCV(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_cv").c_str())
, fRegionEventsShmCV(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_cv").c_str())
, fRegionEventsSubscriptionActive(false)
, fNumObservedEvents(0)
, fDeviceCounter(nullptr)
@@ -70,10 +75,11 @@ class Manager
, fShmSegments(nullptr)
, fShmRegions(nullptr)
, fInterrupted(false)
, fMsgCounter(0)
#ifdef FAIRMQ_DEBUG_MODE
, fMsgDebug(nullptr)
, fShmMsgCounters(nullptr)
, fMsgCounterNew(0)
, fMsgCounterDelete(0)
#endif
, fHeartbeatThread()
, fSendHeartbeats(true)
@@ -82,6 +88,8 @@ class Manager
{
using namespace boost::interprocess;
LOG(debug) << "Generated shmid '" << fShmId << "' out of session id '" << sessionName << "'.";
bool mlockSegment = false;
bool zeroSegment = false;
bool autolaunchMonitor = false;
@@ -96,17 +104,28 @@ class Manager
}
if (autolaunchMonitor) {
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
StartMonitor(fShmId);
}
fHeartbeatThread = std::thread(&Manager::SendHeartbeats, this);
{
std::stringstream ss;
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
fShmSegments = fManagementSegment.find_or_construct<Uint16SegmentInfoHashMap>(unique_instance)(fShmVoidAlloc);
fEventCounter = fManagementSegment.find<EventCounter>(unique_instance).first;
SessionInfo* sessionInfo = fManagementSegment.find<SessionInfo>(unique_instance).first;
if (sessionInfo) {
LOG(debug) << "session info found, name: " << sessionInfo->fSessionName << ", creator id: " << sessionInfo->fCreatorId;
} else {
LOG(debug) << "no session info found, creating and initializing";
sessionInfo = fManagementSegment.construct<SessionInfo>(unique_instance)(sessionName.c_str(), geteuid(), fShmVoidAlloc);
LOG(debug) << "initialized session info, name: " << sessionInfo->fSessionName << ", creator id: " << sessionInfo->fCreatorId;
}
fEventCounter = fManagementSegment.find<EventCounter>(unique_instance).first;
if (fEventCounter) {
LOG(debug) << "event counter found: " << fEventCounter->fCount;
} else {
@@ -146,8 +165,8 @@ class Manager
ss << "Opened ";
}
ss << "shared memory segment '" << "fmq_" << fShmId << "_m_" << fSegmentId << "'."
<< " Size: " << boost::apply_visitor(SegmentSize{}, fSegments.at(fSegmentId)) << " bytes."
<< " Available: " << boost::apply_visitor(SegmentFreeMemory{}, fSegments.at(fSegmentId)) << " bytes."
<< " Size: " << boost::apply_visitor(SegmentSize(), fSegments.at(fSegmentId)) << " bytes."
<< " Available: " << boost::apply_visitor(SegmentFreeMemory(), fSegments.at(fSegmentId)) << " bytes."
<< " Allocation algorithm: " << allocationAlgorithm;
LOG(debug) << ss.str();
} catch(interprocess_exception& bie) {
@@ -157,21 +176,20 @@ class Manager
if (mlockSegment) {
LOG(debug) << "Locking the managed segment memory pages...";
if (mlock(boost::apply_visitor(SegmentAddress{}, fSegments.at(fSegmentId)), boost::apply_visitor(SegmentSize{}, fSegments.at(fSegmentId))) == -1) {
if (mlock(boost::apply_visitor(SegmentAddress(), fSegments.at(fSegmentId)), boost::apply_visitor(SegmentSize(), fSegments.at(fSegmentId))) == -1) {
LOG(error) << "Could not lock the managed segment memory. Code: " << errno << ", reason: " << strerror(errno);
}
LOG(debug) << "Successfully locked the managed segment memory pages.";
}
if (zeroSegment) {
LOG(debug) << "Zeroing the managed segment free memory...";
boost::apply_visitor(SegmentMemoryZeroer{}, fSegments.at(fSegmentId));
boost::apply_visitor(SegmentMemoryZeroer(), fSegments.at(fSegmentId));
LOG(debug) << "Successfully zeroed the managed segment free memory.";
}
fShmRegions = fManagementSegment.find_or_construct<Uint16RegionInfoHashMap>(unique_instance)(fShmVoidAlloc);
fDeviceCounter = fManagementSegment.find<DeviceCounter>(unique_instance).first;
if (fDeviceCounter) {
LOG(debug) << "device counter found, with value of " << fDeviceCounter->fCount << ". incrementing.";
(fDeviceCounter->fCount)++;
@@ -187,8 +205,6 @@ class Manager
fShmMsgCounters = fManagementSegment.find_or_construct<Uint16MsgCounterHashMap>(unique_instance)(fShmVoidAlloc);
#endif
}
fHeartbeatThread = std::thread(&Manager::SendHeartbeats, this);
}
Manager() = delete;
@@ -214,8 +230,15 @@ class Manager
boost::filesystem::path p = boost::process::search_path("fairmq-shmmonitor", ownPath);
bool verbose = false;
if (const char* verboseEnv = getenv("FAIRMQ_SHMMONITOR_VERBOSE")) {
if (std::string(verboseEnv) == "true") {
verbose = true;
}
}
if (!p.empty()) {
boost::process::spawn(p, "-x", "--shmid", id, "-d", "-t", "2000", env);
boost::process::spawn(p, "-x", "-m", "--shmid", id, "-d", "-t", "2000", (verbose ? "--verbose" : ""), env);
int numTries = 0;
do {
try {
@@ -240,10 +263,13 @@ class Manager
void Resume() { fInterrupted.store(false); }
void Reset()
{
if (fMsgCounter.load() != 0) {
LOG(error) << "Message counter during Reset expected to be 0, found: " << fMsgCounter.load();
throw MessageError(tools::ToString("Message counter during Reset expected to be 0, found: ", fMsgCounter.load()));
#ifdef FAIRMQ_DEBUG_MODE
auto diff = fMsgCounterNew.load() - fMsgCounterDelete.load();
if (diff != 0) {
LOG(error) << "Message counter during Reset expected to be 0, found: " << diff;
throw MessageError(tools::ToString("Message counter during Reset expected to be 0, found: ", diff));
}
#endif
}
bool Interrupted() { return fInterrupted.load(); }
@@ -251,8 +277,9 @@ class Manager
const int64_t userFlags,
RegionCallback callback,
RegionBulkCallback bulkCallback,
const std::string& path = "",
int flags = 0)
const std::string& path,
int flags,
fair::mq::RegionConfig cfg)
{
using namespace boost::interprocess;
try {
@@ -282,22 +309,35 @@ class Manager
return {nullptr, id};
}
// create region info
fShmRegions->emplace(id, RegionInfo(path.c_str(), flags, userFlags, fShmVoidAlloc));
auto r = fRegions.emplace(id, std::make_unique<Region>(fShmId, id, size, false, callback, bulkCallback, path, flags));
// LOG(debug) << "Created region with id '" << id << "', path: '" << path << "', flags: '" << flags << "'";
if (cfg.lock) {
LOG(debug) << "Locking region " << id << "...";
if (mlock(r.first->second->fRegion.get_address(), r.first->second->fRegion.get_size()) == -1) {
LOG(error) << "Could not lock region " << id << ". Code: " << errno << ", reason: " << strerror(errno);
}
LOG(debug) << "Successfully locked region " << id << ".";
}
if (cfg.zero) {
LOG(debug) << "Zeroing free memory of region " << id << "...";
memset(r.first->second->fRegion.get_address(), 0x00, r.first->second->fRegion.get_size());
LOG(debug) << "Successfully zeroed free memory of region " << id << ".";
}
fShmRegions->emplace(id, RegionInfo(path.c_str(), flags, userFlags, fShmVoidAlloc));
r.first->second->InitializeQueues();
r.first->second->StartReceivingAcks();
result.first = &(r.first->second->fRegion);
result.second = id;
(fEventCounter->fCount)++;
}
fRegionEventsCV.notify_all();
fRegionsGen += 1; // signal TL cache invalidation
fRegionEventsShmCV.notify_all();
return result;
} catch (interprocess_exception& e) {
LOG(error) << "cannot create region. Already created/not cleaned up?";
LOG(error) << e.what();
@@ -307,8 +347,28 @@ class Manager
Region* GetRegion(const uint16_t id)
{
// NOTE: gcc optimizations. Prevent loading tls addresses many times in the fast path
const auto &lTlCache = fTlRegionCache;
const auto &lTlCacheVec = lTlCache.fRegionsTLCache;
const auto lTlCacheGen = lTlCache.fRegionsTLCacheGen;
// fast path
for (const auto &lRegion : lTlCacheVec) {
if ((std::get<1>(lRegion) == id) && (lTlCacheGen == fRegionsGen) && (std::get<2>(lRegion) == fShmId64)) {
return std::get<0>(lRegion);
}
}
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
return GetRegionUnsafe(id);
// slow path: check invalidation
if (lTlCacheGen != fRegionsGen) {
fTlRegionCache.fRegionsTLCache.clear();
}
auto *lRegion = GetRegionUnsafe(id);
fTlRegionCache.fRegionsTLCache.emplace_back(std::make_tuple(lRegion, id, fShmId64));
fTlRegionCache.fRegionsTLCacheGen = fRegionsGen;
return lRegion;
}
Region* GetRegionUnsafe(const uint16_t id)
@@ -332,7 +392,7 @@ class Manager
LOG(error) << oor.what();
return nullptr;
} catch (boost::interprocess::interprocess_exception& e) {
LOG(warn) << "Could not get remote region for id '" << id << "'";
LOG(error) << "Could not get remote region for id '" << id << "': " << e.what();
return nullptr;
}
}
@@ -340,13 +400,19 @@ class Manager
void RemoveRegion(const uint16_t id)
{
fRegions.erase(id);
{
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
fShmRegions->at(id).fDestroyed = true;
(fEventCounter->fCount)++;
try {
fRegions.at(id)->StopAcks();
{
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
fShmRegions->at(id).fDestroyed = true;
fRegions.erase(id);
(fEventCounter->fCount)++;
}
fRegionEventsShmCV.notify_all();
} catch(std::out_of_range& oor) {
LOG(debug) << "RemoveRegion() could not locate region with id '" << id << "'";
}
fRegionEventsCV.notify_all();
fRegionsGen += 1; // signal TL cache invalidation
}
std::vector<fair::mq::RegionInfo> GetRegionInfo()
@@ -367,8 +433,12 @@ class Manager
info.event = e.second.fDestroyed ? RegionEvent::destroyed : RegionEvent::created;
if (!e.second.fDestroyed) {
auto region = GetRegionUnsafe(info.id);
info.ptr = region->fRegion.get_address();
info.size = region->fRegion.get_size();
if (region) {
info.ptr = region->fRegion.get_address();
info.size = region->fRegion.get_size();
} else {
throw std::runtime_error(tools::ToString("GetRegionInfoUnsafe() could not get region with id '", info.id, "'"));
}
} else {
info.ptr = nullptr;
info.size = 0;
@@ -384,8 +454,8 @@ class Manager
info.managed = true;
info.id = e.first;
info.event = RegionEvent::created;
info.ptr = boost::apply_visitor(SegmentAddress{}, fSegments.at(e.first));
info.size = boost::apply_visitor(SegmentSize{}, fSegments.at(e.first));
info.ptr = boost::apply_visitor(SegmentAddress(), fSegments.at(e.first));
info.size = boost::apply_visitor(SegmentSize(), fSegments.at(e.first));
result.push_back(info);
} catch (const std::out_of_range& oor) {
LOG(error) << "could not find segment with id " << e.first;
@@ -403,7 +473,7 @@ class Manager
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
fRegionEventsSubscriptionActive = false;
lock.unlock();
fRegionEventsCV.notify_all();
fRegionEventsShmCV.notify_all();
fRegionEventThread.join();
}
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
@@ -420,7 +490,7 @@ class Manager
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
fRegionEventsSubscriptionActive = false;
lock.unlock();
fRegionEventsCV.notify_all();
fRegionEventsShmCV.notify_all();
fRegionEventThread.join();
lock.lock();
fRegionEventCallback = nullptr;
@@ -434,36 +504,49 @@ class Manager
auto infos = GetRegionInfoUnsafe();
for (const auto& i : infos) {
auto el = fObservedRegionEvents.find({i.id, i.managed});
if (el == fObservedRegionEvents.end()) {
fRegionEventCallback(i);
if (el == fObservedRegionEvents.end()) { // if event id has not been observed
fObservedRegionEvents.emplace(std::make_pair(i.id, i.managed), i.event);
++fNumObservedEvents;
} else {
// if a region has been created and destroyed rapidly, we could see 'destroyed' without ever seeing 'created'
// TODO: do we care to show 'created' events if we know region is already destroyed?
if (i.event == RegionEvent::created) {
fRegionEventCallback(i);
++fNumObservedEvents;
} else {
fNumObservedEvents += 2;
}
} else { // if event id has been observed (expected - there are two events per id - created & destroyed)
// fire a callback if we have observed 'created' event and incoming is 'destroyed'
if (el->second == RegionEvent::created && i.event == RegionEvent::destroyed) {
fRegionEventCallback(i);
el->second = i.event;
++fNumObservedEvents;
} else {
// LOG(debug) << "ignoring event for id" << i.id << ":";
// LOG(debug) << "incoming event: " << i.event;
// LOG(debug) << "stored event: " << el->second;
// LOG(debug) << "ignoring event " << i.id << ": incoming: " << i.event << ", stored: " << el->second;
}
}
}
fRegionEventsCV.wait(lock, [&] { return !fRegionEventsSubscriptionActive || fNumObservedEvents != fEventCounter->fCount; });
fRegionEventsShmCV.wait(lock, [&] { return !fRegionEventsSubscriptionActive || fNumObservedEvents != fEventCounter->fCount; });
}
}
void IncrementMsgCounter() { fMsgCounter.fetch_add(1, std::memory_order_relaxed); }
void DecrementMsgCounter() { fMsgCounter.fetch_sub(1, std::memory_order_relaxed); }
void IncrementMsgCounter()
{
#ifdef FAIRMQ_DEBUG_MODE
fMsgCounterNew.fetch_add(1, std::memory_order_relaxed);
#endif
}
void DecrementMsgCounter()
{
#ifdef FAIRMQ_DEBUG_MODE
fMsgCounterDelete.fetch_add(1, std::memory_order_relaxed);
#endif
}
#ifdef FAIRMQ_DEBUG_MODE
void IncrementShmMsgCounter(uint16_t segmentId) { ++((*fShmMsgCounters)[segmentId].fCount); }
void DecrementShmMsgCounter(uint16_t segmentId) { --((*fShmMsgCounters)[segmentId].fCount); }
#endif
boost::interprocess::named_mutex& GetMtx() { return fShmMtx; }
void SendHeartbeats()
{
std::string controlQueueName("fmq_" + fShmId + "_cq");
@@ -491,7 +574,7 @@ class Manager
auto it = fSegments.find(id);
if (it == fSegments.end()) {
try {
// get region info
// get segment info
SegmentInfo segmentInfo = fShmSegments->at(id);
LOG(debug) << "Located segment with id '" << id << "'";
@@ -512,11 +595,11 @@ class Manager
boost::interprocess::managed_shared_memory::handle_t GetHandleFromAddress(const void* ptr, uint16_t segmentId) const
{
return boost::apply_visitor(SegmentHandleFromAddress{ptr}, fSegments.at(segmentId));
return boost::apply_visitor(SegmentHandleFromAddress(ptr), fSegments.at(segmentId));
}
void* GetAddressFromHandle(const boost::interprocess::managed_shared_memory::handle_t handle, uint16_t segmentId) const
{
return boost::apply_visitor(SegmentAddressFromHandle{handle}, fSegments.at(segmentId));
return boost::apply_visitor(SegmentAddressFromHandle(handle), fSegments.at(segmentId));
}
char* Allocate(const size_t size, size_t alignment = 0)
@@ -529,7 +612,7 @@ class Manager
// boost::interprocess::managed_shared_memory::size_type actualSize = size;
// char* hint = 0; // unused for boost::interprocess::allocate_new
// ptr = fSegments.at(fSegmentId).allocation_command<char>(boost::interprocess::allocate_new, size, actualSize, hint);
size_t segmentSize = boost::apply_visitor(SegmentSize{}, fSegments.at(fSegmentId));
size_t segmentSize = boost::apply_visitor(SegmentSize(), fSegments.at(fSegmentId));
if (size > segmentSize) {
throw MessageBadAlloc(tools::ToString("Requested message size (", size, ") exceeds segment size (", segmentSize, ")"));
}
@@ -541,7 +624,7 @@ class Manager
} catch (boost::interprocess::bad_alloc& ba) {
// LOG(warn) << "Shared memory full...";
if (ThrowingOnBadAlloc()) {
throw MessageBadAlloc(tools::ToString("shmem: could not create a message of size ", size, ", alignment: ", (alignment != 0) ? std::to_string(alignment) : "default", ", free memory: ", boost::apply_visitor(SegmentFreeMemory{}, fSegments.at(fSegmentId))));
throw MessageBadAlloc(tools::ToString("shmem: could not create a message of size ", size, ", alignment: ", (alignment != 0) ? std::to_string(alignment) : "default", ", free memory: ", boost::apply_visitor(SegmentFreeMemory(), fSegments.at(fSegmentId))));
}
// rateLimiter.maybe_sleep();
std::this_thread::sleep_for(std::chrono::milliseconds(50));
@@ -569,7 +652,7 @@ class Manager
void Deallocate(boost::interprocess::managed_shared_memory::handle_t handle, uint16_t segmentId)
{
boost::apply_visitor(SegmentDeallocate{GetAddressFromHandle(handle, segmentId)}, fSegments.at(segmentId));
boost::apply_visitor(SegmentDeallocate(GetAddressFromHandle(handle, segmentId)), fSegments.at(segmentId));
#ifdef FAIRMQ_DEBUG_MODE
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
DecrementShmMsgCounter(segmentId);
@@ -583,7 +666,7 @@ class Manager
char* ShrinkInPlace(size_t newSize, char* localPtr, uint16_t segmentId)
{
return boost::apply_visitor(SegmentBufferShrink{newSize, localPtr}, fSegments.at(segmentId));
return boost::apply_visitor(SegmentBufferShrink(newSize, localPtr), fSegments.at(segmentId));
}
uint16_t GetSegmentId() const { return fSegmentId; }
@@ -593,6 +676,7 @@ class Manager
using namespace boost::interprocess;
bool lastRemoved = false;
fRegionsGen += 1; // signal TL cache invalidation
UnsubscribeFromRegionEvents();
{
@@ -625,6 +709,7 @@ class Manager
}
private:
uint64_t fShmId64;
std::string fShmId;
uint16_t fSegmentId;
std::string fDeviceId;
@@ -633,11 +718,11 @@ class Manager
VoidAlloc fShmVoidAlloc;
boost::interprocess::named_mutex fShmMtx;
boost::interprocess::named_condition fRegionEventsCV;
boost::interprocess::named_condition fRegionEventsShmCV;
std::thread fRegionEventThread;
bool fRegionEventsSubscriptionActive;
std::function<void(fair::mq::RegionInfo)> fRegionEventCallback;
std::map<std::pair<uint16_t, bool>, RegionEvent> fObservedRegionEvents;
std::map<std::pair<uint16_t, bool>, RegionEvent> fObservedRegionEvents; // pair: <region id, managed>
uint64_t fNumObservedEvents;
DeviceCounter* fDeviceCounter;
@@ -645,12 +730,20 @@ class Manager
Uint16SegmentInfoHashMap* fShmSegments;
Uint16RegionInfoHashMap* fShmRegions;
std::unordered_map<uint16_t, std::unique_ptr<Region>> fRegions;
// make sure this is alone in the cache line: mostly read
alignas(128) inline static std::atomic<unsigned long> fRegionsGen = 0ul;
inline static thread_local struct ManagerTLCache {
unsigned long fRegionsTLCacheGen;
std::vector<std::tuple<Region*, uint16_t, uint64_t>> fRegionsTLCache;
} fTlRegionCache;
std::atomic<bool> fInterrupted;
std::atomic<int32_t> fMsgCounter; // TODO: find a better lifetime solution instead of the counter
#ifdef FAIRMQ_DEBUG_MODE
// make sure the counters are not thrashing the cache line between threads doing creation and deallocation
Uint16MsgDebugMapHashMap* fMsgDebug;
Uint16MsgCounterHashMap* fShmMsgCounters;
alignas(128) std::atomic_uint64_t fMsgCounterNew;
alignas(128) std::atomic_uint64_t fMsgCounterDelete;
#endif
std::thread fHeartbeatThread;
@@ -660,6 +753,8 @@ class Manager
bool fThrowOnBadAlloc;
bool fNoCleanup;
};
} // namespace fair::mq::shmem

View File

@@ -109,12 +109,17 @@ class Message final : public fair::mq::Message
, fRegionPtr(nullptr)
, fLocalPtr(static_cast<char*>(data))
{
if (region->GetType() != GetType()) {
LOG(error) << "region type (" << region->GetType() << ") does not match message type (" << GetType() << ")";
throw TransportError(tools::ToString("region type (", region->GetType(), ") does not match message type (", GetType(), ")"));
}
if (reinterpret_cast<const char*>(data) >= reinterpret_cast<const char*>(region->GetData()) &&
reinterpret_cast<const char*>(data) <= reinterpret_cast<const char*>(region->GetData()) + region->GetSize()) {
fMeta.fHandle = (boost::interprocess::managed_shared_memory::handle_t)(reinterpret_cast<const char*>(data) - reinterpret_cast<const char*>(region->GetData()));
} else {
LOG(error) << "trying to create region message with data from outside the region";
throw std::runtime_error("trying to create region message with data from outside the region");
throw TransportError("trying to create region message with data from outside the region");
}
fManager.IncrementMsgCounter();
}
@@ -304,6 +309,8 @@ class Message final : public fair::mq::Message
}
if (fRegionPtr) {
fRegionPtr->InitializeQueues();
fRegionPtr->StartSendingAcks();
fRegionPtr->ReleaseBlock({fMeta.fHandle, fMeta.fSize, fMeta.fHint});
} else {
LOG(warn) << "region ack queue for id " << fMeta.fRegionId << " no longer exist. Not sending ack";
@@ -319,7 +326,7 @@ class Message final : public fair::mq::Message
Deallocate();
fAlignment = 0;
fManager.DecrementMsgCounter(); // TODO: put this to debug mode
fManager.DecrementMsgCounter();
}
};

View File

@@ -10,7 +10,6 @@
#include "Common.h"
#include <fairmq/tools/Strings.h>
#include <fairlogger/Logger.h>
#include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/interprocess/file_mapping.hpp>
@@ -32,6 +31,14 @@
#include <termios.h>
#include <poll.h>
#if FAIRMQ_HAS_STD_FILESYSTEM
#include <filesystem>
namespace fs = std::filesystem;
#else
#include <boost/filesystem.hpp>
namespace fs = ::boost::filesystem;
#endif
using namespace std;
using bie = ::boost::interprocess::interprocess_exception;
namespace bipc = ::boost::interprocess;
@@ -71,18 +78,16 @@ void signalHandler(int signal)
gSignalStatus = signal;
}
Monitor::Monitor(const string& shmId, bool selfDestruct, bool interactive, bool viewOnly, unsigned int timeoutInMS, unsigned int intervalInMS, bool runAsDaemon, bool cleanOnExit)
Monitor::Monitor(const string& shmId, bool selfDestruct, bool interactive, bool viewOnly, unsigned int timeoutInMS, unsigned int intervalInMS, bool monitor, bool cleanOnExit)
: fSelfDestruct(selfDestruct)
, fInteractive(interactive)
, fViewOnly(viewOnly)
, fIsDaemon(runAsDaemon)
, fMonitor(monitor)
, fSeenOnce(false)
, fCleanOnExit(cleanOnExit)
, fTimeoutInMS(timeoutInMS)
, fIntervalInMS(intervalInMS)
, fShmId(shmId)
, fSegmentName("fmq_" + fShmId + "_m_0")
, fManagementSegmentName("fmq_" + fShmId + "_mng")
, fControlQueueName("fmq_" + fShmId + "_cq")
, fTerminating(false)
, fHeartbeatTriggered(false)
@@ -94,14 +99,14 @@ Monitor::Monitor(const string& shmId, bool selfDestruct, bool interactive, bool
try {
bipc::named_mutex monitorStatus(bipc::create_only, string("fmq_" + fShmId + "_ms").c_str());
} catch (bie&) {
cout << "fairmq-shmmonitor for shared memory id " << fShmId << " already started or not properly exited. Try `fairmq-shmmonitor --cleanup --shmid " << fShmId << "`" << endl;
throw DaemonPresent(tools::ToString("fairmq-shmmonitor for shared memory id ", fShmId, " already started or not properly exited."));
if (fInteractive) {
LOG(error) << "fairmq-shmmonitor for shm id " << fShmId << " is already running. Try `fairmq-shmmonitor --cleanup --shmid " << fShmId << "`, or run in view-only mode (-v)";
} else {
LOG(error) << "fairmq-shmmonitor for shm id " << fShmId << " is already running. Try `fairmq-shmmonitor --cleanup --shmid " << fShmId << "`";
}
throw DaemonPresent(tools::ToString("fairmq-shmmonitor (in monitoring mode) for shm id ", fShmId, " is already running."));
}
}
Logger::SetConsoleColor(false);
Logger::DefineVerbosity(Verbosity::user1, VerbositySpec::Make(VerbositySpec::Info::timestamp_us));
Logger::SetVerbosity(Verbosity::verylow);
}
void Monitor::CatchSignals()
@@ -116,7 +121,7 @@ void Monitor::SignalMonitor()
while (true) {
if (gSignalStatus != 0) {
fTerminating = true;
cout << "signal: " << gSignalStatus << endl;
LOG(info) << "signal: " << gSignalStatus;
break;
} else if (fTerminating) {
break;
@@ -131,18 +136,13 @@ void Monitor::Run()
thread heartbeatThread;
if (!fViewOnly) {
RemoveQueue(fControlQueueName);
heartbeatThread = thread(&Monitor::MonitorHeartbeats, this);
heartbeatThread = thread(&Monitor::ReceiveHeartbeats, this);
}
if (fInteractive) {
Interactive();
} else if (fViewOnly) {
CheckSegment();
} else {
while (!fTerminating) {
this_thread::sleep_for(chrono::milliseconds(fIntervalInMS));
CheckSegment();
}
Watch();
}
if (!fViewOnly) {
@@ -150,7 +150,165 @@ void Monitor::Run()
}
}
void Monitor::MonitorHeartbeats()
void Monitor::Watch()
{
while (!fTerminating) {
using namespace boost::interprocess;
try {
managed_shared_memory managementSegment(open_read_only, std::string("fmq_" + fShmId + "_mng").c_str());
fSeenOnce = true;
auto now = chrono::high_resolution_clock::now();
unsigned int duration = chrono::duration_cast<chrono::milliseconds>(now - fLastHeartbeat).count();
if (fHeartbeatTriggered && duration > fTimeoutInMS) {
// memory is present, but no heartbeats since timeout duration
LOG(info) << "no heartbeats since over " << fTimeoutInMS << " milliseconds, cleaning...";
Cleanup(ShmId{fShmId});
fHeartbeatTriggered = false;
if (fSelfDestruct) {
LOG(info) << "self destructing (segment has been observed and cleaned up by the monitor)";
fTerminating = true;
}
}
} catch (bie&) {
fHeartbeatTriggered = false;
if (fSelfDestruct) {
if (fSeenOnce) {
// segment has been observed at least once, can self-destruct
LOG(info) << "self destructing (segment has been observed and cleaned up orderly)";
fTerminating = true;
} else {
// if self-destruct is requested, and no segment has ever been observed, quit after double timeout duration
auto now = chrono::high_resolution_clock::now();
unsigned int duration = chrono::duration_cast<chrono::milliseconds>(now - fLastHeartbeat).count();
if (duration > fTimeoutInMS * 2) {
Cleanup(ShmId{fShmId});
LOG(info) << "self destructing (no segments observed within (timeout * 2) since start)";
fTerminating = true;
}
}
}
}
this_thread::sleep_for(chrono::milliseconds(100));
}
}
bool Monitor::PrintShm(const ShmId& shmId)
{
using namespace boost::interprocess;
try {
managed_shared_memory managementSegment(open_read_only, std::string("fmq_" + shmId.shmId + "_mng").c_str());
VoidAlloc allocInstance(managementSegment.get_segment_manager());
Uint16SegmentInfoHashMap* segmentInfos = managementSegment.find<Uint16SegmentInfoHashMap>(unique_instance).first;
std::unordered_map<uint16_t, boost::variant<RBTreeBestFitSegment, SimpleSeqFitSegment>> segments;
if (!segmentInfos) {
LOG(error) << "Found management segment, but cannot locate segment info, something went wrong...";
return false;
}
for (const auto& s : *segmentInfos) {
if (s.second.fAllocationAlgorithm == AllocationAlgorithm::rbtree_best_fit) {
segments.emplace(s.first, RBTreeBestFitSegment(open_read_only, std::string("fmq_" + shmId.shmId + "_m_" + to_string(s.first)).c_str()));
} else {
segments.emplace(s.first, SimpleSeqFitSegment(open_read_only, std::string("fmq_" + shmId.shmId + "_m_" + to_string(s.first)).c_str()));
}
}
unsigned int numDevices = 0;
int creatorId = -1;
std::string sessionName;
DeviceCounter* deviceCounter = managementSegment.find<DeviceCounter>(unique_instance).first;
if (deviceCounter) {
numDevices = deviceCounter->fCount;
}
SessionInfo* sessionInfo = managementSegment.find<SessionInfo>(unique_instance).first;
if (sessionInfo) {
creatorId = sessionInfo->fCreatorId;
sessionName = sessionInfo->fSessionName;
}
#ifdef FAIRMQ_DEBUG_MODE
Uint16MsgCounterHashMap* msgCounters = managementSegment.find<Uint16MsgCounterHashMap>(unique_instance).first;
#endif
stringstream ss;
size_t mfree = managementSegment.get_free_memory();
size_t mtotal = managementSegment.get_size();
size_t mused = mtotal - mfree;
ss << "shm id: " << shmId.shmId
<< ", session: " << sessionName
<< ", creator id: " << creatorId
<< ", devices: " << numDevices
<< ", segments:\n";
for (const auto& s : segments) {
size_t free = boost::apply_visitor(SegmentFreeMemory(), s.second);
size_t total = boost::apply_visitor(SegmentSize(), s.second);
size_t used = total - free;
ss << " [" << s.first
<< "]: total: " << total
#ifdef FAIRMQ_DEBUG_MODE
<< ", msgs: " << ( (msgCounters != nullptr) ? to_string((*msgCounters)[s.first].fCount) : "unknown")
#else
<< ", msgs: NODEBUG"
#endif
<< ", free: " << free
<< ", used: " << used
<< "\n";
}
ss << " [m]: "
<< "total: " << mtotal
<< ", free: " << mfree
<< ", used: " << mused;
LOGV(info, user1) << ss.str();
} catch (bie&) {
return false;
}
return true;
}
void Monitor::ListAll(const std::string& path)
{
try {
if (fs::is_empty(path)) {
LOG(info) << "directory " << fs::path(path) << " is empty.";
return;
}
for (const auto& entry : fs::directory_iterator(path)) {
string filename = entry.path().filename().string();
// LOG(info) << filename << ", size: " << entry.file_size() << " bytes";
if (tools::StrStartsWith(filename, "fmq_") || tools::StrStartsWith(filename, "sem.fmq_")) {
// LOG(info) << "The file '" << filename << "' belongs to FairMQ.";
if (tools::StrEndsWith(filename, "_mng")) {
string shmId = filename.substr(4, 8);
LOG(info) << "\nFound shmid '" << shmId << "':\n";
if (!PrintShm(ShmId{shmId})) {
LOG(info) << "could not open file for shmid '" << shmId << "'";
}
}
} else {
LOG(info) << "The file '" << filename << "' does not belong to FairMQ, skipping...";
}
}
} catch (fs::filesystem_error& fse) {
LOG(error) << "error: " << fse.what();
}
}
void Monitor::ReceiveHeartbeats()
{
try {
bipc::message_queue mq(bipc::open_or_create, fControlQueueName.c_str(), 1000, 256);
@@ -167,11 +325,11 @@ void Monitor::MonitorHeartbeats()
string deviceId(msg, recvdSize);
fDeviceHeartbeats[deviceId] = fLastHeartbeat;
} else {
// cout << "control queue timeout" << endl;
// LOG(info) << "control queue timeout";
}
}
} catch (bie& ie) {
cout << ie.what() << endl;
LOG(info) << ie.what();
}
RemoveQueue(fControlQueueName);
@@ -186,7 +344,7 @@ void Monitor::Interactive()
TerminalConfig tcfg;
cout << endl;
LOG(info) << "\n";
PrintHelp();
while (!fTerminating) {
@@ -199,30 +357,30 @@ void Monitor::Interactive()
switch (c) {
case 'q':
cout << "\n[q] --> quitting." << endl;
LOG(info) << "\n[q] --> quitting.";
fTerminating = true;
break;
case 'x':
cout << "\n[x] --> closing shared memory:" << endl;
LOG(info) << "\n[x] --> closing shared memory:";
if (!fViewOnly) {
Cleanup(ShmId{fShmId});
} else {
cout << "cannot close because in view only mode" << endl;
LOG(info) << "cannot close because in view only mode";
}
break;
case 'h':
cout << "\n[h] --> help:" << endl << endl;
LOG(info) << "\n[h] --> help:\n";
PrintHelp();
cout << endl;
LOG(info);
break;
case '\n':
cout << "\n[\\n] --> invalid input." << endl;
LOG(info) << "\n[\\n] --> invalid input.";
break;
case 'b':
PrintDebugInfo(ShmId{fShmId});
break;
default:
cout << "\n[" << c << "] --> invalid input." << endl;
LOG(info) << "\n[" << c << "] --> invalid input.";
break;
}
@@ -235,118 +393,7 @@ void Monitor::Interactive()
break;
}
CheckSegment();
if (!fTerminating) {
cout << "\r";
}
}
}
void Monitor::CheckSegment()
{
using namespace boost::interprocess;
try {
managed_shared_memory managementSegment(open_only, fManagementSegmentName.c_str());
VoidAlloc allocInstance(managementSegment.get_segment_manager());
Uint16SegmentInfoHashMap* segmentInfos = managementSegment.find<Uint16SegmentInfoHashMap>(unique_instance).first;
std::unordered_map<uint16_t, boost::variant<RBTreeBestFitSegment, SimpleSeqFitSegment>> segments;
if (!segmentInfos) {
cout << "Found management segment, but cannot locate segment info, something went wrong..." << endl;
return;
}
for (const auto& s : *segmentInfos) {
if (s.second.fAllocationAlgorithm == AllocationAlgorithm::rbtree_best_fit) {
segments.emplace(s.first, RBTreeBestFitSegment(open_only, std::string("fmq_" + fShmId + "_m_" + to_string(s.first)).c_str()));
} else {
segments.emplace(s.first, SimpleSeqFitSegment(open_only, std::string("fmq_" + fShmId + "_m_" + to_string(s.first)).c_str()));
}
}
fSeenOnce = true;
unsigned int numDevices = 0;
#ifdef FAIRMQ_DEBUG_MODE
Uint16MsgCounterHashMap* msgCounters = nullptr;
#endif
if (fInteractive || fViewOnly) {
DeviceCounter* dc = managementSegment.find<DeviceCounter>(unique_instance).first;
if (dc) {
numDevices = dc->fCount;
}
#ifdef FAIRMQ_DEBUG_MODE
msgCounters = managementSegment.find_or_construct<Uint16MsgCounterHashMap>(unique_instance)(allocInstance);
#endif
}
auto now = chrono::high_resolution_clock::now();
unsigned int duration = chrono::duration_cast<chrono::milliseconds>(now - fLastHeartbeat).count();
if (fHeartbeatTriggered && duration > fTimeoutInMS) {
cout << "no heartbeats since over " << fTimeoutInMS << " milliseconds, cleaning..." << endl;
Cleanup(ShmId{fShmId});
fHeartbeatTriggered = false;
if (fSelfDestruct) {
cout << "\nself destructing" << endl;
fTerminating = true;
}
}
if (fInteractive || fViewOnly) {
stringstream ss;
size_t mfree = managementSegment.get_free_memory();
size_t mtotal = managementSegment.get_size();
size_t mused = mtotal - mfree;
ss << "shm id: " << fShmId
<< ", devices: " << numDevices << ", segments:\n";
for (const auto& s : segments) {
size_t free = boost::apply_visitor(SegmentFreeMemory{}, s.second);
size_t total = boost::apply_visitor(SegmentSize{}, s.second);
size_t used = total - free;
ss << " [" << s.first
<< "]: total: " << total
#ifdef FAIRMQ_DEBUG_MODE
<< ", msgs: " << (*msgCounters)[s.first].fCount
#else
<< ", msgs: NODEBUG"
#endif
<< ", free: " << free
<< ", used: " << used
<< "\n";
}
ss << " [m]: "
<< "total: " << mtotal
<< ", free: " << mfree
<< ", used: " << mused;
LOGV(info, user1) << ss.str();
}
} catch (bie&) {
fHeartbeatTriggered = false;
auto now = chrono::high_resolution_clock::now();
unsigned int duration = chrono::duration_cast<chrono::milliseconds>(now - fLastHeartbeat).count();
if (fIsDaemon && duration > fTimeoutInMS * 2) {
Cleanup(ShmId{fShmId});
fHeartbeatTriggered = false;
if (fSelfDestruct) {
cout << "\nself destructing" << endl;
fTerminating = true;
}
}
if (fSelfDestruct) {
if (fSeenOnce) {
cout << "self destructing" << endl;
fTerminating = true;
}
}
PrintShm(ShmId{fShmId});
}
}
@@ -366,7 +413,7 @@ void Monitor::PrintDebugInfo(const ShmId& shmId __attribute__((unused)))
for (const auto& e : *debug) {
numMessages += e.second.size();
}
cout << endl << "found " << numMessages << " messages." << endl;
LOG(info) << endl << "found " << numMessages << " messages.";
for (const auto& s : *debug) {
for (const auto& e : s.second) {
@@ -375,19 +422,18 @@ void Monitor::PrintDebugInfo(const ShmId& shmId __attribute__((unused)))
time_t t = chrono::system_clock::to_time_t(tmpt);
uint64_t ms = e.second.fCreationTime % 1000000;
auto tm = localtime(&t);
cout << "segment: " << setw(3) << setfill(' ') << s.first
LOG(info) << "segment: " << setw(3) << setfill(' ') << s.first
<< ", offset: " << setw(12) << setfill(' ') << e.first
<< ", size: " << setw(10) << setfill(' ') << e.second.fSize
<< ", creator PID: " << e.second.fPid << setfill('0')
<< ", at: " << setw(2) << tm->tm_hour << ":" << setw(2) << tm->tm_min << ":" << setw(2) << tm->tm_sec << "." << setw(6) << ms << endl;
<< ", at: " << setw(2) << tm->tm_hour << ":" << setw(2) << tm->tm_min << ":" << setw(2) << tm->tm_sec << "." << setw(6) << ms;
}
}
cout << setfill(' ');
} catch (bie&) {
cout << "no segments found" << endl;
LOG(info) << "no segments found";
}
#else
cout << "FairMQ was not compiled in debug mode (FAIRMQ_DEBUG_MODE)" << endl;
LOG(info) << "FairMQ was not compiled in debug mode (FAIRMQ_DEBUG_MODE)";
#endif
}
@@ -419,10 +465,10 @@ unordered_map<uint16_t, std::vector<BufferDebugInfo>> Monitor::GetDebugInfo(cons
}
}
} catch (bie&) {
cout << "no segments found" << endl;
LOG(info) << "no segments found";
}
#else
cout << "FairMQ was not compiled in debug mode (FAIRMQ_DEBUG_MODE)" << endl;
LOG(info) << "FairMQ was not compiled in debug mode (FAIRMQ_DEBUG_MODE)";
#endif
return result;
@@ -435,10 +481,10 @@ unordered_map<uint16_t, std::vector<BufferDebugInfo>> Monitor::GetDebugInfo(cons
void Monitor::PrintHelp()
{
cout << "controls: [x] close memory, "
<< "[b] print a list of allocated messages (only available when compiled with FAIMQ_DEBUG_MODE=ON), "
<< "[h] help, "
<< "[q] quit." << endl;
LOG(info) << "controls: [x] close memory, "
<< "[b] print a list of allocated messages (only available when compiled with FAIMQ_DEBUG_MODE=ON), "
<< "[h] help, "
<< "[q] quit.";
}
@@ -446,12 +492,12 @@ std::pair<std::string, bool> RunRemoval(std::function<bool(const std::string&)>
{
if (f(name)) {
if (verbose) {
cout << "Successfully removed '" << name << "'." << endl;
LOG(info) << "Successfully removed '" << name << "'.";
}
return {name, true};
} else {
if (verbose) {
cout << "Did not remove '" << name << "'. Already removed?" << endl;
LOG(debug) << "Did not remove '" << name << "'. Already removed?";
}
return {name, false};
}
@@ -468,7 +514,7 @@ std::vector<std::pair<std::string, bool>> Monitor::Cleanup(const ShmId& shmId, b
std::vector<std::pair<std::string, bool>> result;
if (verbose) {
cout << "Cleaning up for shared memory id '" << shmId.shmId << "'..." << endl;
LOG(info) << "Cleaning up for shared memory id '" << shmId.shmId << "'...";
}
string managementSegmentName("fmq_" + shmId.shmId + "_mng");
@@ -479,7 +525,7 @@ std::vector<std::pair<std::string, bool>> Monitor::Cleanup(const ShmId& shmId, b
RegionCounter* rc = managementSegment.find<RegionCounter>(bipc::unique_instance).first;
if (rc) {
if (verbose) {
cout << "Region counter found: " << rc->fCount << endl;
LOG(debug) << "Region counter found: " << rc->fCount;
}
uint16_t regionCount = rc->fCount;
@@ -491,7 +537,7 @@ std::vector<std::pair<std::string, bool>> Monitor::Cleanup(const ShmId& shmId, b
string path = ri.fPath.c_str();
int flags = ri.fFlags;
if (verbose) {
cout << "Found RegionInfo with path: '" << path << "', flags: " << flags << ", fDestroyed: " << ri.fDestroyed << "." << endl;
LOG(info) << "Found RegionInfo with path: '" << path << "', flags: " << flags << ", fDestroyed: " << ri.fDestroyed << ".";
}
if (path != "") {
result.emplace_back(RunRemoval(Monitor::RemoveFileMapping, path + "fmq_" + shmId.shmId + "_rg_" + to_string(i), verbose));
@@ -506,12 +552,12 @@ std::vector<std::pair<std::string, bool>> Monitor::Cleanup(const ShmId& shmId, b
}
} else {
if (verbose) {
cout << "No region counter found. No regions to cleanup." << endl;
LOG(info) << "No region counter found. No regions to cleanup.";
}
}
} catch(out_of_range& oor) {
if (verbose) {
cout << "Could not locate element in the region map, out of range: " << oor.what() << endl;
LOG(info) << "Could not locate element in the region map, out of range: " << oor.what();
}
}
@@ -524,7 +570,7 @@ std::vector<std::pair<std::string, bool>> Monitor::Cleanup(const ShmId& shmId, b
result.emplace_back(RunRemoval(Monitor::RemoveObject, managementSegmentName.c_str(), verbose));
} catch (bie&) {
if (verbose) {
cout << "Did not find '" << managementSegmentName << "' shared memory segment. No regions to cleanup." << endl;
LOG(info) << "Did not find '" << managementSegmentName << "' shared memory segment. No regions to cleanup.";
}
}
@@ -538,7 +584,7 @@ std::vector<std::pair<std::string, bool>> Monitor::Cleanup(const SessionId& sess
{
ShmId shmId{makeShmIdStr(sessionId.sessionId)};
if (verbose) {
cout << "Cleanup called with session id '" << sessionId.sessionId << "', translating to shared memory id '" << shmId.shmId << "'" << endl;
LOG(info) << "Cleanup called with session id '" << sessionId.sessionId << "', translating to shared memory id '" << shmId.shmId << "'";
}
return Cleanup(shmId, verbose);
}
@@ -555,7 +601,7 @@ std::vector<std::pair<std::string, bool>> Monitor::CleanupFull(const SessionId&
{
ShmId shmId{makeShmIdStr(sessionId.sessionId)};
if (verbose) {
cout << "Cleanup called with session id '" << sessionId.sessionId << "', translating to shared memory id '" << shmId.shmId << "'" << endl;
LOG(info) << "Cleanup called with session id '" << sessionId.sessionId << "', translating to shared memory id '" << shmId.shmId << "'";
}
return CleanupFull(shmId, verbose);
}

View File

@@ -8,6 +8,8 @@
#ifndef FAIR_MQ_SHMEM_MONITOR_H_
#define FAIR_MQ_SHMEM_MONITOR_H_
#include <fairlogger/Logger.h>
#include <thread>
#include <chrono>
#include <atomic>
@@ -82,6 +84,9 @@ class Monitor
static std::unordered_map<uint16_t, std::vector<BufferDebugInfo>> GetDebugInfo(const ShmId& shmId);
static std::unordered_map<uint16_t, std::vector<BufferDebugInfo>> GetDebugInfo(const SessionId& shmId);
static bool PrintShm(const ShmId& shmId);
static void ListAll(const std::string& path);
static bool RemoveObject(const std::string& name);
static bool RemoveFileMapping(const std::string& name);
static bool RemoveQueue(const std::string& name);
@@ -92,7 +97,8 @@ class Monitor
private:
void PrintHelp();
void MonitorHeartbeats();
void Watch();
void ReceiveHeartbeats();
void CheckSegment();
void Interactive();
void SignalMonitor();
@@ -100,14 +106,12 @@ class Monitor
bool fSelfDestruct; // will self-destruct after the memory has been closed
bool fInteractive; // running in interactive mode
bool fViewOnly; // view only mode
bool fIsDaemon;
bool fMonitor;
bool fSeenOnce; // true is segment has been opened successfully at least once
bool fCleanOnExit;
unsigned int fTimeoutInMS;
unsigned int fIntervalInMS;
std::string fShmId;
std::string fSegmentName;
std::string fManagementSegmentName;
std::string fControlQueueName;
std::atomic<bool> fTerminating;
std::atomic<bool> fHeartbeatTriggered;

View File

@@ -1,4 +1,4 @@
# Shared Memory transport
## Shared Memory transport
Shared memory transport for FairMQ. To try with existing devices, run the devices with `--transport shmem` option or configure channel transport in JSON (see examples/MQ/multiple-transports).
@@ -6,7 +6,7 @@ The transport manages shared memory via boost::interprocess library. The transfe
Devices track and cleanup shared memory on shutdown. For more information on the current shared memory segment and additional cleanup options, see following section.
# Shared Memory objects / files
## Shared Memory objects / files
FairMQ Shared Memory currently uses the following names to register shared memory on the system:
@@ -15,7 +15,7 @@ FairMQ Shared Memory currently uses the following names to register shared memor
| `fmq_<shmId>_m_<segmentId>` | managed segment(s) (user data) | one of the devices | devices |
| `fmq_<shmId>_mng` | management segment (management data) | one of the devices | devices |
| `fmq_<shmId>_mtx` | mutex | one of the devices | devices |
| `fmq_<shmId>_cv` | condition variable | one of the devices | devices with unmanaged regions |
| `fmq_<shmId>_cv` | condition variable | one of the devices | devices |
| `fmq_<shmId>_rg_<index>` | unmanaged region(s) | one of the devices | devices with unmanaged regions |
| `fmq_<shmId>_rgq_<index>` | unmanaged region queue(s) | one of the devices | devices with unmanaged regions |
| `fmq_<shmId>_ms` | shmmonitor status | shmmonitor | devices, shmmonitor |
@@ -25,20 +25,35 @@ The shmId is generated out of session id and user id.
## Shared memory monitor
The shared memory monitor tool, supplied with the shared memory transport can be used to monitor shared memory use and automatically cleanup shared memory in case of device crashes.
The shared memory monitor tool (`fairmq-shmmonitor`) can be used to monitor and cleanup the created shared memory.
With default arguments the monitor will run indefinitely with no output, and clean up shared memory segment if it is open and no heartbeats from devices arrive within a timeout period. It can be further customized with following parameters:
Most commands act for the specified session, identified either via session id (`--session`,`-s`) or shmid (`--shmid`).
`--session <arg>`: for which session to run the monitor (default is "default"). The actual ressource names will be built out of session id, user id (hashed and truncated).
`--cleanup`: start monitor, perform cleanup of the memory and quit.
`--shmid <arg>`: if provided, this shmem id will be used instead of the one generated from session id. Use this if you know the name of the shared memory ressource, but do not have the used session id.
`--self-destruct`: run until the memory segment is closed (either naturally via cleanup performed by devices or in case of a crash (no heartbeats within timeout)).
`--interactive`: run interactively, with detailed segment details and user input for various shmem operations.
`--timeout <arg>`: specifiy the timeout for the heartbeats from shmem transports in milliseconds (default 5000).
The monitor runs in one of the following modes:
The options can be combined, with the exception of `--cleanup` option, which will invoke the described behaviour independent of other options.
Without the `--self-destruct` option, the monitor will run continuously, moitoring (and cleaning up if needed) consecutive topologies.
| command | action |
| --------------------------- | ---------------------------------------------- |
| no args | Print segment info of the specified session/shm ID and exit. |
| `--view`,`-v` | Print segment info of the specified session/shm ID and exit. |
| `--interactive`,`-i` | Print segment info of the specified session/shm ID at a given interval (`--interval`), with some keyboard controls. Can be combined with `--view` for read-only access (and avoid receiving heartbeats). |
| `--monitor`,`-m` | Monitor the session shm usage by receiving heartbeats from shmem users, cleaning it up if no heartbeats arrived within configured timeout (`--timeout`/`-t`). Only one heartbeat receiver per session is currently possible. If `--self-destruct`/`-x` is added, monitor will exit either when (a) no shm has been observed for interval * 2, (b) a cleanup due to reached timeout has been performed, or (c) shm has been observed, but is now cleaned up. |
| `--cleanup`,`-c` | Cleanup the shm for the specified session and exit. |
| `--debug`,`-b` | Print the list of messages in the current session and exit. Only availabe when FairMQ is compiled with `FAIRMQ_DEBUG_MODE=ON` (high performance impact). |
| `--get-shmid` | Translate given session id and user id (`--user-id`) to a shmem id (uses current user id if none provided) and exit. |
| `--list-all` | Print segment info for all sessions present on the system and exit. |
Possible further implementation would be to run the monitor with `--self-destruct` with each topology.
Additional cmd options:
The Monitor class can also be used independently from the supplied executable (built from `runMonitor.cxx`), allowing integration on any level. For example invoking the monitor could be a functionality that a device offers.
| command | action |
| --------------------------- | ---------------------------------------------- |
| `--cleanup-on-exit` | Perform a cleanup on exit, when running in monitoring or interactive mode. |
| `--daemonize`,`-d` | Can be combined with the monitoring mode to detach the process from the parent. |
| `--verbose`,`-d` | When running as a daemon, store monitor output in `fairmq-shmmonitor_<timestamp>.log` |
For full option details, run with `-h`.
The Monitor class can also be used independently from the supplied executable, allowing integration on any level.
## Troubleshooting
Bus Error (SIGBUS) can occur if the transport tries to access shared memory that is not accessible. One reason could be because the used memory in the segment exceeds the capacity or available memory of the shmem filesystem (capacity is by default set to half of RAM on Linux).

View File

@@ -47,7 +47,7 @@ struct Region
Region(const std::string& shmId, uint16_t id, uint64_t size, bool remote, RegionCallback callback, RegionBulkCallback bulkCallback, const std::string& path, int flags)
: fRemote(remote)
, fLinger(100)
, fStop(false)
, fStopAcks(false)
, fName("fmq_" + shmId + "_rg_" + std::to_string(id))
, fQueueName("fmq_" + shmId + "_rgq_" + std::to_string(id))
, fShmemObject()
@@ -85,18 +85,26 @@ struct Region
LOG(debug) << "shmem: initialized file: " << fName;
fRegion = mapped_region(fFileMapping, read_write, 0, size, 0, flags);
} else {
if (fRemote) {
fShmemObject = shared_memory_object(open_only, fName.c_str(), read_write);
} else {
fShmemObject = shared_memory_object(create_only, fName.c_str(), read_write);
fShmemObject.truncate(size);
try {
if (fRemote) {
fShmemObject = shared_memory_object(open_only, fName.c_str(), read_write);
} else {
fShmemObject = shared_memory_object(create_only, fName.c_str(), read_write);
fShmemObject.truncate(size);
}
} catch(interprocess_exception& e) {
LOG(error) << "Failed " << (fRemote ? "opening" : "creating") << " shared_memory_object for region id '" << id << "': " << e.what();
throw;
}
try {
fRegion = mapped_region(fShmemObject, read_write, 0, 0, 0, flags);
} catch(interprocess_exception& e) {
LOG(error) << "Failed mapping shared_memory_object for region id '" << id << "': " << e.what();
throw;
}
fRegion = mapped_region(fShmemObject, read_write, 0, 0, 0, flags);
}
InitializeQueues();
StartSendingAcks();
LOG(debug) << "shmem: initialized region: " << fName;
LOG(trace) << "shmem: initialized region: " << fName << " (" << (fRemote ? "remote" : "local") << ")";
}
Region() = delete;
@@ -108,15 +116,22 @@ struct Region
{
using namespace boost::interprocess;
if (fRemote) {
fQueue = std::make_unique<message_queue>(open_only, fQueueName.c_str());
} else {
fQueue = std::make_unique<message_queue>(create_only, fQueueName.c_str(), 1024, fAckBunchSize * sizeof(RegionBlock));
if (fQueue == nullptr) {
if (fRemote) {
fQueue = std::make_unique<message_queue>(open_only, fQueueName.c_str());
} else {
fQueue = std::make_unique<message_queue>(create_only, fQueueName.c_str(), 1024, fAckBunchSize * sizeof(RegionBlock));
}
LOG(trace) << "shmem: initialized region queue: " << fQueueName << " (" << (fRemote ? "remote" : "local") << ")";
}
LOG(debug) << "shmem: initialized region queue: " << fQueueName;
}
void StartSendingAcks() { fAcksSender = std::thread(&Region::SendAcks, this); }
void StartSendingAcks()
{
if (!fAcksSender.joinable()) {
fAcksSender = std::thread(&Region::SendAcks, this);
}
}
void SendAcks()
{
std::unique_ptr<RegionBlock[]> blocks = std::make_unique<RegionBlock[]>(fAckBunchSize);
@@ -140,13 +155,13 @@ struct Region
}
if (blocksToSend > 0) {
while (!fQueue->try_send(blocks.get(), blocksToSend * sizeof(RegionBlock), 0) && !fStop) {
while (!fQueue->try_send(blocks.get(), blocksToSend * sizeof(RegionBlock), 0) && !fStopAcks) {
// receiver slow? yield and try again...
std::this_thread::yield();
}
// LOG(debug) << "Sent " << blocksToSend << " blocks.";
} else { // blocksToSend == 0
if (fStop) {
if (fStopAcks) {
break;
}
}
@@ -156,7 +171,12 @@ struct Region
<< " blocks left to send: " << blocksToSend << ").";
}
void StartReceivingAcks() { fAcksReceiver = std::thread(&Region::ReceiveAcks, this); }
void StartReceivingAcks()
{
if (!fAcksReceiver.joinable()) {
fAcksReceiver = std::thread(&Region::ReceiveAcks, this);
}
}
void ReceiveAcks()
{
unsigned int priority;
@@ -168,7 +188,7 @@ struct Region
while (true) {
uint32_t timeout = 100;
bool leave = false;
if (fStop) {
if (fStopAcks) {
timeout = fLinger;
leave = true;
}
@@ -213,9 +233,25 @@ struct Region
void SetLinger(uint32_t linger) { fLinger = linger; }
uint32_t GetLinger() const { return fLinger; }
void StopAcks()
{
fStopAcks = true;
if (fAcksSender.joinable()) {
fBlockSendCV.notify_one();
fAcksSender.join();
}
if (!fRemote) {
if (fAcksReceiver.joinable()) {
fAcksReceiver.join();
}
}
}
~Region()
{
fStop = true;
fStopAcks = true;
if (fAcksSender.joinable()) {
fBlockSendCV.notify_one();
@@ -228,11 +264,11 @@ struct Region
}
if (boost::interprocess::shared_memory_object::remove(fName.c_str())) {
LOG(debug) << "Region '" << fName << "' destroyed.";
LOG(trace) << "Region '" << fName << "' destroyed.";
}
if (boost::interprocess::file_mapping::remove(fName.c_str())) {
LOG(debug) << "File mapping '" << fName << "' destroyed.";
LOG(trace) << "File mapping '" << fName << "' destroyed.";
}
if (fFile) {
@@ -240,19 +276,18 @@ struct Region
}
if (boost::interprocess::message_queue::remove(fQueueName.c_str())) {
LOG(debug) << "Region queue '" << fQueueName << "' destroyed.";
LOG(trace) << "Region queue '" << fQueueName << "' destroyed.";
}
} else {
// LOG(debug) << "shmem: region '" << fName << "' is remote, no cleanup necessary.";
LOG(debug) << "Region queue '" << fQueueName << "' is remote, no cleanup necessary";
// LOG(debug) << "Region queue '" << fQueueName << "' is remote, no cleanup necessary";
}
LOG(debug) << "Region '" << fName << "' (" << (fRemote ? "remote" : "local") << ") destructed.";
// LOG(debug) << "Region '" << fName << "' (" << (fRemote ? "remote" : "local") << ") destructed.";
}
bool fRemote;
uint32_t fLinger;
std::atomic<bool> fStop;
std::atomic<bool> fStopAcks;
std::string fName;
std::string fQueueName;
boost::interprocess::shared_memory_object fShmemObject;

View File

@@ -378,12 +378,10 @@ class Socket final : public fair::mq::Socket
}
}
void Events(uint32_t* events) override
int Events(uint32_t* events) override
{
size_t eventsSize = sizeof(uint32_t);
if (zmq_getsockopt(fSocket, ZMQ_EVENTS, events, &eventsSize) < 0) {
throw SocketError(tools::ToString("failed setting ZMQ_EVENTS, reason: ", zmq_strerror(errno)));
}
return zmq_getsockopt(fSocket, ZMQ_EVENTS, events, &eventsSize);
}
int GetLinger() const override

View File

@@ -68,9 +68,6 @@ class TransportFactory final : public fair::mq::TransportFactory
throw SharedMemoryError(tools::ToString("Provided shared memory allocation algorithm '", allocationAlgorithm, "' is not supported. Supported are 'rbtree_best_fit'/'simple_seq_fit'"));
}
std::string shmId = makeShmIdStr(sessionName);
LOG(debug) << "Generated shmid '" << shmId << "' out of session id '" << sessionName << "'.";
try {
if (zmq_ctx_set(fZmqCtx, ZMQ_IO_THREADS, numIoThreads) != 0) {
LOG(error) << "failed configuring context, reason: " << zmq_strerror(errno);
@@ -81,7 +78,7 @@ class TransportFactory final : public fair::mq::TransportFactory
LOG(error) << "failed configuring context, reason: " << zmq_strerror(errno);
}
fManager = std::make_unique<Manager>(shmId, deviceId, segmentSize, config);
fManager = std::make_unique<Manager>(sessionName, deviceId, segmentSize, config);
} catch (boost::interprocess::interprocess_exception& e) {
LOG(error) << "Could not initialize shared memory transport: " << e.what();
throw std::runtime_error(tools::ToString("Could not initialize shared memory transport: ", e.what()));
@@ -144,29 +141,29 @@ class TransportFactory final : public fair::mq::TransportFactory
return std::make_unique<Poller>(channelsMap, channelList);
}
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0) override
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) override
{
return CreateUnmanagedRegion(size, 0, callback, nullptr, path, flags);
return CreateUnmanagedRegion(size, 0, callback, nullptr, path, flags, cfg);
}
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, RegionBulkCallback bulkCallback = nullptr, const std::string& path = "", int flags = 0) override
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, RegionBulkCallback bulkCallback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) override
{
return CreateUnmanagedRegion(size, 0, nullptr, bulkCallback, path, flags);
return CreateUnmanagedRegion(size, 0, nullptr, bulkCallback, path, flags, cfg);
}
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, int64_t userFlags, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0) override
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, int64_t userFlags, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) override
{
return CreateUnmanagedRegion(size, userFlags, callback, nullptr, path, flags);
return CreateUnmanagedRegion(size, userFlags, callback, nullptr, path, flags, cfg);
}
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, int64_t userFlags, RegionBulkCallback bulkCallback = nullptr, const std::string& path = "", int flags = 0) override
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, int64_t userFlags, RegionBulkCallback bulkCallback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) override
{
return CreateUnmanagedRegion(size, userFlags, nullptr, bulkCallback, path, flags);
return CreateUnmanagedRegion(size, userFlags, nullptr, bulkCallback, path, flags, cfg);
}
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, int64_t userFlags, RegionCallback callback, RegionBulkCallback bulkCallback, const std::string& path, int flags)
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, int64_t userFlags, RegionCallback callback, RegionBulkCallback bulkCallback, const std::string& path, int flags, fair::mq::RegionConfig cfg)
{
return std::make_unique<UnmanagedRegion>(*fManager, size, userFlags, callback, bulkCallback, path, flags, this);
return std::make_unique<UnmanagedRegion>(*fManager, size, userFlags, callback, bulkCallback, path, flags, this, cfg);
}
void SubscribeToRegionEvents(RegionEventCallback callback) override { fManager->SubscribeToRegionEvents(callback); }

View File

@@ -37,15 +37,16 @@ class UnmanagedRegion final : public fair::mq::UnmanagedRegion
const int64_t userFlags,
RegionCallback callback,
RegionBulkCallback bulkCallback,
const std::string& path = "",
int flags = 0,
FairMQTransportFactory* factory = nullptr)
const std::string& path,
int flags,
FairMQTransportFactory* factory,
fair::mq::RegionConfig cfg)
: FairMQUnmanagedRegion(factory)
, fManager(manager)
, fRegion(nullptr)
, fRegionId(0)
{
auto result = fManager.CreateRegion(size, userFlags, callback, bulkCallback, path, flags);
auto result = fManager.CreateRegion(size, userFlags, callback, bulkCallback, path, flags, cfg);
fRegion = result.first;
fRegionId = result.second;
}
@@ -56,6 +57,8 @@ class UnmanagedRegion final : public fair::mq::UnmanagedRegion
void SetLinger(uint32_t linger) override { fManager.GetRegion(fRegionId)->SetLinger(linger); }
uint32_t GetLinger() const override { return fManager.GetRegion(fRegionId)->GetLinger(); }
Transport GetType() const override { return fair::mq::Transport::SHM; }
~UnmanagedRegion() override { fManager.RemoveRegion(fRegionId); }
private:

View File

@@ -69,6 +69,10 @@ static void daemonize()
int main(int argc, char** argv)
{
try {
fair::Logger::SetConsoleColor(false);
fair::Logger::DefineVerbosity(fair::Verbosity::user1, fair::VerbositySpec::Make(fair::VerbositySpec::Info::timestamp_us));
fair::Logger::SetVerbosity(fair::Verbosity::verylow);
string sessionName;
string shmId;
bool cleanup = false;
@@ -78,8 +82,15 @@ int main(int argc, char** argv)
unsigned int timeoutInMS = 5000;
unsigned int intervalInMS = 100;
bool runAsDaemon = false;
bool monitor = false;
bool debug = false;
bool cleanOnExit = false;
bool getShmId = false;
bool listAll = false;
string listAllPath;
bool verbose = false;
string severity;
int userId = -1;
options_description desc("Options");
desc.add_options()
@@ -90,27 +101,43 @@ int main(int argc, char** argv)
("interactive,i" , value<bool>(&interactive)->implicit_value(true), "Interactive run")
("view,v" , value<bool>(&viewOnly)->implicit_value(true), "Run in view only mode")
("timeout,t" , value<unsigned int>(&timeoutInMS)->default_value(5000), "Heartbeat timeout in milliseconds")
("daemonize,d" , value<bool>(&runAsDaemon)->implicit_value(true), "Daemonize the monitor")
("debug,b" , value<bool>(&debug)->implicit_value(true), "Debug - Print a list of messages)")
("daemonize,d" , value<bool>(&runAsDaemon)->implicit_value(true), "Daemonize the monitor process (only in monitoring mode)")
("monitor,m" , value<bool>(&monitor)->implicit_value(true), "Run in monitoring mode")
("debug,b" , value<bool>(&debug)->implicit_value(true), "Debug - Print a list of messages)")
("clean-on-exit,e", value<bool>(&cleanOnExit)->implicit_value(true), "Perform cleanup on exit")
("interval" , value<unsigned int>(&intervalInMS)->default_value(100), "Output interval for interactive/view-only mode")
("help,h", "Print help");
("interval" , value<unsigned int>(&intervalInMS)->default_value(1000),"Output interval for interactive mode")
("get-shmid" , value<bool>(&getShmId)->implicit_value(true), "Translate given session id and user id to a shmem id (uses current user id if none provided)")
("list-all" , value<bool>(&listAll)->implicit_value(true), "List all sessions & segments")
("list-all-path" , value<string>(&listAllPath)->default_value("/dev/shm/"),"Path for the --list-all command to search segments in")
("verbose" , value<bool>(&verbose)->implicit_value(true), "Verbose mode (daemon will output to a file 'fairmq-shmmonitor_<timestamp>')")
("severity" , value<string>(&severity)->default_value("info"), "Log severity")
("user-id" , value<int>(&userId)->default_value(-1), "User id (used with --get-shmid)")
("help,h", "Print help");
variables_map vm;
store(parse_command_line(argc, argv, desc), vm);
if (vm.count("help")) {
cout << "FairMQ Shared Memory Monitor" << endl << desc << endl;
LOG(info) << "FairMQ Shared Memory Monitor" << "\n" << desc;
return 0;
}
notify(vm);
if (runAsDaemon) {
daemonize();
fair::Logger::SetConsoleSeverity(severity);
if (getShmId) {
if (userId == -1) {
LOG(info) << "shmem id for session '" << sessionName << "' and current user id " << geteuid()
<< " is: " << makeShmIdStr(sessionName);
} else {
LOG(info) << "shmem id for session '" << sessionName << "' and user id " << userId
<< " is: " << makeShmIdStr(sessionName, to_string(userId));
}
return 0;
}
if (shmId == "") {
if (shmId.empty()) {
shmId = makeShmIdStr(sessionName);
}
@@ -124,18 +151,39 @@ int main(int argc, char** argv)
return 0;
}
cout << "Starting shared memory monitor for session: \"" << sessionName << "\" (shmId: " << shmId << ")..." << endl;
Monitor monitor(shmId, selfDestruct, interactive, viewOnly, timeoutInMS, intervalInMS, runAsDaemon, cleanOnExit);
if (interactive || !viewOnly) {
monitor.CatchSignals();
if (listAll) {
Monitor::ListAll(listAllPath);
return 0;
}
monitor.Run();
if (!viewOnly && !interactive && !monitor) {
// if neither of the run modes are selected, use view only mode.
viewOnly = true;
}
if (viewOnly && !interactive) {
if (!Monitor::PrintShm(ShmId{shmId})) {
LOG(info) << "No segments found.";
}
return 0;
}
if (runAsDaemon && monitor) {
if (verbose) {
fair::Logger::InitFileSink("trace", "fairmq-shmmonitor");
}
daemonize();
}
LOG(info) << "Starting shared memory monitor for session: \"" << sessionName << "\" (shm id: " << shmId << ")...";
Monitor shmmonitor(shmId, selfDestruct, interactive, viewOnly, timeoutInMS, intervalInMS, monitor, cleanOnExit);
shmmonitor.CatchSignals();
shmmonitor.Run();
} catch (Monitor::DaemonPresent& dp) {
return 0;
} catch (exception& e) {
cerr << "Unhandled Exception reached the top of main: " << e.what() << ", application will now exit" << endl;
LOG(error) << "Unhandled Exception reached the top of main: " << e.what() << ", application will now exit";
return 2;
}

View File

@@ -39,6 +39,24 @@ inline auto ToStrVector(const int argc, char*const* argv, const bool dropProgram
}
}
inline bool StrStartsWith(std::string const& str, std::string const& start)
{
if (str.length() >= start.length()) {
return (0 == str.compare(0, start.length(), start));
} else {
return false;
}
}
inline bool StrEndsWith(std::string const& str, std::string const& end)
{
if (str.length() >= end.length()) {
return (0 == str.compare(str.length() - end.length(), end.length(), end));
} else {
return false;
}
}
} // namespace fair::mq::tools
#endif /* FAIR_MQ_TOOLS_STRINGS_H */

View File

@@ -1,10 +1,11 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* Copyright (C) 2014-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <fairmq/tools/Unique.h>
#include <boost/program_options.hpp>

View File

@@ -109,6 +109,11 @@ class Message final : public fair::mq::Message
, fAlignment(0)
, fMsg(std::make_unique<zmq_msg_t>())
{
if (region->GetType() != GetType()) {
LOG(error) << "region type (" << region->GetType() << ") does not match message type (" << GetType() << ")";
throw TransportError(tools::ToString("region type (", region->GetType(), ") does not match message type (", GetType(), ")"));
}
// FIXME: make this zero-copy:
// simply taking over the provided buffer can casue premature delete, since region could be
// destroyed before the message is sent out. Needs lifetime extension for the ZMQ region.

View File

@@ -323,12 +323,10 @@ class Socket final : public fair::mq::Socket
}
}
void Events(uint32_t* events) override
int Events(uint32_t* events) override
{
size_t eventsSize = sizeof(uint32_t);
if (zmq_getsockopt(fSocket, ZMQ_EVENTS, events, &eventsSize) < 0) {
throw SocketError(tools::ToString("failed setting ZMQ_EVENTS, reason: ", zmq_strerror(errno)));
}
return zmq_getsockopt(fSocket, ZMQ_EVENTS, events, &eventsSize);
}
void SetLinger(const int value) override

View File

@@ -96,29 +96,29 @@ class TransportFactory final : public FairMQTransportFactory
return std::make_unique<Poller>(channelsMap, channelList);
}
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, RegionCallback callback, const std::string& path = "", int flags = 0) override
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, RegionCallback callback, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) override
{
return CreateUnmanagedRegion(size, 0, callback, nullptr, path, flags);
return CreateUnmanagedRegion(size, 0, callback, nullptr, path, flags, cfg);
}
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, RegionBulkCallback bulkCallback, const std::string& path = "", int flags = 0) override
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, RegionBulkCallback bulkCallback, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) override
{
return CreateUnmanagedRegion(size, 0, nullptr, bulkCallback, path, flags);
return CreateUnmanagedRegion(size, 0, nullptr, bulkCallback, path, flags, cfg);
}
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, RegionCallback callback, const std::string& path = "", int flags = 0) override
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, RegionCallback callback, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) override
{
return CreateUnmanagedRegion(size, userFlags, callback, nullptr, path, flags);
return CreateUnmanagedRegion(size, userFlags, callback, nullptr, path, flags, cfg);
}
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, RegionBulkCallback bulkCallback, const std::string& path = "", int flags = 0) override
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, RegionBulkCallback bulkCallback, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) override
{
return CreateUnmanagedRegion(size, userFlags, nullptr, bulkCallback, path, flags);
return CreateUnmanagedRegion(size, userFlags, nullptr, bulkCallback, path, flags, cfg);
}
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, RegionCallback callback, RegionBulkCallback bulkCallback, const std::string&, int)
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, RegionCallback callback, RegionBulkCallback bulkCallback, const std::string&, int /* flags */, fair::mq::RegionConfig cfg)
{
UnmanagedRegionPtr ptr = std::make_unique<UnmanagedRegion>(*fCtx, size, userFlags, callback, bulkCallback, this);
UnmanagedRegionPtr ptr = std::make_unique<UnmanagedRegion>(*fCtx, size, userFlags, callback, bulkCallback, this, cfg);
auto zPtr = static_cast<UnmanagedRegion*>(ptr.get());
fCtx->AddRegion(false, zPtr->GetId(), zPtr->GetData(), zPtr->GetSize(), zPtr->GetUserFlags(), RegionEvent::created);
return ptr;

View File

@@ -16,6 +16,8 @@
#include <cstddef> // size_t
#include <string>
#include <sys/mman.h> // mlock
namespace fair::mq::zmq
{
@@ -30,7 +32,8 @@ class UnmanagedRegion final : public fair::mq::UnmanagedRegion
int64_t userFlags,
RegionCallback callback,
RegionBulkCallback bulkCallback,
FairMQTransportFactory* factory = nullptr)
FairMQTransportFactory* factory,
fair::mq::RegionConfig cfg)
: fair::mq::UnmanagedRegion(factory)
, fCtx(ctx)
, fId(fCtx.RegionCount())
@@ -39,7 +42,20 @@ class UnmanagedRegion final : public fair::mq::UnmanagedRegion
, fUserFlags(userFlags)
, fCallback(callback)
, fBulkCallback(bulkCallback)
{}
{
if (cfg.lock) {
LOG(debug) << "Locking region " << fId << "...";
if (mlock(fBuffer, fSize) == -1) {
LOG(error) << "Could not lock region " << fId << ". Code: " << errno << ", reason: " << strerror(errno);
}
LOG(debug) << "Successfully locked region " << fId << ".";
}
if (cfg.zero) {
LOG(debug) << "Zeroing free memory of region " << fId << "...";
memset(fBuffer, 0x00, fSize);
LOG(debug) << "Successfully zeroed free memory of region " << fId << ".";
}
}
UnmanagedRegion(const UnmanagedRegion&) = delete;
UnmanagedRegion operator=(const UnmanagedRegion&) = delete;
@@ -51,6 +67,8 @@ class UnmanagedRegion final : public fair::mq::UnmanagedRegion
void SetLinger(uint32_t /* linger */) override { LOG(debug) << "ZeroMQ UnmanagedRegion linger option not implemented. Acknowledgements are local."; }
uint32_t GetLinger() const override { LOG(debug) << "ZeroMQ UnmanagedRegion linger option not implemented. Acknowledgements are local."; return 0; }
Transport GetType() const override { return Transport::ZMQ; }
virtual ~UnmanagedRegion()
{
LOG(debug) << "destroying region " << fId;

View File

@@ -153,9 +153,11 @@ TEST(ErrorState, interactive_InReset)
EXPECT_EXIT(RunErrorStateIn("Reset", "interactive", "q"), ::testing::ExitedWithCode(1), "");
}
#ifdef FAIRMQ_DEBUG_MODE
TEST(ErrorState, OrphanMessages)
{
BadDevice badDevice;
}
#endif
} // namespace

View File

@@ -23,6 +23,67 @@ namespace
using namespace std;
void RegionsCache(const string& transport, const string& address)
{
size_t session1 = fair::mq::tools::UuidHash();
size_t session2 = fair::mq::tools::UuidHash();
fair::mq::ProgOptions config1;
fair::mq::ProgOptions config2;
config1.SetProperty<string>("session", to_string(session1));
config2.SetProperty<string>("session", to_string(session2));
auto factory1 = FairMQTransportFactory::CreateTransportFactory(transport, fair::mq::tools::Uuid(), &config1);
auto factory2 = FairMQTransportFactory::CreateTransportFactory(transport, fair::mq::tools::Uuid(), &config2);
auto region1 = factory1->CreateUnmanagedRegion(1000000, [](void*, size_t, void*) {});
auto region2 = factory2->CreateUnmanagedRegion(1000000, [](void*, size_t, void*) {});
void* r1ptr = region1->GetData();
void* r2ptr = region2->GetData();
FairMQChannel push1("Push1", "push", factory1);
FairMQChannel pull1("Pull1", "pull", factory1);
push1.Bind(address + to_string(1));
pull1.Connect(address + to_string(1));
FairMQChannel push2("Push2", "push", factory2);
FairMQChannel pull2("Pull2", "pull", factory2);
push2.Bind(address + to_string(2));
pull2.Connect(address + to_string(2));
{
static_cast<char*>(r1ptr)[0] = 97; // a
static_cast<char*>(static_cast<char*>(r1ptr) + 100)[0] = 98; // b
static_cast<char*>(r2ptr)[0] = 99; // c
static_cast<char*>(static_cast<char*>(r2ptr) + 100)[0] = 100; // d
FairMQMessagePtr m1(push1.NewMessage(region1, r1ptr, 100, nullptr));
FairMQMessagePtr m2(push1.NewMessage(region1, static_cast<char*>(r1ptr) + 100, 100, nullptr));
push1.Send(m1);
push1.Send(m2);
FairMQMessagePtr m3(push2.NewMessage(region2, r2ptr, 100, nullptr));
FairMQMessagePtr m4(push2.NewMessage(region2, static_cast<char*>(r2ptr) + 100, 100, nullptr));
push2.Send(m3);
push2.Send(m4);
}
{
FairMQMessagePtr m1(pull1.NewMessage());
FairMQMessagePtr m2(pull1.NewMessage());
ASSERT_EQ(pull1.Receive(m1), 100);
ASSERT_EQ(pull1.Receive(m2), 100);
ASSERT_EQ(static_cast<char*>(m1->GetData())[0], 'a');
ASSERT_EQ(static_cast<char*>(m2->GetData())[0], 'b');
FairMQMessagePtr m3(pull2.NewMessage());
FairMQMessagePtr m4(pull2.NewMessage());
ASSERT_EQ(pull2.Receive(m3), 100);
ASSERT_EQ(pull2.Receive(m4), 100);
ASSERT_EQ(static_cast<char*>(m3->GetData())[0], 'c');
ASSERT_EQ(static_cast<char*>(m4->GetData())[0], 'd');
}
}
void RegionEventSubscriptions(const string& transport)
{
size_t session{fair::mq::tools::UuidHash()};
@@ -160,6 +221,16 @@ void RegionCallbacks(const string& transport, const string& _address)
LOG(info) << "2 done.";
}
TEST(Cache, zeromq)
{
RegionsCache("zeromq", "ipc://test_region_cache");
}
TEST(Cache, shmem)
{
RegionsCache("shmem", "ipc://test_region_cache");
}
TEST(EventSubscriptions, zeromq)
{
RegionEventSubscriptions("zeromq");