Compare commits

..

No commits in common. "master" and "v1.8.1" have entirely different histories.

52 changed files with 255 additions and 578 deletions

View File

@ -1,5 +0,0 @@
{
"image": "ghcr.io/fairrootgroup/fairmq-dev/fedora-38:latest",
"features": {
}
}

View File

@ -1,12 +0,0 @@
version: 2
updates:
- package-ecosystem: "github-actions"
directory: "/"
target-branch: "dev"
schedule:
interval: "monthly"
- package-ecosystem: "gitsubmodule"
directory: "/"
target-branch: "dev"
schedule:
interval: "monthly"

View File

@ -1,29 +0,0 @@
# SPDX-FileCopyrightText: 2023 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH, Darmstadt, Germany
#
# SPDX-License-Identifier: CC0-1.0
name: Check AUTHORS and CONTRIBUTORS in metadata
on:
push:
paths:
- AUTHORS
- CONTRIBUTORS
- codemeta.json
- .zenodo.json
pull_request:
paths:
- AUTHORS
- CONTRIBUTORS
- codemeta.json
- .zenodo.json
jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Try updating metadata
run: python meta_update.py
- name: Check for Updates
run: git diff --exit-code

View File

@ -16,6 +16,6 @@ jobs:
container:
image: gitlab-registry.in2p3.fr/escape2020/wp3/eossr:v1.0
steps:
- uses: actions/checkout@v4
- uses: actions/checkout@v3
- name: validate codemeta
run: eossr-metadata-validator codemeta.json

View File

@ -1,5 +1,5 @@
################################################################################
# Copyright (C) 2018-2024 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
# Copyright (C) 2018-2023 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
# #
# This software is distributed under the terms of the #
# GNU Lesser General Public Licence (LGPL) version 3, #
@ -8,7 +8,8 @@
# Project ######################################################################
cmake_minimum_required(VERSION 3.15...3.30 FATAL_ERROR)
cmake_minimum_required(VERSION 3.15 FATAL_ERROR)
cmake_policy(VERSION 3.15...3.26)
list(PREPEND CMAKE_MODULE_PATH ${CMAKE_SOURCE_DIR}/cmake)
include(GitHelper)

10
Jenkinsfile vendored
View File

@ -43,7 +43,7 @@ def jobMatrix(String type, List specs) {
sh "bash ${jobscript}"
} else { // selector == "slurm"
def imageurl = "oras://ghcr.io/fairrootgroup/fairmq-dev/${os}-${ver}-sif:latest"
def execopts = "--ipc --uts --pid -B/shared"
def execopts = "--net --ipc --uts --pid -B/shared"
def containercmd = "singularity exec ${execopts} ${imageurl} bash -l -c \\\"${ctestcmd} ${extra}\\\""
sh """\
echo \"echo \\\"*** Job started at .......: \\\$(date -R)\\\"\" >> ${jobscript}
@ -87,18 +87,14 @@ pipeline{
def builds = jobMatrix('build', [
[os: 'ubuntu', ver: '20.04', arch: 'x86_64', compiler: 'gcc-9'],
[os: 'ubuntu', ver: '22.04', arch: 'x86_64', compiler: 'gcc-11'],
[os: 'ubuntu', ver: '24.04', arch: 'x86_64', compiler: 'gcc-13'],
[os: 'fedora', ver: '33', arch: 'x86_64', compiler: 'gcc-10'],
[os: 'fedora', ver: '34', arch: 'x86_64', compiler: 'gcc-11'],
[os: 'fedora', ver: '35', arch: 'x86_64', compiler: 'gcc-11'],
[os: 'fedora', ver: '36', arch: 'x86_64', compiler: 'gcc-12'],
[os: 'fedora', ver: '37', arch: 'x86_64', compiler: 'gcc-12'],
[os: 'fedora', ver: '38', arch: 'x86_64', compiler: 'gcc-13'],
[os: 'fedora', ver: '39', arch: 'x86_64', compiler: 'gcc-13'],
[os: 'fedora', ver: '40', arch: 'x86_64', compiler: 'gcc-14'],
[os: 'macos', ver: '14', arch: 'x86_64', compiler: 'apple-clang-16'],
[os: 'macos', ver: '15', arch: 'x86_64', compiler: 'apple-clang-16'],
[os: 'macos', ver: '15', arch: 'arm64', compiler: 'apple-clang-16'],
[os: 'macos', ver: '12', arch: 'x86_64', compiler: 'apple-clang-14'],
[os: 'macos', ver: '13', arch: 'arm64', compiler: 'apple-clang-14'],
])
def all_debug = "-DCMAKE_BUILD_TYPE=Debug"

View File

@ -45,9 +45,9 @@ Recommended:
```bash
git clone https://github.com/FairRootGroup/FairMQ fairmq_source
cmake -S fairmq_source -B fairmq_build -GNinja -DCMAKE_BUILD_TYPE=Release [-DBUILD_TESTING=ON]
cmake -S fairmq_source -B fairmq_build -GNinja -DCMAKE_BUILD_TYPE=Release
cmake --build fairmq_build
[ctest --test-dir fairmq_build --output-on-failure --schedule-random -j<ncpus>] # needs -DBUILD_TESTING=ON
ctest --test-dir fairmq_build --output-on-failure --schedule-random -j<ncpus>
cmake --install fairmq_build --prefix $(pwd)/fairmq_install
```
@ -56,24 +56,6 @@ Please consult the [manpages of your CMake version](https://cmake.org/cmake/help
If dependencies are not installed in standard system directories, you can hint the installation location via
`-DCMAKE_PREFIX_PATH=...` or per dependency via `-D{DEPENDENCY}_ROOT=...` (`*_ROOT` variables can also be environment variables).
## Installation via Spack
Prerequisite: [Spack](https://spack.readthedocs.io/en/latest/getting_started.html)
```bash
spack info fairmq # inspect build options
spack install fairmq # build latest packaged version with default options
```
Build FairMQ's dependencies via Spack for development:
```bash
git clone -b dev https://github.com/FairRootGroup/FairMQ fairmq_source
spack --env fairmq_source install # installs deps declared in fairmq_source/spack.yaml
spack env activate fairmq_source # sets $CMAKE_PREFIX_PATH which is used by CMake to find FairMQ's deps
cmake -S fairmq_source -B fairmq_build -GNinja -DCMAKE_BUILD_TYPE=Debug -DBUILD_TESTING=ON
# develop, compile, test
spack env deactivate # at end of dev session, or simply close the shell
```
## Usage

View File

@ -1,5 +1,5 @@
################################################################################
# Copyright (C) 2018-2025 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
# Copyright (C) 2018-2023 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
# #
# This software is distributed under the terms of the #
# GNU Lesser General Public Licence (LGPL) version 3, #
@ -41,7 +41,7 @@ if(BUILD_TESTING)
endif()
find_package2(BUNDLED GTest REQUIRED)
if(GTest_BUNDLED)
set(GTest_VERSION "Dec 26 2024 @7d76a23")
set(GTest_VERSION "Apr 8 2022 @a1cc8c55")
set(GTest_PREFIX "<bundled>")
endif()
endif()

View File

@ -61,7 +61,7 @@ function(add_example)
set(FAIRMQ_BIN_DIR ${CMAKE_BINARY_DIR}/fairmq)
foreach(script IN LISTS scripts)
set(script_file "${script_prefix}-${script}.sh")
configure_file("${CMAKE_CURRENT_SOURCE_DIR}/${script_file}.in" "${CMAKE_CURRENT_BINARY_DIR}/${script_file}" @ONLY)
configure_file("${CMAKE_CURRENT_SOURCE_DIR}/${script_file}.in" "${CMAKE_CURRENT_BINARY_DIR}/${script_file}")
endforeach()
if(ARG_CONFIG)
@ -119,7 +119,7 @@ function(add_example)
set(FAIRMQ_BIN_DIR ${CMAKE_INSTALL_PREFIX}/${PROJECT_INSTALL_BINDIR}/fairmq)
foreach(script IN LISTS scripts)
set(script_file "${script_prefix}-${script}.sh")
configure_file("${CMAKE_CURRENT_SOURCE_DIR}/${script_file}.in" "${CMAKE_CURRENT_BINARY_DIR}/${script_file}_install" @ONLY)
configure_file("${CMAKE_CURRENT_SOURCE_DIR}/${script_file}.in" "${CMAKE_CURRENT_BINARY_DIR}/${script_file}_install")
install(
PROGRAMS "${CMAKE_CURRENT_BINARY_DIR}/${script_file}_install"
DESTINATION ${PROJECT_INSTALL_BINDIR}

View File

@ -8,5 +8,5 @@
add_example(NAME region
DEVICE sampler processor sink keep-alive
SCRIPT region region-advanced region-advanced-external
SCRIPT region region-advanced
)

View File

@ -1,95 +0,0 @@
#!/bin/bash
export FAIRMQ_PATH=@FAIRMQ_BIN_DIR@
transport=${1:-shmem}
msgSize=${2:-1000000}
SAMPLER="fairmq-ex-region-sampler"
SAMPLER+=" --id sampler1"
# SAMPLER+=" --sampling-rate 10"
SAMPLER+=" --severity debug"
SAMPLER+=" --msg-size $msgSize"
SAMPLER+=" --transport $transport"
SAMPLER+=" --shmid 1"
SAMPLER+=" --shm-monitor false"
SAMPLER+=" --rc-segment-size 200000000"
SAMPLER+=" --external-region true"
SAMPLER+=" --shm-no-cleanup true"
SAMPLER+=" --chan-name data1"
SAMPLER+=" --channel-config name=data1,type=push,method=bind,address=tcp://127.0.0.1:7777"
xterm -geometry 90x60+0+0 -hold -e @EX_BIN_DIR@/$SAMPLER &
PROCESSOR1="fairmq-ex-region-processor"
PROCESSOR1+=" --id processor1"
PROCESSOR1+=" --severity debug"
PROCESSOR1+=" --transport $transport"
PROCESSOR1+=" --shmid 1"
PROCESSOR1+=" --shm-segment-id 1"
PROCESSOR1+=" --shm-monitor false"
PROCESSOR1+=" --shm-no-cleanup true"
PROCESSOR1+=" --channel-config name=data1,type=pull,method=connect,address=tcp://127.0.0.1:7777"
PROCESSOR1+=" name=data2,type=push,method=bind,address=tcp://127.0.0.1:7778"
PROCESSOR1+=" name=data3,type=push,method=bind,address=tcp://127.0.0.1:7779"
xterm -geometry 90x40+550+40 -hold -e @EX_BIN_DIR@/$PROCESSOR1 &
PROCESSOR2="fairmq-ex-region-processor"
PROCESSOR2+=" --id processor2"
PROCESSOR2+=" --severity debug"
PROCESSOR2+=" --transport $transport"
PROCESSOR2+=" --shmid 1"
PROCESSOR2+=" --shm-segment-id 2"
PROCESSOR2+=" --shm-monitor false"
PROCESSOR2+=" --shm-no-cleanup true"
PROCESSOR2+=" --channel-config name=data1,type=pull,method=connect,address=tcp://127.0.0.1:7777"
PROCESSOR2+=" name=data2,type=push,method=bind,address=tcp://127.0.0.1:7788"
PROCESSOR2+=" name=data3,type=push,method=bind,address=tcp://127.0.0.1:7789"
xterm -geometry 90x40+550+600 -hold -e @EX_BIN_DIR@/$PROCESSOR2 &
SINK1_1="fairmq-ex-region-sink"
SINK1_1+=" --id sink1_1"
SINK1_1+=" --severity debug"
SINK1_1+=" --chan-name data2"
SINK1_1+=" --transport $transport"
SINK1_1+=" --shmid 1"
SINK1_1+=" --shm-segment-id 1"
SINK1_1+=" --shm-monitor false"
SINK1_1+=" --shm-no-cleanup true"
SINK1_1+=" --channel-config name=data2,type=pull,method=connect,address=tcp://127.0.0.1:7778"
xterm -geometry 90x20+1100+0 -hold -e @EX_BIN_DIR@/$SINK1_1 &
SINK1_2="fairmq-ex-region-sink"
SINK1_2+=" --id sink1_2"
SINK1_2+=" --severity debug"
SINK1_2+=" --chan-name data3"
SINK1_2+=" --transport $transport"
SINK1_2+=" --shmid 1"
SINK1_2+=" --shm-segment-id 1"
SINK1_2+=" --shm-monitor false"
SINK1_2+=" --shm-no-cleanup true"
SINK1_2+=" --channel-config name=data3,type=pull,method=connect,address=tcp://127.0.0.1:7779"
xterm -geometry 90x20+1100+300 -hold -e @EX_BIN_DIR@/$SINK1_2 &
SINK2_1="fairmq-ex-region-sink"
SINK2_1+=" --id sink2_1"
SINK2_1+=" --severity debug"
SINK2_1+=" --chan-name data2"
SINK2_1+=" --transport $transport"
SINK2_1+=" --shmid 1"
SINK2_1+=" --shm-segment-id 2"
SINK2_1+=" --shm-monitor false"
SINK2_1+=" --shm-no-cleanup true"
SINK2_1+=" --channel-config name=data2,type=pull,method=connect,address=tcp://127.0.0.1:7788"
xterm -geometry 90x20+1100+600 -hold -e @EX_BIN_DIR@/$SINK2_1 &
SINK2_2="fairmq-ex-region-sink"
SINK2_2+=" --id sink2_2"
SINK2_2+=" --severity debug"
SINK2_2+=" --chan-name data3"
SINK2_2+=" --transport $transport"
SINK2_2+=" --shmid 1"
SINK2_2+=" --shm-segment-id 2"
SINK2_2+=" --shm-monitor false"
SINK2_2+=" --shm-no-cleanup true"
SINK2_2+=" --channel-config name=data3,type=pull,method=connect,address=tcp://127.0.0.1:7789"
xterm -geometry 90x20+1100+900 -hold -e @EX_BIN_DIR@/$SINK2_2 &

View File

@ -2,8 +2,16 @@
export FAIRMQ_PATH=@FAIRMQ_BIN_DIR@
transport=${1:-shmem}
msgSize=${2:-1000000}
transport="shmem"
msgSize="1000000"
if [[ $1 =~ ^[a-z]+$ ]]; then
transport=$1
fi
if [[ $2 =~ ^[0-9]+$ ]]; then
msgSize=$1
fi
SAMPLER="fairmq-ex-region-sampler"
SAMPLER+=" --id sampler1"
@ -11,70 +19,35 @@ SAMPLER+=" --id sampler1"
SAMPLER+=" --severity debug"
SAMPLER+=" --msg-size $msgSize"
SAMPLER+=" --transport $transport"
#SAMPLER+=" --rc-segment-size 0"
SAMPLER+=" --shm-monitor true"
SAMPLER+=" --chan-name data1"
SAMPLER+=" --channel-config name=data1,type=push,method=bind,address=tcp://127.0.0.1:7777"
xterm -geometry 90x60+0+0 -hold -e @EX_BIN_DIR@/$SAMPLER &
xterm -geometry 120x60+0+0 -hold -e @EX_BIN_DIR@/$SAMPLER &
PROCESSOR1="fairmq-ex-region-processor"
PROCESSOR1+=" --id processor1"
PROCESSOR1+=" --severity debug"
PROCESSOR1+=" --transport $transport"
PROCESSOR1+=" --shm-segment-id 1"
PROCESSOR1+=" --shm-monitor true"
PROCESSOR1+=" --channel-config name=data1,type=pull,method=connect,address=tcp://127.0.0.1:7777"
PROCESSOR1+=" name=data2,type=push,method=bind,address=tcp://127.0.0.1:7778"
PROCESSOR1+=" name=data3,type=push,method=bind,address=tcp://127.0.0.1:7779"
xterm -geometry 90x40+550+40 -hold -e @EX_BIN_DIR@/$PROCESSOR1 &
PROCESSOR="fairmq-ex-region-processor"
PROCESSOR+=" --id processor1"
PROCESSOR+=" --severity debug"
PROCESSOR+=" --transport $transport"
PROCESSOR+=" --shm-monitor true"
PROCESSOR+=" --channel-config name=data1,type=pull,method=connect,address=tcp://127.0.0.1:7777"
PROCESSOR+=" name=data2,type=push,method=bind,address=tcp://127.0.0.1:7778"
PROCESSOR+=" name=data3,type=push,method=bind,address=tcp://127.0.0.1:7779"
xterm -geometry 120x60+750+0 -hold -e @EX_BIN_DIR@/$PROCESSOR &
PROCESSOR2="fairmq-ex-region-processor"
PROCESSOR2+=" --id processor2"
PROCESSOR2+=" --severity debug"
PROCESSOR2+=" --transport $transport"
PROCESSOR2+=" --shm-segment-id 2"
PROCESSOR2+=" --shm-monitor true"
PROCESSOR2+=" --channel-config name=data1,type=pull,method=connect,address=tcp://127.0.0.1:7777"
PROCESSOR2+=" name=data2,type=push,method=bind,address=tcp://127.0.0.1:7788"
PROCESSOR2+=" name=data3,type=push,method=bind,address=tcp://127.0.0.1:7789"
xterm -geometry 90x40+550+600 -hold -e @EX_BIN_DIR@/$PROCESSOR2 &
SINK1="fairmq-ex-region-sink"
SINK1+=" --id sink1"
SINK1+=" --severity debug"
SINK1+=" --chan-name data2"
SINK1+=" --transport $transport"
SINK1+=" --shm-monitor true"
SINK1+=" --channel-config name=data2,type=pull,method=connect,address=tcp://127.0.0.1:7778"
xterm -geometry 120x32+1500+0 -hold -e @EX_BIN_DIR@/$SINK1 &
SINK1_1="fairmq-ex-region-sink"
SINK1_1+=" --id sink1_1"
SINK1_1+=" --severity debug"
SINK1_1+=" --chan-name data2"
SINK1_1+=" --transport $transport"
SINK1_1+=" --shm-segment-id 1"
SINK1_1+=" --shm-monitor true"
SINK1_1+=" --channel-config name=data2,type=pull,method=connect,address=tcp://127.0.0.1:7778"
xterm -geometry 90x20+1100+0 -hold -e @EX_BIN_DIR@/$SINK1_1 &
SINK1_2="fairmq-ex-region-sink"
SINK1_2+=" --id sink1_2"
SINK1_2+=" --severity debug"
SINK1_2+=" --chan-name data3"
SINK1_2+=" --transport $transport"
SINK1_2+=" --shm-segment-id 1"
SINK1_2+=" --shm-monitor true"
SINK1_2+=" --channel-config name=data3,type=pull,method=connect,address=tcp://127.0.0.1:7779"
xterm -geometry 90x20+1100+300 -hold -e @EX_BIN_DIR@/$SINK1_2 &
SINK2_1="fairmq-ex-region-sink"
SINK2_1+=" --id sink2_1"
SINK2_1+=" --severity debug"
SINK2_1+=" --chan-name data2"
SINK2_1+=" --transport $transport"
SINK2_1+=" --shm-segment-id 2"
SINK2_1+=" --shm-monitor true"
SINK2_1+=" --channel-config name=data2,type=pull,method=connect,address=tcp://127.0.0.1:7788"
xterm -geometry 90x20+1100+600 -hold -e @EX_BIN_DIR@/$SINK2_1 &
SINK2_2="fairmq-ex-region-sink"
SINK2_2+=" --id sink2_2"
SINK2_2+=" --severity debug"
SINK2_2+=" --chan-name data3"
SINK2_2+=" --transport $transport"
SINK2_2+=" --shm-segment-id 2"
SINK2_2+=" --shm-monitor true"
SINK2_2+=" --channel-config name=data3,type=pull,method=connect,address=tcp://127.0.0.1:7789"
xterm -geometry 90x20+1100+900 -hold -e @EX_BIN_DIR@/$SINK2_2 &
SINK2="fairmq-ex-region-sink"
SINK2+=" --id sink2"
SINK2+=" --severity debug"
SINK2+=" --chan-name data3"
SINK2+=" --transport $transport"
SINK2+=" --shm-monitor true"
SINK2+=" --channel-config name=data3,type=pull,method=connect,address=tcp://127.0.0.1:7779"
xterm -geometry 120x32+1500+500 -hold -e @EX_BIN_DIR@/$SINK2 &

View File

@ -2,8 +2,16 @@
export FAIRMQ_PATH=@FAIRMQ_BIN_DIR@
transport=${1:-shmem}
msgSize=${2:-1000000}
transport="shmem"
msgSize="1000000"
if [[ $1 =~ ^[a-z]+$ ]]; then
transport=$1
fi
if [[ $2 =~ ^[0-9]+$ ]]; then
msgSize=$1
fi
SAMPLER="fairmq-ex-region-sampler"
SAMPLER+=" --id sampler1"

View File

@ -95,11 +95,10 @@ struct ShmManager
uint64_t size = stoull(conf.at(1));
fair::mq::RegionConfig cfg;
cfg.id = id;
cfg.rcSegmentSize = 0;
cfg.size = size;
regionCfgs.push_back(cfg);
auto ret = regions.emplace(id, make_unique<fair::mq::shmem::UnmanagedRegion>(shmId, cfg));
auto ret = regions.emplace(id, make_unique<fair::mq::shmem::UnmanagedRegion>(shmId, id, size));
fair::mq::shmem::UnmanagedRegion& region = *(ret.first->second);
LOG(info) << "Created unamanged region " << id << " of size " << region.GetSize()
<< ", starting at " << region.GetData() << ". Locking...";

View File

@ -36,22 +36,16 @@ struct Processor : Device
Channel& dataOut2 = GetChannel("data3", 0);
while (!NewStatePending()) {
fair::mq::Parts inParts;
dataIn.Receive(inParts);
auto msg(dataIn.Transport()->CreateMessage());
dataIn.Receive(msg);
fair::mq::Parts outParts1;
fair::mq::Parts outParts2;
fair::mq::MessagePtr msgCopy1(NewMessage());
msgCopy1->Copy(*msg);
fair::mq::MessagePtr msgCopy2(NewMessage());
msgCopy2->Copy(*msg);
for (const auto& inPart : inParts) {
outParts1.AddPart(NewMessage());
outParts1.fParts.back()->Copy(*inPart);
outParts2.AddPart(NewMessage());
outParts2.fParts.back()->Copy(*inPart);
}
dataOut1.Send(outParts1);
dataOut2.Send(outParts2);
dataOut1.Send(msgCopy1);
dataOut2.Send(msgCopy2);
if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) {
LOG(info) << "Configured max number of iterations reached. Leaving RUNNING state.";

View File

@ -26,7 +26,6 @@ struct Sampler : fair::mq::Device
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
fChanName = fConfig->GetProperty<std::string>("chan-name");
fSamplingRate = fConfig->GetProperty<float>("sampling-rate");
fRCSegmentSize = fConfig->GetProperty<uint64_t>("rc-segment-size");
GetChannel(fChanName, 0).Transport()->SubscribeToRegionEvents([](fair::mq::RegionInfo info) {
LOG(info) << "Region event: " << info.event << ": "
@ -46,7 +45,6 @@ struct Sampler : fair::mq::Device
}
regionCfg.lock = !fExternalRegion; // mlock region after creation
regionCfg.zero = !fExternalRegion; // zero region content after creation
regionCfg.rcSegmentSize = fRCSegmentSize; // size of the corresponding reference count segment
fRegion = fair::mq::UnmanagedRegionPtr(NewUnmanagedRegionFor(
fChanName, // region is created using the transport of this channel...
0, // ... and this sub-channel
@ -68,22 +66,17 @@ struct Sampler : fair::mq::Device
fair::mq::tools::RateLimiter rateLimiter(fSamplingRate);
while (!NewStatePending()) {
fair::mq::Parts parts;
// make 64 parts
for (int i = 0; i < 64; ++i) {
parts.AddPart(NewMessageFor(
fChanName, // channel
0, // sub-channel
fRegion, // region
fRegion->GetData(), // ptr within region
fMsgSize, // offset from ptr
nullptr // hint
));
}
fair::mq::MessagePtr msg(NewMessageFor(fChanName, // channel
0, // sub-channel
fRegion, // region
fRegion->GetData(), // ptr within region
fMsgSize, // offset from ptr
nullptr // hint
));
std::lock_guard<std::mutex> lock(fMtx);
fNumUnackedMsgs += parts.Size();
if (Send(parts, fChanName, 0) > 0) {
++fNumUnackedMsgs;
if (Send(msg, fChanName, 0) > 0) {
if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) {
LOG(info) << "Configured maximum number of iterations reached. Stopping sending.";
break;
@ -124,7 +117,6 @@ struct Sampler : fair::mq::Device
uint32_t fLinger = 100;
uint64_t fMaxIterations = 0;
uint64_t fNumIterations = 0;
uint64_t fRCSegmentSize = 10000000;
fair::mq::UnmanagedRegionPtr fRegion = nullptr;
std::mutex fMtx;
uint64_t fNumUnackedMsgs = 0;
@ -140,8 +132,7 @@ void addCustomOptions(bpo::options_description& options)
("sampling-rate", bpo::value<float>()->default_value(0.), "Sampling rate (Hz).")
("region-linger", bpo::value<uint32_t>()->default_value(100), "Linger period for regions")
("max-iterations", bpo::value<uint64_t>()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)")
("external-region", bpo::value<bool>()->default_value(false), "Use region created by another process")
("rc-segment-size", bpo::value<uint64_t>()->default_value(10000000), "Size of the reference count segment for Unamanged Region");
("external-region", bpo::value<bool>()->default_value(false), "Use region created by another process");
}
std::unique_ptr<fair::mq::Device> getDevice(fair::mq::ProgOptions& /*config*/)

View File

@ -36,8 +36,12 @@ struct Sink : Device
Channel& dataIn = GetChannel(fChanName, 0);
while (!NewStatePending()) {
fair::mq::Parts parts;
dataIn.Receive(parts);
auto msg(dataIn.Transport()->CreateMessage());
dataIn.Receive(msg);
// void* ptr = msg->GetData();
// char* cptr = static_cast<char*>(ptr);
// LOG(info) << "check: " << cptr[3];
if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) {
LOG(info) << "Configured max number of iterations reached. Leaving RUNNING state.";

2
extern/googletest vendored

@ -1 +1 @@
Subproject commit 7d76a231b0e29caf86e68d1df858308cd53b2a66
Subproject commit a1cc8c55195661a58ad60c3bb062a0b9c302710d

View File

@ -1,5 +1,5 @@
################################################################################
# Copyright (C) 2012-2025 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
# Copyright (C) 2012-2023 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
# #
# This software is distributed under the terms of the #
# GNU Lesser General Public Licence (LGPL) version 3, #
@ -63,21 +63,14 @@ if(BUILD_FAIRMQ)
Tools.h
TransportFactory.h
Transports.h
TransportEnum.h
UnmanagedRegion.h
options/FairMQProgOptions.h
runDevice.h
runFairMQDevice.h
shmem/Common.h
shmem/Manager.h
shmem/Message.h
shmem/Monitor.h
shmem/Poller.h
shmem/Segment.h
shmem/Socket.h
shmem/TransportFactory.h
shmem/UnmanagedRegion.h
shmem/UnmanagedRegionImpl.h
tools/Compiler.h
tools/CppSTL.h
tools/Exceptions.h
@ -102,6 +95,12 @@ if(BUILD_FAIRMQ)
plugins/Builtin.h
plugins/config/Config.h
plugins/control/Control.h
shmem/Message.h
shmem/Poller.h
shmem/UnmanagedRegionImpl.h
shmem/Socket.h
shmem/TransportFactory.h
shmem/Manager.h
zeromq/Common.h
zeromq/Context.h
zeromq/Message.h
@ -119,7 +118,6 @@ if(BUILD_FAIRMQ)
Channel.cxx
Device.cxx
DeviceRunner.cxx
EventManager.cxx
JSONParser.cxx
MemoryResources.cxx
Plugin.cxx

View File

@ -12,7 +12,6 @@
#include <fairmq/Channel.h>
#include <fairmq/Properties.h>
#include <fairmq/Tools.h>
#include <fairmq/Transports.h>
#include <random>
#include <regex>
#include <set>
@ -384,10 +383,4 @@ bool Channel::BindEndpoint(string& endpoint)
}
}
std::string Channel::GetTransportName() const { return TransportName(fTransportType); }
Transport Channel::GetTransportType() const { return fTransportType; }
void Channel::UpdateTransport(const std::string& transport) { fTransportType = TransportType(transport); Invalidate(); }
} // namespace fair::mq

View File

@ -14,7 +14,7 @@
#include <fairmq/Properties.h>
#include <fairmq/Socket.h>
#include <fairmq/TransportFactory.h>
#include <fairmq/TransportEnum.h>
#include <fairmq/Transports.h>
#include <fairmq/UnmanagedRegion.h>
#include <cstdint> // int64_t
@ -145,11 +145,11 @@ class Channel
/// Get channel transport name ("default", "zeromq" or "shmem")
/// @return Returns channel transport name (e.g. "default", "zeromq" or "shmem")
std::string GetTransportName() const;
std::string GetTransportName() const { return TransportName(fTransportType); }
/// Get channel transport type
/// @return Returns channel transport type
mq::Transport GetTransportType() const;
mq::Transport GetTransportType() const { return fTransportType; }
/// Get socket send buffer size (in number of messages)
/// @return Returns socket send buffer size (in number of messages)
@ -221,7 +221,7 @@ class Channel
/// Set channel transport
/// @param transport transport string ("default", "zeromq" or "shmem")
void UpdateTransport(const std::string& transport);
void UpdateTransport(const std::string& transport) { fTransportType = TransportType(transport); Invalidate(); }
/// Set socket send buffer size
/// @param sndBufSize Socket send buffer size (in number of messages)
@ -438,7 +438,7 @@ class Channel
}
void CheckSendCompatibility(Parts& parts) { CheckSendCompatibility(parts.fParts); }
void CheckSendCompatibility(Parts::container & msgVec)
void CheckSendCompatibility(std::vector<MessagePtr>& msgVec)
{
for (auto& msg : msgVec) {
if (fTransportType != msg->GetType()) {
@ -468,7 +468,7 @@ class Channel
}
void CheckReceiveCompatibility(Parts& parts) { CheckReceiveCompatibility(parts.fParts); }
void CheckReceiveCompatibility(Parts::container& msgVec)
void CheckReceiveCompatibility(std::vector<MessagePtr>& msgVec)
{
for (auto& msg : msgVec) {
if (fTransportType != msg->GetType()) {

View File

@ -9,7 +9,6 @@
// FairMQ
#include <fairmq/Device.h>
#include <fairmq/Tools.h>
#include <fairmq/Transports.h>
// boost
#include <boost/algorithm/string.hpp> // join/split

View File

@ -19,7 +19,7 @@
#include <fairmq/StateQueue.h>
#include <fairmq/Tools.h>
#include <fairmq/TransportFactory.h>
#include <fairmq/TransportEnum.h>
#include <fairmq/Transports.h>
#include <fairmq/UnmanagedRegion.h>
// logger

View File

@ -1,20 +0,0 @@
/********************************************************************************
* Copyright (C) 2025 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "EventManager.h"
#include <string>
#include <typeindex>
template std::shared_ptr<
fair::mq::EventManager::Signal<fair::mq::PropertyChangeAsString, std::string>>
fair::mq::EventManager::GetSignal<fair::mq::PropertyChangeAsString, std::string>(
const std::pair<std::type_index, std::type_index>& key) const;
template void fair::mq::EventManager::Subscribe<fair::mq::PropertyChangeAsString, std::string>(
const std::string& subscriber,
std::function<void(typename fair::mq::PropertyChangeAsString::KeyType, std::string)>);

View File

@ -1,5 +1,5 @@
/********************************************************************************
* Copyright (C) 2014-2025 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* Copyright (C) 2014-2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
@ -57,8 +57,27 @@ class EventManager
template<typename E, typename ...Args>
using Signal = boost::signals2::signal<void(typename E::KeyType, Args...)>;
template<typename E, typename... Args>
auto Subscribe(const std::string& subscriber, std::function<void(typename E::KeyType, Args...)> callback) -> void;
template<typename E, typename ...Args>
auto Subscribe(const std::string& subscriber, std::function<void(typename E::KeyType, Args...)> callback) -> void
{
const std::type_index event_type_index{typeid(E)};
const std::type_index callback_type_index{typeid(std::function<void(typename E::KeyType, Args...)>)};
const auto signalsKey = std::make_pair(event_type_index, callback_type_index);
const auto connectionsKey = std::make_pair(subscriber, signalsKey);
const auto connection = GetSignal<E, Args...>(signalsKey)->connect(callback);
{
std::lock_guard<std::mutex> lock{fMutex};
if (fConnections.find(connectionsKey) != fConnections.end())
{
fConnections.at(connectionsKey).disconnect();
fConnections.erase(connectionsKey);
}
fConnections.insert({connectionsKey, connection});
}
}
template<typename E, typename ...Args>
auto Unsubscribe(const std::string& subscriber) -> void
@ -100,58 +119,21 @@ class EventManager
mutable std::mutex fMutex;
template<typename E, typename ...Args>
auto GetSignal(const SignalsKey& key) const -> std::shared_ptr<Signal<E, Args...>>;
}; /* class EventManager */
struct PropertyChangeAsString : Event<std::string> {};
template<typename E, typename... Args>
auto EventManager::GetSignal(const SignalsKey& key) const -> std::shared_ptr<Signal<E, Args...>>
{
std::lock_guard<std::mutex> lock{fMutex};
if (fSignals.find(key) == fSignals.end()) {
// wrapper is needed because boost::signals2::signal is neither copyable nor movable
// and I don't know how else to insert it into the map
auto signal = std::make_shared<Signal<E, Args...>>();
fSignals.insert(std::make_pair(key, signal));
}
return boost::any_cast<std::shared_ptr<Signal<E, Args...>>>(fSignals.at(key));
}
template<typename E, typename... Args>
auto EventManager::Subscribe(const std::string& subscriber,
std::function<void(typename E::KeyType, Args...)> callback) -> void
{
const std::type_index event_type_index{typeid(E)};
const std::type_index callback_type_index{
typeid(std::function<void(typename E::KeyType, Args...)>)};
const auto signalsKey = std::make_pair(event_type_index, callback_type_index);
const auto connectionsKey = std::make_pair(subscriber, signalsKey);
const auto connection = GetSignal<E, Args...>(signalsKey)->connect(callback);
auto GetSignal(const SignalsKey& key) const -> std::shared_ptr<Signal<E, Args...>>
{
std::lock_guard<std::mutex> lock{fMutex};
if (fConnections.find(connectionsKey) != fConnections.end()) {
fConnections.at(connectionsKey).disconnect();
fConnections.erase(connectionsKey);
if (fSignals.find(key) == fSignals.end())
{
// wrapper is needed because boost::signals2::signal is neither copyable nor movable
// and I don't know how else to insert it into the map
auto signal = std::make_shared<Signal<E, Args...>>();
fSignals.insert(std::make_pair(key, signal));
}
fConnections.insert({connectionsKey, connection});
return boost::any_cast<std::shared_ptr<Signal<E, Args...>>>(fSignals.at(key));
}
}
extern template std::shared_ptr<
fair::mq::EventManager::Signal<fair::mq::PropertyChangeAsString, std::string>>
fair::mq::EventManager::GetSignal<fair::mq::PropertyChangeAsString, std::string>(
const std::pair<std::type_index, std::type_index>& key) const;
extern template void
fair::mq::EventManager::Subscribe<fair::mq::PropertyChangeAsString, std::string>(
const std::string& subscriber,
std::function<void(typename fair::mq::PropertyChangeAsString::KeyType, std::string)>);
}; /* class EventManager */
} // namespace fair::mq

View File

@ -17,7 +17,7 @@
#include <boost/container/container_fwd.hpp>
#include <boost/container/flat_map.hpp>
#include <memory_resource>
#include <boost/container/pmr/memory_resource.hpp>
#include <cstring>
#include <fairmq/Message.h>
#include <stdexcept>
@ -27,7 +27,7 @@ namespace fair::mq {
class TransportFactory;
using byte = unsigned char;
namespace pmr = std::pmr;
namespace pmr = boost::container::pmr;
/// All FairMQ related memory resources need to inherit from this interface
/// class for the

View File

@ -10,7 +10,7 @@
#define FAIR_MQ_MESSAGE_H
#include <cstddef> // for size_t
#include <fairmq/TransportEnum.h>
#include <fairmq/Transports.h>
#include <memory> // unique_ptr
#include <stdexcept>
@ -76,11 +76,6 @@ struct MessageBadAlloc : std::runtime_error
using std::runtime_error::runtime_error;
};
struct RefCountBadAlloc : std::runtime_error
{
using std::runtime_error::runtime_error;
};
} // namespace fair::mq
using fairmq_free_fn [[deprecated("Use fair::mq::FreeFn")]] = fair::mq::FreeFn;

View File

@ -448,6 +448,3 @@ void ProgOptions::PrintOptionsRaw() const
}
} // namespace fair::mq
template void fair::mq::ProgOptions::SetProperty<std::string>(const std::string& key, std::string val);
template void fair::mq::ProgOptions::SetProperty<int>(const std::string& key, int val);

View File

@ -129,7 +129,17 @@ class ProgOptions
/// @param key
/// @param val
template<typename T>
void SetProperty(const std::string& key, T val);
void SetProperty(const std::string& key, T val)
{
std::unique_lock<std::mutex> lock(fMtx);
SetVarMapValue<typename std::decay<T>::type>(key, val);
lock.unlock();
fEvents.Emit<fair::mq::PropertyChange, typename std::decay<T>::type>(key, val);
fEvents.Emit<fair::mq::PropertyChangeAsString, std::string>(key, GetPropertyAsString(key));
}
/// @brief Updates an existing config property (or fails if it doesn't exist)
/// @param key
@ -265,20 +275,5 @@ class ProgOptions
};
} // namespace fair::mq
template <typename T>
void fair::mq::ProgOptions::SetProperty(const std::string& key, T val)
{
std::unique_lock<std::mutex> lock(fMtx);
SetVarMapValue<typename std::decay<T>::type>(key, val);
lock.unlock();
fEvents.Emit<fair::mq::PropertyChange, typename std::decay<T>::type>(key, val);
fEvents.Emit<fair::mq::PropertyChangeAsString, std::string>(key, GetPropertyAsString(key));
}
extern template void fair::mq::ProgOptions::SetProperty<int>(const std::string& key, int val);
extern template void fair::mq::ProgOptions::SetProperty<std::string>(const std::string& key, std::string val);
#endif /* FAIR_MQ_PROGOPTIONS_H */

View File

@ -1,5 +1,5 @@
/********************************************************************************
* Copyright (C) 2014-2025 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* Copyright (C) 2014-2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
@ -29,6 +29,7 @@ using Property = boost::any;
using Properties = std::map<std::string, Property>;
struct PropertyChange : Event<std::string> {};
struct PropertyChangeAsString : Event<std::string> {};
class PropertyHelper
{

View File

@ -52,8 +52,8 @@ struct Socket
virtual int64_t Send(MessagePtr& msg, int timeout = -1) = 0;
virtual int64_t Receive(MessagePtr& msg, int timeout = -1) = 0;
virtual int64_t Send(Parts::container& msgVec, int timeout = -1) = 0;
virtual int64_t Receive(Parts::container & msgVec, int timeout = -1) = 0;
virtual int64_t Send(std::vector<std::unique_ptr<Message>>& msgVec, int timeout = -1) = 0;
virtual int64_t Receive(std::vector<std::unique_ptr<Message>>& msgVec, int timeout = -1) = 0;
virtual int64_t Send(Parts& parts, int timeout = -1) { return Send(parts.fParts, timeout); }
virtual int64_t Receive(Parts& parts, int timeout = -1) { return Receive(parts.fParts, timeout); }

View File

@ -1,22 +0,0 @@
/********************************************************************************
* Copyright (C) 2014-2025 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#ifndef FAIR_MQ_TRANSPORTENUMS_H
#define FAIR_MQ_TRANSPORTENUMS_H
namespace fair::mq {
enum class Transport {
DEFAULT,
ZMQ,
SHM
};
}
#endif // FAIR_MQ_TRANSPORTENUMS_H

View File

@ -14,7 +14,7 @@
#include <fairmq/Message.h>
#include <fairmq/Poller.h>
#include <fairmq/Socket.h>
#include <fairmq/TransportEnum.h>
#include <fairmq/Transports.h>
#include <fairmq/UnmanagedRegion.h>
#include <memory> // shared_ptr
#include <stdexcept>

View File

@ -10,7 +10,6 @@
#define FAIR_MQ_TRANSPORTS_H
#include <fairmq/tools/Strings.h>
#include <fairmq/TransportEnum.h>
#include <memory>
#include <ostream>
#include <stdexcept>
@ -19,6 +18,13 @@
namespace fair::mq {
enum class Transport
{
DEFAULT,
ZMQ,
SHM
};
struct TransportError : std::runtime_error
{
using std::runtime_error::runtime_error;

View File

@ -9,7 +9,7 @@
#ifndef FAIR_MQ_UNMANAGEDREGION_H
#define FAIR_MQ_UNMANAGEDREGION_H
#include <fairmq/TransportEnum.h>
#include <fairmq/Transports.h>
#include <cstddef> // size_t
#include <cstdint> // uint32_t
@ -134,7 +134,7 @@ struct RegionConfig
int creationFlags = 0; /// flags passed to the underlying transport on region creation
int64_t userFlags = 0; /// custom flags that have no effect on the transport, but can be retrieved from the region by the user
uint64_t size = 0; /// region size
uint64_t rcSegmentSize = 100000000; /// size of the segment that stores reference counts when "soft"-copying the messages
uint64_t rcSegmentSize = 10000000; /// size of the segment that stores reference counts when "soft"-copying the messages
std::string path = ""; /// file path, if the region is backed by a file
std::optional<uint16_t> id = std::nullopt; /// region id
uint32_t linger = 100; /// delay in ms before region destruction to collect outstanding events

View File

@ -1,5 +1,5 @@
/********************************************************************************
* Copyright (C) 2018-2025 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* Copyright (C) 2018-2022 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
@ -19,7 +19,7 @@
#define FAIRMQ_GIT_DATE "@PROJECT_GIT_DATE@"
#define FAIRMQ_REPO_URL "https://github.com/FairRootGroup/FairMQ"
#define FAIRMQ_LICENSE "LGPL-3.0"
#define FAIRMQ_COPYRIGHT "2012-2025 GSI"
#define FAIRMQ_COPYRIGHT "2012-2023 GSI"
#define FAIRMQ_BUILD_TYPE "@CMAKE_BUILD_TYPE@"
#endif // FAIR_MQ_VERSION_H

View File

@ -65,17 +65,13 @@ Control::Control(const string& name, Plugin::Version version, const string& main
});
try {
auto control = GetProperty<string>("control");
TakeDeviceControl();
if (control != "none") {
TakeDeviceControl();
}
auto control = GetProperty<string>("control");
if (control == "static") {
LOG(debug) << "Running builtin controller: static";
fControllerThread = thread(&Control::StaticMode, this);
} else if (control == "none") {
LOG(debug) << "Builtin controller: disabled";
} else if (control == "gui") {
LOG(debug) << "Running builtin controller: gui";
fControllerThread = thread(&Control::GUIMode, this);
@ -146,7 +142,7 @@ auto ControlPluginProgramOptions() -> Plugin::ProgOptions
namespace po = boost::program_options;
auto pluginOptions = po::options_description{"Control (builtin) Plugin"};
pluginOptions.add_options()
("control", po::value<string>()->default_value("dynamic"), "Control mode, 'static' or 'dynamic' (aliases for dynamic are external and interactive), 'none', 'gui'")
("control", po::value<string>()->default_value("dynamic"), "Control mode, 'static' or 'dynamic' (aliases for dynamic are external and interactive)")
("catch-signals", po::value<int >()->default_value(1), "Enable signal handling (1/0).");
return pluginOptions;
}
@ -275,11 +271,11 @@ auto Control::InteractiveMode() -> void
try {
RunStartupSequence();
if (!fDeviceShutdownRequested) {
if(!fDeviceShutdownRequested) {
RunREPL();
}
if (!fDeviceShutdownRequested) {
if(!fDeviceShutdownRequested) {
RunShutdownSequence();
}
} catch (PluginServices::DeviceControlError& e) {
@ -408,7 +404,7 @@ try {
// or for device shutdown request (Ctrl-C)
fStateQueue.WaitForNextOrCustom([this]{ return fDeviceShutdownRequested.load(); });
if (!fDeviceShutdownRequested) {
if(!fDeviceShutdownRequested) {
RunShutdownSequence();
}
} catch (PluginServices::DeviceControlError& e) {
@ -425,7 +421,7 @@ try {
// Wait for device shutdown request (Ctrl-C)
fStateQueue.WaitForCustom([this]{ return fDeviceShutdownRequested.load(); });
if (!fDeviceShutdownRequested) {
if(!fDeviceShutdownRequested) {
RunShutdownSequence();
}
} catch (PluginServices::DeviceControlError& e) {

View File

@ -13,8 +13,7 @@
#include <functional> // std::equal_to
#include <boost/functional/hash.hpp>
// #include <boost/interprocess/allocators/adaptive_pool.hpp>
#include <boost/interprocess/allocators/node_allocator.hpp>
#include <boost/interprocess/allocators/adaptive_pool.hpp>
#include <boost/interprocess/allocators/allocator.hpp>
#include <boost/interprocess/containers/map.hpp>
#include <boost/interprocess/containers/string.hpp>
@ -70,10 +69,9 @@ struct RefCount
static constexpr size_t numNodesPerBlock = 4096;
// Maximum number of totally free blocks that the adaptive node pool will hold.
// The rest of the totally free blocks will be deallocated with the segment manager.
// static constexpr size_t maxFreeBlocks = 2;
static constexpr size_t maxFreeBlocks = 2;
using RefCountPool = boost::interprocess::node_allocator<RefCount, boost::interprocess::managed_shared_memory::segment_manager, numNodesPerBlock>;
// using RefCountPool = boost::interprocess::adaptive_pool<RefCount, boost::interprocess::managed_shared_memory::segment_manager, numNodesPerBlock, maxFreeBlocks>;
using RefCountPool = boost::interprocess::adaptive_pool<RefCount, boost::interprocess::managed_shared_memory::segment_manager, numNodesPerBlock, maxFreeBlocks>;
using SegmentManager = boost::interprocess::managed_shared_memory::segment_manager;
using VoidAlloc = boost::interprocess::allocator<void, SegmentManager>;
@ -174,6 +172,8 @@ enum class AllocationAlgorithm : int
struct RegionInfo
{
static constexpr uint64_t DefaultRcSegmentSize = 10000000;
RegionInfo(const char* path, int flags, uint64_t userFlags, uint64_t size, uint64_t rcSegmentSize, const VoidAlloc& alloc)
: fPath(path, alloc)
, fCreationFlags(flags)
@ -183,6 +183,14 @@ struct RegionInfo
, fDestroyed(false)
{}
RegionInfo(const VoidAlloc& alloc)
: RegionInfo("", 0, 0, 0, DefaultRcSegmentSize, alloc)
{}
RegionInfo(const char* path, int flags, uint64_t userFlags, uint64_t size, const VoidAlloc& alloc)
: RegionInfo(path, flags, userFlags, size, DefaultRcSegmentSize, alloc)
{}
Str fPath;
int fCreationFlags;
uint64_t fUserFlags;

View File

@ -323,6 +323,7 @@ class Manager
}
const uint16_t id = cfg.id.value();
const uint64_t rcSegmentSize = cfg.rcSegmentSize;
std::lock_guard<std::mutex> lock(fLocalRegionsMtx);
@ -339,12 +340,6 @@ class Manager
LOG(debug) << "Unmanaged region (view) already present, promoting to controller";
region->BecomeController(cfg);
} else {
// we need to update local config, if the region information already exists
auto info = fShmRegions->find(id);
if (info != fShmRegions->end()) {
cfg.rcSegmentSize = info->second.fRCSegmentSize;
}
auto res = fRegions.emplace(id, std::make_unique<UnmanagedRegion>(fShmId, size, true, cfg));
region = res.first->second.get();
}
@ -353,6 +348,7 @@ class Manager
// start ack receiver only if a callback has been provided.
if (callback || bulkCallback) {
region->SetCallbacks(callback, bulkCallback);
region->InitializeRefCountSegment(rcSegmentSize);
region->InitializeQueues();
region->StartAckSender();
region->StartAckReceiver();
@ -405,18 +401,19 @@ class Manager
try {
RegionConfig cfg;
const uint64_t rcSegmentSize = cfg.rcSegmentSize;
// get region info
{
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> shmLock(*fShmMtx);
RegionInfo regionInfo = fShmRegions->at(id);
cfg.id = id;
cfg.creationFlags = regionInfo.fCreationFlags;
cfg.rcSegmentSize = regionInfo.fRCSegmentSize;
cfg.path = regionInfo.fPath.c_str();
}
// LOG(debug) << "Located remote region with id '" << id << "', path: '" << cfg.path << "', flags: '" << cfg.creationFlags << "'";
auto r = fRegions.emplace(id, std::make_unique<UnmanagedRegion>(fShmId, 0, false, std::move(cfg)));
r.first->second->InitializeRefCountSegment(rcSegmentSize);
r.first->second->InitializeQueues();
r.first->second->StartAckSender();
return r.first->second.get();
@ -485,7 +482,6 @@ class Manager
cfg.id = info.id;
cfg.creationFlags = regionInfo.fCreationFlags;
cfg.path = regionInfo.fPath.c_str();
cfg.rcSegmentSize = regionInfo.fRCSegmentSize;
regionCfgs.emplace(info.id, cfg);
// fill the ptr+size info after shmLock is released, to avoid constructing local region under it
} else {
@ -507,8 +503,10 @@ class Manager
if (it != fRegions.end()) {
region = it->second.get();
} else {
const uint64_t rcSegmentSize = cfgIt->second.rcSegmentSize;
auto r = fRegions.emplace(cfgIt->first, std::make_unique<UnmanagedRegion>(fShmId, 0, false, cfgIt->second));
region = r.first->second.get();
region->InitializeRefCountSegment(rcSegmentSize);
region->InitializeQueues();
region->StartAckSender();
}

View File

@ -14,7 +14,6 @@
#include "UnmanagedRegionImpl.h"
#include <fairmq/Message.h>
#include <fairmq/UnmanagedRegion.h>
#include <fairmq/Transports.h>
#include <fairlogger/Logger.h>
@ -252,12 +251,7 @@ class Message final : public fair::mq::Message
if (!fRegionPtr) {
throw TransportError(tools::ToString("Cannot get unmanaged region with id ", fRegionId));
}
if (fRegionPtr->fRcSegmentSize > 0) {
return fRegionPtr->GetRefCountAddressFromHandle(fShared)->Get();
} else {
fManager.GetSegment(fSegmentId);
return ShmHeader::RefCount(fManager.GetAddressFromHandle(fShared, fSegmentId));
}
return fRegionPtr->GetRefCountAddressFromHandle(fShared)->Get();
}
void Copy(const fair::mq::Message& other) override
@ -283,29 +277,11 @@ class Message final : public fair::mq::Message
if (!fRegionPtr) {
throw TransportError(tools::ToString("Cannot get unmanaged region with id ", otherMsg.fRegionId));
}
if (fRegionPtr->fRcSegmentSize > 0) {
if (otherMsg.fShared < 0) {
// UR msg not yet shared, create the reference counting object with count 2
try {
otherMsg.fShared = fRegionPtr->HandleFromAddress(&(fRegionPtr->MakeRefCount(2)));
} catch (boost::interprocess::bad_alloc& ba) {
throw RefCountBadAlloc(tools::ToString("Insufficient space in the reference count segment ", otherMsg.fRegionId, ", original exception: bad_alloc: ", ba.what()));
}
} else {
fRegionPtr->GetRefCountAddressFromHandle(otherMsg.fShared)->Increment();
}
} else { // if RefCount segment size is 0, store the ref count in the managed segment
if (otherMsg.fShared < 0) { // if UR msg is not yet shared
char* ptr = fManager.Allocate(2, 0);
// point the fShared in the unmanaged region message to the refCount holder
otherMsg.fShared = fManager.GetHandleFromAddress(ptr, fSegmentId);
// the message needs to be able to locate in which segment the refCount is stored
otherMsg.fSegmentId = fSegmentId;
ShmHeader::IncrementRefCount(ptr);
} else { // if the UR msg is already shared
fManager.GetSegment(otherMsg.fSegmentId);
ShmHeader::IncrementRefCount(fManager.GetAddressFromHandle(otherMsg.fShared, otherMsg.fSegmentId));
}
if (otherMsg.fShared < 0) {
// UR msg not yet shared, create the reference counting object with count 2
otherMsg.fShared = fRegionPtr->HandleFromAddress(&(fRegionPtr->MakeRefCount(2)));
} else {
fRegionPtr->GetRefCountAddressFromHandle(otherMsg.fShared)->Increment();
}
}
@ -373,21 +349,10 @@ class Message final : public fair::mq::Message
if (!fRegionPtr) {
throw TransportError(tools::ToString("Cannot get unmanaged region with id ", fRegionId));
}
if (fRegionPtr->fRcSegmentSize > 0) {
uint16_t refCount = fRegionPtr->GetRefCountAddressFromHandle(fShared)->Decrement();
if (refCount == 1) {
fRegionPtr->RemoveRefCount(*(fRegionPtr->GetRefCountAddressFromHandle(fShared)));
ReleaseUnmanagedRegionBlock();
}
} else { // if RefCount segment size is 0, get the ref count from the managed segment
// make sure segment is initialized in this transport
fManager.GetSegment(fSegmentId);
// release unmanaged region block if ref count is one
uint16_t refCount = ShmHeader::DecrementRefCount(fManager.GetAddressFromHandle(fShared, fSegmentId));
if (refCount == 1) {
fManager.Deallocate(fShared, fSegmentId);
ReleaseUnmanagedRegionBlock();
}
uint16_t refCount = fRegionPtr->GetRefCountAddressFromHandle(fShared)->Decrement();
if (refCount == 1) {
fRegionPtr->RemoveRefCount(*(fRegionPtr->GetRefCountAddressFromHandle(fShared)));
ReleaseUnmanagedRegionBlock();
}
} else {
ReleaseUnmanagedRegionBlock();

View File

@ -274,9 +274,7 @@ bool Monitor::PrintShm(const ShmId& shmId)
try {
managed_shared_memory rcCountSegment(open_read_only, MakeShmName(shmId.shmId, "rrc", id).c_str());
auto size = rcCountSegment.get_size();
auto free = rcCountSegment.get_free_memory();
ss << ", rcCountSegment size: " << size << ", free: " << free << ", used: " << size - free;
ss << ", rcCountSegment size: " << rcCountSegment.get_size();
} catch (bie&) {
ss << ", rcCountSegment: not found";
}

View File

@ -10,7 +10,6 @@
#include <fairmq/shmem/Common.h>
#include <fairmq/shmem/Monitor.h>
#include <fairmq/Transports.h>
#include <cstdint>
#include <string>

View File

@ -200,7 +200,7 @@ class Socket final : public fair::mq::Socket
}
}
int64_t Send(Parts::container& msgVec, int timeout = -1) override
int64_t Send(std::vector<MessagePtr>& msgVec, int timeout = -1) override
{
int flags = 0;
if (timeout == 0) {
@ -260,7 +260,7 @@ class Socket final : public fair::mq::Socket
return static_cast<int>(TransferCode::error);
}
int64_t Receive(Parts::container& msgVec, int timeout = -1) override
int64_t Receive(std::vector<MessagePtr>& msgVec, int timeout = -1) override
{
int flags = 0;
if (timeout == 0) {

View File

@ -13,7 +13,6 @@
#include <fairmq/shmem/Monitor.h>
#include <fairmq/tools/Strings.h>
#include <fairmq/UnmanagedRegion.h>
#include <fairmq/Transports.h>
#include <fairlogger/Logger.h>
@ -66,7 +65,6 @@ struct UnmanagedRegion
, fShmemObject()
, fFile(nullptr)
, fFileMapping()
, fRcSegmentSize(cfg.rcSegmentSize)
, fQueue(nullptr)
, fCallback(nullptr)
, fBulkCallback(nullptr)
@ -148,13 +146,11 @@ struct UnmanagedRegion
LOG(debug) << "Successfully zeroed free memory of region " << id << ".";
}
InitializeRefCountSegment(fRcSegmentSize);
if (fControlling && created) {
Register(shmId, cfg);
}
LOG(debug) << (created ? "Created" : "Opened") << " unmanaged shared memory region: " << fName << " (" << (fControlling ? "controller" : "viewer") << "), refCount segment size: " << fRcSegmentSize;
LOG(debug) << (created ? "Created" : "Opened") << " unmanaged shared memory region: " << fName << " (" << (fControlling ? "controller" : "viewer") << ")";
}
UnmanagedRegion() = delete;
@ -193,7 +189,7 @@ struct UnmanagedRegion
RefCount& MakeRefCount(uint16_t initialCount = 1)
{
RefCount* refCount = fRefCountPool->allocate_one().get();
RefCount* refCount = fRefCountPool->allocate(1).get();
new (refCount) RefCount(initialCount);
return *refCount;
}
@ -201,7 +197,7 @@ struct UnmanagedRegion
void RemoveRefCount(RefCount& refCount)
{
refCount.~RefCount();
fRefCountPool->deallocate_one(&refCount);
fRefCountPool->deallocate(&refCount, 1);
}
~UnmanagedRegion()
@ -268,7 +264,6 @@ struct UnmanagedRegion
std::condition_variable fBlockSendCV;
std::vector<RegionBlock> fBlocksToFree;
const std::size_t fAckBunchSize = 256;
uint64_t fRcSegmentSize;
std::unique_ptr<boost::interprocess::message_queue> fQueue;
std::unique_ptr<boost::interprocess::managed_shared_memory> fRefCountSegment;
std::unique_ptr<RefCountPool> fRefCountPool;
@ -324,7 +319,7 @@ struct UnmanagedRegion
void InitializeRefCountSegment(uint64_t size)
{
using namespace boost::interprocess;
if (!fRefCountSegment && size > 0) {
if (!fRefCountSegment) {
fRefCountSegment = std::make_unique<managed_shared_memory>(open_or_create, fRefCountSegmentName.c_str(), size);
LOG(trace) << "shmem: initialized ref count segment: " << fRefCountSegmentName;
fRefCountPool = std::make_unique<RefCountPool>(fRefCountSegment->get_segment_manager());

View File

@ -1,5 +1,5 @@
/********************************************************************************
* Copyright (C) 2017-2025 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* Copyright (C) 2017-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
@ -8,12 +8,12 @@
#include <fairlogger/Logger.h>
#include <fairmq/tools/Network.h>
#include <fairmq/tools/Strings.h>
#ifndef _GNU_SOURCE
#define _GNU_SOURCE // To get defns of NI_MAXSERV and NI_MAXHOST
#endif
#include <algorithm>
#include <array>
#include <boost/algorithm/string.hpp> // trim
#include <boost/asio.hpp>
@ -158,22 +158,33 @@ string getDefaultRouteNetworkInterface()
}
string getIpFromHostname(const string& hostname)
try {
{
boost::asio::io_context ioc;
boost::asio::ip::tcp::resolver resolver(ioc);
auto const result = resolver.resolve(boost::asio::ip::tcp::v4(), hostname, "");
using namespace boost::asio::ip;
try {
tcp::resolver resolver(ioc);
tcp::resolver::query query(hostname, "");
tcp::resolver::iterator end;
auto it = find_if(static_cast<basic_resolver_iterator<tcp>>(resolver.resolve(query)),
end,
[](const tcp::endpoint& ep) { return ep.address().is_v4(); });
if (it != end) {
stringstream ss;
ss << static_cast<tcp::endpoint>(*it).address();
return ss.str();
}
if (result.empty()) {
LOG(warn) << "could not find ipv4 address for hostname '" << hostname << "'";
return "";
} catch (exception& e) {
LOG(error) << "could not resolve hostname '" << hostname << "', reason: " << e.what();
return "";
}
return ToString(result.begin()->endpoint().address());
}
catch (std::exception const& ex)
{
LOG(error) << "could not resolve hostname '" << hostname << "', reason: " << ex.what();
return "";
}
} // namespace fair::mq::tools

View File

@ -1,5 +1,5 @@
/********************************************************************************
* Copyright (C) 2017-2025 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* Copyright (C) 2017-2023 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
@ -64,22 +64,22 @@ execute_result execute(const string& cmd, const string& prefix, const string& in
p.Print(cmd);
ba::io_context ioc;
ba::io_service ios;
// containers for std_in
ba::const_buffer inputBuffer(ba::buffer(input));
bp::async_pipe inputPipe(ioc);
bp::async_pipe inputPipe(ios);
// containers for std_out
ba::streambuf outputBuffer;
bp::async_pipe outputPipe(ioc);
bp::async_pipe outputPipe(ios);
// containers for std_err
ba::streambuf errorBuffer;
bp::async_pipe errorPipe(ioc);
bp::async_pipe errorPipe(ios);
const string delimiter = "\n";
ba::steady_timer inputTimer(ioc);
ba::steady_timer inputTimer(ios);
inputTimer.expires_after(std::chrono::milliseconds(1000)); // NOLINT
ba::steady_timer signalTimer(ioc);
ba::steady_timer signalTimer(ios);
signalTimer.expires_after(std::chrono::milliseconds(2000)); // NOLINT
// child process
@ -154,7 +154,7 @@ execute_result execute(const string& cmd, const string& prefix, const string& in
};
ba::async_read_until(errorPipe, errorBuffer, delimiter, onStdErr);
ioc.run();
ios.run();
c.wait();
result.exit_code = c.exit_code();

View File

@ -154,7 +154,7 @@ class Socket final : public fair::mq::Socket
}
}
int64_t Send(Parts::container& msgVec, int timeout = -1) override
int64_t Send(std::vector<std::unique_ptr<fair::mq::Message>>& msgVec, int timeout = -1) override
{
int flags = 0;
if (timeout == 0) {
@ -206,7 +206,7 @@ class Socket final : public fair::mq::Socket
}
}
int64_t Receive(Parts::container& msgVec, int timeout = -1) override
int64_t Receive(std::vector<std::unique_ptr<fair::mq::Message>>& msgVec, int timeout = -1) override
{
int flags = 0;
if (timeout == 0) {

View File

@ -253,7 +253,7 @@ add_testsuite(Tools
LINKS FairMQ
INCLUDES ${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_BINARY_DIR}
TIMEOUT 5
TIMEOUT 20
${environment}
)

View File

@ -27,7 +27,7 @@ class TestDevice : public Device
{
public:
TestDevice(const string& transport)
: fDeviceThread(&Device::RunStateMachine, this)
: fDeviceThread(&Device::RunStateMachine, this)
{
SetTransport(transport);
test::Control(*this, test::Cycle::ToRun);

View File

@ -16,7 +16,6 @@
#include <gtest/gtest.h>
#include <cstring>
#include <string>
#include <vector>
namespace
@ -102,7 +101,7 @@ TEST(MemoryResources, allocator)
size_t session{tools::UuidHash()};
ProgOptions config;
config.SetProperty<std::string>("session", to_string(session));
config.SetProperty<string>("session", to_string(session));
FactoryType factoryZMQ = TransportFactory::CreateTransportFactory("zeromq", fair::mq::tools::Uuid(), &config);
@ -130,7 +129,7 @@ TEST(MemoryResources, getMessage)
size_t session{tools::UuidHash()};
ProgOptions config;
config.SetProperty<std::string>("session", to_string(session));
config.SetProperty<string>("session", to_string(session));
config.SetProperty<bool>("shm-monitor", true);
FactoryType factoryZMQ = TransportFactory::CreateTransportFactory("zeromq", fair::mq::tools::Uuid(), &config);

View File

@ -287,10 +287,8 @@ auto ZeroCopy(bool expandedShmMetadata = false) -> void
// The "zero copy" property of the Copy() method is an implementation detail and is not guaranteed.
// Currently it holds true for the shmem (across devices) and for zeromq (within same device) transports.
auto ZeroCopyFromUnmanaged(string const& address, bool expandedShmMetadata, uint64_t rcSegmentSize) -> void
auto ZeroCopyFromUnmanaged(string const& address, bool expandedShmMetadata = false) -> void
{
fair::Logger::SetConsoleSeverity(fair::Severity::debug);
ProgOptions config1;
ProgOptions config2;
string session(tools::Uuid());
@ -313,13 +311,11 @@ auto ZeroCopyFromUnmanaged(string const& address, bool expandedShmMetadata, uint
const size_t msgSize{100};
const size_t regionSize{1000000};
RegionConfig cfg;
cfg.rcSegmentSize = rcSegmentSize;
tools::Semaphore blocker;
auto region = factory1->CreateUnmanagedRegion(regionSize, [&blocker](void*, size_t, void*) {
blocker.Signal();
}, cfg);
});
{
Channel push("Push", "push", factory1);
@ -465,22 +461,12 @@ TEST(ZeroCopy, shmem_expanded_metadata) // NOLINT
TEST(ZeroCopyFromUnmanaged, shmem) // NOLINT
{
ZeroCopyFromUnmanaged("ipc://test_zerocopy_unmanaged", false, 10000000);
ZeroCopyFromUnmanaged("ipc://test_zerocopy_unmanaged");
}
TEST(ZeroCopyFromUnmanaged, shmem_expanded_metadata) // NOLINT
{
ZeroCopyFromUnmanaged("ipc://test_zerocopy_unmanaged_expanded", true, 10000000);
}
TEST(ZeroCopyFromUnmanaged, shmem_no_rc_segment) // NOLINT
{
ZeroCopyFromUnmanaged("ipc://test_zerocopy_unmanaged_no_rc_segment", false, 0);
}
TEST(ZeroCopyFromUnmanaged, shmem_expanded_metadata_no_rc_segment) // NOLINT
{
ZeroCopyFromUnmanaged("ipc://test_zerocopy_unmanaged_expanded_no_rc_segment", true, 0);
ZeroCopyFromUnmanaged("ipc://test_zerocopy_unmanaged", true);
}
} // namespace

View File

@ -1,37 +1,28 @@
/********************************************************************************
* Copyright (C) 2018-2025 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* Copyright (C) 2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <fairmq/tools/Network.h>
#include <gtest/gtest.h>
#include <fairmq/tools/Network.h>
#include <string>
namespace
{
TEST(Tools, NetworkDefaultIP)
using namespace std;
using namespace fair::mq;
TEST(Tools, Network)
{
auto const interface = fair::mq::tools::getDefaultRouteNetworkInterface();
string interface = fair::mq::tools::getDefaultRouteNetworkInterface();
EXPECT_NE(interface, "");
auto const interfaceIP = fair::mq::tools::getInterfaceIP(interface);
string interfaceIP = fair::mq::tools::getInterfaceIP(interface);
EXPECT_NE(interfaceIP, "");
}
TEST(Tools, NetworkIPv4Localhost)
{
auto const ip = fair::mq::tools::getIpFromHostname("localhost");
EXPECT_FALSE(ip.empty());
EXPECT_EQ(ip, "127.0.0.1");
}
TEST(Tools, NetworkInvalidHostname)
{
auto const ip = fair::mq::tools::getIpFromHostname("non.existent.domain.invalid");
EXPECT_TRUE(ip.empty());
}
} /* namespace */