mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-15 09:31:45 +00:00
Compare commits
24 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
f3bc9e05a8 | ||
|
5facc441b8 | ||
|
2602f53585 | ||
|
0976465338 | ||
|
9144258b89 | ||
|
be55565617 | ||
|
d7e2fbecea | ||
|
72175e5757 | ||
|
effba534f0 | ||
|
efd42075a9 | ||
|
5228407932 | ||
|
30e81d58f8 | ||
|
2c7c46f2fd | ||
|
0a5122bb24 | ||
|
fc49687879 | ||
|
66a4df0667 | ||
|
978191fa6c | ||
|
cef6d0afcd | ||
|
47ec550792 | ||
|
b4aeb320e5 | ||
|
107248be0a | ||
|
68ceaba501 | ||
|
4b6cf8b181 | ||
|
21d6cf9830 |
@@ -1,3 +1,3 @@
|
||||
---
|
||||
Checks: '*,-google-*,-fuchsia-*,-cert-*,-llvm-header-guard,-readability-named-parameter,-misc-non-private-member-variables-in-classes,-*-magic-numbers,-llvm-include-order,-hicpp-no-array-decay,-performance-unnecessary-value-param,-cppcoreguidelines-pro-bounds-array-to-pointer-decay,-modernize-use-trailing-return-type,-readability-redundant-member-init'
|
||||
Checks: 'cppcoreguidelines-*,misc-unused-alias-decls,misc-unused-parameters,modernize-deprecated-headers,modernize-raw-string-literal,modernize-redundant-void-arg,modernize-use-bool-literals,modernize-use-default-member-init,modernize-use-emplace,modernize-use-equals-default,modernize-use-equals-delete,modernize-use-noexcept,modernize-use-nullptr,modernize-use-override,modernize-use-using,performance-faster-string-find,performance-for-range-copy,performance-unnecessary-copy-initialization,readability-avoid-const-params-in-decls,readability-braces-around-statements,readability-container-size-empty,readability-delete-null-pointer,readability-redundant-member-init,readability-redundant-string-init,readability-static-accessed-through-instance,readability-string-compare'
|
||||
HeaderFilterRegex: '/(fairmq/)'
|
||||
|
@@ -44,8 +44,10 @@ list(APPEND options
|
||||
"-DDISABLE_COLOR=ON"
|
||||
"-DBUILD_SDK_COMMANDS=ON"
|
||||
"-DBUILD_SDK=ON"
|
||||
"-DBUILD_DDS_PLUGIN=ON"
|
||||
)
|
||||
"-DBUILD_DDS_PLUGIN=ON")
|
||||
if(RUN_STATIC_ANALYSIS)
|
||||
list(APPEND options "-DRUN_STATIC_ANALYSIS=ON")
|
||||
endif()
|
||||
list(JOIN options ";" optionsstr)
|
||||
ctest_configure(OPTIONS "${optionsstr}")
|
||||
|
||||
|
25
Jenkinsfile
vendored
25
Jenkinsfile
vendored
@@ -1,6 +1,6 @@
|
||||
#!groovy
|
||||
|
||||
def jobMatrix(String prefix, String type, List specs) {
|
||||
def jobMatrix(String type, List specs) {
|
||||
def nodes = [:]
|
||||
for (spec in specs) {
|
||||
job = "${spec.os}-${spec.ver}-${spec.arch}-${spec.compiler}"
|
||||
@@ -12,7 +12,7 @@ def jobMatrix(String prefix, String type, List specs) {
|
||||
|
||||
nodes[label] = {
|
||||
node(selector) {
|
||||
githubNotify(context: "${prefix}/${label}", description: 'Building ...', status: 'PENDING')
|
||||
githubNotify(context: "${label}", description: 'Building ...', status: 'PENDING')
|
||||
try {
|
||||
deleteDir()
|
||||
checkout scm
|
||||
@@ -29,7 +29,7 @@ def jobMatrix(String prefix, String type, List specs) {
|
||||
sh "cat ${jobscript}"
|
||||
sh "bash ${jobscript}"
|
||||
} else {
|
||||
def containercmd = "singularity exec -B/shared ${env.SINGULARITY_CONTAINER_ROOT}/fairmq/${os}.${ver}.sif bash -l -c \\\"${ctestcmd}\\\""
|
||||
def containercmd = "singularity exec -B/shared ${env.SINGULARITY_CONTAINER_ROOT}/fairmq/${os}.${ver}.sif bash -l -c \\\"${ctestcmd} -DRUN_STATIC_ANALYSIS=ON\\\""
|
||||
sh """\
|
||||
echo \"echo \\\"*** Job started at .......: \\\$(date -R)\\\"\" >> ${jobscript}
|
||||
echo \"echo \\\"*** Job ID ...............: \\\${SLURM_JOB_ID}\\\"\" >> ${jobscript}
|
||||
@@ -40,13 +40,24 @@ def jobMatrix(String prefix, String type, List specs) {
|
||||
"""
|
||||
sh "cat ${jobscript}"
|
||||
sh "test/ci/slurm-submit.sh \"FairMQ \${JOB_BASE_NAME} ${label}\" ${jobscript}"
|
||||
|
||||
withChecks('Static Analysis') {
|
||||
recordIssues(enabledForFailure: true,
|
||||
tools: [gcc(pattern: 'build/Testing/Temporary/*.log')],
|
||||
filters: [excludeFile('extern/*'), excludeFile('usr/*')],
|
||||
skipBlames: true)
|
||||
}
|
||||
}
|
||||
|
||||
deleteDir()
|
||||
githubNotify(context: "${prefix}/${label}", description: 'Success', status: 'SUCCESS')
|
||||
githubNotify(context: "${label}", description: 'Success', status: 'SUCCESS')
|
||||
} catch (e) {
|
||||
def tarball = "${prefix}_${label}_dds_logs.tar.gz"
|
||||
sh "tar czvf ${tarball} -C \${WORKSPACE}/build/test .DDS/"
|
||||
archiveArtifacts tarball
|
||||
|
||||
deleteDir()
|
||||
githubNotify(context: "${prefix}/${label}", description: 'Error', status: 'ERROR')
|
||||
githubNotify(context: "${label}", description: 'Error', status: 'ERROR')
|
||||
throw e
|
||||
}
|
||||
}
|
||||
@@ -58,10 +69,10 @@ def jobMatrix(String prefix, String type, List specs) {
|
||||
pipeline{
|
||||
agent none
|
||||
stages {
|
||||
stage("Run CI Matrix") {
|
||||
stage("CI") {
|
||||
steps{
|
||||
script {
|
||||
def builds = jobMatrix('alfa-ci', 'build', [
|
||||
def builds = jobMatrix('build', [
|
||||
[os: 'fedora', ver: '32', arch: 'x86_64', compiler: 'gcc-10'],
|
||||
[os: 'macos', ver: '11', arch: 'x86_64', compiler: 'apple-clang-12'],
|
||||
])
|
||||
|
@@ -1,5 +1,5 @@
|
||||
<!-- {#mainpage} -->
|
||||
# FairMQ [](COPYRIGHT) [](https://alfa-ci.gsi.de/blue/organizations/jenkins/FairRootGroup%2FFairMQ/branches) [](https://codecov.io/gh/FairRootGroup/FairMQ/branch/master) [](https://scan.coverity.com/projects/fairrootgroup-fairmq) [](https://www.codacy.com/app/dennisklein/FairMQ?utm_source=github.com&utm_medium=referral&utm_content=FairRootGroup/FairMQ&utm_campaign=Badge_Grade)
|
||||
# FairMQ [](COPYRIGHT) [](https://alfa-ci.gsi.de/blue/organizations/jenkins/FairRootGroup%2FFairMQ/branches) [](https://scan.coverity.com/projects/fairrootgroup-fairmq)
|
||||
|
||||
C++ Message Queuing Library and Framework
|
||||
|
||||
|
@@ -1,5 +1,5 @@
|
||||
################################################################################
|
||||
# Copyright (C) 2012-2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
|
||||
# Copyright (C) 2012-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
|
||||
# #
|
||||
# This software is distributed under the terms of the #
|
||||
# GNU Lesser General Public Licence (LGPL) version 3, #
|
||||
@@ -62,6 +62,7 @@ if(BUILD_FAIRMQ OR BUILD_SDK)
|
||||
target_link_libraries(${target}
|
||||
PRIVATE
|
||||
FairLogger::FairLogger
|
||||
Threads::Threads
|
||||
PUBLIC
|
||||
Boost::boost
|
||||
)
|
||||
@@ -145,6 +146,7 @@ if(BUILD_FAIRMQ)
|
||||
# libFairMQ header files #
|
||||
##########################
|
||||
set(FAIRMQ_PUBLIC_HEADER_FILES
|
||||
Device.h
|
||||
DeviceRunner.h
|
||||
EventManager.h
|
||||
FairMQChannel.h
|
||||
@@ -169,14 +171,21 @@ if(BUILD_FAIRMQ)
|
||||
Plugin.h
|
||||
PluginManager.h
|
||||
PluginServices.h
|
||||
runDevice.h
|
||||
runFairMQDevice.h
|
||||
shmem/Monitor.h
|
||||
)
|
||||
|
||||
set(FAIRMQ_PRIVATE_HEADER_FILES
|
||||
devices/BenchmarkSampler.h
|
||||
devices/Merger.h
|
||||
devices/Multiplier.h
|
||||
devices/Proxy.h
|
||||
devices/Sink.h
|
||||
devices/Splitter.h
|
||||
plugins/Builtin.h
|
||||
plugins/config/Config.h
|
||||
plugins/Control.h
|
||||
plugins/control/Control.h
|
||||
shmem/Message.h
|
||||
shmem/Poller.h
|
||||
shmem/UnmanagedRegion.h
|
||||
@@ -223,7 +232,7 @@ if(BUILD_FAIRMQ)
|
||||
Properties.cxx
|
||||
SuboptParser.cxx
|
||||
plugins/config/Config.cxx
|
||||
plugins/Control.cxx
|
||||
plugins/control/Control.cxx
|
||||
MemoryResources.cxx
|
||||
shmem/Monitor.cxx
|
||||
)
|
||||
@@ -243,7 +252,7 @@ if(BUILD_FAIRMQ)
|
||||
# configure files #
|
||||
###################
|
||||
set(FAIRMQ_BIN_DIR ${CMAKE_BINARY_DIR}/fairmq)
|
||||
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/run/startMQBenchmark.sh.in ${CMAKE_CURRENT_BINARY_DIR}/startMQBenchmark.sh)
|
||||
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/devices/startMQBenchmark.sh.in ${CMAKE_CURRENT_BINARY_DIR}/startMQBenchmark.sh)
|
||||
|
||||
#################################
|
||||
# define libFairMQ build target #
|
||||
@@ -347,22 +356,22 @@ if(BUILD_FAIRMQ)
|
||||
###############
|
||||
# executables #
|
||||
###############
|
||||
add_executable(fairmq-bsampler run/runBenchmarkSampler.cxx)
|
||||
add_executable(fairmq-bsampler devices/runBenchmarkSampler.cxx)
|
||||
target_link_libraries(fairmq-bsampler FairMQ)
|
||||
|
||||
add_executable(fairmq-merger run/runMerger.cxx)
|
||||
add_executable(fairmq-merger devices/runMerger.cxx)
|
||||
target_link_libraries(fairmq-merger FairMQ)
|
||||
|
||||
add_executable(fairmq-multiplier run/runMultiplier.cxx)
|
||||
add_executable(fairmq-multiplier devices/runMultiplier.cxx)
|
||||
target_link_libraries(fairmq-multiplier FairMQ)
|
||||
|
||||
add_executable(fairmq-proxy run/runProxy.cxx)
|
||||
add_executable(fairmq-proxy devices/runProxy.cxx)
|
||||
target_link_libraries(fairmq-proxy FairMQ)
|
||||
|
||||
add_executable(fairmq-sink run/runSink.cxx)
|
||||
add_executable(fairmq-sink devices/runSink.cxx)
|
||||
target_link_libraries(fairmq-sink FairMQ)
|
||||
|
||||
add_executable(fairmq-splitter run/runSplitter.cxx)
|
||||
add_executable(fairmq-splitter devices/runSplitter.cxx)
|
||||
target_link_libraries(fairmq-splitter FairMQ)
|
||||
|
||||
add_executable(fairmq-shmmonitor shmem/Monitor.cxx shmem/Monitor.h shmem/runMonitor.cxx)
|
||||
@@ -383,8 +392,11 @@ if(BUILD_FAIRMQ)
|
||||
$<BUILD_INTERFACE:${CMAKE_SOURCE_DIR}>
|
||||
)
|
||||
|
||||
add_executable(fairmq-uuid-gen run/runUuidGenerator.cxx)
|
||||
target_link_libraries(fairmq-uuid-gen FairMQ)
|
||||
add_executable(fairmq-uuid-gen tools/runUuidGenerator.cxx)
|
||||
target_link_libraries(fairmq-uuid-gen PUBLIC
|
||||
Boost::program_options
|
||||
Tools
|
||||
)
|
||||
|
||||
|
||||
###########
|
||||
|
21
fairmq/Device.h
Normal file
21
fairmq/Device.h
Normal file
@@ -0,0 +1,21 @@
|
||||
/********************************************************************************
|
||||
* Copyright (C) 2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* *
|
||||
* This software is distributed under the terms of the *
|
||||
* GNU Lesser General Public Licence (LGPL) version 3, *
|
||||
* copied verbatim in the file "LICENSE" *
|
||||
********************************************************************************/
|
||||
|
||||
#ifndef FAIR_MQ_DEVICE_H
|
||||
#define FAIR_MQ_DEVICE_H
|
||||
|
||||
#include <FairMQDevice.h>
|
||||
|
||||
namespace fair::mq
|
||||
{
|
||||
|
||||
using Device = ::FairMQDevice;
|
||||
|
||||
} // namespace fair::mq
|
||||
|
||||
#endif /* FAIR_MQ_DEVICE_H */
|
@@ -1,5 +1,5 @@
|
||||
/********************************************************************************
|
||||
* Copyright (C) 2017-2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* Copyright (C) 2017-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* *
|
||||
* This software is distributed under the terms of the *
|
||||
* GNU Lesser General Public Licence (LGPL) version 3, *
|
||||
|
@@ -1,5 +1,5 @@
|
||||
/********************************************************************************
|
||||
* Copyright (C) 2017-2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* Copyright (C) 2017-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* *
|
||||
* This software is distributed under the terms of the *
|
||||
* GNU Lesser General Public Licence (LGPL) version 3, *
|
||||
@@ -9,10 +9,10 @@
|
||||
#ifndef FAIR_MQ_DEVICERUNNER_H
|
||||
#define FAIR_MQ_DEVICERUNNER_H
|
||||
|
||||
#include <fairmq/Device.h>
|
||||
#include <fairmq/EventManager.h>
|
||||
#include <fairmq/PluginManager.h>
|
||||
#include <fairmq/ProgOptions.h>
|
||||
#include <FairMQDevice.h>
|
||||
|
||||
#include <functional>
|
||||
#include <memory>
|
||||
@@ -73,7 +73,7 @@ class DeviceRunner
|
||||
|
||||
std::vector<std::string> fRawCmdLineArgs;
|
||||
fair::mq::ProgOptions fConfig;
|
||||
std::unique_ptr<FairMQDevice> fDevice;
|
||||
std::unique_ptr<Device> fDevice;
|
||||
PluginManager fPluginManager;
|
||||
const bool fPrintLogo;
|
||||
|
||||
|
@@ -9,17 +9,17 @@
|
||||
#ifndef FAIRMQDEVICE_H_
|
||||
#define FAIRMQDEVICE_H_
|
||||
|
||||
#include <StateMachine.h>
|
||||
#include <FairMQTransportFactory.h>
|
||||
#include <fairmq/Transports.h>
|
||||
#include <fairmq/StateQueue.h>
|
||||
|
||||
#include <FairMQChannel.h>
|
||||
#include <FairMQLogger.h>
|
||||
#include <FairMQMessage.h>
|
||||
#include <FairMQParts.h>
|
||||
#include <FairMQTransportFactory.h>
|
||||
#include <FairMQUnmanagedRegion.h>
|
||||
#include <FairMQLogger.h>
|
||||
#include <fairmq/ProgOptions.h>
|
||||
#include <fairmq/StateMachine.h>
|
||||
#include <fairmq/StateQueue.h>
|
||||
#include <fairmq/Transports.h>
|
||||
#include <fairmq/tools/Version.h>
|
||||
|
||||
#include <vector>
|
||||
#include <memory> // unique_ptr
|
||||
@@ -34,8 +34,6 @@
|
||||
#include <cstddef>
|
||||
#include <utility> // pair
|
||||
|
||||
#include <fairmq/tools/Version.h>
|
||||
|
||||
using FairMQChannelMap = std::unordered_map<std::string, std::vector<FairMQChannel>>;
|
||||
|
||||
using InputMsgCallback = std::function<bool(FairMQMessagePtr&, int)>;
|
||||
|
@@ -1,5 +1,5 @@
|
||||
/********************************************************************************
|
||||
* Copyright (C) 2014-2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* Copyright (C) 2014-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* *
|
||||
* This software is distributed under the terms of the *
|
||||
* GNU Lesser General Public Licence (LGPL) version 3, *
|
||||
@@ -9,8 +9,9 @@
|
||||
#ifndef FAIRMQPOLLER_H_
|
||||
#define FAIRMQPOLLER_H_
|
||||
|
||||
#include <string>
|
||||
#include <memory>
|
||||
#include <stdexcept>
|
||||
#include <string>
|
||||
|
||||
class FairMQPoller
|
||||
{
|
||||
|
@@ -15,7 +15,7 @@
|
||||
#ifndef FAIR_MQ_MEMORY_RESOURCES_H
|
||||
#define FAIR_MQ_MEMORY_RESOURCES_H
|
||||
|
||||
#include <fairmq/FairMQMessage.h>
|
||||
#include <FairMQMessage.h>
|
||||
class FairMQTransportFactory;
|
||||
|
||||
#include <boost/container/container_fwd.hpp>
|
||||
|
@@ -1,32 +1,35 @@
|
||||
/********************************************************************************
|
||||
* Copyright (C) 2014-2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* Copyright (C) 2014-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* *
|
||||
* This software is distributed under the terms of the *
|
||||
* GNU Lesser General Public Licence (LGPL) version 3, *
|
||||
* copied verbatim in the file "LICENSE" *
|
||||
********************************************************************************/
|
||||
|
||||
#ifndef FAIRMQBENCHMARKSAMPLER_H_
|
||||
#define FAIRMQBENCHMARKSAMPLER_H_
|
||||
#ifndef FAIR_MQ_BENCHMARKSAMPLER_H
|
||||
#define FAIR_MQ_BENCHMARKSAMPLER_H
|
||||
|
||||
#include "../FairMQLogger.h"
|
||||
#include "FairMQDevice.h"
|
||||
#include "tools/RateLimit.h"
|
||||
#include <fairmq/Device.h>
|
||||
#include <fairmq/tools/RateLimit.h>
|
||||
|
||||
#include <chrono>
|
||||
#include <cstddef> // size_t
|
||||
#include <cstdint> // uint64_t
|
||||
#include <cstring> // memset
|
||||
#include <fairlogger/Logger.h>
|
||||
#include <string>
|
||||
|
||||
namespace fair::mq
|
||||
{
|
||||
|
||||
/**
|
||||
* Sampler to generate traffic for benchmarking.
|
||||
*/
|
||||
|
||||
class FairMQBenchmarkSampler : public FairMQDevice
|
||||
class BenchmarkSampler : public Device
|
||||
{
|
||||
public:
|
||||
FairMQBenchmarkSampler()
|
||||
BenchmarkSampler()
|
||||
: fMultipart(false)
|
||||
, fMemSet(false)
|
||||
, fNumParts(1)
|
||||
@@ -117,4 +120,6 @@ class FairMQBenchmarkSampler : public FairMQDevice
|
||||
std::string fOutChannelName;
|
||||
};
|
||||
|
||||
#endif /* FAIRMQBENCHMARKSAMPLER_H_ */
|
||||
} // namespace fair::mq
|
||||
|
||||
#endif /* FAIR_MQ_BENCHMARKSAMPLER_H */
|
@@ -1,36 +1,32 @@
|
||||
/********************************************************************************
|
||||
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* Copyright (C) 2014-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* *
|
||||
* This software is distributed under the terms of the *
|
||||
* GNU Lesser General Public Licence (LGPL) version 3, *
|
||||
* copied verbatim in the file "LICENSE" *
|
||||
********************************************************************************/
|
||||
/**
|
||||
* FairMQMerger.h
|
||||
*
|
||||
* @since 2012-12-06
|
||||
* @author D. Klein, A. Rybalchenko
|
||||
*/
|
||||
|
||||
#ifndef FAIRMQMERGER_H_
|
||||
#define FAIRMQMERGER_H_
|
||||
#ifndef FAIR_MQ_MERGER_H
|
||||
#define FAIR_MQ_MERGER_H
|
||||
|
||||
#include "FairMQDevice.h"
|
||||
#include "../FairMQPoller.h"
|
||||
#include "../FairMQLogger.h"
|
||||
#include <FairMQPoller.h>
|
||||
#include <fairmq/Device.h>
|
||||
|
||||
#include <fairlogger/Logger.h>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
class FairMQMerger : public FairMQDevice
|
||||
namespace fair::mq
|
||||
{
|
||||
|
||||
class Merger : public Device
|
||||
{
|
||||
public:
|
||||
FairMQMerger()
|
||||
Merger()
|
||||
: fMultipart(true)
|
||||
, fInChannelName("data-in")
|
||||
, fOutChannelName("data-out")
|
||||
{}
|
||||
~FairMQMerger() {}
|
||||
|
||||
protected:
|
||||
bool fMultipart;
|
||||
@@ -112,4 +108,6 @@ class FairMQMerger : public FairMQDevice
|
||||
}
|
||||
};
|
||||
|
||||
#endif /* FAIRMQMERGER_H_ */
|
||||
} // namespace fair::mq
|
||||
|
||||
#endif /* FAIR_MQ_MERGER_H */
|
@@ -1,29 +1,31 @@
|
||||
/********************************************************************************
|
||||
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* Copyright (C) 2014-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* *
|
||||
* This software is distributed under the terms of the *
|
||||
* GNU Lesser General Public Licence (LGPL) version 3, *
|
||||
* copied verbatim in the file "LICENSE" *
|
||||
********************************************************************************/
|
||||
|
||||
#ifndef FAIRMQMULTIPLIER_H_
|
||||
#define FAIRMQMULTIPLIER_H_
|
||||
#ifndef FAIR_MQ_MULTIPLIER_H
|
||||
#define FAIR_MQ_MULTIPLIER_H
|
||||
|
||||
#include "FairMQDevice.h"
|
||||
#include <fairmq/Device.h>
|
||||
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
class FairMQMultiplier : public FairMQDevice
|
||||
namespace fair::mq
|
||||
{
|
||||
|
||||
class Multiplier : public Device
|
||||
{
|
||||
public:
|
||||
FairMQMultiplier()
|
||||
Multiplier()
|
||||
: fMultipart(true)
|
||||
, fNumOutputs(0)
|
||||
, fInChannelName()
|
||||
, fOutChannelNames()
|
||||
{}
|
||||
~FairMQMultiplier() {}
|
||||
|
||||
protected:
|
||||
bool fMultipart;
|
||||
@@ -39,9 +41,9 @@ class FairMQMultiplier : public FairMQDevice
|
||||
fNumOutputs = fChannels.at(fOutChannelNames.at(0)).size();
|
||||
|
||||
if (fMultipart) {
|
||||
OnData(fInChannelName, &FairMQMultiplier::HandleMultipartData);
|
||||
OnData(fInChannelName, &Multiplier::HandleMultipartData);
|
||||
} else {
|
||||
OnData(fInChannelName, &FairMQMultiplier::HandleSingleData);
|
||||
OnData(fInChannelName, &Multiplier::HandleSingleData);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -107,4 +109,6 @@ class FairMQMultiplier : public FairMQDevice
|
||||
}
|
||||
};
|
||||
|
||||
#endif /* FAIRMQMULTIPLIER_H_ */
|
||||
} // namespace fair::mq
|
||||
|
||||
#endif /* FAIR_MQ_MULTIPLIER_H */
|
@@ -1,33 +1,29 @@
|
||||
/********************************************************************************
|
||||
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* Copyright (C) 2014-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* *
|
||||
* This software is distributed under the terms of the *
|
||||
* GNU Lesser General Public Licence (LGPL) version 3, *
|
||||
* copied verbatim in the file "LICENSE" *
|
||||
********************************************************************************/
|
||||
/**
|
||||
* FairMQProxy.h
|
||||
*
|
||||
* @since 2013-10-02
|
||||
* @author A. Rybalchenko
|
||||
*/
|
||||
|
||||
#ifndef FAIRMQPROXY_H_
|
||||
#define FAIRMQPROXY_H_
|
||||
#ifndef FAIR_MQ_PROXY_H
|
||||
#define FAIR_MQ_PROXY_H
|
||||
|
||||
#include "FairMQDevice.h"
|
||||
#include <fairmq/Device.h>
|
||||
|
||||
#include <string>
|
||||
|
||||
class FairMQProxy : public FairMQDevice
|
||||
namespace fair::mq
|
||||
{
|
||||
|
||||
class Proxy : public Device
|
||||
{
|
||||
public:
|
||||
FairMQProxy()
|
||||
Proxy()
|
||||
: fMultipart(true)
|
||||
, fInChannelName()
|
||||
, fOutChannelName()
|
||||
{}
|
||||
~FairMQProxy() {}
|
||||
|
||||
protected:
|
||||
bool fMultipart;
|
||||
@@ -73,4 +69,6 @@ class FairMQProxy : public FairMQDevice
|
||||
}
|
||||
};
|
||||
|
||||
#endif /* FAIRMQPROXY_H_ */
|
||||
} // namespace fair::mq
|
||||
|
||||
#endif /* FAIR_MQ_PROXY_H */
|
@@ -2,9 +2,9 @@
|
||||
|
||||
With FairMQ several generic devices are provided:
|
||||
|
||||
- **FairMQBenchmarkSampler**: generates random data of configurable size and at configurable rate and sends it out on an output channel.
|
||||
- **FairMQSink**: receives messages on the input channel and simply discards them.
|
||||
- **FairMQMerger**: receives data from multiple input channels and forwards it to a single output channel.
|
||||
- **FairMQSplitter**: receives messages on a single input channels and round-robins them among multiple output channels (which can have different socket types).
|
||||
- **FairMQMultiplier**: receives data from a single input channel and multiplies (copies) it to two or more output channels.
|
||||
- **FairMQProxy**: connects input channel to output channel, where both can have different socket types and multiple peers.
|
||||
- **BenchmarkSampler**: generates random data of configurable size and at configurable rate and sends it out on an output channel.
|
||||
- **Sink**: receives messages on the input channel and simply discards them.
|
||||
- **Merger**: receives data from multiple input channels and forwards it to a single output channel.
|
||||
- **Splitter**: receives messages on a single input channels and round-robins them among multiple output channels (which can have different socket types).
|
||||
- **Multiplier**: receives data from a single input channel and multiplies (copies) it to two or more output channels.
|
||||
- **Proxy**: connects input channel to output channel, where both can have different socket types and multiple peers.
|
||||
|
@@ -1,33 +1,31 @@
|
||||
/********************************************************************************
|
||||
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* Copyright (C) 2014-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* *
|
||||
* This software is distributed under the terms of the *
|
||||
* GNU Lesser General Public Licence (LGPL) version 3, *
|
||||
* copied verbatim in the file "LICENSE" *
|
||||
********************************************************************************/
|
||||
/**
|
||||
* FairMQSink.h
|
||||
*
|
||||
* @since 2013-01-09
|
||||
* @author D. Klein, A. Rybalchenko
|
||||
*/
|
||||
|
||||
#ifndef FAIRMQSINK_H_
|
||||
#define FAIRMQSINK_H_
|
||||
#ifndef FAIR_MQ_SINK_H
|
||||
#define FAIR_MQ_SINK_H
|
||||
|
||||
#include "../FairMQDevice.h"
|
||||
#include "../FairMQLogger.h"
|
||||
#include <FairMQPoller.h>
|
||||
#include <fairmq/Device.h>
|
||||
#include <fairmq/tools/Strings.h>
|
||||
|
||||
#include <chrono>
|
||||
#include <string>
|
||||
#include <fairlogger/Logger.h>
|
||||
#include <fstream>
|
||||
#include <string>
|
||||
#include <stdexcept>
|
||||
|
||||
class FairMQSink : public FairMQDevice
|
||||
namespace fair::mq
|
||||
{
|
||||
|
||||
class Sink : public Device
|
||||
{
|
||||
public:
|
||||
FairMQSink()
|
||||
Sink()
|
||||
: fMultipart(false)
|
||||
, fMaxIterations(0)
|
||||
, fNumIterations(0)
|
||||
@@ -37,8 +35,6 @@ class FairMQSink : public FairMQDevice
|
||||
, fOutFilename()
|
||||
{}
|
||||
|
||||
~FairMQSink() {}
|
||||
|
||||
protected:
|
||||
bool fMultipart;
|
||||
uint64_t fMaxIterations;
|
||||
@@ -145,4 +141,6 @@ class FairMQSink : public FairMQDevice
|
||||
}
|
||||
};
|
||||
|
||||
#endif /* FAIRMQSINK_H_ */
|
||||
} // namespace fair::mq
|
||||
|
||||
#endif /* FAIR_MQ_SINK_H */
|
@@ -1,35 +1,31 @@
|
||||
/********************************************************************************
|
||||
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* Copyright (C) 2014-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* *
|
||||
* This software is distributed under the terms of the *
|
||||
* GNU Lesser General Public Licence (LGPL) version 3, *
|
||||
* copied verbatim in the file "LICENSE" *
|
||||
********************************************************************************/
|
||||
/**
|
||||
* FairMQSplitter.h
|
||||
*
|
||||
* @since 2012-12-06
|
||||
* @author D. Klein, A. Rybalchenko
|
||||
*/
|
||||
|
||||
#ifndef FAIRMQSPLITTER_H_
|
||||
#define FAIRMQSPLITTER_H_
|
||||
#ifndef FAIR_MQ_SPLITTER_H
|
||||
#define FAIR_MQ_SPLITTER_H
|
||||
|
||||
#include "FairMQDevice.h"
|
||||
#include <fairmq/Device.h>
|
||||
|
||||
#include <string>
|
||||
|
||||
class FairMQSplitter : public FairMQDevice
|
||||
namespace fair::mq
|
||||
{
|
||||
|
||||
class Splitter : public Device
|
||||
{
|
||||
public:
|
||||
FairMQSplitter()
|
||||
Splitter()
|
||||
: fMultipart(true)
|
||||
, fNumOutputs(0)
|
||||
, fDirection(0)
|
||||
, fInChannelName()
|
||||
, fOutChannelName()
|
||||
{}
|
||||
~FairMQSplitter() {}
|
||||
|
||||
protected:
|
||||
bool fMultipart;
|
||||
@@ -47,9 +43,9 @@ class FairMQSplitter : public FairMQDevice
|
||||
fDirection = 0;
|
||||
|
||||
if (fMultipart) {
|
||||
OnData(fInChannelName, &FairMQSplitter::HandleData<FairMQParts>);
|
||||
OnData(fInChannelName, &Splitter::HandleData<FairMQParts>);
|
||||
} else {
|
||||
OnData(fInChannelName, &FairMQSplitter::HandleData<FairMQMessagePtr>);
|
||||
OnData(fInChannelName, &Splitter::HandleData<FairMQMessagePtr>);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -66,4 +62,6 @@ class FairMQSplitter : public FairMQDevice
|
||||
}
|
||||
};
|
||||
|
||||
#endif /* FAIRMQSPLITTER_H_ */
|
||||
} // namespace fair::mq
|
||||
|
||||
#endif /* FAIR_MQ_SPLITTER_H */
|
@@ -1,13 +1,13 @@
|
||||
/********************************************************************************
|
||||
* Copyright (C) 2014-2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* Copyright (C) 2014-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* *
|
||||
* This software is distributed under the terms of the *
|
||||
* GNU Lesser General Public Licence (LGPL) version 3, *
|
||||
* copied verbatim in the file "LICENSE" *
|
||||
********************************************************************************/
|
||||
|
||||
#include <runFairMQDevice.h>
|
||||
#include <devices/FairMQBenchmarkSampler.h>
|
||||
#include <fairmq/devices/BenchmarkSampler.h>
|
||||
#include <fairmq/runDevice.h>
|
||||
|
||||
namespace bpo = boost::program_options;
|
||||
|
||||
@@ -24,7 +24,7 @@ void addCustomOptions(bpo::options_description& options)
|
||||
("msg-rate", bpo::value<float>()->default_value(0), "Msg rate limit in maximum number of messages per second");
|
||||
}
|
||||
|
||||
FairMQDevicePtr getDevice(const fair::mq::ProgOptions& /* config */)
|
||||
std::unique_ptr<fair::mq::Device> getDevice(const fair::mq::ProgOptions& /* config */)
|
||||
{
|
||||
return new FairMQBenchmarkSampler();
|
||||
return std::make_unique<fair::mq::BenchmarkSampler>();
|
||||
}
|
@@ -1,13 +1,13 @@
|
||||
/********************************************************************************
|
||||
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* Copyright (C) 2014-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* *
|
||||
* This software is distributed under the terms of the *
|
||||
* GNU Lesser General Public Licence (LGPL) version 3, *
|
||||
* copied verbatim in the file "LICENSE" *
|
||||
********************************************************************************/
|
||||
|
||||
#include <runFairMQDevice.h>
|
||||
#include <devices/FairMQMerger.h>
|
||||
#include <fairmq/devices/Merger.h>
|
||||
#include <fairmq/runDevice.h>
|
||||
|
||||
namespace bpo = boost::program_options;
|
||||
|
||||
@@ -19,7 +19,7 @@ void addCustomOptions(bpo::options_description& options)
|
||||
("multipart", bpo::value<bool>()->default_value(true), "Handle multipart payloads");
|
||||
}
|
||||
|
||||
FairMQDevicePtr getDevice(const fair::mq::ProgOptions& /*config*/)
|
||||
std::unique_ptr<fair::mq::Device> getDevice(const fair::mq::ProgOptions& /*config*/)
|
||||
{
|
||||
return new FairMQMerger();
|
||||
return std::make_unique<fair::mq::Merger>();
|
||||
}
|
@@ -1,13 +1,13 @@
|
||||
/********************************************************************************
|
||||
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* Copyright (C) 2014-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* *
|
||||
* This software is distributed under the terms of the *
|
||||
* GNU Lesser General Public Licence (LGPL) version 3, *
|
||||
* copied verbatim in the file "LICENSE" *
|
||||
********************************************************************************/
|
||||
|
||||
#include <runFairMQDevice.h>
|
||||
#include <devices/FairMQMultiplier.h>
|
||||
#include <fairmq/devices/Multiplier.h>
|
||||
#include <fairmq/runDevice.h>
|
||||
|
||||
namespace bpo = boost::program_options;
|
||||
|
||||
@@ -19,7 +19,7 @@ void addCustomOptions(bpo::options_description& options)
|
||||
("multipart", bpo::value<bool>()->default_value(true), "Handle multipart payloads");
|
||||
}
|
||||
|
||||
FairMQDevicePtr getDevice(const fair::mq::ProgOptions& /*config*/)
|
||||
std::unique_ptr<fair::mq::Device> getDevice(const fair::mq::ProgOptions& /*config*/)
|
||||
{
|
||||
return new FairMQMultiplier();
|
||||
return std::make_unique<fair::mq::Multiplier>();
|
||||
}
|
@@ -1,13 +1,13 @@
|
||||
/********************************************************************************
|
||||
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* Copyright (C) 2014-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* *
|
||||
* This software is distributed under the terms of the *
|
||||
* GNU Lesser General Public Licence (LGPL) version 3, *
|
||||
* copied verbatim in the file "LICENSE" *
|
||||
********************************************************************************/
|
||||
|
||||
#include <runFairMQDevice.h>
|
||||
#include <devices/FairMQProxy.h>
|
||||
#include <fairmq/devices/Proxy.h>
|
||||
#include <fairmq/runDevice.h>
|
||||
|
||||
namespace bpo = boost::program_options;
|
||||
|
||||
@@ -19,7 +19,7 @@ void addCustomOptions(bpo::options_description& options)
|
||||
("multipart", bpo::value<bool>()->default_value(true), "Handle multipart payloads");
|
||||
}
|
||||
|
||||
FairMQDevicePtr getDevice(const fair::mq::ProgOptions& /*config*/)
|
||||
std::unique_ptr<fair::mq::Device> getDevice(const fair::mq::ProgOptions& /*config*/)
|
||||
{
|
||||
return new FairMQProxy();
|
||||
return std::make_unique<fair::mq::Proxy>();
|
||||
}
|
@@ -1,13 +1,13 @@
|
||||
/********************************************************************************
|
||||
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* Copyright (C) 2014-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* *
|
||||
* This software is distributed under the terms of the *
|
||||
* GNU Lesser General Public Licence (LGPL) version 3, *
|
||||
* copied verbatim in the file "LICENSE" *
|
||||
********************************************************************************/
|
||||
|
||||
#include <runFairMQDevice.h>
|
||||
#include <devices/FairMQSink.h>
|
||||
#include <fairmq/devices/Sink.h>
|
||||
#include <fairmq/runDevice.h>
|
||||
|
||||
namespace bpo = boost::program_options;
|
||||
|
||||
@@ -21,7 +21,7 @@ void addCustomOptions(bpo::options_description& options)
|
||||
("multipart", bpo::value<bool>()->default_value(false), "Handle multipart payloads");
|
||||
}
|
||||
|
||||
FairMQDevicePtr getDevice(const fair::mq::ProgOptions& /*config*/)
|
||||
std::unique_ptr<fair::mq::Device> getDevice(const fair::mq::ProgOptions& /*config*/)
|
||||
{
|
||||
return new FairMQSink();
|
||||
return std::make_unique<fair::mq::Sink>();
|
||||
}
|
@@ -1,13 +1,13 @@
|
||||
/********************************************************************************
|
||||
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* Copyright (C) 2014-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* *
|
||||
* This software is distributed under the terms of the *
|
||||
* GNU Lesser General Public Licence (LGPL) version 3, *
|
||||
* copied verbatim in the file "LICENSE" *
|
||||
********************************************************************************/
|
||||
|
||||
#include <runFairMQDevice.h>
|
||||
#include <devices/FairMQSplitter.h>
|
||||
#include <fairmq/devices/Splitter.h>
|
||||
#include <fairmq/runDevice.h>
|
||||
|
||||
namespace bpo = boost::program_options;
|
||||
|
||||
@@ -19,7 +19,7 @@ void addCustomOptions(bpo::options_description& options)
|
||||
("multipart", bpo::value<bool>()->default_value(true), "Handle multipart payloads");
|
||||
}
|
||||
|
||||
FairMQDevicePtr getDevice(const fair::mq::ProgOptions& /*config*/)
|
||||
std::unique_ptr<fair::mq::Device> getDevice(const fair::mq::ProgOptions& /*config*/)
|
||||
{
|
||||
return new FairMQSplitter();
|
||||
return std::make_unique<fair::mq::Splitter>();
|
||||
}
|
@@ -1,5 +1,5 @@
|
||||
/********************************************************************************
|
||||
* Copyright (C) 2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* Copyright (C) 2017-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* *
|
||||
* This software is distributed under the terms of the *
|
||||
* GNU Lesser General Public Licence (LGPL) version 3, *
|
||||
@@ -9,4 +9,4 @@
|
||||
// List of all builtin plugin headers (the ones which call REGISTER_FAIRMQ_PLUGIN macro)
|
||||
|
||||
#include <fairmq/plugins/config/Config.h>
|
||||
#include <fairmq/plugins/Control.h>
|
||||
#include <fairmq/plugins/control/Control.h>
|
||||
|
59
fairmq/runDevice.h
Normal file
59
fairmq/runDevice.h
Normal file
@@ -0,0 +1,59 @@
|
||||
/********************************************************************************
|
||||
* Copyright (C) 2017-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* *
|
||||
* This software is distributed under the terms of the *
|
||||
* GNU Lesser General Public Licence (LGPL) version 3, *
|
||||
* copied verbatim in the file "LICENSE" *
|
||||
********************************************************************************/
|
||||
|
||||
#include <fairmq/DeviceRunner.h>
|
||||
#include <boost/program_options.hpp>
|
||||
#include <memory>
|
||||
|
||||
// to be implemented by the user to return a child class of FairMQDevice
|
||||
std::unique_ptr<fair::mq::Device> getDevice(const fair::mq::ProgOptions& config);
|
||||
|
||||
// to be implemented by the user to add custom command line options (or just with empty body)
|
||||
void addCustomOptions(boost::program_options::options_description&);
|
||||
|
||||
int main(int argc, char* argv[])
|
||||
{
|
||||
using namespace fair::mq;
|
||||
using namespace fair::mq::hooks;
|
||||
|
||||
try {
|
||||
DeviceRunner runner(argc, argv);
|
||||
|
||||
// runner.AddHook<LoadPlugins>([](DeviceRunner& r){
|
||||
// // for example:
|
||||
// r.fPluginManager->SetSearchPaths({"/lib", "/lib/plugins"});
|
||||
// r.fPluginManager->LoadPlugin("asdf");
|
||||
// });
|
||||
|
||||
runner.AddHook<SetCustomCmdLineOptions>([](DeviceRunner& r){
|
||||
boost::program_options::options_description customOptions("Custom options");
|
||||
addCustomOptions(customOptions);
|
||||
r.fConfig.AddToCmdLineOptions(customOptions);
|
||||
});
|
||||
|
||||
// runner.AddHook<ModifyRawCmdLineArgs>([](DeviceRunner& r){
|
||||
// // for example:
|
||||
// r.fRawCmdLineArgs.push_back("--blubb");
|
||||
// });
|
||||
|
||||
runner.AddHook<InstantiateDevice>([](DeviceRunner& r){
|
||||
r.fDevice = getDevice(r.fConfig);
|
||||
});
|
||||
|
||||
return runner.Run();
|
||||
|
||||
// Run with builtin catch all exception handler, just:
|
||||
// return runner.RunWithExceptionHandlers();
|
||||
} catch (std::exception& e) {
|
||||
LOG(error) << "Uncaught exception reached the top of main: " << e.what();
|
||||
return 1;
|
||||
} catch (...) {
|
||||
LOG(error) << "Uncaught exception reached the top of main.";
|
||||
return 1;
|
||||
}
|
||||
}
|
@@ -91,6 +91,17 @@ struct SegmentInfo
|
||||
AllocationAlgorithm fAllocationAlgorithm;
|
||||
};
|
||||
|
||||
struct SessionInfo
|
||||
{
|
||||
SessionInfo(const char* sessionName, int creatorId, const VoidAlloc& alloc)
|
||||
: fSessionName(sessionName, alloc)
|
||||
, fCreatorId(creatorId)
|
||||
{}
|
||||
|
||||
Str fSessionName;
|
||||
int fCreatorId;
|
||||
};
|
||||
|
||||
using Uint16SegmentInfoPairAlloc = boost::interprocess::allocator<std::pair<const uint16_t, SegmentInfo>, SegmentManager>;
|
||||
using Uint16SegmentInfoHashMap = boost::unordered_map<uint16_t, SegmentInfo, boost::hash<uint16_t>, std::equal_to<uint16_t>, Uint16SegmentInfoPairAlloc>;
|
||||
// using Uint16SegmentInfoMap = boost::interprocess::map<uint16_t, SegmentInfo, std::less<uint16_t>, Uint16SegmentInfoPairAlloc>;
|
||||
@@ -195,9 +206,9 @@ struct RegionBlock
|
||||
|
||||
// find id for unique shmem name:
|
||||
// a hash of user id + session id, truncated to 8 characters (to accommodate for name size limit on some systems (MacOS)).
|
||||
inline std::string makeShmIdStr(const std::string& sessionId)
|
||||
inline std::string makeShmIdStr(const std::string& sessionId, const std::string& userId)
|
||||
{
|
||||
std::string seed((std::to_string(geteuid()) + sessionId));
|
||||
std::string seed(userId + sessionId);
|
||||
// generate a 8-digit hex value out of sha256 hash
|
||||
std::vector<unsigned char> hash(4);
|
||||
picosha2::hash256(seed.begin(), seed.end(), hash.begin(), hash.end());
|
||||
@@ -205,6 +216,11 @@ inline std::string makeShmIdStr(const std::string& sessionId)
|
||||
return picosha2::bytes_to_hex_string(hash.begin(), hash.end());
|
||||
}
|
||||
|
||||
inline std::string makeShmIdStr(const std::string& sessionId)
|
||||
{
|
||||
return makeShmIdStr(sessionId, std::to_string(geteuid()));
|
||||
}
|
||||
|
||||
inline uint64_t makeShmIdUint64(const std::string& sessionId)
|
||||
{
|
||||
std::string shmId = makeShmIdStr(sessionId);
|
||||
|
@@ -46,6 +46,9 @@
|
||||
#include <utility> // pair
|
||||
#include <vector>
|
||||
|
||||
#include <unistd.h> // getuid
|
||||
#include <sys/types.h> // getuid
|
||||
|
||||
#include <sys/mman.h> // mlock
|
||||
|
||||
namespace fair::mq::shmem
|
||||
@@ -54,8 +57,8 @@ namespace fair::mq::shmem
|
||||
class Manager
|
||||
{
|
||||
public:
|
||||
Manager(std::string shmId, std::string deviceId, size_t size, const ProgOptions* config)
|
||||
: fShmId(std::move(shmId))
|
||||
Manager(const std::string& sessionName, std::string deviceId, size_t size, const ProgOptions* config)
|
||||
: fShmId(makeShmIdStr(sessionName))
|
||||
, fSegmentId(config ? config->GetProperty<uint16_t>("shm-segment-id", 0) : 0)
|
||||
, fDeviceId(std::move(deviceId))
|
||||
, fSegments()
|
||||
@@ -82,6 +85,8 @@ class Manager
|
||||
{
|
||||
using namespace boost::interprocess;
|
||||
|
||||
LOG(debug) << "Generated shmid '" << fShmId << "' out of session id '" << sessionName << "'.";
|
||||
|
||||
bool mlockSegment = false;
|
||||
bool zeroSegment = false;
|
||||
bool autolaunchMonitor = false;
|
||||
@@ -96,17 +101,28 @@ class Manager
|
||||
}
|
||||
|
||||
if (autolaunchMonitor) {
|
||||
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
|
||||
StartMonitor(fShmId);
|
||||
}
|
||||
|
||||
fHeartbeatThread = std::thread(&Manager::SendHeartbeats, this);
|
||||
|
||||
{
|
||||
std::stringstream ss;
|
||||
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
|
||||
|
||||
fShmSegments = fManagementSegment.find_or_construct<Uint16SegmentInfoHashMap>(unique_instance)(fShmVoidAlloc);
|
||||
|
||||
fEventCounter = fManagementSegment.find<EventCounter>(unique_instance).first;
|
||||
SessionInfo* sessionInfo = fManagementSegment.find<SessionInfo>(unique_instance).first;
|
||||
if (sessionInfo) {
|
||||
LOG(debug) << "session info found, name: " << sessionInfo->fSessionName << ", creator id: " << sessionInfo->fCreatorId;
|
||||
} else {
|
||||
LOG(debug) << "no session info found, creating and initializing";
|
||||
sessionInfo = fManagementSegment.construct<SessionInfo>(unique_instance)(sessionName.c_str(), geteuid(), fShmVoidAlloc);
|
||||
LOG(debug) << "initialized session info, name: " << sessionInfo->fSessionName << ", creator id: " << sessionInfo->fCreatorId;
|
||||
}
|
||||
|
||||
fEventCounter = fManagementSegment.find<EventCounter>(unique_instance).first;
|
||||
if (fEventCounter) {
|
||||
LOG(debug) << "event counter found: " << fEventCounter->fCount;
|
||||
} else {
|
||||
@@ -146,8 +162,8 @@ class Manager
|
||||
ss << "Opened ";
|
||||
}
|
||||
ss << "shared memory segment '" << "fmq_" << fShmId << "_m_" << fSegmentId << "'."
|
||||
<< " Size: " << boost::apply_visitor(SegmentSize{}, fSegments.at(fSegmentId)) << " bytes."
|
||||
<< " Available: " << boost::apply_visitor(SegmentFreeMemory{}, fSegments.at(fSegmentId)) << " bytes."
|
||||
<< " Size: " << boost::apply_visitor(SegmentSize(), fSegments.at(fSegmentId)) << " bytes."
|
||||
<< " Available: " << boost::apply_visitor(SegmentFreeMemory(), fSegments.at(fSegmentId)) << " bytes."
|
||||
<< " Allocation algorithm: " << allocationAlgorithm;
|
||||
LOG(debug) << ss.str();
|
||||
} catch(interprocess_exception& bie) {
|
||||
@@ -157,21 +173,20 @@ class Manager
|
||||
|
||||
if (mlockSegment) {
|
||||
LOG(debug) << "Locking the managed segment memory pages...";
|
||||
if (mlock(boost::apply_visitor(SegmentAddress{}, fSegments.at(fSegmentId)), boost::apply_visitor(SegmentSize{}, fSegments.at(fSegmentId))) == -1) {
|
||||
if (mlock(boost::apply_visitor(SegmentAddress(), fSegments.at(fSegmentId)), boost::apply_visitor(SegmentSize(), fSegments.at(fSegmentId))) == -1) {
|
||||
LOG(error) << "Could not lock the managed segment memory. Code: " << errno << ", reason: " << strerror(errno);
|
||||
}
|
||||
LOG(debug) << "Successfully locked the managed segment memory pages.";
|
||||
}
|
||||
if (zeroSegment) {
|
||||
LOG(debug) << "Zeroing the managed segment free memory...";
|
||||
boost::apply_visitor(SegmentMemoryZeroer{}, fSegments.at(fSegmentId));
|
||||
boost::apply_visitor(SegmentMemoryZeroer(), fSegments.at(fSegmentId));
|
||||
LOG(debug) << "Successfully zeroed the managed segment free memory.";
|
||||
}
|
||||
|
||||
fShmRegions = fManagementSegment.find_or_construct<Uint16RegionInfoHashMap>(unique_instance)(fShmVoidAlloc);
|
||||
|
||||
fDeviceCounter = fManagementSegment.find<DeviceCounter>(unique_instance).first;
|
||||
|
||||
if (fDeviceCounter) {
|
||||
LOG(debug) << "device counter found, with value of " << fDeviceCounter->fCount << ". incrementing.";
|
||||
(fDeviceCounter->fCount)++;
|
||||
@@ -187,8 +202,6 @@ class Manager
|
||||
fShmMsgCounters = fManagementSegment.find_or_construct<Uint16MsgCounterHashMap>(unique_instance)(fShmVoidAlloc);
|
||||
#endif
|
||||
}
|
||||
|
||||
fHeartbeatThread = std::thread(&Manager::SendHeartbeats, this);
|
||||
}
|
||||
|
||||
Manager() = delete;
|
||||
@@ -214,8 +227,15 @@ class Manager
|
||||
|
||||
boost::filesystem::path p = boost::process::search_path("fairmq-shmmonitor", ownPath);
|
||||
|
||||
bool verbose = false;
|
||||
if (const char* verboseEnv = getenv("FAIRMQ_SHMMONITOR_VERBOSE")) {
|
||||
if (std::string(verboseEnv) == "true") {
|
||||
verbose = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (!p.empty()) {
|
||||
boost::process::spawn(p, "-x", "--shmid", id, "-d", "-t", "2000", env);
|
||||
boost::process::spawn(p, "-x", "-m", "--shmid", id, "-d", "-t", "2000", verbose ? "--verbose" : "", env);
|
||||
int numTries = 0;
|
||||
do {
|
||||
try {
|
||||
@@ -384,8 +404,8 @@ class Manager
|
||||
info.managed = true;
|
||||
info.id = e.first;
|
||||
info.event = RegionEvent::created;
|
||||
info.ptr = boost::apply_visitor(SegmentAddress{}, fSegments.at(e.first));
|
||||
info.size = boost::apply_visitor(SegmentSize{}, fSegments.at(e.first));
|
||||
info.ptr = boost::apply_visitor(SegmentAddress(), fSegments.at(e.first));
|
||||
info.size = boost::apply_visitor(SegmentSize(), fSegments.at(e.first));
|
||||
result.push_back(info);
|
||||
} catch (const std::out_of_range& oor) {
|
||||
LOG(error) << "could not find segment with id " << e.first;
|
||||
@@ -512,11 +532,11 @@ class Manager
|
||||
|
||||
boost::interprocess::managed_shared_memory::handle_t GetHandleFromAddress(const void* ptr, uint16_t segmentId) const
|
||||
{
|
||||
return boost::apply_visitor(SegmentHandleFromAddress{ptr}, fSegments.at(segmentId));
|
||||
return boost::apply_visitor(SegmentHandleFromAddress(ptr), fSegments.at(segmentId));
|
||||
}
|
||||
void* GetAddressFromHandle(const boost::interprocess::managed_shared_memory::handle_t handle, uint16_t segmentId) const
|
||||
{
|
||||
return boost::apply_visitor(SegmentAddressFromHandle{handle}, fSegments.at(segmentId));
|
||||
return boost::apply_visitor(SegmentAddressFromHandle(handle), fSegments.at(segmentId));
|
||||
}
|
||||
|
||||
char* Allocate(const size_t size, size_t alignment = 0)
|
||||
@@ -529,7 +549,7 @@ class Manager
|
||||
// boost::interprocess::managed_shared_memory::size_type actualSize = size;
|
||||
// char* hint = 0; // unused for boost::interprocess::allocate_new
|
||||
// ptr = fSegments.at(fSegmentId).allocation_command<char>(boost::interprocess::allocate_new, size, actualSize, hint);
|
||||
size_t segmentSize = boost::apply_visitor(SegmentSize{}, fSegments.at(fSegmentId));
|
||||
size_t segmentSize = boost::apply_visitor(SegmentSize(), fSegments.at(fSegmentId));
|
||||
if (size > segmentSize) {
|
||||
throw MessageBadAlloc(tools::ToString("Requested message size (", size, ") exceeds segment size (", segmentSize, ")"));
|
||||
}
|
||||
@@ -541,7 +561,7 @@ class Manager
|
||||
} catch (boost::interprocess::bad_alloc& ba) {
|
||||
// LOG(warn) << "Shared memory full...";
|
||||
if (ThrowingOnBadAlloc()) {
|
||||
throw MessageBadAlloc(tools::ToString("shmem: could not create a message of size ", size, ", alignment: ", (alignment != 0) ? std::to_string(alignment) : "default", ", free memory: ", boost::apply_visitor(SegmentFreeMemory{}, fSegments.at(fSegmentId))));
|
||||
throw MessageBadAlloc(tools::ToString("shmem: could not create a message of size ", size, ", alignment: ", (alignment != 0) ? std::to_string(alignment) : "default", ", free memory: ", boost::apply_visitor(SegmentFreeMemory(), fSegments.at(fSegmentId))));
|
||||
}
|
||||
// rateLimiter.maybe_sleep();
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(50));
|
||||
@@ -569,7 +589,7 @@ class Manager
|
||||
|
||||
void Deallocate(boost::interprocess::managed_shared_memory::handle_t handle, uint16_t segmentId)
|
||||
{
|
||||
boost::apply_visitor(SegmentDeallocate{GetAddressFromHandle(handle, segmentId)}, fSegments.at(segmentId));
|
||||
boost::apply_visitor(SegmentDeallocate(GetAddressFromHandle(handle, segmentId)), fSegments.at(segmentId));
|
||||
#ifdef FAIRMQ_DEBUG_MODE
|
||||
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
|
||||
DecrementShmMsgCounter(segmentId);
|
||||
@@ -583,7 +603,7 @@ class Manager
|
||||
|
||||
char* ShrinkInPlace(size_t newSize, char* localPtr, uint16_t segmentId)
|
||||
{
|
||||
return boost::apply_visitor(SegmentBufferShrink{newSize, localPtr}, fSegments.at(segmentId));
|
||||
return boost::apply_visitor(SegmentBufferShrink(newSize, localPtr), fSegments.at(segmentId));
|
||||
}
|
||||
|
||||
uint16_t GetSegmentId() const { return fSegmentId; }
|
||||
|
@@ -10,7 +10,6 @@
|
||||
#include "Common.h"
|
||||
|
||||
#include <fairmq/tools/Strings.h>
|
||||
#include <fairlogger/Logger.h>
|
||||
|
||||
#include <boost/interprocess/managed_shared_memory.hpp>
|
||||
#include <boost/interprocess/file_mapping.hpp>
|
||||
@@ -21,6 +20,7 @@
|
||||
#include <boost/date_time/posix_time/posix_time.hpp>
|
||||
|
||||
#include <csignal>
|
||||
#include <filesystem>
|
||||
#include <iostream>
|
||||
#include <iomanip>
|
||||
#include <chrono>
|
||||
@@ -71,18 +71,16 @@ void signalHandler(int signal)
|
||||
gSignalStatus = signal;
|
||||
}
|
||||
|
||||
Monitor::Monitor(const string& shmId, bool selfDestruct, bool interactive, bool viewOnly, unsigned int timeoutInMS, unsigned int intervalInMS, bool runAsDaemon, bool cleanOnExit)
|
||||
Monitor::Monitor(const string& shmId, bool selfDestruct, bool interactive, bool viewOnly, unsigned int timeoutInMS, unsigned int intervalInMS, bool monitor, bool cleanOnExit)
|
||||
: fSelfDestruct(selfDestruct)
|
||||
, fInteractive(interactive)
|
||||
, fViewOnly(viewOnly)
|
||||
, fIsDaemon(runAsDaemon)
|
||||
, fMonitor(monitor)
|
||||
, fSeenOnce(false)
|
||||
, fCleanOnExit(cleanOnExit)
|
||||
, fTimeoutInMS(timeoutInMS)
|
||||
, fIntervalInMS(intervalInMS)
|
||||
, fShmId(shmId)
|
||||
, fSegmentName("fmq_" + fShmId + "_m_0")
|
||||
, fManagementSegmentName("fmq_" + fShmId + "_mng")
|
||||
, fControlQueueName("fmq_" + fShmId + "_cq")
|
||||
, fTerminating(false)
|
||||
, fHeartbeatTriggered(false)
|
||||
@@ -94,14 +92,14 @@ Monitor::Monitor(const string& shmId, bool selfDestruct, bool interactive, bool
|
||||
try {
|
||||
bipc::named_mutex monitorStatus(bipc::create_only, string("fmq_" + fShmId + "_ms").c_str());
|
||||
} catch (bie&) {
|
||||
cout << "fairmq-shmmonitor for shared memory id " << fShmId << " already started or not properly exited. Try `fairmq-shmmonitor --cleanup --shmid " << fShmId << "`" << endl;
|
||||
throw DaemonPresent(tools::ToString("fairmq-shmmonitor for shared memory id ", fShmId, " already started or not properly exited."));
|
||||
if (fInteractive) {
|
||||
LOG(error) << "fairmq-shmmonitor for shm id " << fShmId << " is already running. Try `fairmq-shmmonitor --cleanup --shmid " << fShmId << "`, or run in view-only mode (-v)";
|
||||
} else {
|
||||
LOG(error) << "fairmq-shmmonitor for shm id " << fShmId << " is already running. Try `fairmq-shmmonitor --cleanup --shmid " << fShmId << "`";
|
||||
}
|
||||
throw DaemonPresent(tools::ToString("fairmq-shmmonitor (in monitoring mode) for shm id ", fShmId, " is already running."));
|
||||
}
|
||||
}
|
||||
|
||||
Logger::SetConsoleColor(false);
|
||||
Logger::DefineVerbosity(Verbosity::user1, VerbositySpec::Make(VerbositySpec::Info::timestamp_us));
|
||||
Logger::SetVerbosity(Verbosity::verylow);
|
||||
}
|
||||
|
||||
void Monitor::CatchSignals()
|
||||
@@ -116,7 +114,7 @@ void Monitor::SignalMonitor()
|
||||
while (true) {
|
||||
if (gSignalStatus != 0) {
|
||||
fTerminating = true;
|
||||
cout << "signal: " << gSignalStatus << endl;
|
||||
LOG(info) << "signal: " << gSignalStatus;
|
||||
break;
|
||||
} else if (fTerminating) {
|
||||
break;
|
||||
@@ -131,18 +129,13 @@ void Monitor::Run()
|
||||
thread heartbeatThread;
|
||||
if (!fViewOnly) {
|
||||
RemoveQueue(fControlQueueName);
|
||||
heartbeatThread = thread(&Monitor::MonitorHeartbeats, this);
|
||||
heartbeatThread = thread(&Monitor::ReceiveHeartbeats, this);
|
||||
}
|
||||
|
||||
if (fInteractive) {
|
||||
Interactive();
|
||||
} else if (fViewOnly) {
|
||||
CheckSegment();
|
||||
} else {
|
||||
while (!fTerminating) {
|
||||
this_thread::sleep_for(chrono::milliseconds(fIntervalInMS));
|
||||
CheckSegment();
|
||||
}
|
||||
Watch();
|
||||
}
|
||||
|
||||
if (!fViewOnly) {
|
||||
@@ -150,7 +143,165 @@ void Monitor::Run()
|
||||
}
|
||||
}
|
||||
|
||||
void Monitor::MonitorHeartbeats()
|
||||
void Monitor::Watch()
|
||||
{
|
||||
while (!fTerminating) {
|
||||
using namespace boost::interprocess;
|
||||
|
||||
try {
|
||||
managed_shared_memory managementSegment(open_read_only, std::string("fmq_" + fShmId + "_mng").c_str());
|
||||
|
||||
fSeenOnce = true;
|
||||
|
||||
auto now = chrono::high_resolution_clock::now();
|
||||
unsigned int duration = chrono::duration_cast<chrono::milliseconds>(now - fLastHeartbeat).count();
|
||||
|
||||
if (fHeartbeatTriggered && duration > fTimeoutInMS) {
|
||||
// memory is present, but no heartbeats since timeout duration
|
||||
LOG(info) << "no heartbeats since over " << fTimeoutInMS << " milliseconds, cleaning...";
|
||||
Cleanup(ShmId{fShmId});
|
||||
fHeartbeatTriggered = false;
|
||||
if (fSelfDestruct) {
|
||||
LOG(info) << "self destructing (segment has been observed and cleaned up by the monitor)";
|
||||
fTerminating = true;
|
||||
}
|
||||
}
|
||||
} catch (bie&) {
|
||||
fHeartbeatTriggered = false;
|
||||
|
||||
if (fSelfDestruct) {
|
||||
if (fSeenOnce) {
|
||||
// segment has been observed at least once, can self-destruct
|
||||
LOG(info) << "self destructing (segment has been observed and cleaned up orderly)";
|
||||
fTerminating = true;
|
||||
} else {
|
||||
// if self-destruct is requested, and no segment has ever been observed, quit after double timeout duration
|
||||
auto now = chrono::high_resolution_clock::now();
|
||||
unsigned int duration = chrono::duration_cast<chrono::milliseconds>(now - fLastHeartbeat).count();
|
||||
|
||||
if (duration > fTimeoutInMS * 2) {
|
||||
Cleanup(ShmId{fShmId});
|
||||
LOG(info) << "self destructing (no segments observed within (timeout * 2) since start)";
|
||||
fTerminating = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
this_thread::sleep_for(chrono::milliseconds(100));
|
||||
}
|
||||
}
|
||||
|
||||
bool Monitor::PrintShm(const ShmId& shmId)
|
||||
{
|
||||
using namespace boost::interprocess;
|
||||
|
||||
try {
|
||||
managed_shared_memory managementSegment(open_read_only, std::string("fmq_" + shmId.shmId + "_mng").c_str());
|
||||
VoidAlloc allocInstance(managementSegment.get_segment_manager());
|
||||
|
||||
Uint16SegmentInfoHashMap* segmentInfos = managementSegment.find<Uint16SegmentInfoHashMap>(unique_instance).first;
|
||||
std::unordered_map<uint16_t, boost::variant<RBTreeBestFitSegment, SimpleSeqFitSegment>> segments;
|
||||
|
||||
if (!segmentInfos) {
|
||||
LOG(error) << "Found management segment, but cannot locate segment info, something went wrong...";
|
||||
return false;
|
||||
}
|
||||
|
||||
for (const auto& s : *segmentInfos) {
|
||||
if (s.second.fAllocationAlgorithm == AllocationAlgorithm::rbtree_best_fit) {
|
||||
segments.emplace(s.first, RBTreeBestFitSegment(open_read_only, std::string("fmq_" + shmId.shmId + "_m_" + to_string(s.first)).c_str()));
|
||||
} else {
|
||||
segments.emplace(s.first, SimpleSeqFitSegment(open_read_only, std::string("fmq_" + shmId.shmId + "_m_" + to_string(s.first)).c_str()));
|
||||
}
|
||||
}
|
||||
|
||||
unsigned int numDevices = 0;
|
||||
int creatorId = -1;
|
||||
std::string sessionName;
|
||||
|
||||
DeviceCounter* deviceCounter = managementSegment.find<DeviceCounter>(unique_instance).first;
|
||||
if (deviceCounter) {
|
||||
numDevices = deviceCounter->fCount;
|
||||
}
|
||||
SessionInfo* sessionInfo = managementSegment.find<SessionInfo>(unique_instance).first;
|
||||
if (sessionInfo) {
|
||||
creatorId = sessionInfo->fCreatorId;
|
||||
sessionName = sessionInfo->fSessionName;
|
||||
}
|
||||
#ifdef FAIRMQ_DEBUG_MODE
|
||||
Uint16MsgCounterHashMap* msgCounters = managementSegment.find<Uint16MsgCounterHashMap>(unique_instance).first;
|
||||
#endif
|
||||
|
||||
stringstream ss;
|
||||
size_t mfree = managementSegment.get_free_memory();
|
||||
size_t mtotal = managementSegment.get_size();
|
||||
size_t mused = mtotal - mfree;
|
||||
|
||||
ss << "shm id: " << shmId.shmId
|
||||
<< ", session: " << sessionName
|
||||
<< ", creator id: " << creatorId
|
||||
<< ", devices: " << numDevices
|
||||
<< ", segments:\n";
|
||||
|
||||
for (const auto& s : segments) {
|
||||
size_t free = boost::apply_visitor(SegmentFreeMemory(), s.second);
|
||||
size_t total = boost::apply_visitor(SegmentSize(), s.second);
|
||||
size_t used = total - free;
|
||||
ss << " [" << s.first
|
||||
<< "]: total: " << total
|
||||
#ifdef FAIRMQ_DEBUG_MODE
|
||||
<< ", msgs: " << ( (msgCounters != nullptr) ? to_string((*msgCounters)[s.first].fCount) : "unknown")
|
||||
#else
|
||||
<< ", msgs: NODEBUG"
|
||||
#endif
|
||||
<< ", free: " << free
|
||||
<< ", used: " << used
|
||||
<< "\n";
|
||||
}
|
||||
|
||||
ss << " [m]: "
|
||||
<< "total: " << mtotal
|
||||
<< ", free: " << mfree
|
||||
<< ", used: " << mused;
|
||||
LOGV(info, user1) << ss.str();
|
||||
} catch (bie&) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
void Monitor::ListAll(const std::string& path)
|
||||
{
|
||||
try {
|
||||
if (std::filesystem::is_empty(path)) {
|
||||
LOG(info) << "directory " << filesystem::path(path) << " is empty.";
|
||||
return;
|
||||
}
|
||||
|
||||
for (const auto& entry : filesystem::directory_iterator(path)) {
|
||||
string filename = entry.path().filename().string();
|
||||
// LOG(info) << filename << ", size: " << entry.file_size() << " bytes";
|
||||
if (tools::StrStartsWith(filename, "fmq_") || tools::StrStartsWith(filename, "sem.fmq_")) {
|
||||
// LOG(info) << "The file '" << filename << "' belongs to FairMQ.";
|
||||
if (tools::StrEndsWith(filename, "_mng")) {
|
||||
string shmId = filename.substr(4, 8);
|
||||
LOG(info) << "\nFound shmid '" << shmId << "':\n";
|
||||
if (!PrintShm(ShmId{shmId})) {
|
||||
LOG(info) << "could not open file for shmid '" << shmId << "'";
|
||||
}
|
||||
}
|
||||
} else {
|
||||
LOG(info) << "The file '" << filename << "' does not belong to FairMQ, skipping...";
|
||||
}
|
||||
}
|
||||
} catch (filesystem::filesystem_error& fse) {
|
||||
LOG(error) << "error: " << fse.what();
|
||||
}
|
||||
}
|
||||
|
||||
void Monitor::ReceiveHeartbeats()
|
||||
{
|
||||
try {
|
||||
bipc::message_queue mq(bipc::open_or_create, fControlQueueName.c_str(), 1000, 256);
|
||||
@@ -167,11 +318,11 @@ void Monitor::MonitorHeartbeats()
|
||||
string deviceId(msg, recvdSize);
|
||||
fDeviceHeartbeats[deviceId] = fLastHeartbeat;
|
||||
} else {
|
||||
// cout << "control queue timeout" << endl;
|
||||
// LOG(info) << "control queue timeout";
|
||||
}
|
||||
}
|
||||
} catch (bie& ie) {
|
||||
cout << ie.what() << endl;
|
||||
LOG(info) << ie.what();
|
||||
}
|
||||
|
||||
RemoveQueue(fControlQueueName);
|
||||
@@ -186,7 +337,7 @@ void Monitor::Interactive()
|
||||
|
||||
TerminalConfig tcfg;
|
||||
|
||||
cout << endl;
|
||||
LOG(info) << "\n";
|
||||
PrintHelp();
|
||||
|
||||
while (!fTerminating) {
|
||||
@@ -199,30 +350,30 @@ void Monitor::Interactive()
|
||||
|
||||
switch (c) {
|
||||
case 'q':
|
||||
cout << "\n[q] --> quitting." << endl;
|
||||
LOG(info) << "\n[q] --> quitting.";
|
||||
fTerminating = true;
|
||||
break;
|
||||
case 'x':
|
||||
cout << "\n[x] --> closing shared memory:" << endl;
|
||||
LOG(info) << "\n[x] --> closing shared memory:";
|
||||
if (!fViewOnly) {
|
||||
Cleanup(ShmId{fShmId});
|
||||
} else {
|
||||
cout << "cannot close because in view only mode" << endl;
|
||||
LOG(info) << "cannot close because in view only mode";
|
||||
}
|
||||
break;
|
||||
case 'h':
|
||||
cout << "\n[h] --> help:" << endl << endl;
|
||||
LOG(info) << "\n[h] --> help:\n";
|
||||
PrintHelp();
|
||||
cout << endl;
|
||||
LOG(info);
|
||||
break;
|
||||
case '\n':
|
||||
cout << "\n[\\n] --> invalid input." << endl;
|
||||
LOG(info) << "\n[\\n] --> invalid input.";
|
||||
break;
|
||||
case 'b':
|
||||
PrintDebugInfo(ShmId{fShmId});
|
||||
break;
|
||||
default:
|
||||
cout << "\n[" << c << "] --> invalid input." << endl;
|
||||
LOG(info) << "\n[" << c << "] --> invalid input.";
|
||||
break;
|
||||
}
|
||||
|
||||
@@ -235,118 +386,7 @@ void Monitor::Interactive()
|
||||
break;
|
||||
}
|
||||
|
||||
CheckSegment();
|
||||
|
||||
if (!fTerminating) {
|
||||
cout << "\r";
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void Monitor::CheckSegment()
|
||||
{
|
||||
using namespace boost::interprocess;
|
||||
|
||||
try {
|
||||
managed_shared_memory managementSegment(open_only, fManagementSegmentName.c_str());
|
||||
VoidAlloc allocInstance(managementSegment.get_segment_manager());
|
||||
|
||||
Uint16SegmentInfoHashMap* segmentInfos = managementSegment.find<Uint16SegmentInfoHashMap>(unique_instance).first;
|
||||
std::unordered_map<uint16_t, boost::variant<RBTreeBestFitSegment, SimpleSeqFitSegment>> segments;
|
||||
|
||||
if (!segmentInfos) {
|
||||
cout << "Found management segment, but cannot locate segment info, something went wrong..." << endl;
|
||||
return;
|
||||
}
|
||||
|
||||
for (const auto& s : *segmentInfos) {
|
||||
if (s.second.fAllocationAlgorithm == AllocationAlgorithm::rbtree_best_fit) {
|
||||
segments.emplace(s.first, RBTreeBestFitSegment(open_only, std::string("fmq_" + fShmId + "_m_" + to_string(s.first)).c_str()));
|
||||
} else {
|
||||
segments.emplace(s.first, SimpleSeqFitSegment(open_only, std::string("fmq_" + fShmId + "_m_" + to_string(s.first)).c_str()));
|
||||
}
|
||||
}
|
||||
|
||||
fSeenOnce = true;
|
||||
|
||||
unsigned int numDevices = 0;
|
||||
#ifdef FAIRMQ_DEBUG_MODE
|
||||
Uint16MsgCounterHashMap* msgCounters = nullptr;
|
||||
#endif
|
||||
|
||||
if (fInteractive || fViewOnly) {
|
||||
DeviceCounter* dc = managementSegment.find<DeviceCounter>(unique_instance).first;
|
||||
if (dc) {
|
||||
numDevices = dc->fCount;
|
||||
}
|
||||
#ifdef FAIRMQ_DEBUG_MODE
|
||||
msgCounters = managementSegment.find_or_construct<Uint16MsgCounterHashMap>(unique_instance)(allocInstance);
|
||||
#endif
|
||||
}
|
||||
|
||||
auto now = chrono::high_resolution_clock::now();
|
||||
unsigned int duration = chrono::duration_cast<chrono::milliseconds>(now - fLastHeartbeat).count();
|
||||
|
||||
if (fHeartbeatTriggered && duration > fTimeoutInMS) {
|
||||
cout << "no heartbeats since over " << fTimeoutInMS << " milliseconds, cleaning..." << endl;
|
||||
Cleanup(ShmId{fShmId});
|
||||
fHeartbeatTriggered = false;
|
||||
if (fSelfDestruct) {
|
||||
cout << "\nself destructing" << endl;
|
||||
fTerminating = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (fInteractive || fViewOnly) {
|
||||
stringstream ss;
|
||||
size_t mfree = managementSegment.get_free_memory();
|
||||
size_t mtotal = managementSegment.get_size();
|
||||
size_t mused = mtotal - mfree;
|
||||
|
||||
ss << "shm id: " << fShmId
|
||||
<< ", devices: " << numDevices << ", segments:\n";
|
||||
for (const auto& s : segments) {
|
||||
size_t free = boost::apply_visitor(SegmentFreeMemory{}, s.second);
|
||||
size_t total = boost::apply_visitor(SegmentSize{}, s.second);
|
||||
size_t used = total - free;
|
||||
ss << " [" << s.first
|
||||
<< "]: total: " << total
|
||||
#ifdef FAIRMQ_DEBUG_MODE
|
||||
<< ", msgs: " << (*msgCounters)[s.first].fCount
|
||||
#else
|
||||
<< ", msgs: NODEBUG"
|
||||
#endif
|
||||
<< ", free: " << free
|
||||
<< ", used: " << used
|
||||
<< "\n";
|
||||
}
|
||||
ss << " [m]: "
|
||||
<< "total: " << mtotal
|
||||
<< ", free: " << mfree
|
||||
<< ", used: " << mused;
|
||||
LOGV(info, user1) << ss.str();
|
||||
}
|
||||
} catch (bie&) {
|
||||
fHeartbeatTriggered = false;
|
||||
|
||||
auto now = chrono::high_resolution_clock::now();
|
||||
unsigned int duration = chrono::duration_cast<chrono::milliseconds>(now - fLastHeartbeat).count();
|
||||
|
||||
if (fIsDaemon && duration > fTimeoutInMS * 2) {
|
||||
Cleanup(ShmId{fShmId});
|
||||
fHeartbeatTriggered = false;
|
||||
if (fSelfDestruct) {
|
||||
cout << "\nself destructing" << endl;
|
||||
fTerminating = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (fSelfDestruct) {
|
||||
if (fSeenOnce) {
|
||||
cout << "self destructing" << endl;
|
||||
fTerminating = true;
|
||||
}
|
||||
}
|
||||
PrintShm(ShmId{fShmId});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -366,7 +406,7 @@ void Monitor::PrintDebugInfo(const ShmId& shmId __attribute__((unused)))
|
||||
for (const auto& e : *debug) {
|
||||
numMessages += e.second.size();
|
||||
}
|
||||
cout << endl << "found " << numMessages << " messages." << endl;
|
||||
LOG(info) << endl << "found " << numMessages << " messages.";
|
||||
|
||||
for (const auto& s : *debug) {
|
||||
for (const auto& e : s.second) {
|
||||
@@ -375,19 +415,18 @@ void Monitor::PrintDebugInfo(const ShmId& shmId __attribute__((unused)))
|
||||
time_t t = chrono::system_clock::to_time_t(tmpt);
|
||||
uint64_t ms = e.second.fCreationTime % 1000000;
|
||||
auto tm = localtime(&t);
|
||||
cout << "segment: " << setw(3) << setfill(' ') << s.first
|
||||
LOG(info) << "segment: " << setw(3) << setfill(' ') << s.first
|
||||
<< ", offset: " << setw(12) << setfill(' ') << e.first
|
||||
<< ", size: " << setw(10) << setfill(' ') << e.second.fSize
|
||||
<< ", creator PID: " << e.second.fPid << setfill('0')
|
||||
<< ", at: " << setw(2) << tm->tm_hour << ":" << setw(2) << tm->tm_min << ":" << setw(2) << tm->tm_sec << "." << setw(6) << ms << endl;
|
||||
<< ", at: " << setw(2) << tm->tm_hour << ":" << setw(2) << tm->tm_min << ":" << setw(2) << tm->tm_sec << "." << setw(6) << ms;
|
||||
}
|
||||
}
|
||||
cout << setfill(' ');
|
||||
} catch (bie&) {
|
||||
cout << "no segments found" << endl;
|
||||
LOG(info) << "no segments found";
|
||||
}
|
||||
#else
|
||||
cout << "FairMQ was not compiled in debug mode (FAIRMQ_DEBUG_MODE)" << endl;
|
||||
LOG(info) << "FairMQ was not compiled in debug mode (FAIRMQ_DEBUG_MODE)";
|
||||
#endif
|
||||
}
|
||||
|
||||
@@ -419,10 +458,10 @@ unordered_map<uint16_t, std::vector<BufferDebugInfo>> Monitor::GetDebugInfo(cons
|
||||
}
|
||||
}
|
||||
} catch (bie&) {
|
||||
cout << "no segments found" << endl;
|
||||
LOG(info) << "no segments found";
|
||||
}
|
||||
#else
|
||||
cout << "FairMQ was not compiled in debug mode (FAIRMQ_DEBUG_MODE)" << endl;
|
||||
LOG(info) << "FairMQ was not compiled in debug mode (FAIRMQ_DEBUG_MODE)";
|
||||
#endif
|
||||
|
||||
return result;
|
||||
@@ -435,10 +474,10 @@ unordered_map<uint16_t, std::vector<BufferDebugInfo>> Monitor::GetDebugInfo(cons
|
||||
|
||||
void Monitor::PrintHelp()
|
||||
{
|
||||
cout << "controls: [x] close memory, "
|
||||
<< "[b] print a list of allocated messages (only available when compiled with FAIMQ_DEBUG_MODE=ON), "
|
||||
<< "[h] help, "
|
||||
<< "[q] quit." << endl;
|
||||
LOG(info) << "controls: [x] close memory, "
|
||||
<< "[b] print a list of allocated messages (only available when compiled with FAIMQ_DEBUG_MODE=ON), "
|
||||
<< "[h] help, "
|
||||
<< "[q] quit.";
|
||||
}
|
||||
|
||||
|
||||
@@ -446,12 +485,12 @@ std::pair<std::string, bool> RunRemoval(std::function<bool(const std::string&)>
|
||||
{
|
||||
if (f(name)) {
|
||||
if (verbose) {
|
||||
cout << "Successfully removed '" << name << "'." << endl;
|
||||
LOG(info) << "Successfully removed '" << name << "'.";
|
||||
}
|
||||
return {name, true};
|
||||
} else {
|
||||
if (verbose) {
|
||||
cout << "Did not remove '" << name << "'. Already removed?" << endl;
|
||||
LOG(info) << "Did not remove '" << name << "'. Already removed?";
|
||||
}
|
||||
return {name, false};
|
||||
}
|
||||
@@ -468,7 +507,7 @@ std::vector<std::pair<std::string, bool>> Monitor::Cleanup(const ShmId& shmId, b
|
||||
std::vector<std::pair<std::string, bool>> result;
|
||||
|
||||
if (verbose) {
|
||||
cout << "Cleaning up for shared memory id '" << shmId.shmId << "'..." << endl;
|
||||
LOG(info) << "Cleaning up for shared memory id '" << shmId.shmId << "'...";
|
||||
}
|
||||
|
||||
string managementSegmentName("fmq_" + shmId.shmId + "_mng");
|
||||
@@ -479,7 +518,7 @@ std::vector<std::pair<std::string, bool>> Monitor::Cleanup(const ShmId& shmId, b
|
||||
RegionCounter* rc = managementSegment.find<RegionCounter>(bipc::unique_instance).first;
|
||||
if (rc) {
|
||||
if (verbose) {
|
||||
cout << "Region counter found: " << rc->fCount << endl;
|
||||
LOG(info) << "Region counter found: " << rc->fCount;
|
||||
}
|
||||
uint16_t regionCount = rc->fCount;
|
||||
|
||||
@@ -491,7 +530,7 @@ std::vector<std::pair<std::string, bool>> Monitor::Cleanup(const ShmId& shmId, b
|
||||
string path = ri.fPath.c_str();
|
||||
int flags = ri.fFlags;
|
||||
if (verbose) {
|
||||
cout << "Found RegionInfo with path: '" << path << "', flags: " << flags << ", fDestroyed: " << ri.fDestroyed << "." << endl;
|
||||
LOG(info) << "Found RegionInfo with path: '" << path << "', flags: " << flags << ", fDestroyed: " << ri.fDestroyed << ".";
|
||||
}
|
||||
if (path != "") {
|
||||
result.emplace_back(RunRemoval(Monitor::RemoveFileMapping, path + "fmq_" + shmId.shmId + "_rg_" + to_string(i), verbose));
|
||||
@@ -506,12 +545,12 @@ std::vector<std::pair<std::string, bool>> Monitor::Cleanup(const ShmId& shmId, b
|
||||
}
|
||||
} else {
|
||||
if (verbose) {
|
||||
cout << "No region counter found. No regions to cleanup." << endl;
|
||||
LOG(info) << "No region counter found. No regions to cleanup.";
|
||||
}
|
||||
}
|
||||
} catch(out_of_range& oor) {
|
||||
if (verbose) {
|
||||
cout << "Could not locate element in the region map, out of range: " << oor.what() << endl;
|
||||
LOG(info) << "Could not locate element in the region map, out of range: " << oor.what();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -524,7 +563,7 @@ std::vector<std::pair<std::string, bool>> Monitor::Cleanup(const ShmId& shmId, b
|
||||
result.emplace_back(RunRemoval(Monitor::RemoveObject, managementSegmentName.c_str(), verbose));
|
||||
} catch (bie&) {
|
||||
if (verbose) {
|
||||
cout << "Did not find '" << managementSegmentName << "' shared memory segment. No regions to cleanup." << endl;
|
||||
LOG(info) << "Did not find '" << managementSegmentName << "' shared memory segment. No regions to cleanup.";
|
||||
}
|
||||
}
|
||||
|
||||
@@ -538,7 +577,7 @@ std::vector<std::pair<std::string, bool>> Monitor::Cleanup(const SessionId& sess
|
||||
{
|
||||
ShmId shmId{makeShmIdStr(sessionId.sessionId)};
|
||||
if (verbose) {
|
||||
cout << "Cleanup called with session id '" << sessionId.sessionId << "', translating to shared memory id '" << shmId.shmId << "'" << endl;
|
||||
LOG(info) << "Cleanup called with session id '" << sessionId.sessionId << "', translating to shared memory id '" << shmId.shmId << "'";
|
||||
}
|
||||
return Cleanup(shmId, verbose);
|
||||
}
|
||||
@@ -555,7 +594,7 @@ std::vector<std::pair<std::string, bool>> Monitor::CleanupFull(const SessionId&
|
||||
{
|
||||
ShmId shmId{makeShmIdStr(sessionId.sessionId)};
|
||||
if (verbose) {
|
||||
cout << "Cleanup called with session id '" << sessionId.sessionId << "', translating to shared memory id '" << shmId.shmId << "'" << endl;
|
||||
LOG(info) << "Cleanup called with session id '" << sessionId.sessionId << "', translating to shared memory id '" << shmId.shmId << "'";
|
||||
}
|
||||
return CleanupFull(shmId, verbose);
|
||||
}
|
||||
|
@@ -8,6 +8,8 @@
|
||||
#ifndef FAIR_MQ_SHMEM_MONITOR_H_
|
||||
#define FAIR_MQ_SHMEM_MONITOR_H_
|
||||
|
||||
#include <fairlogger/Logger.h>
|
||||
|
||||
#include <thread>
|
||||
#include <chrono>
|
||||
#include <atomic>
|
||||
@@ -82,6 +84,9 @@ class Monitor
|
||||
static std::unordered_map<uint16_t, std::vector<BufferDebugInfo>> GetDebugInfo(const ShmId& shmId);
|
||||
static std::unordered_map<uint16_t, std::vector<BufferDebugInfo>> GetDebugInfo(const SessionId& shmId);
|
||||
|
||||
static bool PrintShm(const ShmId& shmId);
|
||||
static void ListAll(const std::string& path);
|
||||
|
||||
static bool RemoveObject(const std::string& name);
|
||||
static bool RemoveFileMapping(const std::string& name);
|
||||
static bool RemoveQueue(const std::string& name);
|
||||
@@ -92,7 +97,8 @@ class Monitor
|
||||
|
||||
private:
|
||||
void PrintHelp();
|
||||
void MonitorHeartbeats();
|
||||
void Watch();
|
||||
void ReceiveHeartbeats();
|
||||
void CheckSegment();
|
||||
void Interactive();
|
||||
void SignalMonitor();
|
||||
@@ -100,14 +106,12 @@ class Monitor
|
||||
bool fSelfDestruct; // will self-destruct after the memory has been closed
|
||||
bool fInteractive; // running in interactive mode
|
||||
bool fViewOnly; // view only mode
|
||||
bool fIsDaemon;
|
||||
bool fMonitor;
|
||||
bool fSeenOnce; // true is segment has been opened successfully at least once
|
||||
bool fCleanOnExit;
|
||||
unsigned int fTimeoutInMS;
|
||||
unsigned int fIntervalInMS;
|
||||
std::string fShmId;
|
||||
std::string fSegmentName;
|
||||
std::string fManagementSegmentName;
|
||||
std::string fControlQueueName;
|
||||
std::atomic<bool> fTerminating;
|
||||
std::atomic<bool> fHeartbeatTriggered;
|
||||
|
@@ -25,20 +25,32 @@ The shmId is generated out of session id and user id.
|
||||
|
||||
## Shared memory monitor
|
||||
|
||||
The shared memory monitor tool, supplied with the shared memory transport can be used to monitor shared memory use and automatically cleanup shared memory in case of device crashes.
|
||||
The shared memory monitor tool (`fairmq-shmmonitor`) can be used to monitor and cleanup the created shared memory.
|
||||
|
||||
With default arguments the monitor will run indefinitely with no output, and clean up shared memory segment if it is open and no heartbeats from devices arrive within a timeout period. It can be further customized with following parameters:
|
||||
Most commands act for the specified session, identified either via session id (`--session`,`-s`) or shmid (`--shmid`).
|
||||
|
||||
`--session <arg>`: for which session to run the monitor (default is "default"). The actual ressource names will be built out of session id, user id (hashed and truncated).
|
||||
`--cleanup`: start monitor, perform cleanup of the memory and quit.
|
||||
`--shmid <arg>`: if provided, this shmem id will be used instead of the one generated from session id. Use this if you know the name of the shared memory ressource, but do not have the used session id.
|
||||
`--self-destruct`: run until the memory segment is closed (either naturally via cleanup performed by devices or in case of a crash (no heartbeats within timeout)).
|
||||
`--interactive`: run interactively, with detailed segment details and user input for various shmem operations.
|
||||
`--timeout <arg>`: specifiy the timeout for the heartbeats from shmem transports in milliseconds (default 5000).
|
||||
The monitor runs in one of the following modes:
|
||||
|
||||
| command | action |
|
||||
| --------------------------- | ---------------------------------------------- |
|
||||
| no args | Print segment info of the specified session/shm ID and exit. |
|
||||
| `--view`,`-v` | Print segment info of the specified session/shm ID and exit. |
|
||||
| `--interactive`,`-i` | Print segment info of the specified session/shm ID and exit at a given interval (`--interval`), with some keyboard controls. Can be combined with `--view` for read-only access (and avoid receiving heartbeats). |
|
||||
| `--monitor`,`-m` | Monitor the session shm usage by receiving heartbeats from shmem users, cleaning it up if no heartbeats arrived within configured timeout (`--timeout`/`-t`). Only one heartbeat receiver per session is currently possible. If `--self-destruct`/`-x` is added, monitor will exit either when (a) no shm has been observed for interval * 2, (b) a cleanup due to reached timeout has been performed, or (c) shm has been observed, but is now cleaned up. |
|
||||
| `--cleanup`,`-c` | Cleanup the shm for the specified session and exit. |
|
||||
| `--debug`,`-b` | Print the list of messages in the current session and exit. Only availabe when FairMQ is compiled with `FAIRMQ_DEBUG_MODE=ON` (high performance impact). |
|
||||
| `--get-shmid` | Translate given session id and user id (`--user-id`) to a shmem id (uses current user id if none provided) and exit. |
|
||||
| `--list-all` | Print segment info for all sessions present on the system and exit. |
|
||||
|
||||
Additional cmd options:
|
||||
|
||||
| command | action |
|
||||
| --------------------------- | ---------------------------------------------- |
|
||||
| `--cleanup-on-exit` | Perform a cleanup on exit, when running in monitoring or interactive mode. |
|
||||
| `--daemonize`,`-d` | Can be combined with the monitoring mode to detach the process from the parent. |
|
||||
| `--verbose`,`-d` | When running as a daemon, store monitor output in `fairmq-shmmonitor_<timestamp>.log` |
|
||||
|
||||
The options can be combined, with the exception of `--cleanup` option, which will invoke the described behaviour independent of other options.
|
||||
Without the `--self-destruct` option, the monitor will run continuously, moitoring (and cleaning up if needed) consecutive topologies.
|
||||
|
||||
Possible further implementation would be to run the monitor with `--self-destruct` with each topology.
|
||||
|
||||
The Monitor class can also be used independently from the supplied executable (built from `runMonitor.cxx`), allowing integration on any level. For example invoking the monitor could be a functionality that a device offers.
|
||||
The Monitor class can also be used independently from the supplied executable, allowing integration on any level.
|
||||
|
@@ -68,9 +68,6 @@ class TransportFactory final : public fair::mq::TransportFactory
|
||||
throw SharedMemoryError(tools::ToString("Provided shared memory allocation algorithm '", allocationAlgorithm, "' is not supported. Supported are 'rbtree_best_fit'/'simple_seq_fit'"));
|
||||
}
|
||||
|
||||
std::string shmId = makeShmIdStr(sessionName);
|
||||
LOG(debug) << "Generated shmid '" << shmId << "' out of session id '" << sessionName << "'.";
|
||||
|
||||
try {
|
||||
if (zmq_ctx_set(fZmqCtx, ZMQ_IO_THREADS, numIoThreads) != 0) {
|
||||
LOG(error) << "failed configuring context, reason: " << zmq_strerror(errno);
|
||||
@@ -81,7 +78,7 @@ class TransportFactory final : public fair::mq::TransportFactory
|
||||
LOG(error) << "failed configuring context, reason: " << zmq_strerror(errno);
|
||||
}
|
||||
|
||||
fManager = std::make_unique<Manager>(shmId, deviceId, segmentSize, config);
|
||||
fManager = std::make_unique<Manager>(sessionName, deviceId, segmentSize, config);
|
||||
} catch (boost::interprocess::interprocess_exception& e) {
|
||||
LOG(error) << "Could not initialize shared memory transport: " << e.what();
|
||||
throw std::runtime_error(tools::ToString("Could not initialize shared memory transport: ", e.what()));
|
||||
|
@@ -69,6 +69,10 @@ static void daemonize()
|
||||
int main(int argc, char** argv)
|
||||
{
|
||||
try {
|
||||
fair::Logger::SetConsoleColor(false);
|
||||
fair::Logger::DefineVerbosity(fair::Verbosity::user1, fair::VerbositySpec::Make(fair::VerbositySpec::Info::timestamp_us));
|
||||
fair::Logger::SetVerbosity(fair::Verbosity::verylow);
|
||||
|
||||
string sessionName;
|
||||
string shmId;
|
||||
bool cleanup = false;
|
||||
@@ -78,8 +82,14 @@ int main(int argc, char** argv)
|
||||
unsigned int timeoutInMS = 5000;
|
||||
unsigned int intervalInMS = 100;
|
||||
bool runAsDaemon = false;
|
||||
bool monitor = false;
|
||||
bool debug = false;
|
||||
bool cleanOnExit = false;
|
||||
bool getShmId = false;
|
||||
bool listAll = false;
|
||||
string listAllPath;
|
||||
bool verbose = false;
|
||||
int userId = -1;
|
||||
|
||||
options_description desc("Options");
|
||||
desc.add_options()
|
||||
@@ -90,27 +100,40 @@ int main(int argc, char** argv)
|
||||
("interactive,i" , value<bool>(&interactive)->implicit_value(true), "Interactive run")
|
||||
("view,v" , value<bool>(&viewOnly)->implicit_value(true), "Run in view only mode")
|
||||
("timeout,t" , value<unsigned int>(&timeoutInMS)->default_value(5000), "Heartbeat timeout in milliseconds")
|
||||
("daemonize,d" , value<bool>(&runAsDaemon)->implicit_value(true), "Daemonize the monitor")
|
||||
("debug,b" , value<bool>(&debug)->implicit_value(true), "Debug - Print a list of messages)")
|
||||
("daemonize,d" , value<bool>(&runAsDaemon)->implicit_value(true), "Daemonize the monitor process (only in monitoring mode)")
|
||||
("monitor,m" , value<bool>(&monitor)->implicit_value(true), "Run in monitoring mode")
|
||||
("debug,b" , value<bool>(&debug)->implicit_value(true), "Debug - Print a list of messages)")
|
||||
("clean-on-exit,e", value<bool>(&cleanOnExit)->implicit_value(true), "Perform cleanup on exit")
|
||||
("interval" , value<unsigned int>(&intervalInMS)->default_value(100), "Output interval for interactive/view-only mode")
|
||||
("help,h", "Print help");
|
||||
("interval" , value<unsigned int>(&intervalInMS)->default_value(1000),"Output interval for interactive mode")
|
||||
("get-shmid" , value<bool>(&getShmId)->implicit_value(true), "Translate given session id and user id to a shmem id (uses current user id if none provided)")
|
||||
("list-all" , value<bool>(&listAll)->implicit_value(true), "List all sessions & segments")
|
||||
("list-all-path" , value<string>(&listAllPath)->default_value("/dev/shm/"),"Path for the --list-all command to search segments in")
|
||||
("verbose" , value<bool>(&verbose)->implicit_value(true), "Verbose mode (daemon will output to a file 'fairmq-shmmonitor_<timestamp>')")
|
||||
("user-id" , value<int>(&userId)->default_value(-1), "User id (used with --get-shmid)")
|
||||
("help,h", "Print help");
|
||||
|
||||
variables_map vm;
|
||||
store(parse_command_line(argc, argv, desc), vm);
|
||||
|
||||
if (vm.count("help")) {
|
||||
cout << "FairMQ Shared Memory Monitor" << endl << desc << endl;
|
||||
LOG(info) << "FairMQ Shared Memory Monitor" << "\n" << desc;
|
||||
return 0;
|
||||
}
|
||||
|
||||
notify(vm);
|
||||
|
||||
if (runAsDaemon) {
|
||||
daemonize();
|
||||
if (getShmId) {
|
||||
if (userId == -1) {
|
||||
LOG(info) << "shmem id for session '" << sessionName << "' and current user id " << geteuid()
|
||||
<< " is: " << makeShmIdStr(sessionName);
|
||||
} else {
|
||||
LOG(info) << "shmem id for session '" << sessionName << "' and user id " << userId
|
||||
<< " is: " << makeShmIdStr(sessionName, to_string(userId));
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (shmId == "") {
|
||||
if (shmId.empty()) {
|
||||
shmId = makeShmIdStr(sessionName);
|
||||
}
|
||||
|
||||
@@ -124,18 +147,39 @@ int main(int argc, char** argv)
|
||||
return 0;
|
||||
}
|
||||
|
||||
cout << "Starting shared memory monitor for session: \"" << sessionName << "\" (shmId: " << shmId << ")..." << endl;
|
||||
|
||||
Monitor monitor(shmId, selfDestruct, interactive, viewOnly, timeoutInMS, intervalInMS, runAsDaemon, cleanOnExit);
|
||||
|
||||
if (interactive || !viewOnly) {
|
||||
monitor.CatchSignals();
|
||||
if (listAll) {
|
||||
Monitor::ListAll(listAllPath);
|
||||
return 0;
|
||||
}
|
||||
monitor.Run();
|
||||
|
||||
if (!viewOnly && !interactive && !monitor) {
|
||||
// if neither of the run modes are selected, use view only mode.
|
||||
viewOnly = true;
|
||||
}
|
||||
|
||||
if (viewOnly && !interactive) {
|
||||
if (!Monitor::PrintShm(ShmId{shmId})) {
|
||||
LOG(info) << "No segments found.";
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (runAsDaemon && monitor) {
|
||||
if (verbose) {
|
||||
fair::Logger::InitFileSink("trace", "fairmq-shmmonitor");
|
||||
}
|
||||
daemonize();
|
||||
}
|
||||
|
||||
LOG(info) << "Starting shared memory monitor for session: \"" << sessionName << "\" (shm id: " << shmId << ")...";
|
||||
|
||||
Monitor shmmonitor(shmId, selfDestruct, interactive, viewOnly, timeoutInMS, intervalInMS, monitor, cleanOnExit);
|
||||
shmmonitor.CatchSignals();
|
||||
shmmonitor.Run();
|
||||
} catch (Monitor::DaemonPresent& dp) {
|
||||
return 0;
|
||||
} catch (exception& e) {
|
||||
cerr << "Unhandled Exception reached the top of main: " << e.what() << ", application will now exit" << endl;
|
||||
LOG(error) << "Unhandled Exception reached the top of main: " << e.what() << ", application will now exit";
|
||||
return 2;
|
||||
}
|
||||
|
||||
|
@@ -39,6 +39,24 @@ inline auto ToStrVector(const int argc, char*const* argv, const bool dropProgram
|
||||
}
|
||||
}
|
||||
|
||||
inline bool StrStartsWith(std::string const& str, std::string const& start)
|
||||
{
|
||||
if (str.length() >= start.length()) {
|
||||
return (0 == str.compare(0, start.length(), start));
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
inline bool StrEndsWith(std::string const& str, std::string const& end)
|
||||
{
|
||||
if (str.length() >= end.length()) {
|
||||
return (0 == str.compare(str.length() - end.length(), end.length(), end));
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace fair::mq::tools
|
||||
|
||||
#endif /* FAIR_MQ_TOOLS_STRINGS_H */
|
||||
|
@@ -1,10 +1,11 @@
|
||||
/********************************************************************************
|
||||
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* Copyright (C) 2014-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* *
|
||||
* This software is distributed under the terms of the *
|
||||
* GNU Lesser General Public Licence (LGPL) version 3, *
|
||||
* copied verbatim in the file "LICENSE" *
|
||||
********************************************************************************/
|
||||
|
||||
#include <fairmq/tools/Unique.h>
|
||||
|
||||
#include <boost/program_options.hpp>
|
Reference in New Issue
Block a user