Compare commits

..

35 Commits

Author SHA1 Message Date
Alexey Rybalchenko
16275db125 Add test for externally (outside the session) created shmem region 2023-01-19 16:12:58 +01:00
Alexey Rybalchenko
42ce691f57 shm: error on duplicate region IDs 2023-01-19 16:12:31 +01:00
Alexey Rybalchenko
58aa2b4f88 shm: refactor UnamangedRegion: rename fRemote to fController 2023-01-19 16:12:22 +01:00
Alexey Rybalchenko
c3b273cec0 shm: Improve debug output a bit 2023-01-19 16:10:59 +01:00
Alexey Rybalchenko
a982d60ed7 example: fix incorrect config 2023-01-19 16:10:44 +01:00
Dennis Klein
d16e473b91 docs: Update fair-software.eu compliance badge
And link to the GH workflow page instead of fair-software.eu
2023-01-16 13:27:13 +01:00
Dennis Klein
1881986cca docs: Add fair-software.eu compliance badge 2023-01-16 13:17:09 +01:00
Dennis Klein
adf91d053d docs: Add OpenSSF Best Practices Badge 2023-01-16 13:16:56 +01:00
Dennis Klein
d3be9af9b6 docs: Add our DOI badge 2023-01-16 13:16:42 +01:00
Dennis Klein
4104636456 build: Add fair-software.eu compliance checker 2023-01-16 13:15:33 +01:00
Alexey Rybalchenko
af0d668951 Shm: fix region init with external regions 2022-09-09 15:40:33 +02:00
Alexey Rybalchenko
072d7cb744 shm: add some debug output 2022-09-09 15:40:33 +02:00
Alexey Rybalchenko
f5c46ce018 region example: add options for testing with externally-created regions 2022-09-09 15:40:33 +02:00
Alexey Rybalchenko
d105960444 fix(shm): Fix incorrect parameters when mapping regions 2022-09-06 08:09:47 +02:00
Dennis Klein
3aae5bae58 build: Add ORCID for Christian Tacke 2022-09-06 08:08:42 +02:00
Dennis Klein
9031029d2c build: Add ORCID for Dennis Klein 2022-09-06 08:08:34 +02:00
Dennis Klein
d478e050ba build: HTML-Format desc field in zenodo.org config 2022-09-06 08:08:14 +02:00
Dennis Klein
06b2b9b01f build: Add license hint to zenodo.org config 2022-09-06 08:08:05 +02:00
Dennis Klein
b3fa4f6e7e build: Add config for zenodo.org import 2022-09-06 08:07:46 +02:00
Alexey Rybalchenko
da5cb34416 fix(shm): race/deadlock in region locks 2022-08-21 18:32:24 +02:00
Alexey Rybalchenko
226733c653 Reduce severity of the missing channel cfg on command line
It is a valid use case to create the config programmatically at a later stage.
2022-06-22 14:04:43 +02:00
Alexey Rybalchenko
b06efc401e shm: Monitor: Add region/segment presence check function 2022-06-22 13:31:51 +02:00
Alexey Rybalchenko
2500771689 shm: ResetContent(): reset data after recreating the metadata 2022-05-28 14:46:21 +02:00
Alexey Rybalchenko
d2aa3b6bb0 shm: open managament data as read only during cleanup 2022-05-28 14:46:21 +02:00
Alexey Rybalchenko
00df117c7c Shm::Monitor: add nullptr check for segment info 2022-05-28 14:46:21 +02:00
Dennis Klein
69faa63c5b docs: Update README 2022-03-21 18:22:07 +01:00
Dennis Klein
b7474ae138 build: Deprecate components dds_plugin, sdk, sdk_commands 2022-03-21 18:22:07 +01:00
Dennis Klein
b426bf39d7 fix: Update metadata 2022-03-21 18:22:07 +01:00
Dennis Klein
6780b7452c fix(control): Honor SIGINT and SIGTERM in more places
* Queue next transition for long-running states (fix #421)
* Add *OrCustom/Push/Locked family of functions to StateQueue to enable
  composition with custom signals
2022-03-21 16:28:43 +01:00
Dennis Klein
27277b11b4 fix(Device): Warning about narrowing conversion 2022-03-21 16:28:43 +01:00
Dennis Klein
cb5029f826 fix(Device): Spawn rate logger thread only if needed 2022-03-21 16:28:43 +01:00
Dennis Klein
5d45d89269 feat: Remove --max-run-time option
BREAKING CHANGE: was introduced in 1.4.0 release but appears unused
2022-03-21 16:28:43 +01:00
Dennis Klein
eb9ddc81cf ci: Run thread sanitizer with clang++ 2022-03-21 16:28:43 +01:00
Dennis Klein
f5891d5ae3 ci: Add thread sanitizer check and bump all checks to Fedora 35 2022-03-21 16:28:43 +01:00
Dennis Klein
3b2ad1f6f4 ci: Add Fedora 35 build 2022-03-21 16:28:43 +01:00
32 changed files with 938 additions and 350 deletions

15
.github/workflows/fair-software.yml vendored Normal file
View File

@@ -0,0 +1,15 @@
name: fair-software
on: push
jobs:
verify:
name: "fair-software"
runs-on: ubuntu-latest
steps:
- uses: fair-software/howfairis-github-action@0.2.1
name: Measure compliance with fair-software.eu recommendations
env:
PYCHARM_HOSTED: "Trick colorama into displaying colored output"
with:
MY_REPO_URL: "https://github.com/${{ github.repository }}"

86
.zenodo.json Normal file
View File

@@ -0,0 +1,86 @@
{
"creators": [
{
"name": "Al-Turany, Mohammad"
},
{
"orcid": "0000-0003-3787-1910",
"name": "Klein, Dennis"
},
{
"name": "Kollegger, Thorsten"
},
{
"name": "Rybalchenko, Alexey"
},
{
"name": "Winckler, Nicolas"
}
],
"contributors": [
{
"type": "Other",
"name": "Aphecetche, Laurent"
},
{
"type": "Other",
"name": "Binet, Sebastien"
},
{
"type": "Other",
"name": "Eulisse, Giulio"
},
{
"type": "Other",
"name": "Karabowicz, Radoslaw"
},
{
"type": "Other",
"name": "Kretz, Matthias"
},
{
"type": "Other",
"name": "Krzewicki, Mikolaj"
},
{
"type": "Other",
"name": "Lebedev, Andrey"
},
{
"type": "Other",
"name": "Mrnjavac, Teo"
},
{
"type": "Other",
"name": "Neskovic, Gvozden"
},
{
"type": "Other",
"name": "Richter, Matthias"
},
{
"type": "Other",
"orcid": "0000-0002-5321-8404",
"name": "Tacke, Christian"
},
{
"type": "Other",
"name": "Uhlig, Florian"
},
{
"type": "Other",
"name": "Wenzel, Sandro"
}
],
"description": "<p>C++ Message Queuing Library and Framework</p>",
"related_identifiers": [
{
"identifier": "https://github.com/FairRootGroup/FairMQ/",
"relation": "isSupplementTo",
"resource_type": "software",
"scheme": "url"
}
],
"title": "FairMQ",
"license": "LGPL-3.0-only"
}

View File

@@ -1,5 +1,5 @@
Al-Turany, Mohammad
Klein, Dennis
Klein, Dennis [https://orcid.org/0000-0003-3787-1910]
Kollegger, Thorsten
Rybalchenko, Alexey
Winckler, Nicolas

View File

@@ -1,5 +1,5 @@
################################################################################
# Copyright (C) 2018-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
# Copyright (C) 2018-2022 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
# #
# This software is distributed under the terms of the #
# GNU Lesser General Public Licence (LGPL) version 3, #
@@ -9,7 +9,7 @@
# Project ######################################################################
cmake_minimum_required(VERSION 3.15 FATAL_ERROR)
cmake_policy(VERSION 3.15...3.20)
cmake_policy(VERSION 3.15...3.22)
list(PREPEND CMAKE_MODULE_PATH ${CMAKE_SOURCE_DIR}/cmake)
include(GitHelper)

View File

@@ -3,11 +3,11 @@ Binet, Sebastien
Eulisse, Giulio
Karabowicz, Radoslaw
Kretz, Matthias <kretz@kde.org>
Krzewicki, Mikolaj
Krzewicki, Mikolaj
Lebedev, Andrey
Mrnjavac, Teo <teo.m@cern.ch>
Neskovic, Gvozden
Richter, Matthias
Tacke, Christian
Tacke, Christian [https://orcid.org/0000-0002-5321-8404]
Uhlig, Florian
Wenzel, Sandro

View File

@@ -4,19 +4,19 @@ Upstream-Contact: Mohammad Al-Turany <m.al-turany@gsi.de>
Source: https://github.com/FairRootGroup/FairMQ
Files: *
Copyright: 2012-2021, GSI Helmholtzzentrum fuer Schwerionenforschung GmbH
Copyright: 2012-2021, [see AUTHORS file]
Copyright: 2012-2021, [see CONTRIBUTORS file]
Copyright: 2012-2022, GSI Helmholtzzentrum fuer Schwerionenforschung GmbH
Copyright: 2012-2022, [see AUTHORS file]
Copyright: 2012-2022, [see CONTRIBUTORS file]
Comment: The copyright of individual contributors is documented in the
Git history.
License: LGPL-3.0-only
Files: extern/googletest
Copyright: 2008-2021, Google Inc.
Copyright: 2008-2022, Google Inc.
License: GOOGLE
Files: extern/asio
Copyright: 2003-2021, Christopher M. Kohlhoff (chris at kohlhoff dot com)
Copyright: 2003-2022, Christopher M. Kohlhoff (chris at kohlhoff dot com)
License: BSL-1.0
Files: extern/PicoSHA2

View File

@@ -72,6 +72,9 @@ endif()
if(ENABLE_SANITIZER_THREAD)
list(APPEND options "-DENABLE_SANITIZER_THREAD=ON")
endif()
if(CMAKE_CXX_COMPILER)
list(APPEND options "-DCMAKE_CXX_COMPILER=${CMAKE_CXX_COMPILER}")
endif()
if(CMAKE_CXX_FLAGS)
list(APPEND options "-DCMAKE_CXX_FLAGS=${CMAKE_CXX_FLAGS}")
endif()

6
Jenkinsfile vendored
View File

@@ -15,9 +15,9 @@ def jobMatrix(String type, List specs) {
ver = spec.ver
} else { // == 'check'
job = "${spec.name}"
selector = 'fedora-34-x86_64'
selector = 'fedora-35-x86_64'
os = 'fedora'
ver = '34'
ver = '35'
}
def label = "${job}"
@@ -96,6 +96,7 @@ pipeline{
[os: 'fedora', ver: '32', arch: 'x86_64', compiler: 'gcc-10', extra: all],
[os: 'fedora', ver: '33', arch: 'x86_64', compiler: 'gcc-10', extra: all],
[os: 'fedora', ver: '34', arch: 'x86_64', compiler: 'gcc-11', extra: all],
[os: 'fedora', ver: '35', arch: 'x86_64', compiler: 'gcc-11', extra: all],
[os: 'macos', ver: '11', arch: 'x86_64', compiler: 'apple-clang-12', extra: '-DHAS_ASIO=ON'],
[os: 'macos', ver: '11', arch: 'arm64', compiler: 'apple-clang-13', extra: '-DHAS_ASIO=ON'],
])
@@ -106,6 +107,7 @@ pipeline{
[name: 'static-analyzers', extra: "${all_debug} -DRUN_STATIC_ANALYSIS=ON"],
[name: '{address,leak,ub}-sanitizers',
extra: "${all_debug} -DENABLE_SANITIZER_ADDRESS=ON -DENABLE_SANITIZER_LEAK=ON -DENABLE_SANITIZER_UNDEFINED_BEHAVIOUR=ON -DCMAKE_CXX_FLAGS='-O1 -fno-omit-frame-pointer'"],
[name: 'thread-sanitizer', extra: "${all_debug} -DENABLE_SANITIZER_THREAD=ON -DCMAKE_CXX_COMPILER=clang++"],
])
parallel(builds + checks)

View File

@@ -1,12 +1,14 @@
<!-- {#mainpage} -->
# FairMQ [![license](https://alfa-ci.gsi.de/shields/badge/license-LGPL--3.0-orange.svg)](COPYRIGHT) [![build status](https://alfa-ci.gsi.de/buildStatus/icon?job=FairRootGroup/FairMQ/dev)](https://alfa-ci.gsi.de/blue/organizations/jenkins/FairRootGroup%2FFairMQ/branches) [![Coverity Badge](https://alfa-ci.gsi.de/shields/coverity/scan/fairrootgroup-fairmq.svg)](https://scan.coverity.com/projects/fairrootgroup-fairmq)
# FairMQ
[![license](https://alfa-ci.gsi.de/shields/badge/license-LGPL--3.0-orange.svg)](COPYRIGHT)
[![DOI](https://zenodo.org/badge/DOI/10.5281/zenodo.1689985.svg)](https://doi.org/10.5281/zenodo.1689985)
[![OpenSSF Best Practices](https://bestpractices.coreinfrastructure.org/projects/6915/badge)](https://bestpractices.coreinfrastructure.org/projects/6915)
[![fair-software.eu](https://img.shields.io/badge/fair--software.eu-%E2%97%8F%20%20%E2%97%8F%20%20%E2%97%8B%20%20%E2%97%8F%20%20%E2%97%8F-yellow)](https://github.com/FairRootGroup/FairMQ/actions/workflows/fair-software.yml)
C++ Message Queuing Library and Framework
| Release | Version | Docs |
| :---: | :--- | :--- |
| `stable` | [![release](https://alfa-ci.gsi.de/shields/github/release/FairRootGroup/FairMQ.svg)](https://github.com/FairRootGroup/FairMQ/releases/latest) | [API](https://fairrootgroup.github.io/FairMQ/latest), [Book](https://github.com/FairRootGroup/FairMQ/blob/master/README.md#documentation) |
| `testing` | [![dev tag](https://alfa-ci.gsi.de/shields/github/tag/FairRootGroup/FairMQ.svg)](https://github.com/FairRootGroup/FairMQ/tags) | [Book](https://github.com/FairRootGroup/FairMQ/blob/dev/README.md#documentation) |
Docs: [Book](https://github.com/FairRootGroup/FairMQ/blob/dev/README.md#documentation)
Find all FairMQ releases [here](https://github.com/FairRootGroup/FairMQ/releases).
@@ -24,11 +26,13 @@ FairMQ provides multiple implementations for its API (so-called "transports",
e.g. `zeromq`, `shmem` and `ofi` (in development)) to cover a variety of use cases
(e.g. inter-thread, inter-process, inter-node communication) and machines (e.g. Ethernet, Infiniband).
In addition to this core functionality FairMQ provides a framework for creating "devices" - actors which
are communicating through message passing. FairMQ does not only allow the user to use different transport but also to mix them; i.e: A Device can communicate using different transport on different channels at the same time. Device execution is modelled as a simple state machine that
shapes the integration points for the user task. Devices also incorporate a plugin system for runtime configuration and control.
Next to the provided devices and plugins (e.g. [DDS](https://github.com/FairRootGroup/DDS))
the user can extend FairMQ by developing his own plugins to integrate his devices with external
configuration and control services.
are communicating through message passing. FairMQ does not only allow the user to use different transport
but also to mix them; i.e: A Device can communicate using different transport on different channels at the
same time. Device execution is modelled as a simple state machine that shapes the integration points for
the user task. Devices also incorporate a plugin system for runtime configuration and control.
Next to the provided [devices](https://github.com/FairRootGroup/FairMQ/tree/master/fairmq/devices) and
[plugins](https://github.com/FairRootGroup/FairMQ/tree/master/fairmq/plugins) the user can extend FairMQ
by developing his own plugins to integrate his devices with external configuration and control services.
FairMQ has been developed in the context of its mother project [FairRoot](https://github.com/FairRootGroup/FairRoot) -
a simulation, reconstruction and analysis framework.
@@ -47,14 +51,15 @@ cmake --build fairmq_build --target install
Please consult the [manpages of your CMake version](https://cmake.org/cmake/help/latest/manual/cmake.1.html) for more options.
If dependencies are not installed in standard system directories, you can hint the installation location via `-DCMAKE_PREFIX_PATH=...` or per dependency via `-D{DEPENDENCY}_ROOT=...`. `{DEPENDENCY}` can be `GTEST`, `BOOST`, `FAIRLOGGER`, `ZEROMQ`, `OFI`, `PMIX`, `ASIO`, `ASIOFI` or `DDS` (`*_ROOT` variables can also be environment variables).
If dependencies are not installed in standard system directories, you can hint the installation location via
`-DCMAKE_PREFIX_PATH=...` or per dependency via `-D{DEPENDENCY}_ROOT=...` (`*_ROOT` variables can also be environment variables).
## Usage
FairMQ ships as a CMake package, so in your `CMakeLists.txt` you can discover it like this:
```cmake
find_package(FairCMakeModules 0.2 REQUIRED)
find_package(FairCMakeModules 1.0 REQUIRED)
include(FairFindPackage2)
find_package2(FairMQ)
find_package2_implicit_dependencies()
@@ -71,14 +76,14 @@ list(PREPEND CMAKE_PREFIX_PATH /path/to/fairmq_install)
Optionally, you can require certain FairMQ package components and a minimum version:
```cmake
find_package(FairMQ 1.4.0 COMPONENTS dds_plugin)
find_package(FairMQ 1.4.50 COMPONENTS ofi_transport)
```
When building FairMQ, CMake will print a summary table of all available package components.
## Dependencies
* [asio](https://github.com/chriskohlhoff/asio) (optionally bundled)
* [asio](https://github.com/chriskohlhoff/asio)
* [asiofi](https://github.com/FairRootGroup/asiofi)
* [Boost](https://www.boost.org/)
* [CMake](https://cmake.org/)
@@ -86,13 +91,14 @@ When building FairMQ, CMake will print a summary table of all available package
* [Doxygen](http://www.doxygen.org/)
* [FairCMakeModules](https://github.com/FairRootGroup/FairCMakeModules) (optionally bundled)
* [FairLogger](https://github.com/FairRootGroup/FairLogger)
* [Flatbuffers](https://google.github.io/flatbuffers/)
* [GTest](https://github.com/google/googletest) (optionally bundled)
* [PMIx](https://pmix.org/)
* [ZeroMQ](http://zeromq.org/)
Which dependencies are required depends on which components are built.
Supported platforms: Linux and MacOS.
Supported platform is Linux. macOS is supported on a best-effort basis.
## CMake options
@@ -102,7 +108,6 @@ On command line:
* `-DBUILD_TESTING=OFF` disables building of tests.
* `-DBUILD_EXAMPLES=OFF` disables building of examples.
* `-DBUILD_OFI_TRANSPORT=ON` enables building of the experimental OFI transport.
* `-DBUILD_DDS_PLUGIN=ON` enables building of the DDS plugin.
* `-DBUILD_PMIX_PLUGIN=ON` enables building of the PMIx plugin.
* `-DBUILD_DOCS=ON` enables building of API docs.
* You can hint non-system installations for dependent packages, see the #installation-from-source section above

View File

@@ -34,9 +34,9 @@ macro(fairmq_summary_components)
endif()
message(STATUS " ${BWhite}ofi_transport${CR} ${ofi_summary}")
if(BUILD_DDS_PLUGIN)
set(dds_summary "${BGreen}YES${CR} (disable with ${BMagenta}-DBUILD_DDS_PLUGIN=OFF${CR})")
set(dds_summary "${BGreen}YES${CR} DEPRECATED (disable with ${BMagenta}-DBUILD_DDS_PLUGIN=OFF${CR})")
else()
set(dds_summary "${BRed} NO${CR} (default, enable with ${BMagenta}-DBUILD_DDS_PLUGIN=ON${CR})")
set(dds_summary "${BRed} NO${CR} DEPRECATED (default, enable with ${BMagenta}-DBUILD_DDS_PLUGIN=ON${CR})")
endif()
message(STATUS " ${BWhite}dds_plugin${CR} ${dds_summary}")
if(BUILD_PMIX_PLUGIN)
@@ -58,15 +58,15 @@ macro(fairmq_summary_components)
endif()
message(STATUS " ${BWhite}docs${CR} ${docs_summary}")
if(BUILD_SDK)
set(sdk_summary "${BGreen}YES${CR} EXPERIMENTAL (disable with ${BMagenta}-DBUILD_SDK=OFF${CR})")
set(sdk_summary "${BGreen}YES${CR} DEPRECATED (disable with ${BMagenta}-DBUILD_SDK=OFF${CR})")
else()
set(sdk_summary "${BRed} NO${CR} EXPERIMENTAL (default, enable with ${BMagenta}-DBUILD_SDK=ON${CR})")
set(sdk_summary "${BRed} NO${CR} DEPRECATED (default, enable with ${BMagenta}-DBUILD_SDK=ON${CR})")
endif()
message(STATUS " ${BWhite}sdk${CR} ${sdk_summary}")
if(BUILD_SDK_COMMANDS)
set(sdk_commands_summary "${BGreen}YES${CR} (disable with ${BMagenta}-DBUILD_SDK_COMMANDS=OFF${CR})")
set(sdk_commands_summary "${BGreen}YES${CR} DEPRECATED (disable with ${BMagenta}-DBUILD_SDK_COMMANDS=OFF${CR})")
else()
set(sdk_commands_summary "${BRed} NO${CR} (default, enable with ${BMagenta}-DBUILD_SDK_COMMANDS=ON${CR})")
set(sdk_commands_summary "${BRed} NO${CR} DEPRECATED (default, enable with ${BMagenta}-DBUILD_SDK_COMMANDS=ON${CR})")
endif()
message(STATUS " ${BWhite}sdk_commands${CR} ${sdk_commands_summary}")
if(BUILD_TIDY_TOOL)
@@ -75,6 +75,21 @@ macro(fairmq_summary_components)
set(sdk_tidy_summary "${BRed} NO${CR} (default, enable with ${BMagenta}-DBUILD_TIDY_TOOL=ON${CR})")
endif()
message(STATUS " ${BWhite}tidy_tool${CR} ${sdk_tidy_summary}")
set(_deprecated)
if(BUILD_SDK)
list(APPEND _deprecated sdk)
endif()
if(BUILD_SDK_COMMANDS)
list(APPEND _deprecated sdk_commands)
endif()
if(BUILD_DDS_PLUGIN)
list(APPEND _deprecated dds_plugin)
endif()
list(JOIN _deprecated ", " _deprecated)
if(_deprecated)
message(DEPRECATION "You have selected to build deprecated components '${_deprecated}' which will be removed in a future release. See https://github.com/FairRootGroup/FairMQ/discussions/392 for more information. Use '-Wno-deprecated' to silence deprecation warnings.")
endif()
endmacro()
macro(fairmq_summary_static_analysis)

View File

@@ -18,7 +18,8 @@
{
"@type": "Person",
"givenName": "Dennis",
"familyName": "Klein"
"familyName": "Klein",
"@id": "https://orcid.org/0000-0003-3787-1910"
},
{
"@type": "Person",
@@ -92,7 +93,8 @@
{
"@type": "Person",
"givenName": "Christian",
"familyName": "Tacke"
"familyName": "Tacke",
"@id": "https://orcid.org/0000-0002-5321-8404"
},
{
"@type": "Person",

View File

@@ -26,7 +26,6 @@ Here is an overview of the device/channel options and when they are applied:
| `transport` | at the end of `fair::mq::State::InitializingDevice` |
| `network-interface` | at the end of `fair::mq::State::InitializingDevice` |
| `init-timeout` | at the end of `fair::mq::State::InitializingDevice` |
| `max-run-time` | at the end of `fair::mq::State::InitializingDevice` |
| `shm-segment-size` | at the end of `fair::mq::State::InitializingDevice` |
| `shm-monitor` | at the end of `fair::mq::State::InitializingDevice` |
| `ofi-size-hint` | at the end of `fair::mq::State::InitializingDevice` |

View File

@@ -19,6 +19,10 @@ SAMPLER+=" --severity debug"
SAMPLER+=" --msg-size $msgSize"
# SAMPLER+=" --rate 10"
SAMPLER+=" --transport $transport"
# SAMPLER+=" --external-region true"
# SAMPLER+=" --shm-no-cleaup true"
# SAMPLER+=" --shm-monitor false"
# SAMPLER+=" --shmid 1"
SAMPLER+=" --channel-config name=data,type=push,method=bind,address=tcp://127.0.0.1:7777,sndKernelSize=212992"
xterm -geometry 120x60+0+0 -hold -e @EX_BIN_DIR@/$SAMPLER &
@@ -26,5 +30,8 @@ SINK="fairmq-ex-region-sink"
SINK+=" --id sink1"
SINK+=" --severity debug"
SINK+=" --transport $transport"
# SINK+=" --shm-no-cleaup true"
# SINK+=" --shm-monitor false"
# SINK+=" --shmid 1"
SINK+=" --channel-config name=data,type=pull,method=connect,address=tcp://127.0.0.1:7777,rcvKernelSize=212992"
xterm -geometry 120x60+750+0 -hold -e @EX_BIN_DIR@/$SINK &

View File

@@ -6,10 +6,9 @@
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <fairmq/shmem/Common.h>
#include <fairmq/shmem/UnmanagedRegion.h>
#include <fairmq/shmem/Segment.h>
#include <fairmq/shmem/Monitor.h>
#include <fairmq/shmem/Segment.h>
#include <fairmq/shmem/UnmanagedRegion.h>
#include <fairmq/tools/Unique.h>
#include <fairlogger/Logger.h>
@@ -17,9 +16,8 @@
#include <boost/algorithm/string.hpp>
#include <boost/program_options.hpp>
#include <csignal>
#include <chrono>
#include <csignal>
#include <map>
#include <string>
#include <thread>
@@ -27,65 +25,117 @@
using namespace std;
using namespace boost::program_options;
namespace
{
volatile sig_atomic_t gStopping = 0;
}
namespace {
volatile sig_atomic_t gStopping = 0;
volatile sig_atomic_t gResetContent = 0;
} // namespace
void signalHandler(int /* signal */)
{
gStopping = 1;
}
void signalHandler(int /* signal */) { gStopping = 1; }
void resetContentHandler(int /* signal */) { gResetContent = 1; }
struct ShmManager
{
ShmManager(uint64_t _shmId, const vector<string>& _segments, const vector<string>& _regions)
ShmManager(uint64_t _shmId, const vector<string>& _segments, const vector<string>& _regions, bool zero = true)
: shmId(fair::mq::shmem::makeShmIdStr(_shmId))
{
LOG(info) << "Starting ShmManager for shmId: " << shmId;
LOG(info) << "Performing full reset...";
FullReset();
LOG(info) << "Done.";
LOG(info) << "Adding managed segments...";
AddSegments(_segments, zero);
LOG(info) << "Done.";
LOG(info) << "Adding unmanaged regions...";
AddRegions(_regions, zero);
LOG(info) << "Done.";
LOG(info) << "Shared memory is ready for use.";
}
void AddSegments(const vector<string>& _segments, bool zero)
{
for (const auto& s : _segments) {
vector<string> segmentConf;
boost::algorithm::split(segmentConf, s, boost::algorithm::is_any_of(","));
if (segmentConf.size() != 2) {
LOG(error) << "incorrect format for --segments. Expecting pairs of <id>,<size>.";
vector<string> conf;
boost::algorithm::split(conf, s, boost::algorithm::is_any_of(","));
if (conf.size() != 3) {
LOG(error) << "incorrect format for --segments. Expecting pairs of <id>,<size><numaid>.";
fair::mq::shmem::Monitor::Cleanup(fair::mq::shmem::ShmId{shmId});
throw runtime_error("incorrect format for --segments. Expecting pairs of <id>,<size>.");
throw runtime_error("incorrect format for --segments. Expecting pairs of <id>,<size>,<numaid>.");
}
uint16_t id = stoi(segmentConf.at(0));
uint64_t size = stoull(segmentConf.at(1));
uint16_t id = stoi(conf.at(0));
uint64_t size = stoull(conf.at(1));
segmentCfgs.emplace_back(fair::mq::shmem::SegmentConfig{id, size, "rbtree_best_fit"});
auto ret = segments.emplace(id, fair::mq::shmem::Segment(shmId, id, size, fair::mq::shmem::rbTreeBestFit));
fair::mq::shmem::Segment& segment = ret.first->second;
LOG(info) << "Created segment " << id << " of size " << segment.GetSize() << ", starting at " << segment.GetData() << ". Locking...";
LOG(info) << "Created segment " << id << " of size " << segment.GetSize()
<< ", starting at " << segment.GetData() << ". Locking...";
segment.Lock();
LOG(info) << "Done.";
LOG(info) << "Zeroing...";
segment.Zero();
LOG(info) << "Done.";
}
for (const auto& r : _regions) {
vector<string> regionConf;
boost::algorithm::split(regionConf, r, boost::algorithm::is_any_of(","));
if (regionConf.size() != 2) {
LOG(error) << "incorrect format for --regions. Expecting pairs of <id>,<size>.";
fair::mq::shmem::Monitor::Cleanup(fair::mq::shmem::ShmId{shmId});
throw runtime_error("incorrect format for --regions. Expecting pairs of <id>,<size>.");
if (zero) {
LOG(info) << "Zeroing...";
segment.Zero();
LOG(info) << "Done.";
}
uint16_t id = stoi(regionConf.at(0));
uint64_t size = stoull(regionConf.at(1));
}
}
void AddRegions(const vector<string>& _regions, bool zero)
{
for (const auto& r : _regions) {
vector<string> conf;
boost::algorithm::split(conf, r, boost::algorithm::is_any_of(","));
if (conf.size() != 3) {
LOG(error) << "incorrect format for --regions. Expecting pairs of <id>,<size>,<numaid>.";
fair::mq::shmem::Monitor::Cleanup(fair::mq::shmem::ShmId{shmId});
throw runtime_error("incorrect format for --regions. Expecting pairs of <id>,<size>,<numaid>.");
}
uint16_t id = stoi(conf.at(0));
uint64_t size = stoull(conf.at(1));
fair::mq::RegionConfig cfg;
cfg.id = id;
cfg.size = size;
regionCfgs.push_back(cfg);
auto ret = regions.emplace(id, make_unique<fair::mq::shmem::UnmanagedRegion>(shmId, id, size));
fair::mq::shmem::UnmanagedRegion& region = *(ret.first->second);
LOG(info) << "Created unamanged region " << id << " of size " << region.GetSize() << ", starting at " << region.GetData() << ". Locking...";
LOG(info) << "Created unamanged region " << id << " of size " << region.GetSize()
<< ", starting at " << region.GetData() << ". Locking...";
region.Lock();
LOG(info) << "Done.";
LOG(info) << "Zeroing...";
region.Zero();
LOG(info) << "Done.";
if (zero) {
LOG(info) << "Zeroing...";
region.Zero();
LOG(info) << "Done.";
}
}
}
bool CheckPresence()
{
for (const auto& sc : segmentCfgs) {
if (!(fair::mq::shmem::Monitor::SegmentIsPresent(fair::mq::shmem::ShmId{shmId}, sc.id))) {
return false;
}
}
for (const auto& rc : regionCfgs) {
if (!(fair::mq::shmem::Monitor::RegionIsPresent(fair::mq::shmem::ShmId{shmId}, rc.id.value()))) {
return false;
}
}
return true;
}
void ResetContent()
{
fair::mq::shmem::Monitor::ResetContent(fair::mq::shmem::ShmId{shmId});
fair::mq::shmem::Monitor::ResetContent(fair::mq::shmem::ShmId{shmId}, segmentCfgs, regionCfgs);
}
void FullReset()
{
segments.clear();
regions.clear();
fair::mq::shmem::Monitor::Cleanup(fair::mq::shmem::ShmId{shmId});
}
~ShmManager()
@@ -97,6 +147,8 @@ struct ShmManager
std::string shmId;
map<uint16_t, fair::mq::shmem::Segment> segments;
map<uint16_t, unique_ptr<fair::mq::shmem::UnmanagedRegion>> regions;
std::vector<fair::mq::shmem::SegmentConfig> segmentCfgs;
std::vector<fair::mq::RegionConfig> regionCfgs;
};
int main(int argc, char** argv)
@@ -105,8 +157,11 @@ int main(int argc, char** argv)
signal(SIGINT, signalHandler);
signal(SIGTERM, signalHandler);
signal(SIGUSR1, resetContentHandler);
try {
bool nozero = false;
bool checkPresence = true;
uint64_t shmId = 0;
vector<string> segments;
vector<string> regions;
@@ -114,8 +169,10 @@ int main(int argc, char** argv)
options_description desc("Options");
desc.add_options()
("shmid", value<uint64_t>(&shmId)->required(), "Shm id")
("segments", value<vector<string>>(&segments)->multitoken()->composing(), "Segments, as <id>,<size> <id>,<size> <id>,<size> ...")
("regions", value<vector<string>>(&regions)->multitoken()->composing(), "Regions, as <id>,<size> <id>,<size> <id>,<size> ...")
("segments", value<vector<string>>(&segments)->multitoken()->composing(), "Segments, as <id>,<size>,<numaid> <id>,<size>,<numaid> <id>,<size>,<numaid> ... (numaid: -2 disabled, -1 interleave, >=0 node)")
("regions", value<vector<string>>(&regions)->multitoken()->composing(), "Regions, as <id>,<size> <id>,<size>,<numaid> <id>,<size>,<numaid> ...")
("nozero", value<bool>(&nozero)->default_value(false)->implicit_value(true), "Do not zero segments after initialization")
("check-presence", value<bool>(&checkPresence)->default_value(true)->implicit_value(true), "Check periodically if configured segments/regions are still present, and cleanup and leave if they are not")
("help,h", "Print help");
variables_map vm;
@@ -128,15 +185,35 @@ int main(int argc, char** argv)
notify(vm);
ShmManager shmManager(shmId, segments, regions);
ShmManager shmManager(shmId, segments, regions, !nozero);
while (!gStopping) {
std::this_thread::sleep_for(std::chrono::milliseconds(50));
std::thread resetContentThread([&shmManager]() {
while (!gStopping) {
std::this_thread::sleep_for(std::chrono::milliseconds(50));
if (gResetContent == 1) {
LOG(info) << "Resetting content for shmId " << shmManager.shmId;
shmManager.ResetContent();
gResetContent = 0;
LOG(info) << "Done resetting content for shmId " << shmManager.shmId;
}
}
});
if (checkPresence) {
while (!gStopping) {
if (shmManager.CheckPresence() == false) {
LOG(error) << "Failed to find segments, exiting.";
gStopping = true;
}
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
}
resetContentThread.join();
LOG(info) << "stopping.";
} catch (exception& e) {
LOG(error) << "Unhandled Exception reached the top of main: " << e.what() << ", application will now exit";
LOG(error) << "Exception reached the top of main: " << e.what() << ", exiting";
return 2;
}

View File

@@ -19,6 +19,7 @@ struct Sampler : fair::mq::Device
{
void InitTask() override
{
fExternalRegion = fConfig->GetProperty<bool>("external-region");
fMsgSize = fConfig->GetProperty<int>("msg-size");
fLinger = fConfig->GetProperty<uint32_t>("region-linger");
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
@@ -34,18 +35,26 @@ struct Sampler : fair::mq::Device
fair::mq::RegionConfig regionCfg;
regionCfg.linger = fLinger; // delay in ms before region destruction to collect outstanding events
regionCfg.lock = true; // mlock region after creation
regionCfg.zero = true; // zero region content after creation
fRegion = fair::mq::UnmanagedRegionPtr(NewUnmanagedRegionFor("data", // region is created using the transport of this channel...
0, // ... and this sub-channel
10000000, // region size
[this](const std::vector<fair::mq::RegionBlock>& blocks) { // callback to be called when message buffers no longer needed by transport
std::lock_guard<std::mutex> lock(fMtx);
fNumUnackedMsgs -= blocks.size();
if (fMaxIterations > 0) {
LOG(info) << "Received " << blocks.size() << " acks";
}
}, regionCfg));
// options for testing with an externally-created -region
if (fExternalRegion) {
regionCfg.id = 1;
regionCfg.removeOnDestruction = false;
}
regionCfg.lock = !fExternalRegion; // mlock region after creation
regionCfg.zero = !fExternalRegion; // zero region content after creation
fRegion = fair::mq::UnmanagedRegionPtr(NewUnmanagedRegionFor(
"data", // region is created using the transport of this channel...
0, // ... and this sub-channel
10000000, // region size
[this](const std::vector<fair::mq::RegionBlock>& blocks) { // callback to be called when message buffers no longer needed by transport
std::lock_guard<std::mutex> lock(fMtx);
fNumUnackedMsgs -= blocks.size();
if (fMaxIterations > 0) {
LOG(info) << "Received " << blocks.size() << " acks";
}
},
regionCfg
));
}
bool ConditionalRun() override
@@ -76,6 +85,8 @@ struct Sampler : fair::mq::Device
void ResetTask() override
{
// give some time for acks to be received
std::this_thread::sleep_for(std::chrono::milliseconds(250));
fRegion.reset();
{
std::lock_guard<std::mutex> lock(fMtx);
@@ -89,6 +100,7 @@ struct Sampler : fair::mq::Device
}
private:
int fExternalRegion = false;
int fMsgSize = 10000;
uint32_t fLinger = 100;
uint64_t fMaxIterations = 0;
@@ -103,7 +115,8 @@ void addCustomOptions(bpo::options_description& options)
options.add_options()
("msg-size", bpo::value<int>()->default_value(1000), "Message size in bytes")
("region-linger", bpo::value<uint32_t>()->default_value(100), "Linger period for regions")
("max-iterations", bpo::value<uint64_t>()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
("max-iterations", bpo::value<uint64_t>()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)")
("external-region", bpo::value<bool>()->default_value(false), "Use region created by another process");
}
std::unique_ptr<fair::mq::Device> getDevice(fair::mq::ProgOptions& /*config*/)

View File

@@ -1,19 +1,19 @@
/********************************************************************************
* Copyright (C) 2012-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* Copyright (C) 2012-2022 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <algorithm> // std::max
#include <algorithm> // std::max, std::any_of
#include <boost/algorithm/string.hpp> // join/split
#include <chrono>
#include <fairmq/Device.h>
#include <fairmq/Tools.h>
#include <future>
#include <iomanip>
#include <list>
#include <memory> // std::make_unique
#include <mutex>
#include <thread>
@@ -27,7 +27,6 @@ constexpr const char* Device::DefaultTransportName;
constexpr mq::Transport Device::DefaultTransportType;
constexpr const char* Device::DefaultNetworkInterface;
constexpr int Device::DefaultInitTimeout;
constexpr uint64_t Device::DefaultMaxRunTime;
constexpr float Device::DefaultRate;
constexpr const char* Device::DefaultSession;
@@ -83,7 +82,6 @@ Device::Device(ProgOptions* config, tools::Version version)
, fMultitransportProceed(false)
, fVersion(version)
, fRate(DefaultRate)
, fMaxRunRuntimeInS(DefaultMaxRunTime)
, fInitializationTimeoutInS(DefaultInitTimeout)
, fTransitioning(false)
{
@@ -215,7 +213,6 @@ void Device::InitWrapper()
Init();
fRate = fConfig->GetProperty<float>("rate", DefaultRate);
fMaxRunRuntimeInS = fConfig->GetProperty<uint64_t>("max-run-time", DefaultMaxRunTime);
fInitializationTimeoutInS = fConfig->GetProperty<int>("init-timeout", DefaultInitTimeout);
try {
@@ -293,7 +290,9 @@ void Device::BindWrapper()
Bind();
ChangeState(Transition::Auto);
if (!NewStatePending()) {
ChangeState(Transition::Auto);
}
}
void Device::ConnectWrapper()
@@ -330,7 +329,9 @@ void Device::ConnectWrapper()
Connect();
ChangeState(Transition::Auto);
if (!NewStatePending()) {
ChangeState(Transition::Auto);
}
}
void Device::AttachChannels(vector<Channel*>& chans)
@@ -430,15 +431,28 @@ void Device::InitTaskWrapper()
{
InitTask();
ChangeState(Transition::Auto);
if (!NewStatePending()) {
ChangeState(Transition::Auto);
}
}
void Device::RunWrapper()
{
LOG(info) << "fair::mq::Device running...";
// start the rate logger thread
future<void> rateLogger = async(launch::async, &Device::LogSocketRates, this);
unique_ptr<thread> rateLogger;
// Check if rate logging thread is needed
const bool rateLogging = any_of(fChannels.cbegin(), fChannels.cend(), [](auto ch) {
return any_of(ch.second.cbegin(), ch.second.cend(), [](auto sub) { return sub.fRateLogging > 0; });
});
if (rateLogging) {
rateLogger = make_unique<thread>(&Device::LogSocketRates, this);
}
tools::CallOnDestruction joinRateLogger([&](){
if (rateLogging && rateLogger->joinable()) { rateLogger->join(); }
});
// notify transports to resume transfers
for (auto& t : fTransports) {
@@ -481,8 +495,6 @@ void Device::RunWrapper()
PostRun();
cod.disable();
rateLogger.get();
}
void Device::HandleSingleChannelInput()
@@ -710,7 +722,6 @@ void Device::LogSocketRates()
chrono::time_point<chrono::high_resolution_clock> t0(chrono::high_resolution_clock::now());
chrono::time_point<chrono::high_resolution_clock> t1;
uint64_t secondsElapsed = 0;
while (!NewStatePending()) {
WaitFor(chrono::seconds(1));
@@ -743,7 +754,7 @@ void Device::LogSocketRates()
bytesOut.at(i) = bytesOutNew.at(i);
msgOut.at(i) = msgOutNew.at(i);
LOG(info) << setw(chanNameLen) << filteredChannelNames.at(i) << ": "
LOG(info) << setw(static_cast<int>(chanNameLen)) << filteredChannelNames.at(i) << ": "
<< "in: " << msgPerSecIn.at(i) << " (" << mbPerSecIn.at(i) << " MB) "
<< "out: " << msgPerSecOut.at(i) << " (" << mbPerSecOut.at(i) << " MB)";
}
@@ -753,9 +764,6 @@ void Device::LogSocketRates()
}
t0 = t1;
if (fMaxRunRuntimeInS > 0 && ++secondsElapsed >= fMaxRunRuntimeInS) {
ChangeState(Transition::Stop);
}
}
}
@@ -770,7 +778,9 @@ void Device::ResetTaskWrapper()
{
ResetTask();
ChangeState(Transition::Auto);
if (!NewStatePending()) {
ChangeState(Transition::Auto);
}
}
void Device::ResetWrapper()
@@ -784,7 +794,9 @@ void Device::ResetWrapper()
fChannels.clear();
fTransports.clear();
fTransportFactory.reset();
ChangeState(Transition::Auto);
if (!NewStatePending()) {
ChangeState(Transition::Auto);
}
}
Device::~Device()

View File

@@ -1,5 +1,5 @@
/********************************************************************************
* Copyright (C) 2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* Copyright (C) 2021-2022 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
@@ -565,7 +565,6 @@ class Device
static constexpr mq::Transport DefaultTransportType = mq::Transport::ZMQ;
static constexpr const char* DefaultNetworkInterface = "default";
static constexpr int DefaultInitTimeout = 120;
static constexpr uint64_t DefaultMaxRunTime = 0;
static constexpr float DefaultRate = 0.;
static constexpr const char* DefaultSession = "default";

View File

@@ -187,9 +187,7 @@ struct Machine_ : public state_machine_def<Machine_>
{
unique_lock<mutex> lock(fStateMtx);
while (!fNewStatePending) {
fNewStatePendingCV.wait_for(lock, chrono::milliseconds(100));
}
fNewStatePendingCV.wait(lock, [this]{ return fNewStatePending.load(); });
LOG(state) << fState << " ---> " << fNewState;
fState = static_cast<State>(fNewState);

View File

@@ -1,5 +1,5 @@
/********************************************************************************
* Copyright (C) 2019 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* Copyright (C) 2019-2022 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
@@ -9,16 +9,14 @@
#ifndef FAIRMQSTATEQUEUE_H_
#define FAIRMQSTATEQUEUE_H_
#include <fairmq/States.h>
#include <queue>
#include <mutex>
#include <chrono>
#include <utility> // pair
#include <condition_variable>
#include <fairmq/States.h>
#include <mutex>
#include <queue>
#include <utility> // pair
namespace fair::mq
{
namespace fair::mq {
class StateQueue
{
@@ -33,41 +31,47 @@ class StateQueue
fair::mq::State WaitForNext()
{
std::unique_lock<std::mutex> lock(fMtx);
while (fStates.empty()) {
fCV.wait_for(lock, std::chrono::milliseconds(50));
}
fair::mq::State state = fStates.front();
if (state == fair::mq::State::Error) {
throw DeviceErrorState("Controlled device transitioned to error state.");
}
fStates.pop();
return state;
fCV.wait(lock, [this] { return Predicate(); });
return PopFrontUnsafe();
}
template<typename Rep, typename Period>
std::pair<bool, fair::mq::State> WaitForNext(std::chrono::duration<Rep, Period> const& duration)
template<typename Timeout>
std::pair<bool, fair::mq::State> WaitForNext(Timeout&& duration)
{
std::unique_lock<std::mutex> lock(fMtx);
fCV.wait_for(lock, duration);
if (fStates.empty()) {
return { false, fair::mq::State::Ok };
}
fair::mq::State state = fStates.front();
if (state == fair::mq::State::Error) {
throw DeviceErrorState("Controlled device transitioned to error state.");
}
fStates.pop();
return { true, state };
fCV.wait_for(lock, std::forward<Timeout>(duration), [this] { return Predicate(); });
return ReturnPairUnsafe();
}
void WaitForState(fair::mq::State state) { while (WaitForNext() != state) {} }
template<typename CustomPredicate>
std::pair<bool, fair::mq::State> WaitForNextOrCustom(CustomPredicate&& customPredicate)
{
std::unique_lock<std::mutex> lock(fMtx);
fCV.wait(lock, [this, cp = std::move(customPredicate)] { return Predicate() || cp(); });
return ReturnPairUnsafe();
}
template<typename CustomPredicate>
std::pair<bool, fair::mq::State> WaitForCustom(CustomPredicate&& customPredicate)
{
std::unique_lock<std::mutex> lock(fMtx);
fCV.wait(lock, [cp = std::move(customPredicate)] { return cp(); });
return ReturnPairUnsafe();
}
void WaitForState(fair::mq::State state)
{
while (WaitForNext() != state) {}
}
template<typename CustomPredicate>
void WaitForStateOrCustom(fair::mq::State state, CustomPredicate customPredicate)
{
auto next = WaitForNextOrCustom(customPredicate);
while (!customPredicate() && (next.first && next.second != state)) {
next = WaitForNextOrCustom(customPredicate);
}
}
void Push(fair::mq::State state)
{
@@ -75,7 +79,35 @@ class StateQueue
std::lock_guard<std::mutex> lock(fMtx);
fStates.push(state);
}
fCV.notify_all();
fCV.notify_one();
}
template<typename CustomSignal>
void Push(fair::mq::State state, CustomSignal&& signal)
{
{
std::lock_guard<std::mutex> lock(fMtx);
fStates.push(state);
signal();
}
fCV.notify_one();
}
template<typename CustomSignal>
void Notify(CustomSignal&& signal)
{
{
std::lock_guard<std::mutex> lock(fMtx);
signal();
}
fCV.notify_one();
}
template<typename CustomSignal>
void Locked(CustomSignal&& signal)
{
std::lock_guard<std::mutex> lock(fMtx);
signal();
}
void Clear()
@@ -88,8 +120,29 @@ class StateQueue
std::queue<fair::mq::State> fStates;
std::mutex fMtx;
std::condition_variable fCV;
// must be called under locked fMtx
fair::mq::State PopFrontUnsafe()
{
fair::mq::State state = fStates.front();
if (state == fair::mq::State::Error) {
throw DeviceErrorState("Controlled device transitioned to error state.");
}
fStates.pop();
return state;
}
// must be called under locked fMtx
std::pair<bool, fair::mq::State> ReturnPairUnsafe()
{
auto const pred = Predicate();
return {pred, pred ? PopFrontUnsafe() : fair::mq::State::Ok};
}
// must be called under locked fMtx
bool Predicate() { return !fStates.empty(); }
};
} // namespace fair::mq
} // namespace fair::mq
#endif /* FAIRMQSTATEQUEUE_H_ */

View File

@@ -1,5 +1,5 @@
/********************************************************************************
* Copyright (C) 2018-2020 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* Copyright (C) 2018-2022 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
@@ -21,7 +21,7 @@
#define FAIRMQ_GIT_DATE "@PROJECT_GIT_DATE@"
#define FAIRMQ_REPO_URL "https://github.com/FairRootGroup/FairMQ"
#define FAIRMQ_LICENSE "LGPL-3.0"
#define FAIRMQ_COPYRIGHT "2012-2021 GSI"
#define FAIRMQ_COPYRIGHT "2012-2022 GSI"
#define FAIRMQ_BUILD_TYPE "@CMAKE_BUILD_TYPE@"
#endif // FAIR_MQ_VERSION_H

View File

@@ -1,5 +1,5 @@
/********************************************************************************
* Copyright (C) 2017-2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* Copyright (C) 2017-2022 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
@@ -40,7 +40,7 @@ Config::Config(const string& name, Plugin::Version version, const string& mainta
LOG(debug) << "channel-config: Parsing channel configuration";
SetProperties(SuboptParser(GetProperty<vector<string>>("channel-config"), idForParser));
} else {
LOG(warn) << "fair::mq::plugins::Config: no channels configuration provided via --mq-config or --channel-config";
LOG(info) << "fair::mq::plugins::Config: no channels configuration provided via --mq-config or --channel-config";
}
} catch (exception& e) {
LOG(error) << e.what();
@@ -62,7 +62,6 @@ Plugin::ProgOptions ConfigPluginProgramOptions()
("transport", po::value<string >()->default_value("zeromq"), "Transport ('zeromq'/'shmem').")
("network-interface", po::value<string >()->default_value("default"), "Network interface to bind on (e.g. eth0, ib0..., default will try to detect the interface of the default route).")
("init-timeout", po::value<int >()->default_value(120), "Timeout for the initialization in seconds (when expecting dynamic initialization).")
("max-run-time", po::value<uint64_t >()->default_value(0), "Maximum runtime for the Running state handler, after which state will change to Ready (in seconds, 0 for no limit).")
("print-channels", po::value<bool >()->implicit_value(true), "Print registered channel endpoints in a machine-readable format (<channel name>:<min num subchannels>:<max num subchannels>)")
("shm-segment-size", po::value<size_t >()->default_value(2ULL << 30), "Shared memory: size of the shared memory segment (in bytes).")
("shm-allocation", po::value<string >()->default_value("rbtree_best_fit"), "Shared memory allocation algorithm: rbtree_best_fit/simple_seq_fit.")

View File

@@ -1,5 +1,5 @@
/********************************************************************************
* Copyright (C) 2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* Copyright (C) 2017-2022 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
@@ -36,7 +36,7 @@ REGISTER_FAIRMQ_PLUGIN(
config, // Plugin name
(Plugin::Version{FAIRMQ_VERSION_MAJOR, FAIRMQ_VERSION_MINOR, FAIRMQ_VERSION_PATCH}),
"FairRootGroup <fairroot@gsi.de>",
"https://github.com/FairRootGroup/FairRoot",
"https://github.com/FairRootGroup/FairMQ",
ConfigPluginProgramOptions
)

View File

@@ -1,5 +1,5 @@
/********************************************************************************
* Copyright (C) 2017-2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* Copyright (C) 2017-2022 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
@@ -56,11 +56,11 @@ Control::Control(const string& name, Plugin::Version version, const string& main
SubscribeToDeviceStateChange([&](DeviceState newState) {
LOG(trace) << "control plugin notified on new state: " << newState;
fStateQueue.Push(newState);
if (newState == DeviceState::Error) {
fPluginShutdownRequested = true;
fDeviceShutdownRequested = true;
fStateQueue.Push(newState, [this]{ fDeviceShutdownRequested = true; });
} else {
fStateQueue.Push(newState);
}
});
@@ -99,18 +99,42 @@ Control::Control(const string& name, Plugin::Version version, const string& main
auto Control::RunStartupSequence() -> void
{
ChangeDeviceState(DeviceStateTransition::InitDevice);
while (fStateQueue.WaitForNext() != DeviceState::InitializingDevice) {}
ChangeDeviceState(DeviceStateTransition::CompleteInit);
while (fStateQueue.WaitForNext() != DeviceState::Initialized) {}
ChangeDeviceState(DeviceStateTransition::Bind);
while (fStateQueue.WaitForNext() != DeviceState::Bound) {}
ChangeDeviceState(DeviceStateTransition::Connect);
while (fStateQueue.WaitForNext() != DeviceState::DeviceReady) {}
ChangeDeviceState(DeviceStateTransition::InitTask);
while (fStateQueue.WaitForNext() != DeviceState::Ready) {}
ChangeDeviceState(DeviceStateTransition::Run);
while (fStateQueue.WaitForNext() != DeviceState::Running) {}
using Transition = DeviceStateTransition;
using State = DeviceState;
auto shutdownRequested = [this]{ return fDeviceShutdownRequested.load(); };
ChangeDeviceState(Transition::InitDevice);
fStateQueue.WaitForStateOrCustom(State::InitializingDevice, shutdownRequested);
if (fDeviceShutdownRequested) { return; /* --> shutdown sequence */ }
ChangeDeviceState(Transition::CompleteInit);
fStateQueue.WaitForStateOrCustom(State::Initialized, shutdownRequested);
if (fDeviceShutdownRequested) { return; /* --> shutdown sequence */ }
ChangeDeviceState(Transition::Bind);
fStateQueue.WaitForStateOrCustom(State::Binding, shutdownRequested);
if (fDeviceShutdownRequested) { return; /* --> shutdown sequence */ }
fStateQueue.WaitForStateOrCustom(State::Bound, shutdownRequested);
if (fDeviceShutdownRequested) { return; /* --> shutdown sequence */ }
ChangeDeviceState(Transition::Connect);
fStateQueue.WaitForStateOrCustom(State::Connecting, shutdownRequested);
if (fDeviceShutdownRequested) { return; /* --> shutdown sequence */ }
fStateQueue.WaitForStateOrCustom(State::DeviceReady, shutdownRequested);
if (fDeviceShutdownRequested) { return; /* --> shutdown sequence */ }
ChangeDeviceState(Transition::InitTask);
fStateQueue.WaitForStateOrCustom(State::InitializingTask, shutdownRequested);
if (fDeviceShutdownRequested) { return; /* --> shutdown sequence */ }
fStateQueue.WaitForStateOrCustom(State::Ready, shutdownRequested);
if (fDeviceShutdownRequested) { return; /* --> shutdown sequence */ }
ChangeDeviceState(Transition::Run);
fStateQueue.WaitForStateOrCustom(State::Running, shutdownRequested);
if (fDeviceShutdownRequested) { return; /* --> shutdown sequence */ }
}
auto ControlPluginProgramOptions() -> Plugin::ProgOptions
@@ -123,10 +147,8 @@ auto ControlPluginProgramOptions() -> Plugin::ProgOptions
return pluginOptions;
}
auto Control::InteractiveMode() -> void
try {
RunStartupSequence();
auto Control::RunREPL() -> void
{
char input = 0; // hold the user console input
pollfd cinfd[1];
cinfd[0].fd = fileno(stdin);
@@ -161,7 +183,7 @@ try {
case 'i':
cout << "\n --> [i] init device\n\n" << flush;
if (ChangeDeviceState(DeviceStateTransition::InitDevice)) {
while (fStateQueue.WaitForNext() != DeviceState::InitializingDevice) {}
fStateQueue.WaitForState(DeviceState::InitializingDevice);
ChangeDeviceState(DeviceStateTransition::CompleteInit);
}
break;
@@ -243,7 +265,19 @@ try {
}
}
RunShutdownSequence();
}
auto Control::InteractiveMode() -> void
try {
RunStartupSequence();
if(!fDeviceShutdownRequested) {
RunREPL();
}
if(!fDeviceShutdownRequested) {
RunShutdownSequence();
}
} catch (PluginServices::DeviceControlError& e) {
// If we are here, it means another plugin has taken control. That's fine, just print the exception message and do nothing else.
LOG(debug) << e.what();
@@ -366,16 +400,13 @@ auto Control::StaticMode() -> void
try {
RunStartupSequence();
{
// Wait for next state, which is DeviceState::Ready,
// or for device shutdown request (Ctrl-C)
pair<bool, fair::mq::State> result;
do {
result = fStateQueue.WaitForNext(chrono::milliseconds(50));
} while (result.first == false && !fDeviceShutdownRequested);
}
// Wait for next state, which is DeviceState::Ready,
// or for device shutdown request (Ctrl-C)
fStateQueue.WaitForNextOrCustom([this]{ return fDeviceShutdownRequested.load(); });
RunShutdownSequence();
if(!fDeviceShutdownRequested) {
RunShutdownSequence();
}
} catch (PluginServices::DeviceControlError& e) {
// If we are here, it means another plugin has taken control. That's fine, just print the exception message and do nothing else.
LOG(debug) << e.what();
@@ -387,16 +418,12 @@ auto Control::GUIMode() -> void
try {
RunStartupSequence();
{
// Wait for next state, which is DeviceState::Ready,
// or for device shutdown request (Ctrl-C)
pair<bool, fair::mq::State> result;
do {
result = fStateQueue.WaitForNext(chrono::milliseconds(50));
} while (!fDeviceShutdownRequested);
}
// Wait for device shutdown request (Ctrl-C)
fStateQueue.WaitForCustom([this]{ return fDeviceShutdownRequested.load(); });
RunShutdownSequence();
if(!fDeviceShutdownRequested) {
RunShutdownSequence();
}
} catch (PluginServices::DeviceControlError& e) {
// If we are here, it means another plugin has taken control. That's fine, just print the
// exception message and do nothing else.
@@ -416,10 +443,10 @@ auto Control::SignalHandler() -> void
LOG(info) << "Waiting for graceful device shutdown. Hit Ctrl-C again to abort immediately.";
// Signal and wait for controller thread, if we are controller
fDeviceShutdownRequested = true;
fStateQueue.Notify([this] { fDeviceShutdownRequested = true; });
{
unique_lock<mutex> lock(fControllerMutex);
if (fControllerThread.joinable()) fControllerThread.join();
if (fControllerThread.joinable()) { fControllerThread.join(); }
}
if (!fDeviceHasShutdown) {
@@ -462,6 +489,12 @@ auto Control::RunShutdownSequence() -> void
case DeviceState::Running:
ChangeDeviceState(DeviceStateTransition::Stop);
break;
case DeviceState::Binding:
case DeviceState::Connecting:
case DeviceState::InitializingTask:
case DeviceState::ResettingTask:
case DeviceState::ResettingDevice:
ChangeDeviceState(DeviceStateTransition::Auto);
default:
// LOG(debug) << "Controller ignoring event: " << nextState;
break;
@@ -481,9 +514,9 @@ Control::~Control()
{
unique_lock<mutex> lock(fControllerMutex);
if (fControllerThread.joinable()) fControllerThread.join();
if (fControllerThread.joinable()) { fControllerThread.join(); }
}
if (fSignalHandlerThread.joinable()) fSignalHandlerThread.join();
if (fSignalHandlerThread.joinable()) { fSignalHandlerThread.join(); }
UnsubscribeFromDeviceStateChange();
}

View File

@@ -1,5 +1,5 @@
/********************************************************************************
* Copyright (C) 2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* Copyright (C) 2017-2022 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
@@ -46,6 +46,7 @@ class Control : public Plugin
auto GUIMode() -> void;
auto SignalHandler() -> void;
auto RunShutdownSequence() -> void;
auto RunREPL() -> void;
auto RunStartupSequence() -> void;
std::thread fControllerThread;

View File

@@ -207,22 +207,22 @@ class Manager
fEventCounter = fManagementSegment.find<EventCounter>(unique_instance).first;
if (fEventCounter) {
LOG(debug) << "event counter found: " << fEventCounter->fCount;
LOG(trace) << "event counter found: " << fEventCounter->fCount;
} else {
LOG(debug) << "no event counter found, creating one and initializing with 0";
LOG(trace) << "no event counter found, creating one and initializing with 0";
fEventCounter = fManagementSegment.construct<EventCounter>(unique_instance)(0);
LOG(debug) << "initialized event counter with: " << fEventCounter->fCount;
LOG(trace) << "initialized event counter with: " << fEventCounter->fCount;
}
fDeviceCounter = fManagementSegment.find<DeviceCounter>(unique_instance).first;
if (fDeviceCounter) {
LOG(debug) << "device counter found, with value of " << fDeviceCounter->fCount << ". incrementing.";
LOG(trace) << "device counter found, with value of " << fDeviceCounter->fCount << ". incrementing.";
(fDeviceCounter->fCount)++;
LOG(debug) << "incremented device counter, now: " << fDeviceCounter->fCount;
LOG(trace) << "incremented device counter, now: " << fDeviceCounter->fCount;
} else {
LOG(debug) << "no device counter found, creating one and initializing with 1";
LOG(trace) << "no device counter found, creating one and initializing with 1";
fDeviceCounter = fManagementSegment.construct<DeviceCounter>(unique_instance)(1);
LOG(debug) << "initialized device counter with: " << fDeviceCounter->fCount;
LOG(trace) << "initialized device counter with: " << fDeviceCounter->fCount;
}
fShmSegments = fManagementSegment.find_or_construct<Uint16SegmentInfoHashMap>(unique_instance)(fShmVoidAlloc);
@@ -265,10 +265,10 @@ class Manager
}
}
}
LOG(debug) << "Created/opened shared memory segment '" << "fmq_" << fShmId << "_m_" << fSegmentId << "'."
<< " Size: " << boost::apply_visitor(SegmentSize(), fSegments.at(fSegmentId)) << " bytes."
<< " Available: " << boost::apply_visitor(SegmentFreeMemory(), fSegments.at(fSegmentId)) << " bytes."
<< " Allocation algorithm: " << allocationAlgorithm;
LOG(debug) << (createdSegment ? "Created" : "Opened") << " managed shared memory segment " << "fmq_" << fShmId << "_m_" << fSegmentId
<< ". Size: " << boost::apply_visitor(SegmentSize(), fSegments.at(fSegmentId)) << " bytes."
<< " Available: " << boost::apply_visitor(SegmentFreeMemory(), fSegments.at(fSegmentId)) << " bytes."
<< " Allocation algorithm: " << allocationAlgorithm;
} catch (interprocess_exception& bie) {
LOG(error) << "Failed to create/open shared memory segment '" << "fmq_" << fShmId << "_m_" << fSegmentId << "': " << bie.what();
throw TransportError(tools::ToString("Failed to create/open shared memory segment '", "fmq_", fShmId, "_m_", fSegmentId, "': ", bie.what()));
@@ -395,20 +395,26 @@ class Manager
const uint16_t id = cfg.id.value();
std::lock_guard<std::mutex> lock(fLocalRegionsMtx);
UnmanagedRegion* region = nullptr;
bool newRegionCreated = false;
{
std::lock_guard<std::mutex> lock(fLocalRegionsMtx);
auto res = fRegions.emplace(id, std::make_unique<UnmanagedRegion>(fShmId, size, false, cfg));
newRegionCreated = res.second;
auto it = fRegions.find(id);
if (it != fRegions.end()) {
region = it->second.get();
if (region->fControlling) {
LOG(error) << "Unmanaged Region with id " << id << " already exists. Only unique IDs per session are allowed.";
throw TransportError(tools::ToString("Unmanaged Region with id ", id, " already exists. Only unique IDs per session are allowed."));
}
LOG(debug) << "Unmanaged region (view) already present, promoting to controller";
region->BecomeController(cfg);
} else {
auto res = fRegions.emplace(id, std::make_unique<UnmanagedRegion>(fShmId, size, true, cfg));
region = res.first->second.get();
}
// LOG(debug) << "Created region with id '" << id << "', path: '" << cfg.path << "', flags: '" << cfg.creationFlags << "'";
if (!newRegionCreated) {
region->fRemote = false; // TODO: this should be more clear, refactor it.
}
// start ack receiver only if a callback has been provided.
if (callback || bulkCallback) {
region->SetCallbacks(callback, bulkCallback);
@@ -429,7 +435,7 @@ class Manager
}
}
UnmanagedRegion* GetRegion(uint16_t id)
UnmanagedRegion* GetRegionFromCache(uint16_t id)
{
// NOTE: gcc optimizations. Prevent loading tls addresses many times in the fast path
const auto &lTlCache = fTlRegionCache;
@@ -443,41 +449,39 @@ class Manager
}
}
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> shmLock(*fShmMtx);
// slow path: check invalidation
if (lTlCacheGen != fRegionsGen) {
fTlRegionCache.fRegionsTLCache.clear();
}
std::lock_guard<std::mutex> lock(fLocalRegionsMtx);
auto* lRegion = GetRegionUnsafe(id, shmLock);
auto* lRegion = GetRegion(id);
fTlRegionCache.fRegionsTLCache.emplace_back(std::make_tuple(lRegion, id, fShmId64));
fTlRegionCache.fRegionsTLCacheGen = fRegionsGen;
return lRegion;
}
UnmanagedRegion* GetRegionUnsafe(uint16_t id, boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex>& lockedShmLock)
UnmanagedRegion* GetRegion(uint16_t id)
{
// remote region could actually be a local one if a message originates from this device (has been sent out and returned)
std::lock_guard<std::mutex> lock(fLocalRegionsMtx);
auto it = fRegions.find(id);
if (it != fRegions.end()) {
return it->second.get();
} else {
try {
// get region info
RegionInfo regionInfo = fShmRegions->at(id);
// safe to unlock now - no shm container accessed after this
lockedShmLock.unlock();
RegionConfig cfg;
cfg.id = id;
cfg.creationFlags = regionInfo.fCreationFlags;
cfg.path = regionInfo.fPath.c_str();
// get region info
{
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> shmLock(*fShmMtx);
RegionInfo regionInfo = fShmRegions->at(id);
cfg.id = id;
cfg.creationFlags = regionInfo.fCreationFlags;
cfg.path = regionInfo.fPath.c_str();
}
// LOG(debug) << "Located remote region with id '" << id << "', path: '" << cfg.path << "', flags: '" << cfg.creationFlags << "'";
auto r = fRegions.emplace(id, std::make_unique<UnmanagedRegion>(fShmId, 0, true, std::move(cfg)));
auto r = fRegions.emplace(id, std::make_unique<UnmanagedRegion>(fShmId, 0, false, std::move(cfg)));
r.first->second->InitializeQueues();
r.first->second->StartAckSender();
lockedShmLock.lock();
return r.first->second.get();
} catch (std::out_of_range& oor) {
LOG(error) << "Could not get remote region with id '" << id << "'. Does the region creator run with the same session id?";
@@ -493,10 +497,10 @@ class Manager
void RemoveRegion(uint16_t id)
{
try {
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> shmLock(*fShmMtx);
std::lock_guard<std::mutex> lock(fLocalRegionsMtx);
fRegions.at(id)->StopAcks();
{
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> shmLock(*fShmMtx);
if (fRegions.at(id)->RemoveOnDestruction()) {
fShmRegions->at(id).fDestroyed = true;
(fEventCounter->fCount)++;
@@ -512,44 +516,73 @@ class Manager
std::vector<fair::mq::RegionInfo> GetRegionInfo()
{
std::vector<fair::mq::RegionInfo> result;
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> shmLock(*fShmMtx);
std::map<uint64_t, RegionConfig> regionCfgs;
for (const auto& e : *fShmSegments) {
// make sure any segments in the session are found
GetSegment(e.first);
try {
{
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> shmLock(*fShmMtx);
for (const auto& [segmentId, segmentInfo] : *fShmSegments) {
// make sure any segments in the session are found
GetSegment(segmentId);
try {
fair::mq::RegionInfo info;
info.managed = true;
info.id = segmentId;
info.event = RegionEvent::created;
info.ptr = boost::apply_visitor(SegmentAddress(), fSegments.at(segmentId));
info.size = boost::apply_visitor(SegmentSize(), fSegments.at(segmentId));
result.push_back(info);
} catch (const std::out_of_range& oor) {
LOG(error) << "could not find segment with id " << segmentId;
LOG(error) << oor.what();
}
}
for (const auto& [regionId, regionInfo] : *fShmRegions) {
fair::mq::RegionInfo info;
info.managed = true;
info.id = e.first;
info.event = RegionEvent::created;
info.ptr = boost::apply_visitor(SegmentAddress(), fSegments.at(e.first));
info.size = boost::apply_visitor(SegmentSize(), fSegments.at(e.first));
info.managed = false;
info.id = regionId;
info.flags = regionInfo.fUserFlags;
info.event = regionInfo.fDestroyed ? RegionEvent::destroyed : RegionEvent::created;
if (info.event == RegionEvent::created) {
RegionConfig cfg;
cfg.id = info.id;
cfg.creationFlags = regionInfo.fCreationFlags;
cfg.path = regionInfo.fPath.c_str();
regionCfgs.emplace(info.id, cfg);
// fill the ptr+size info after shmLock is released, to avoid constructing local region under it
} else {
info.ptr = nullptr;
info.size = 0;
}
result.push_back(info);
} catch (const std::out_of_range& oor) {
LOG(error) << "could not find segment with id " << e.first;
LOG(error) << oor.what();
}
}
for (const auto& e : *fShmRegions) {
fair::mq::RegionInfo info;
info.managed = false;
info.id = e.first;
info.flags = e.second.fUserFlags;
info.event = e.second.fDestroyed ? RegionEvent::destroyed : RegionEvent::created;
if (info.event == RegionEvent::created) {
auto region = GetRegionUnsafe(info.id, shmLock);
if (region) {
// do another iteration outside of shm lock, to fill ptr+size of unmanaged regions
for (auto& info : result) {
if (!info.managed && info.event == RegionEvent::created) {
auto cfgIt = regionCfgs.find(info.id);
if (cfgIt != regionCfgs.end()) {
UnmanagedRegion* region = nullptr;
std::lock_guard<std::mutex> lock(fLocalRegionsMtx);
auto it = fRegions.find(info.id);
if (it != fRegions.end()) {
region = it->second.get();
} else {
auto r = fRegions.emplace(cfgIt->first, std::make_unique<UnmanagedRegion>(fShmId, 0, false, cfgIt->second));
region = r.first->second.get();
region->InitializeQueues();
region->StartAckSender();
}
info.ptr = region->GetData();
info.size = region->GetSize();
} else {
throw std::runtime_error(tools::ToString("GetRegionInfo() could not get region with id '", info.id, "'"));
info.ptr = nullptr;
info.size = 0;
}
} else {
info.ptr = nullptr;
info.size = 0;
}
result.push_back(info);
}
return result;

View File

@@ -195,7 +195,7 @@ class Message final : public fair::mq::Message
fLocalPtr = nullptr;
}
} else {
fRegionPtr = fManager.GetRegion(fMeta.fRegionId);
fRegionPtr = fManager.GetRegionFromCache(fMeta.fRegionId);
if (fRegionPtr) {
fLocalPtr = reinterpret_cast<char*>(fRegionPtr->GetData()) + fMeta.fHandle;
} else {
@@ -365,7 +365,7 @@ class Message final : public fair::mq::Message
void ReleaseUnmanagedRegionBlock()
{
if (!fRegionPtr) {
fRegionPtr = fManager.GetRegion(fMeta.fRegionId);
fRegionPtr = fManager.GetRegionFromCache(fMeta.fRegionId);
}
if (fRegionPtr) {

View File

@@ -23,6 +23,7 @@
#include <boost/interprocess/ipc/message_queue.hpp>
#include <csignal>
#include <cstdio>
#include <iostream>
#include <iomanip>
#include <chrono>
@@ -533,6 +534,88 @@ unsigned long Monitor::GetFreeMemory(const SessionId& sessionId, uint16_t segmen
return GetFreeMemory(shmId, segmentId);
}
bool Monitor::SegmentIsPresent(const ShmId& shmId, uint16_t segmentId)
{
using namespace boost::interprocess;
try {
bipc::managed_shared_memory managementSegment(bipc::open_read_only, std::string("fmq_" + shmId.shmId + "_mng").c_str());
Uint16SegmentInfoHashMap* shmSegments = managementSegment.find<Uint16SegmentInfoHashMap>(unique_instance).first;
if (!shmSegments) {
LOG(error) << "Found management segment, but could not locate segment info";
return false;
}
auto it = shmSegments->find(segmentId);
if (it != shmSegments->end()) {
try {
if (it->second.fAllocationAlgorithm == AllocationAlgorithm::rbtree_best_fit) {
RBTreeBestFitSegment segment(open_read_only, std::string("fmq_" + shmId.shmId + "_m_" + std::to_string(segmentId)).c_str());
} else {
SimpleSeqFitSegment segment(open_read_only, std::string("fmq_" + shmId.shmId + "_m_" + std::to_string(segmentId)).c_str());
}
} catch (bie&) {
LOG(error) << "Could not find segment with id '" << segmentId << "' for shmId '" << shmId.shmId << "'";
return false;
}
} else {
LOG(error) << "Could not find segment info for segment id '" << segmentId << "' for shmId '" << shmId.shmId << "'";
return false;
}
} catch (bie&) {
LOG(error) << "Could not find management segment for shmid '" << shmId.shmId << "'";
return false;
}
return true;
}
bool Monitor::SegmentIsPresent(const SessionId& sessionId, uint16_t segmentId)
{
ShmId shmId{makeShmIdStr(sessionId.sessionId)};
return SegmentIsPresent(shmId, segmentId);
}
bool Monitor::RegionIsPresent(const ShmId& shmId, uint16_t regionId)
{
using namespace boost::interprocess;
try {
bipc::managed_shared_memory managementSegment(bipc::open_read_only, std::string("fmq_" + shmId.shmId + "_mng").c_str());
Uint16RegionInfoHashMap* shmRegions = managementSegment.find<Uint16RegionInfoHashMap>(bipc::unique_instance).first;
if (!shmRegions) {
LOG(error) << "Found management segment, but could not locate region info";
return false;
}
std::string regionFileName("fmq_" + shmId.shmId + "_rg_" + to_string(regionId));
auto it = shmRegions->find(regionId);
if (it != shmRegions->end()) {
try {
if (it->second.fPath.empty()) {
shared_memory_object object(open_only, regionFileName.c_str(), read_only);
}
} catch (bie&) {
LOG(error) << "Could not find region with id '" << regionId << "' for shmId '" << shmId.shmId << "'";
return false;
}
} else {
LOG(error) << "Could not find region info for region id '" << regionId << "' for shmId '" << shmId.shmId << "'";
return false;
}
} catch (bie&) {
LOG(error) << "Could not find management segment for shmid '" << shmId.shmId << "'";
return false;
}
return true;
}
bool Monitor::RegionIsPresent(const SessionId& sessionId, uint16_t regionId)
{
ShmId shmId{makeShmIdStr(sessionId.sessionId)};
return RegionIsPresent(shmId, regionId);
}
void Monitor::PrintHelp()
{
LOG(info) << "controls: [x] close memory, "
@@ -574,7 +657,7 @@ std::vector<std::pair<std::string, bool>> Monitor::Cleanup(const ShmId& shmIdT,
string managementSegmentName("fmq_" + shmId + "_mng");
try {
bipc::managed_shared_memory managementSegment(bipc::open_only, managementSegmentName.c_str());
bipc::managed_shared_memory managementSegment(bipc::open_read_only, managementSegmentName.c_str());
Uint16RegionInfoHashMap* shmRegions = managementSegment.find<Uint16RegionInfoHashMap>(bipc::unique_instance).first;
if (shmRegions) {
@@ -587,7 +670,7 @@ std::vector<std::pair<std::string, bool>> Monitor::Cleanup(const ShmId& shmIdT,
string path = info.fPath.c_str();
int flags = info.fCreationFlags;
if (verbose) {
LOG(info) << "Found RegionInfo with path: '" << path << "', flags: " << flags << ", fDestroyed: " << info.fDestroyed << ".";
LOG(info) << "Found UnmanagedRegion with id: " << id << ", path: '" << path << "', flags: " << flags << ", fDestroyed: " << info.fDestroyed << ".";
}
if (!path.empty()) {
result.emplace_back(Remove<bipc::file_mapping>(path + "fmq_" + shmId + "_rg_" + to_string(id), verbose));
@@ -660,27 +743,35 @@ void Monitor::ResetContent(const ShmId& shmIdT, bool verbose /* = true */)
managed_shared_memory managementSegment(open_only, managementSegmentName.c_str());
Uint16SegmentInfoHashMap* segmentInfos = managementSegment.find<Uint16SegmentInfoHashMap>(unique_instance).first;
for (const auto& s : *segmentInfos) {
if (verbose) {
cout << "Resetting content of segment '" << "fmq_" << shmId << "_m_" << s.first << "'..." << endl;
}
try {
if (s.second.fAllocationAlgorithm == AllocationAlgorithm::rbtree_best_fit) {
RBTreeBestFitSegment segment(open_only, std::string("fmq_" + shmId + "_m_" + to_string(s.first)).c_str());
void* ptr = segment.get_segment_manager();
size_t size = segment.get_segment_manager()->get_size();
new(ptr) segment_manager<char, rbtree_best_fit<mutex_family, offset_ptr<void>>, null_index>(size);
} else {
SimpleSeqFitSegment segment(open_only, std::string("fmq_" + shmId + "_m_" + to_string(s.first)).c_str());
void* ptr = segment.get_segment_manager();
size_t size = segment.get_segment_manager()->get_size();
new(ptr) segment_manager<char, simple_seq_fit<mutex_family, offset_ptr<void>>, null_index>(size);
}
} catch (bie& e) {
if (segmentInfos) {
cout << "Found info for " << segmentInfos->size() << " managed segments" << endl;
for (const auto& s : *segmentInfos) {
if (verbose) {
cout << "Error resetting content of segment '" << std::string("fmq_" + shmId + "_m_" + to_string(s.first)) << "': " << e.what() << endl;
cout << "Resetting content of segment '" << "fmq_" << shmId << "_m_" << s.first << "'..." << endl;
}
try {
if (s.second.fAllocationAlgorithm == AllocationAlgorithm::rbtree_best_fit) {
RBTreeBestFitSegment segment(open_only, std::string("fmq_" + shmId + "_m_" + to_string(s.first)).c_str());
void* ptr = segment.get_segment_manager();
size_t size = segment.get_segment_manager()->get_size();
new(ptr) segment_manager<char, rbtree_best_fit<mutex_family, offset_ptr<void>>, null_index>(size);
} else {
SimpleSeqFitSegment segment(open_only, std::string("fmq_" + shmId + "_m_" + to_string(s.first)).c_str());
void* ptr = segment.get_segment_manager();
size_t size = segment.get_segment_manager()->get_size();
new(ptr) segment_manager<char, simple_seq_fit<mutex_family, offset_ptr<void>>, null_index>(size);
}
if (verbose) {
cout << "Done." << endl;
}
} catch (bie& e) {
if (verbose) {
cout << "Error resetting content of segment '" << std::string("fmq_" + shmId + "_m_" + to_string(s.first)) << "': " << e.what() << endl;
}
}
}
} else {
cout << "Found management segment, but cannot locate segment info, something went wrong..." << endl;
}
Uint16RegionInfoHashMap* shmRegions = managementSegment.find<Uint16RegionInfoHashMap>(bipc::unique_instance).first;
@@ -717,13 +808,15 @@ void Monitor::ResetContent(const ShmId& shmIdT, const std::vector<SegmentConfig>
std::string shmId = shmIdT.shmId;
std::string managementSegmentName("fmq_" + shmId + "_mng");
// reset managed segments
ResetContent(shmIdT, verbose);
// delete management segment
cout << "deleting management segment" << endl;
Remove<bipc::shared_memory_object>(managementSegmentName, verbose);
// recreate management segment
cout << "recreating management segment..." << endl;
managed_shared_memory mngSegment(create_only, managementSegmentName.c_str(), kManagementSegmentSize);
cout << "done." << endl;
// fill management segment with segment & region infos
cout << "filling management segment with managed segment configs..." << endl;
for (const auto& s : segmentCfgs) {
if (s.allocationAlgorithm == "rbtree_best_fit") {
Segment::Register(shmId, s.id, AllocationAlgorithm::rbtree_best_fit);
@@ -734,9 +827,14 @@ void Monitor::ResetContent(const ShmId& shmIdT, const std::vector<SegmentConfig>
throw MonitorError("Unknown allocation algorithm provided: " + s.allocationAlgorithm);
}
}
cout << "done." << endl;
cout << "filling management segment with unmanaged region configs..." << endl;
for (const auto& r : regionCfgs) {
fair::mq::shmem::UnmanagedRegion::Register(shmId, r);
}
cout << "done." << endl;
// reset managed segments
ResetContent(shmIdT, verbose);
}
void Monitor::ResetContent(const SessionId& sessionId, const std::vector<SegmentConfig>& segmentCfgs, const std::vector<RegionConfig>& regionCfgs, bool verbose /* = true */)

View File

@@ -119,7 +119,7 @@ class Monitor
/// @param sessionId session id
static std::unordered_map<uint16_t, std::vector<BufferDebugInfo>> GetDebugInfo(const SessionId& sessionId);
/// @brief Returns the amount of free memory in the specified segment
/// @param sessionId shmem id
/// @param shmId shmem id
/// @param segmentId segment id
/// @throws MonitorError
static unsigned long GetFreeMemory(const ShmId& shmId, uint16_t segmentId);
@@ -128,6 +128,23 @@ class Monitor
/// @param segmentId segment id
/// @throws MonitorError
static unsigned long GetFreeMemory(const SessionId& sessionId, uint16_t segmentId);
/// @brief Checks if a given segment can be opened
/// @param shmId shmem id
/// @param segmentId segment id
static bool SegmentIsPresent(const ShmId& shmId, uint16_t segmentId);
/// @brief Checks if a given segment can be opened
/// @param sessionId session id
/// @param segmentId segment id
static bool SegmentIsPresent(const SessionId& sessionId, uint16_t segmentId);
/// @brief Checks if a given region can be opened
/// @param shmId shmem id
/// @param regionId region id
static bool RegionIsPresent(const ShmId& shmId, uint16_t regionId);
/// @brief Checks if a given region can be opened
/// @param sessionId session id
/// @param regionId region id
static bool RegionIsPresent(const SessionId& sessionId, uint16_t regionId);
static bool PrintShm(const ShmId& shmId);
static void ListAll(const std::string& path);

View File

@@ -44,19 +44,19 @@ struct UnmanagedRegion
friend class Monitor;
UnmanagedRegion(const std::string& shmId, uint16_t id, uint64_t size)
: UnmanagedRegion(shmId, size, false, makeRegionConfig(id))
: UnmanagedRegion(shmId, size, true, makeRegionConfig(id))
{}
UnmanagedRegion(const std::string& shmId, uint64_t size, RegionConfig cfg)
: UnmanagedRegion(shmId, size, false, std::move(cfg))
: UnmanagedRegion(shmId, size, true, std::move(cfg))
{}
UnmanagedRegion(const std::string& shmId, RegionConfig cfg)
: UnmanagedRegion(shmId, cfg.size, false, std::move(cfg))
: UnmanagedRegion(shmId, cfg.size, true, std::move(cfg))
{}
UnmanagedRegion(const std::string& shmId, uint64_t size, bool remote, RegionConfig cfg)
: fRemote(remote)
UnmanagedRegion(const std::string& shmId, uint64_t size, bool controlling, RegionConfig cfg)
: fControlling(controlling)
, fRemoveOnDestruction(cfg.removeOnDestruction)
, fLinger(cfg.linger)
, fStopAcks(false)
@@ -73,11 +73,15 @@ struct UnmanagedRegion
// TODO: refactor this
cfg.size = size;
const uint16_t id = cfg.id.value();
bool created = false;
LOG(debug) << "UnmanagedRegion(): " << fName << " (" << (fControlling ? "controller" : "viewer") << ")";
if (!cfg.path.empty()) {
fName = std::string(cfg.path + fName);
if (!fRemote) {
if (fControlling) {
// create a file
std::filebuf fbuf;
if (fbuf.open(fName, std::ios_base::in | std::ios_base::out | std::ios_base::trunc | std::ios_base::binary)) {
@@ -92,23 +96,30 @@ struct UnmanagedRegion
if (!fFile) {
LOG(error) << "Failed to initialize file: " << fName;
LOG(error) << "errno: " << errno << ": " << strerror(errno);
throw std::runtime_error(tools::ToString("Failed to initialize file for shared memory region: ", strerror(errno)));
throw TransportError(tools::ToString("Failed to initialize file for shared memory region: ", strerror(errno)));
}
fFileMapping = file_mapping(fName.c_str(), read_write);
LOG(debug) << "shmem: initialized file: " << fName;
LOG(debug) << "UnmanagedRegion(): initialized file: " << fName;
fRegion = mapped_region(fFileMapping, read_write, 0, size, 0, cfg.creationFlags);
} else {
try {
// if opening fails, create
try {
fShmemObject = shared_memory_object(open_only, fName.c_str(), read_write);
created = false;
} catch (interprocess_exception& e) {
LOG(debug) << "Could not open " << (remote ? "remote" : "local") << " shared_memory_object for region id '" << cfg.id.value() << "': " << e.what() << ", creating...";
fShmemObject = shared_memory_object(create_only, fName.c_str(), read_write);
fShmemObject.truncate(size);
if (fControlling) {
LOG(debug) << "Could not open controlling shared_memory_object for region " << id << ": " << e.what() << ", creating...";
fShmemObject = shared_memory_object(create_only, fName.c_str(), read_write);
fShmemObject.truncate(size);
created = true;
} else {
LOG(error) << "Could not open view for shared_memory_object for region " << id << ": " << e.what();
throw TransportError(tools::ToString("Could not open view for shared_memory_object for region ", id, ": ", e.what()));
}
}
} catch (interprocess_exception& e) {
LOG(error) << "Failed " << (remote ? "opening" : "creating") << " shared_memory_object for region id '" << cfg.id.value() << "': " << e.what();
LOG(error) << "Failed initializing shared_memory_object for region id " << id << ": " << e.what();
throw;
}
@@ -119,27 +130,27 @@ struct UnmanagedRegion
throw TransportError(tools::ToString("Created/opened region size (", fRegion.get_size(), ") does not match configured size (", size, ")"));
}
} catch (interprocess_exception& e) {
LOG(error) << "Failed mapping shared_memory_object for region id '" << cfg.id.value() << "': " << e.what();
LOG(error) << "Failed mapping shared_memory_object for region id " << id << ": " << e.what();
throw;
}
}
if (cfg.lock) {
LOG(debug) << "Locking region " << cfg.id.value() << "...";
LOG(debug) << "Locking region " << id << "...";
Lock();
LOG(debug) << "Successfully locked region " << cfg.id.value() << ".";
LOG(debug) << "Successfully locked region " << id << ".";
}
if (cfg.zero) {
LOG(debug) << "Zeroing free memory of region " << cfg.id.value() << "...";
LOG(debug) << "Zeroing free memory of region " << id << "...";
Zero();
LOG(debug) << "Successfully zeroed free memory of region " << cfg.id.value() << ".";
LOG(debug) << "Successfully zeroed free memory of region " << id << ".";
}
if (!remote) {
if (fControlling && created) {
Register(shmId, cfg);
}
LOG(trace) << "shmem: initialized region: " << fName << " (" << (remote ? "remote" : "local") << ")";
LOG(debug) << (created ? "Created" : "Opened") << " unmanaged shared memory region: " << fName << " (" << (fControlling ? "controller" : "viewer") << ")";
}
UnmanagedRegion() = delete;
@@ -149,6 +160,13 @@ struct UnmanagedRegion
UnmanagedRegion& operator=(const UnmanagedRegion&) = delete;
UnmanagedRegion& operator=(UnmanagedRegion&&) = delete;
void BecomeController(RegionConfig& cfg)
{
fControlling = true;
fLinger = cfg.linger;
fRemoveOnDestruction = cfg.removeOnDestruction;
}
void Zero()
{
memset(fRegion.get_address(), 0x00, fRegion.get_size());
@@ -171,6 +189,7 @@ struct UnmanagedRegion
~UnmanagedRegion()
{
LOG(debug) << "~UnmanagedRegion(): " << fName << " (" << (fControlling ? "controller" : "viewer") << ")";
fStopAcks = true;
if (fAcksSender.joinable()) {
@@ -178,7 +197,7 @@ struct UnmanagedRegion
fAcksSender.join();
}
if (!fRemote) {
if (fControlling) {
if (fAcksReceiver.joinable()) {
fAcksReceiver.join();
}
@@ -204,14 +223,14 @@ struct UnmanagedRegion
fclose(fFile);
}
} else {
// LOG(debug) << "Region queue '" << fQueueName << "' is remote, no cleanup necessary";
// LOG(debug) << "Region queue '" << fQueueName << "' is viewer, no cleanup necessary";
}
// LOG(debug) << "Region '" << fName << "' (" << (fRemote ? "remote" : "local") << ") destructed.";
// LOG(debug) << "Region '" << fName << "' (" << (fControlling ? "controller" : "viewer") << ") destructed.";
}
private:
bool fRemote;
bool fControlling;
bool fRemoveOnDestruction;
uint32_t fLinger;
std::atomic<bool> fStopAcks;
@@ -243,6 +262,7 @@ struct UnmanagedRegion
static void Register(const std::string& shmId, const RegionConfig& cfg)
{
using namespace boost::interprocess;
LOG(debug) << "Registering unmanaged shared memory region with id " << cfg.id.value();
managed_shared_memory mngSegment(open_or_create, std::string("fmq_" + shmId + "_mng").c_str(), kManagementSegmentSize);
VoidAlloc alloc(mngSegment.get_segment_manager());
@@ -250,10 +270,14 @@ struct UnmanagedRegion
EventCounter* eventCounter = mngSegment.find_or_construct<EventCounter>(unique_instance)(0);
bool newShmRegionCreated = shmRegions->emplace(cfg.id.value(), RegionInfo(cfg.path.c_str(), cfg.creationFlags, cfg.userFlags, cfg.size, alloc)).second;
if (newShmRegionCreated) {
(eventCounter->fCount)++;
auto it = shmRegions->find(cfg.id.value());
if (it != shmRegions->end()) {
LOG(error) << "Unmanaged Region with id " << cfg.id.value() << " has already been registered. Only unique IDs per session are allowed.";
throw TransportError(tools::ToString("Unmanaged Region with id ", cfg.id.value(), " has already been registered. Only unique IDs per session are allowed."));
}
shmRegions->emplace(cfg.id.value(), RegionInfo(cfg.path.c_str(), cfg.creationFlags, cfg.userFlags, cfg.size, alloc)).second;
(eventCounter->fCount)++;
}
void SetCallbacks(RegionCallback callback, RegionBulkCallback bulkCallback)

View File

@@ -40,9 +40,9 @@ class UnmanagedRegionImpl final : public fair::mq::UnmanagedRegion
, fRegion(nullptr)
, fRegionId(0)
{
auto result = fManager.CreateRegion(size, callback, bulkCallback, std::move(cfg));
fRegion = result.first;
fRegionId = result.second;
auto [regionPtr, regionId] = fManager.CreateRegion(size, callback, bulkCallback, std::move(cfg));
fRegion = regionPtr;
fRegionId = regionId;
}
UnmanagedRegionImpl(const UnmanagedRegionImpl&) = delete;

9
test/ci/fedora.35.def Normal file
View File

@@ -0,0 +1,9 @@
Bootstrap: docker
From: fedora:35
%post
dnf -y update
dnf -y install https://alfa-ci.gsi.de/packages/rpm/fedora-35-x86_64/fairsoft-release-dev.rpm
dnf -y install clang cli11-devel pmix-devel ninja-build 'dnf-command(builddep)' libasan liblsan libtsan libubsan clang-tools-extra
dnf -y builddep fairmq
dnf -y clean all

View File

@@ -6,6 +6,11 @@
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <fairmq/shmem/Common.h>
#include <fairmq/shmem/Monitor.h>
#include <fairmq/shmem/Segment.h>
#include <fairmq/shmem/UnmanagedRegion.h>
#include <fairmq/TransportFactory.h>
#include <fairmq/ProgOptions.h>
#include <fairmq/tools/Unique.h>
@@ -16,8 +21,12 @@
#include <gtest/gtest.h>
#include <cstdint>
#include <map>
#include <memory> // make_unique
#include <string>
#include <utility> // pair
#include <vector> // pair
namespace
{
@@ -25,6 +34,34 @@ namespace
using namespace std;
using namespace fair::mq;
struct ShmOwner
{
ShmOwner(const string& sessionId,
const vector<pair<uint16_t, size_t>>& segments,
const vector<pair<uint16_t, size_t>>& regions)
: fShmId(fair::mq::shmem::makeShmIdStr(sessionId))
{
LOG(info) << "ShmOwner: creating segments";
for (auto [id, size] : segments) {
fSegments.emplace(id, fair::mq::shmem::Segment(fShmId, id, size, fair::mq::shmem::rbTreeBestFit));
}
LOG(info) << "ShmOwner: creating regions";
for (auto [id, size] : regions) {
fRegions.emplace(id, make_unique<fair::mq::shmem::UnmanagedRegion>(fShmId, id, size));
}
}
~ShmOwner()
{
LOG(info) << "ShmOwner: cleaning up";
fair::mq::shmem::Monitor::Cleanup(fair::mq::shmem::ShmId{fShmId});
}
string fShmId;
map<uint16_t, fair::mq::shmem::Segment> fSegments;
map<uint16_t, unique_ptr<fair::mq::shmem::UnmanagedRegion>> fRegions;
};
void RegionsSizeMismatch()
{
size_t session = tools::UuidHash();
@@ -108,31 +145,69 @@ void RegionsCache(const string& transport, const string& address)
}
}
void RegionEventSubscriptions(const string& transport)
void RegionEventSubscriptions(const string& transport, bool external)
{
fair::Logger::SetConsoleSeverity(fair::Severity::debug);
unique_ptr<ShmOwner> shmOwner = nullptr;
size_t session{tools::UuidHash()};
constexpr int sSize = 100000000;
constexpr int r1Size = 1000000;
constexpr int r2Size = 5000000;
constexpr uint16_t sId = 0;
constexpr uint16_t r1id = 100;
constexpr uint16_t r2id = 101;
if (external) {
shmOwner = make_unique<ShmOwner>(
to_string(session),
vector<pair<uint16_t, size_t>>{ { sId, sSize } },
vector<pair<uint16_t, size_t>>{ { r1id, r1Size }, { r2id, r2Size } }
);
}
ProgOptions config;
config.SetProperty<string>("session", to_string(session));
config.SetProperty<size_t>("shm-segment-size", 100000000);
config.SetProperty<size_t>("shm-segment-size", sSize);
if (external) {
config.SetProperty<bool>("shm-no-cleanup", true);
config.SetProperty<bool>("shm-monitor", false);
}
auto factory = TransportFactory::CreateTransportFactory(transport, tools::Uuid(), &config);
constexpr int size1 = 1000000;
constexpr int size2 = 5000000;
constexpr int64_t userFlags = 12345;
tools::Semaphore blocker;
{
auto region1 = factory->CreateUnmanagedRegion(size1, [](void*, size_t, void*) {});
fair::mq::RegionConfig r1Cfg;
if (external) {
r1Cfg.id = r1id;
r1Cfg.removeOnDestruction = false;
}
auto region1 = factory->CreateUnmanagedRegion(r1Size, [](void*, size_t, void*) {}, r1Cfg);
void* ptr1 = region1->GetData();
uint64_t id1 = region1->GetId();
ASSERT_EQ(region1->GetSize(), size1);
if (external) {
ASSERT_EQ(id1, r1id);
}
ASSERT_EQ(region1->GetSize(), r1Size);
auto region2 = factory->CreateUnmanagedRegion(size2, userFlags, [](void*, size_t, void*) {});
fair::mq::RegionConfig r2Cfg;
r2Cfg.userFlags = userFlags;
if (external) {
r2Cfg.id = r2id;
r2Cfg.removeOnDestruction = false;
}
auto region2 = factory->CreateUnmanagedRegion(r2Size, [](void*, size_t, void*) {}, r2Cfg);
void* ptr2 = region2->GetData();
uint64_t id2 = region2->GetId();
ASSERT_EQ(region2->GetSize(), size2);
if (external) {
ASSERT_EQ(id2, r2id);
}
ASSERT_EQ(region2->GetSize(), r2Size);
ASSERT_EQ(factory->SubscribedToRegionEvents(), false);
factory->SubscribeToRegionEvents([&, id1, id2, ptr1, ptr2](RegionInfo info) {
@@ -144,13 +219,15 @@ void RegionEventSubscriptions(const string& transport)
<< ", flags: " << info.flags;
if (info.event == RegionEvent::created) {
if (info.id == id1) {
ASSERT_EQ(info.size, size1);
ASSERT_EQ(info.size, r1Size);
ASSERT_EQ(info.ptr, ptr1);
blocker.Signal();
} else if (info.id == id2) {
ASSERT_EQ(info.size, size2);
ASSERT_EQ(info.size, r2Size);
ASSERT_EQ(info.ptr, ptr2);
ASSERT_EQ(info.flags, userFlags);
if (!external) {
ASSERT_EQ(info.flags, userFlags);
}
blocker.Signal();
}
} else if (info.event == RegionEvent::destroyed) {
@@ -170,10 +247,12 @@ void RegionEventSubscriptions(const string& transport)
LOG(info) << "2 done.";
}
blocker.Wait();
LOG(info) << "3 done.";
blocker.Wait();
LOG(info) << "4 done.";
if (!external) {
blocker.Wait();
LOG(info) << "3 done.";
blocker.Wait();
LOG(info) << "4 done.";
}
LOG(info) << "All done.";
factory->UnsubscribeFromRegionEvents();
@@ -185,9 +264,13 @@ void RegionCallbacks(const string& transport, const string& _address)
size_t session(tools::UuidHash());
std::string address(tools::ToString(_address, "_", transport));
constexpr size_t sSize = 100000000;
constexpr size_t r1Size = 2000000;
constexpr size_t r2Size = 3000000;
ProgOptions config;
config.SetProperty<string>("session", to_string(session));
config.SetProperty<size_t>("shm-segment-size", 100000000);
config.SetProperty<size_t>("shm-segment-size", sSize);
auto factory = TransportFactory::CreateTransportFactory(transport, tools::Uuid(), &config);
@@ -206,7 +289,7 @@ void RegionCallbacks(const string& transport, const string& _address)
void* ptr2 = nullptr;
size_t size2 = 200;
auto region1 = factory->CreateUnmanagedRegion(2000000, [&](void* ptr, size_t size, void* hint) {
auto region1 = factory->CreateUnmanagedRegion(r1Size, [&](void* ptr, size_t size, void* hint) {
ASSERT_EQ(ptr, ptr1);
ASSERT_EQ(size, size1);
ASSERT_EQ(hint, intPtr1.get());
@@ -215,7 +298,7 @@ void RegionCallbacks(const string& transport, const string& _address)
});
ptr1 = region1->GetData();
auto region2 = factory->CreateUnmanagedRegion(3000000, [&](const std::vector<RegionBlock>& blocks) {
auto region2 = factory->CreateUnmanagedRegion(r2Size, [&](const std::vector<RegionBlock>& blocks) {
ASSERT_EQ(blocks.size(), 1);
ASSERT_EQ(blocks.at(0).ptr, ptr2);
ASSERT_EQ(blocks.at(0).size, size2);
@@ -263,12 +346,12 @@ TEST(Cache, shmem)
TEST(EventSubscriptions, zeromq)
{
RegionEventSubscriptions("zeromq");
RegionEventSubscriptions("zeromq", false);
}
TEST(EventSubscriptions, shmem)
{
RegionEventSubscriptions("shmem");
RegionEventSubscriptions("shmem", false);
}
TEST(Callbacks, zeromq)
@@ -281,4 +364,9 @@ TEST(Callbacks, shmem)
RegionCallbacks("shmem", "ipc://test_region_callbacks");
}
TEST(EventSubscriptionsExternalRegion, shmem)
{
RegionEventSubscriptions("shmem", true);
}
} // namespace