mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-15 09:31:45 +00:00
Compare commits
24 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
091d0824d1 | ||
|
857aa84fa3 | ||
|
c85d6e079c | ||
|
4e466514d2 | ||
|
8b4056e408 | ||
|
b67b80e0ad | ||
|
2c89b24857 | ||
|
c6a6a5f21b | ||
|
9defa71622 | ||
|
ed2dcedf03 | ||
|
a3d56b9aeb | ||
|
8a2641d842 | ||
|
2ca62d06db | ||
|
87e0ca5450 | ||
|
ef5b3c782e | ||
|
f7ba3052aa | ||
|
a90dbf64de | ||
|
9724f184f4 | ||
|
057ba03776 | ||
|
6dfea32aee | ||
|
868fe02ee9 | ||
|
a2016a9361 | ||
|
ea9aede652 | ||
|
77bf12c8e8 |
@@ -51,8 +51,12 @@ endif()
|
||||
list(JOIN options ";" optionsstr)
|
||||
ctest_configure(OPTIONS "${optionsstr}")
|
||||
|
||||
ctest_submit()
|
||||
|
||||
ctest_build(FLAGS "-j${NCPUS}")
|
||||
|
||||
ctest_submit()
|
||||
|
||||
ctest_test(BUILD "${CTEST_BINARY_DIRECTORY}"
|
||||
PARALLEL_LEVEL 1
|
||||
SCHEDULE_RANDOM ON
|
||||
|
21
Jenkinsfile
vendored
21
Jenkinsfile
vendored
@@ -29,7 +29,11 @@ def jobMatrix(String type, List specs) {
|
||||
sh "cat ${jobscript}"
|
||||
sh "bash ${jobscript}"
|
||||
} else {
|
||||
def containercmd = "singularity exec -B/shared ${env.SINGULARITY_CONTAINER_ROOT}/fairmq/${os}.${ver}.sif bash -l -c \\\"${ctestcmd} -DRUN_STATIC_ANALYSIS=ON\\\""
|
||||
def static_analysis = "OFF"
|
||||
if (selector =~ /^fedora/) {
|
||||
static_analysis = "ON"
|
||||
}
|
||||
def containercmd = "singularity exec -B/shared ${env.SINGULARITY_CONTAINER_ROOT}/fairmq/${os}.${ver}.sif bash -l -c \\\"${ctestcmd} -DRUN_STATIC_ANALYSIS=${static_analysis}\\\""
|
||||
sh """\
|
||||
echo \"echo \\\"*** Job started at .......: \\\$(date -R)\\\"\" >> ${jobscript}
|
||||
echo \"echo \\\"*** Job ID ...............: \\\${SLURM_JOB_ID}\\\"\" >> ${jobscript}
|
||||
@@ -42,10 +46,12 @@ def jobMatrix(String type, List specs) {
|
||||
sh "test/ci/slurm-submit.sh \"FairMQ \${JOB_BASE_NAME} ${label}\" ${jobscript}"
|
||||
|
||||
withChecks('Static Analysis') {
|
||||
recordIssues(enabledForFailure: true,
|
||||
tools: [gcc(pattern: 'build/Testing/Temporary/*.log')],
|
||||
filters: [excludeFile('extern/*'), excludeFile('usr/*')],
|
||||
skipBlames: true)
|
||||
if (static_analysis == "ON") {
|
||||
recordIssues(enabledForFailure: true,
|
||||
tools: [gcc(pattern: 'build/Testing/Temporary/*.log')],
|
||||
filters: [excludeFile('extern/*'), excludeFile('usr/*')],
|
||||
skipBlames: true)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -73,8 +79,9 @@ pipeline{
|
||||
steps{
|
||||
script {
|
||||
def builds = jobMatrix('build', [
|
||||
[os: 'fedora', ver: '32', arch: 'x86_64', compiler: 'gcc-10'],
|
||||
[os: 'macos', ver: '11', arch: 'x86_64', compiler: 'apple-clang-12'],
|
||||
[os: 'alice-centos', ver: '7', arch: 'x86_64', compiler: 'gcc-7'],
|
||||
[os: 'fedora', ver: '32', arch: 'x86_64', compiler: 'gcc-10'],
|
||||
[os: 'macos', ver: '11', arch: 'x86_64', compiler: 'apple-clang-12'],
|
||||
])
|
||||
|
||||
parallel(builds)
|
||||
|
@@ -203,6 +203,13 @@ macro(set_fairmq_defaults)
|
||||
set_property(GLOBAL PROPERTY RULE_LAUNCH_COMPILE ${CCACHE})
|
||||
endif()
|
||||
endif()
|
||||
|
||||
if( CMAKE_CXX_COMPILER_ID STREQUAL "GNU"
|
||||
AND CMAKE_CXX_COMPILER_VERSION VERSION_LESS 9)
|
||||
set(FAIRMQ_HAS_STD_FILESYSTEM 0)
|
||||
else()
|
||||
set(FAIRMQ_HAS_STD_FILESYSTEM 1)
|
||||
endif()
|
||||
endmacro()
|
||||
|
||||
function(join VALUES GLUE OUTPUT)
|
||||
|
@@ -5,12 +5,6 @@
|
||||
* GNU Lesser General Public Licence (LGPL) version 3, *
|
||||
* copied verbatim in the file "LICENSE" *
|
||||
********************************************************************************/
|
||||
/**
|
||||
* Sampler.cpp
|
||||
*
|
||||
* @since 2014-10-10
|
||||
* @author A. Rybalchenko
|
||||
*/
|
||||
|
||||
#include "Sampler.h"
|
||||
|
||||
@@ -37,17 +31,17 @@ void Sampler::InitTask()
|
||||
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
|
||||
|
||||
fChannels.at("data").at(0).Transport()->SubscribeToRegionEvents([](FairMQRegionInfo info) {
|
||||
LOG(info) << "Region event: " << info.event
|
||||
<< ", managed: " << info.managed
|
||||
LOG(info) << "Region event: " << info.event << ": "
|
||||
<< (info.managed ? "managed" : "unmanaged")
|
||||
<< ", id: " << info.id
|
||||
<< ", ptr: " << info.ptr
|
||||
<< ", size: " << info.size
|
||||
<< ", flags: " << info.flags;
|
||||
});
|
||||
|
||||
fRegion = FairMQUnmanagedRegionPtr(NewUnmanagedRegionFor("data",
|
||||
0,
|
||||
10000000,
|
||||
fRegion = FairMQUnmanagedRegionPtr(NewUnmanagedRegionFor("data", // region is created using the transport of this channel...
|
||||
0, // ... and this sub-channel
|
||||
10000000, // region size
|
||||
[this](const std::vector<fair::mq::RegionBlock>& blocks) { // callback to be called when message buffers no longer needed by transport
|
||||
lock_guard<mutex> lock(fMtx);
|
||||
fNumUnackedMsgs -= blocks.size();
|
||||
@@ -55,7 +49,10 @@ void Sampler::InitTask()
|
||||
if (fMaxIterations > 0) {
|
||||
LOG(info) << "Received " << blocks.size() << " acks";
|
||||
}
|
||||
}
|
||||
},
|
||||
"", // path, if a region is backed by a file
|
||||
0, // flags that are passed for region creation
|
||||
fair::mq::RegionConfig{true, true} // additional config: { call mlock on the region, zero the region memory }
|
||||
));
|
||||
fRegion->SetLinger(fLinger);
|
||||
}
|
||||
@@ -75,20 +72,19 @@ bool Sampler::ConditionalRun()
|
||||
// std::this_thread::sleep_for(std::chrono::seconds(1));
|
||||
|
||||
lock_guard<mutex> lock(fMtx);
|
||||
++fNumUnackedMsgs;
|
||||
if (Send(msg, "data", 0) > 0) {
|
||||
if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) {
|
||||
LOG(info) << "Configured maximum number of iterations reached. Leaving RUNNING state.";
|
||||
return false;
|
||||
}
|
||||
}
|
||||
++fNumUnackedMsgs;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
void Sampler::ResetTask()
|
||||
{
|
||||
// On destruction UnmanagedRegion will try to TODO
|
||||
fRegion.reset();
|
||||
{
|
||||
lock_guard<mutex> lock(fMtx);
|
||||
|
@@ -30,8 +30,8 @@ void Sink::InitTask()
|
||||
// Get the fMaxIterations value from the command line options (via fConfig)
|
||||
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
|
||||
fChannels.at("data").at(0).Transport()->SubscribeToRegionEvents([](FairMQRegionInfo info) {
|
||||
LOG(info) << "Region event: " << info.event
|
||||
<< ", managed: " << info.managed
|
||||
LOG(info) << "Region event: " << info.event << ": "
|
||||
<< (info.managed ? "managed" : "unmanaged")
|
||||
<< ", id: " << info.id
|
||||
<< ", ptr: " << info.ptr
|
||||
<< ", size: " << info.size
|
||||
|
@@ -2,9 +2,14 @@
|
||||
|
||||
export FAIRMQ_PATH=@FAIRMQ_BIN_DIR@
|
||||
|
||||
transport="shmem"
|
||||
msgSize="1000000"
|
||||
|
||||
if [[ $1 =~ ^[0-9]+$ ]]; then
|
||||
if [[ $1 =~ ^[a-z]+$ ]]; then
|
||||
transport=$1
|
||||
fi
|
||||
|
||||
if [[ $2 =~ ^[0-9]+$ ]]; then
|
||||
msgSize=$1
|
||||
fi
|
||||
|
||||
@@ -13,13 +18,13 @@ SAMPLER+=" --id sampler1"
|
||||
SAMPLER+=" --severity debug"
|
||||
SAMPLER+=" --msg-size $msgSize"
|
||||
# SAMPLER+=" --rate 10"
|
||||
SAMPLER+=" --transport shmem"
|
||||
SAMPLER+=" --transport $transport"
|
||||
SAMPLER+=" --channel-config name=data,type=push,method=bind,address=tcp://127.0.0.1:7777,sndKernelSize=212992"
|
||||
xterm -geometry 80x23+0+0 -hold -e @EX_BIN_DIR@/$SAMPLER &
|
||||
xterm -geometry 120x60+0+0 -hold -e @EX_BIN_DIR@/$SAMPLER &
|
||||
|
||||
SINK="fairmq-ex-region-sink"
|
||||
SINK+=" --id sink1"
|
||||
SINK+=" --severity debug"
|
||||
SINK+=" --transport shmem"
|
||||
SINK+=" --transport $transport"
|
||||
SINK+=" --channel-config name=data,type=pull,method=connect,address=tcp://127.0.0.1:7777,rcvKernelSize=212992"
|
||||
xterm -geometry 80x23+500+0 -hold -e @EX_BIN_DIR@/$SINK &
|
||||
xterm -geometry 120x60+750+0 -hold -e @EX_BIN_DIR@/$SINK &
|
||||
|
@@ -31,6 +31,7 @@ SAMPLER_PID=$!
|
||||
SINK="fairmq-ex-region-sink"
|
||||
SINK+=" --id sink1"
|
||||
SINK+=" --transport $transport"
|
||||
SINK+=" --severity debug"
|
||||
SINK+=" --session $SESSION"
|
||||
SINK+=" --verbosity veryhigh"
|
||||
SINK+=" --control static --color false"
|
||||
|
@@ -283,6 +283,7 @@ if(BUILD_FAIRMQ)
|
||||
if(BUILD_OFI_TRANSPORT)
|
||||
target_compile_definitions(${_target} PRIVATE BUILD_OFI_TRANSPORT)
|
||||
endif()
|
||||
target_compile_definitions(${_target} PUBLIC FAIRMQ_HAS_STD_FILESYSTEM=${FAIRMQ_HAS_STD_FILESYSTEM})
|
||||
|
||||
|
||||
#######################
|
||||
@@ -384,6 +385,7 @@ if(BUILD_FAIRMQ)
|
||||
$<$<PLATFORM_ID:Linux>:rt>
|
||||
Boost::boost
|
||||
Boost::date_time
|
||||
$<$<NOT:${FAIRMQ_HAS_STD_FILESYSTEM}>:Boost::filesystem>
|
||||
Boost::program_options
|
||||
FairLogger::FairLogger
|
||||
PicoSHA2
|
||||
@@ -391,6 +393,7 @@ if(BUILD_FAIRMQ)
|
||||
target_include_directories(fairmq-shmmonitor PUBLIC
|
||||
$<BUILD_INTERFACE:${CMAKE_SOURCE_DIR}>
|
||||
)
|
||||
target_compile_definitions(fairmq-shmmonitor PUBLIC FAIRMQ_HAS_STD_FILESYSTEM=${FAIRMQ_HAS_STD_FILESYSTEM})
|
||||
|
||||
add_executable(fairmq-uuid-gen tools/runUuidGenerator.cxx)
|
||||
target_link_libraries(fairmq-uuid-gen PUBLIC
|
||||
|
@@ -56,7 +56,7 @@ class FairMQSocket
|
||||
/// If the backend supports it, fills the unsigned integer @a events with the ZMQ_EVENTS value
|
||||
/// DISCLAIMER: this API is experimental and unsupported and might be dropped / refactored in
|
||||
/// the future.
|
||||
virtual void Events(uint32_t* events) = 0;
|
||||
virtual int Events(uint32_t* events) = 0;
|
||||
virtual void SetLinger(const int value) = 0;
|
||||
virtual int GetLinger() const = 0;
|
||||
virtual void SetSndBufSize(const int value) = 0;
|
||||
|
@@ -92,8 +92,8 @@ class FairMQTransportFactory
|
||||
/// @param path optional parameter to pass to the underlying transport
|
||||
/// @param flags optional parameter to pass to the underlying transport
|
||||
/// @return pointer to UnmanagedRegion
|
||||
virtual FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback = nullptr, const std::string& path = "", int flags = 0) = 0;
|
||||
virtual FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionBulkCallback callback = nullptr, const std::string& path = "", int flags = 0) = 0;
|
||||
virtual FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) = 0;
|
||||
virtual FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionBulkCallback callback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) = 0;
|
||||
/// @brief Create new UnmanagedRegion
|
||||
/// @param size size of the region
|
||||
/// @param userFlags flags to be stored with the region, have no effect on the transport, but can be retrieved from the region by the user
|
||||
@@ -101,8 +101,8 @@ class FairMQTransportFactory
|
||||
/// @param path optional parameter to pass to the underlying transport
|
||||
/// @param flags optional parameter to pass to the underlying transport
|
||||
/// @return pointer to UnmanagedRegion
|
||||
virtual FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, FairMQRegionCallback callback = nullptr, const std::string& path = "", int flags = 0) = 0;
|
||||
virtual FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, FairMQRegionBulkCallback callback = nullptr, const std::string& path = "", int flags = 0) = 0;
|
||||
virtual FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, FairMQRegionCallback callback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) = 0;
|
||||
virtual FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, FairMQRegionBulkCallback callback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) = 0;
|
||||
|
||||
/// @brief Subscribe to region events (creation, destruction, ...)
|
||||
/// @param callback the callback that is called when a region event occurs
|
||||
|
@@ -79,6 +79,7 @@ class FairMQUnmanagedRegion
|
||||
virtual void SetLinger(uint32_t linger) = 0;
|
||||
virtual uint32_t GetLinger() const = 0;
|
||||
|
||||
virtual fair::mq::Transport GetType() const = 0;
|
||||
FairMQTransportFactory* GetTransport() { return fTransport; }
|
||||
void SetTransport(FairMQTransportFactory* transport) { fTransport = transport; }
|
||||
|
||||
@@ -107,6 +108,19 @@ inline std::ostream& operator<<(std::ostream& os, const FairMQRegionEvent& event
|
||||
namespace fair::mq
|
||||
{
|
||||
|
||||
struct RegionConfig {
|
||||
bool lock;
|
||||
bool zero;
|
||||
|
||||
RegionConfig()
|
||||
: lock(false), zero(false)
|
||||
{}
|
||||
|
||||
RegionConfig(bool l, bool z)
|
||||
: lock(l), zero(z)
|
||||
{}
|
||||
};
|
||||
|
||||
using RegionCallback = FairMQRegionCallback;
|
||||
using RegionBulkCallback = FairMQRegionBulkCallback;
|
||||
using RegionEventCallback = FairMQRegionEventCallback;
|
||||
|
@@ -200,7 +200,19 @@ auto fair::mq::PluginManager::LoadPluginStatic(const string& pluginName) -> void
|
||||
// Load symbol
|
||||
if (fPluginFactories.find(pluginName) == fPluginFactories.end()) {
|
||||
try {
|
||||
LoadSymbols(pluginName, dll::program_location());
|
||||
if ("control" == pluginName) {
|
||||
try {
|
||||
fPluginProgOptions.insert({pluginName, plugins::ControlPluginProgramOptions().value()});
|
||||
}
|
||||
catch (const boost::bad_optional_access& e) { /* just ignore, if no prog options are declared */ }
|
||||
} else if ("config" == pluginName) {
|
||||
try {
|
||||
fPluginProgOptions.insert({pluginName, plugins::ConfigPluginProgramOptions().value()});
|
||||
}
|
||||
catch (const boost::bad_optional_access& e) { /* just ignore, if no prog options are declared */ }
|
||||
} else {
|
||||
LoadSymbols(pluginName, dll::program_location());
|
||||
}
|
||||
fPluginOrder.push_back(pluginName);
|
||||
} catch (boost::system::system_error& e) {
|
||||
throw PluginLoadError(ToString("An error occurred while loading static plugin ", pluginName, ": ", e.what()));
|
||||
@@ -211,7 +223,13 @@ auto fair::mq::PluginManager::LoadPluginStatic(const string& pluginName) -> void
|
||||
auto fair::mq::PluginManager::InstantiatePlugin(const string& pluginName) -> void
|
||||
{
|
||||
if (fPlugins.find(pluginName) == fPlugins.end()) {
|
||||
fPlugins[pluginName] = fPluginFactories[pluginName](*fPluginServices);
|
||||
if ("control" == pluginName) {
|
||||
fPlugins[pluginName] = plugins::Make_control_Plugin(fPluginServices.get());
|
||||
} else if ("config" == pluginName) {
|
||||
fPlugins[pluginName] = plugins::Make_config_Plugin(fPluginServices.get());
|
||||
} else {
|
||||
fPlugins[pluginName] = fPluginFactories[pluginName](*fPluginServices);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -12,6 +12,7 @@
|
||||
#include <fairmq/tools/Strings.h>
|
||||
|
||||
#include <memory>
|
||||
#include <ostream>
|
||||
#include <stdexcept>
|
||||
#include <string>
|
||||
#include <unordered_map>
|
||||
@@ -60,6 +61,11 @@ try {
|
||||
throw TransportError(tools::ToString("Unknown transport provided: ", transport));
|
||||
}
|
||||
|
||||
inline std::ostream& operator<<(std::ostream& os, const Transport& transport)
|
||||
{
|
||||
return os << TransportName(transport);
|
||||
}
|
||||
|
||||
} // namespace fair::mq
|
||||
|
||||
#endif /* FAIR_MQ_TRANSPORTS_H */
|
||||
|
@@ -41,7 +41,7 @@ class Socket final : public fair::mq::Socket
|
||||
|
||||
auto GetId() const -> std::string override { return fId; }
|
||||
|
||||
auto Events(uint32_t *events) -> void override { *events = 0; }
|
||||
auto Events(uint32_t *events) -> int override { *events = 0; }
|
||||
auto Bind(const std::string& address) -> bool override;
|
||||
auto Connect(const std::string& address) -> bool override;
|
||||
|
||||
|
@@ -92,22 +92,22 @@ auto TransportFactory::CreatePoller(const unordered_map<string, vector<FairMQCha
|
||||
// return PollerPtr{new Poller(channelsMap, channelList)};
|
||||
}
|
||||
|
||||
auto TransportFactory::CreateUnmanagedRegion(const size_t /*size*/, FairMQRegionCallback /*callback*/, const std::string& /* path = "" */, int /* flags = 0 */) -> UnmanagedRegionPtr
|
||||
auto TransportFactory::CreateUnmanagedRegion(const size_t /*size*/, FairMQRegionCallback /*callback*/, const std::string& /* path = "" */, int /* flags = 0 */, fair::mq::RegionConfig /* cfg = fair::mq::RegionConfig() */) -> UnmanagedRegionPtr
|
||||
{
|
||||
throw runtime_error{"Not yet implemented UMR."};
|
||||
}
|
||||
|
||||
auto TransportFactory::CreateUnmanagedRegion(const size_t /*size*/, FairMQRegionBulkCallback /*callback*/, const std::string& /* path = "" */, int /* flags = 0 */) -> UnmanagedRegionPtr
|
||||
auto TransportFactory::CreateUnmanagedRegion(const size_t /*size*/, FairMQRegionBulkCallback /*callback*/, const std::string& /* path = "" */, int /* flags = 0 */, fair::mq::RegionConfig /* cfg = fair::mq::RegionConfig() */) -> UnmanagedRegionPtr
|
||||
{
|
||||
throw runtime_error{"Not yet implemented UMR."};
|
||||
}
|
||||
|
||||
auto TransportFactory::CreateUnmanagedRegion(const size_t /*size*/, const int64_t /*userFlags*/, FairMQRegionCallback /*callback*/, const std::string& /* path = "" */, int /* flags = 0 */) -> UnmanagedRegionPtr
|
||||
auto TransportFactory::CreateUnmanagedRegion(const size_t /*size*/, const int64_t /*userFlags*/, FairMQRegionCallback /*callback*/, const std::string& /* path = "" */, int /* flags = 0 */, fair::mq::RegionConfig /* cfg = fair::mq::RegionConfig() */) -> UnmanagedRegionPtr
|
||||
{
|
||||
throw runtime_error{"Not yet implemented UMR."};
|
||||
}
|
||||
|
||||
auto TransportFactory::CreateUnmanagedRegion(const size_t /*size*/, const int64_t /*userFlags*/, FairMQRegionBulkCallback /*callback*/, const std::string& /* path = "" */, int /* flags = 0 */) -> UnmanagedRegionPtr
|
||||
auto TransportFactory::CreateUnmanagedRegion(const size_t /*size*/, const int64_t /*userFlags*/, FairMQRegionBulkCallback /*callback*/, const std::string& /* path = "" */, int /* flags = 0 */, fair::mq::RegionConfig /* cfg = fair::mq::RegionConfig() */) -> UnmanagedRegionPtr
|
||||
{
|
||||
throw runtime_error{"Not yet implemented UMR."};
|
||||
}
|
||||
|
@@ -65,7 +65,10 @@ struct DDSEnvironment::Impl
|
||||
" mkdir -p \"$HOME/.DDS\"\n"
|
||||
" dds-user-defaults --ignore-default-sid -d -c \"$HOME/.DDS/DDS.cfg\"\n"
|
||||
"fi\n";
|
||||
std::system(cmd.str().c_str());
|
||||
auto rc(std::system(cmd.str().c_str()));
|
||||
if (rc != 0) {
|
||||
LOG(warn) << "DDSEnvironment::SetupConfigHome failed";
|
||||
}
|
||||
}
|
||||
|
||||
auto SetupPath() -> void
|
||||
|
@@ -44,6 +44,7 @@
|
||||
#include <thread>
|
||||
#include <unordered_map>
|
||||
#include <utility> // pair
|
||||
#include <tuple>
|
||||
#include <vector>
|
||||
|
||||
#include <unistd.h> // getuid
|
||||
@@ -58,14 +59,15 @@ class Manager
|
||||
{
|
||||
public:
|
||||
Manager(const std::string& sessionName, std::string deviceId, size_t size, const ProgOptions* config)
|
||||
: fShmId(makeShmIdStr(sessionName))
|
||||
: fShmId64(makeShmIdUint64(sessionName))
|
||||
, fShmId(makeShmIdStr(sessionName))
|
||||
, fSegmentId(config ? config->GetProperty<uint16_t>("shm-segment-id", 0) : 0)
|
||||
, fDeviceId(std::move(deviceId))
|
||||
, fSegments()
|
||||
, fManagementSegment(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_mng").c_str(), 6553600)
|
||||
, fShmVoidAlloc(fManagementSegment.get_segment_manager())
|
||||
, fShmMtx(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_mtx").c_str())
|
||||
, fRegionEventsCV(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_cv").c_str())
|
||||
, fRegionEventsShmCV(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_cv").c_str())
|
||||
, fRegionEventsSubscriptionActive(false)
|
||||
, fNumObservedEvents(0)
|
||||
, fDeviceCounter(nullptr)
|
||||
@@ -73,10 +75,11 @@ class Manager
|
||||
, fShmSegments(nullptr)
|
||||
, fShmRegions(nullptr)
|
||||
, fInterrupted(false)
|
||||
, fMsgCounter(0)
|
||||
#ifdef FAIRMQ_DEBUG_MODE
|
||||
, fMsgDebug(nullptr)
|
||||
, fShmMsgCounters(nullptr)
|
||||
, fMsgCounterNew(0)
|
||||
, fMsgCounterDelete(0)
|
||||
#endif
|
||||
, fHeartbeatThread()
|
||||
, fSendHeartbeats(true)
|
||||
@@ -235,7 +238,7 @@ class Manager
|
||||
}
|
||||
|
||||
if (!p.empty()) {
|
||||
boost::process::spawn(p, "-x", "-m", "--shmid", id, "-d", "-t", "2000", verbose ? "--verbose" : "", env);
|
||||
boost::process::spawn(p, "-x", "-m", "--shmid", id, "-d", "-t", "2000", (verbose ? "--verbose" : ""), env);
|
||||
int numTries = 0;
|
||||
do {
|
||||
try {
|
||||
@@ -260,10 +263,13 @@ class Manager
|
||||
void Resume() { fInterrupted.store(false); }
|
||||
void Reset()
|
||||
{
|
||||
if (fMsgCounter.load() != 0) {
|
||||
LOG(error) << "Message counter during Reset expected to be 0, found: " << fMsgCounter.load();
|
||||
throw MessageError(tools::ToString("Message counter during Reset expected to be 0, found: ", fMsgCounter.load()));
|
||||
#ifdef FAIRMQ_DEBUG_MODE
|
||||
auto diff = fMsgCounterNew.load() - fMsgCounterDelete.load();
|
||||
if (diff != 0) {
|
||||
LOG(error) << "Message counter during Reset expected to be 0, found: " << diff;
|
||||
throw MessageError(tools::ToString("Message counter during Reset expected to be 0, found: ", diff));
|
||||
}
|
||||
#endif
|
||||
}
|
||||
bool Interrupted() { return fInterrupted.load(); }
|
||||
|
||||
@@ -271,8 +277,9 @@ class Manager
|
||||
const int64_t userFlags,
|
||||
RegionCallback callback,
|
||||
RegionBulkCallback bulkCallback,
|
||||
const std::string& path = "",
|
||||
int flags = 0)
|
||||
const std::string& path,
|
||||
int flags,
|
||||
fair::mq::RegionConfig cfg)
|
||||
{
|
||||
using namespace boost::interprocess;
|
||||
try {
|
||||
@@ -302,22 +309,35 @@ class Manager
|
||||
return {nullptr, id};
|
||||
}
|
||||
|
||||
// create region info
|
||||
fShmRegions->emplace(id, RegionInfo(path.c_str(), flags, userFlags, fShmVoidAlloc));
|
||||
|
||||
auto r = fRegions.emplace(id, std::make_unique<Region>(fShmId, id, size, false, callback, bulkCallback, path, flags));
|
||||
// LOG(debug) << "Created region with id '" << id << "', path: '" << path << "', flags: '" << flags << "'";
|
||||
|
||||
if (cfg.lock) {
|
||||
LOG(debug) << "Locking region " << id << "...";
|
||||
if (mlock(r.first->second->fRegion.get_address(), r.first->second->fRegion.get_size()) == -1) {
|
||||
LOG(error) << "Could not lock region " << id << ". Code: " << errno << ", reason: " << strerror(errno);
|
||||
}
|
||||
LOG(debug) << "Successfully locked region " << id << ".";
|
||||
}
|
||||
if (cfg.zero) {
|
||||
LOG(debug) << "Zeroing free memory of region " << id << "...";
|
||||
memset(r.first->second->fRegion.get_address(), 0x00, r.first->second->fRegion.get_size());
|
||||
LOG(debug) << "Successfully zeroed free memory of region " << id << ".";
|
||||
}
|
||||
|
||||
fShmRegions->emplace(id, RegionInfo(path.c_str(), flags, userFlags, fShmVoidAlloc));
|
||||
|
||||
r.first->second->InitializeQueues();
|
||||
r.first->second->StartReceivingAcks();
|
||||
result.first = &(r.first->second->fRegion);
|
||||
result.second = id;
|
||||
|
||||
(fEventCounter->fCount)++;
|
||||
}
|
||||
fRegionEventsCV.notify_all();
|
||||
fRegionsGen += 1; // signal TL cache invalidation
|
||||
fRegionEventsShmCV.notify_all();
|
||||
|
||||
return result;
|
||||
|
||||
} catch (interprocess_exception& e) {
|
||||
LOG(error) << "cannot create region. Already created/not cleaned up?";
|
||||
LOG(error) << e.what();
|
||||
@@ -327,8 +347,28 @@ class Manager
|
||||
|
||||
Region* GetRegion(const uint16_t id)
|
||||
{
|
||||
// NOTE: gcc optimizations. Prevent loading tls addresses many times in the fast path
|
||||
const auto &lTlCache = fTlRegionCache;
|
||||
const auto &lTlCacheVec = lTlCache.fRegionsTLCache;
|
||||
const auto lTlCacheGen = lTlCache.fRegionsTLCacheGen;
|
||||
|
||||
// fast path
|
||||
for (const auto &lRegion : lTlCacheVec) {
|
||||
if ((std::get<1>(lRegion) == id) && (lTlCacheGen == fRegionsGen) && (std::get<2>(lRegion) == fShmId64)) {
|
||||
return std::get<0>(lRegion);
|
||||
}
|
||||
}
|
||||
|
||||
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
|
||||
return GetRegionUnsafe(id);
|
||||
// slow path: check invalidation
|
||||
if (lTlCacheGen != fRegionsGen) {
|
||||
fTlRegionCache.fRegionsTLCache.clear();
|
||||
}
|
||||
|
||||
auto *lRegion = GetRegionUnsafe(id);
|
||||
fTlRegionCache.fRegionsTLCache.emplace_back(std::make_tuple(lRegion, id, fShmId64));
|
||||
fTlRegionCache.fRegionsTLCacheGen = fRegionsGen;
|
||||
return lRegion;
|
||||
}
|
||||
|
||||
Region* GetRegionUnsafe(const uint16_t id)
|
||||
@@ -352,7 +392,7 @@ class Manager
|
||||
LOG(error) << oor.what();
|
||||
return nullptr;
|
||||
} catch (boost::interprocess::interprocess_exception& e) {
|
||||
LOG(warn) << "Could not get remote region for id '" << id << "'";
|
||||
LOG(error) << "Could not get remote region for id '" << id << "': " << e.what();
|
||||
return nullptr;
|
||||
}
|
||||
}
|
||||
@@ -360,13 +400,19 @@ class Manager
|
||||
|
||||
void RemoveRegion(const uint16_t id)
|
||||
{
|
||||
fRegions.erase(id);
|
||||
{
|
||||
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
|
||||
fShmRegions->at(id).fDestroyed = true;
|
||||
(fEventCounter->fCount)++;
|
||||
try {
|
||||
fRegions.at(id)->StopAcks();
|
||||
{
|
||||
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
|
||||
fShmRegions->at(id).fDestroyed = true;
|
||||
fRegions.erase(id);
|
||||
(fEventCounter->fCount)++;
|
||||
}
|
||||
fRegionEventsShmCV.notify_all();
|
||||
} catch(std::out_of_range& oor) {
|
||||
LOG(debug) << "RemoveRegion() could not locate region with id '" << id << "'";
|
||||
}
|
||||
fRegionEventsCV.notify_all();
|
||||
fRegionsGen += 1; // signal TL cache invalidation
|
||||
}
|
||||
|
||||
std::vector<fair::mq::RegionInfo> GetRegionInfo()
|
||||
@@ -387,8 +433,12 @@ class Manager
|
||||
info.event = e.second.fDestroyed ? RegionEvent::destroyed : RegionEvent::created;
|
||||
if (!e.second.fDestroyed) {
|
||||
auto region = GetRegionUnsafe(info.id);
|
||||
info.ptr = region->fRegion.get_address();
|
||||
info.size = region->fRegion.get_size();
|
||||
if (region) {
|
||||
info.ptr = region->fRegion.get_address();
|
||||
info.size = region->fRegion.get_size();
|
||||
} else {
|
||||
throw std::runtime_error(tools::ToString("GetRegionInfoUnsafe() could not get region with id '", info.id, "'"));
|
||||
}
|
||||
} else {
|
||||
info.ptr = nullptr;
|
||||
info.size = 0;
|
||||
@@ -423,7 +473,7 @@ class Manager
|
||||
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
|
||||
fRegionEventsSubscriptionActive = false;
|
||||
lock.unlock();
|
||||
fRegionEventsCV.notify_all();
|
||||
fRegionEventsShmCV.notify_all();
|
||||
fRegionEventThread.join();
|
||||
}
|
||||
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
|
||||
@@ -440,7 +490,7 @@ class Manager
|
||||
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
|
||||
fRegionEventsSubscriptionActive = false;
|
||||
lock.unlock();
|
||||
fRegionEventsCV.notify_all();
|
||||
fRegionEventsShmCV.notify_all();
|
||||
fRegionEventThread.join();
|
||||
lock.lock();
|
||||
fRegionEventCallback = nullptr;
|
||||
@@ -454,36 +504,49 @@ class Manager
|
||||
auto infos = GetRegionInfoUnsafe();
|
||||
for (const auto& i : infos) {
|
||||
auto el = fObservedRegionEvents.find({i.id, i.managed});
|
||||
if (el == fObservedRegionEvents.end()) {
|
||||
fRegionEventCallback(i);
|
||||
if (el == fObservedRegionEvents.end()) { // if event id has not been observed
|
||||
fObservedRegionEvents.emplace(std::make_pair(i.id, i.managed), i.event);
|
||||
++fNumObservedEvents;
|
||||
} else {
|
||||
// if a region has been created and destroyed rapidly, we could see 'destroyed' without ever seeing 'created'
|
||||
// TODO: do we care to show 'created' events if we know region is already destroyed?
|
||||
if (i.event == RegionEvent::created) {
|
||||
fRegionEventCallback(i);
|
||||
++fNumObservedEvents;
|
||||
} else {
|
||||
fNumObservedEvents += 2;
|
||||
}
|
||||
} else { // if event id has been observed (expected - there are two events per id - created & destroyed)
|
||||
// fire a callback if we have observed 'created' event and incoming is 'destroyed'
|
||||
if (el->second == RegionEvent::created && i.event == RegionEvent::destroyed) {
|
||||
fRegionEventCallback(i);
|
||||
el->second = i.event;
|
||||
++fNumObservedEvents;
|
||||
} else {
|
||||
// LOG(debug) << "ignoring event for id" << i.id << ":";
|
||||
// LOG(debug) << "incoming event: " << i.event;
|
||||
// LOG(debug) << "stored event: " << el->second;
|
||||
// LOG(debug) << "ignoring event " << i.id << ": incoming: " << i.event << ", stored: " << el->second;
|
||||
}
|
||||
}
|
||||
}
|
||||
fRegionEventsCV.wait(lock, [&] { return !fRegionEventsSubscriptionActive || fNumObservedEvents != fEventCounter->fCount; });
|
||||
fRegionEventsShmCV.wait(lock, [&] { return !fRegionEventsSubscriptionActive || fNumObservedEvents != fEventCounter->fCount; });
|
||||
}
|
||||
}
|
||||
|
||||
void IncrementMsgCounter() { fMsgCounter.fetch_add(1, std::memory_order_relaxed); }
|
||||
void DecrementMsgCounter() { fMsgCounter.fetch_sub(1, std::memory_order_relaxed); }
|
||||
void IncrementMsgCounter()
|
||||
{
|
||||
#ifdef FAIRMQ_DEBUG_MODE
|
||||
fMsgCounterNew.fetch_add(1, std::memory_order_relaxed);
|
||||
#endif
|
||||
}
|
||||
void DecrementMsgCounter()
|
||||
{
|
||||
#ifdef FAIRMQ_DEBUG_MODE
|
||||
fMsgCounterDelete.fetch_add(1, std::memory_order_relaxed);
|
||||
#endif
|
||||
}
|
||||
|
||||
#ifdef FAIRMQ_DEBUG_MODE
|
||||
void IncrementShmMsgCounter(uint16_t segmentId) { ++((*fShmMsgCounters)[segmentId].fCount); }
|
||||
void DecrementShmMsgCounter(uint16_t segmentId) { --((*fShmMsgCounters)[segmentId].fCount); }
|
||||
#endif
|
||||
|
||||
boost::interprocess::named_mutex& GetMtx() { return fShmMtx; }
|
||||
|
||||
void SendHeartbeats()
|
||||
{
|
||||
std::string controlQueueName("fmq_" + fShmId + "_cq");
|
||||
@@ -511,7 +574,7 @@ class Manager
|
||||
auto it = fSegments.find(id);
|
||||
if (it == fSegments.end()) {
|
||||
try {
|
||||
// get region info
|
||||
// get segment info
|
||||
SegmentInfo segmentInfo = fShmSegments->at(id);
|
||||
LOG(debug) << "Located segment with id '" << id << "'";
|
||||
|
||||
@@ -613,6 +676,7 @@ class Manager
|
||||
using namespace boost::interprocess;
|
||||
bool lastRemoved = false;
|
||||
|
||||
fRegionsGen += 1; // signal TL cache invalidation
|
||||
UnsubscribeFromRegionEvents();
|
||||
|
||||
{
|
||||
@@ -645,6 +709,7 @@ class Manager
|
||||
}
|
||||
|
||||
private:
|
||||
uint64_t fShmId64;
|
||||
std::string fShmId;
|
||||
uint16_t fSegmentId;
|
||||
std::string fDeviceId;
|
||||
@@ -653,11 +718,11 @@ class Manager
|
||||
VoidAlloc fShmVoidAlloc;
|
||||
boost::interprocess::named_mutex fShmMtx;
|
||||
|
||||
boost::interprocess::named_condition fRegionEventsCV;
|
||||
boost::interprocess::named_condition fRegionEventsShmCV;
|
||||
std::thread fRegionEventThread;
|
||||
bool fRegionEventsSubscriptionActive;
|
||||
std::function<void(fair::mq::RegionInfo)> fRegionEventCallback;
|
||||
std::map<std::pair<uint16_t, bool>, RegionEvent> fObservedRegionEvents;
|
||||
std::map<std::pair<uint16_t, bool>, RegionEvent> fObservedRegionEvents; // pair: <region id, managed>
|
||||
uint64_t fNumObservedEvents;
|
||||
|
||||
DeviceCounter* fDeviceCounter;
|
||||
@@ -665,12 +730,20 @@ class Manager
|
||||
Uint16SegmentInfoHashMap* fShmSegments;
|
||||
Uint16RegionInfoHashMap* fShmRegions;
|
||||
std::unordered_map<uint16_t, std::unique_ptr<Region>> fRegions;
|
||||
// make sure this is alone in the cache line: mostly read
|
||||
alignas(128) inline static std::atomic<unsigned long> fRegionsGen = 0ul;
|
||||
inline static thread_local struct ManagerTLCache {
|
||||
unsigned long fRegionsTLCacheGen;
|
||||
std::vector<std::tuple<Region*, uint16_t, uint64_t>> fRegionsTLCache;
|
||||
} fTlRegionCache;
|
||||
|
||||
std::atomic<bool> fInterrupted;
|
||||
std::atomic<int32_t> fMsgCounter; // TODO: find a better lifetime solution instead of the counter
|
||||
#ifdef FAIRMQ_DEBUG_MODE
|
||||
// make sure the counters are not thrashing the cache line between threads doing creation and deallocation
|
||||
Uint16MsgDebugMapHashMap* fMsgDebug;
|
||||
Uint16MsgCounterHashMap* fShmMsgCounters;
|
||||
alignas(128) std::atomic_uint64_t fMsgCounterNew;
|
||||
alignas(128) std::atomic_uint64_t fMsgCounterDelete;
|
||||
#endif
|
||||
|
||||
std::thread fHeartbeatThread;
|
||||
@@ -680,6 +753,8 @@ class Manager
|
||||
|
||||
bool fThrowOnBadAlloc;
|
||||
bool fNoCleanup;
|
||||
|
||||
|
||||
};
|
||||
|
||||
} // namespace fair::mq::shmem
|
||||
|
@@ -109,12 +109,17 @@ class Message final : public fair::mq::Message
|
||||
, fRegionPtr(nullptr)
|
||||
, fLocalPtr(static_cast<char*>(data))
|
||||
{
|
||||
if (region->GetType() != GetType()) {
|
||||
LOG(error) << "region type (" << region->GetType() << ") does not match message type (" << GetType() << ")";
|
||||
throw TransportError(tools::ToString("region type (", region->GetType(), ") does not match message type (", GetType(), ")"));
|
||||
}
|
||||
|
||||
if (reinterpret_cast<const char*>(data) >= reinterpret_cast<const char*>(region->GetData()) &&
|
||||
reinterpret_cast<const char*>(data) <= reinterpret_cast<const char*>(region->GetData()) + region->GetSize()) {
|
||||
fMeta.fHandle = (boost::interprocess::managed_shared_memory::handle_t)(reinterpret_cast<const char*>(data) - reinterpret_cast<const char*>(region->GetData()));
|
||||
} else {
|
||||
LOG(error) << "trying to create region message with data from outside the region";
|
||||
throw std::runtime_error("trying to create region message with data from outside the region");
|
||||
throw TransportError("trying to create region message with data from outside the region");
|
||||
}
|
||||
fManager.IncrementMsgCounter();
|
||||
}
|
||||
@@ -304,6 +309,8 @@ class Message final : public fair::mq::Message
|
||||
}
|
||||
|
||||
if (fRegionPtr) {
|
||||
fRegionPtr->InitializeQueues();
|
||||
fRegionPtr->StartSendingAcks();
|
||||
fRegionPtr->ReleaseBlock({fMeta.fHandle, fMeta.fSize, fMeta.fHint});
|
||||
} else {
|
||||
LOG(warn) << "region ack queue for id " << fMeta.fRegionId << " no longer exist. Not sending ack";
|
||||
@@ -319,7 +326,7 @@ class Message final : public fair::mq::Message
|
||||
Deallocate();
|
||||
fAlignment = 0;
|
||||
|
||||
fManager.DecrementMsgCounter(); // TODO: put this to debug mode
|
||||
fManager.DecrementMsgCounter();
|
||||
}
|
||||
};
|
||||
|
||||
|
@@ -20,7 +20,6 @@
|
||||
#include <boost/date_time/posix_time/posix_time.hpp>
|
||||
|
||||
#include <csignal>
|
||||
#include <filesystem>
|
||||
#include <iostream>
|
||||
#include <iomanip>
|
||||
#include <chrono>
|
||||
@@ -32,6 +31,14 @@
|
||||
#include <termios.h>
|
||||
#include <poll.h>
|
||||
|
||||
#if FAIRMQ_HAS_STD_FILESYSTEM
|
||||
#include <filesystem>
|
||||
namespace fs = std::filesystem;
|
||||
#else
|
||||
#include <boost/filesystem.hpp>
|
||||
namespace fs = ::boost::filesystem;
|
||||
#endif
|
||||
|
||||
using namespace std;
|
||||
using bie = ::boost::interprocess::interprocess_exception;
|
||||
namespace bipc = ::boost::interprocess;
|
||||
@@ -275,12 +282,12 @@ bool Monitor::PrintShm(const ShmId& shmId)
|
||||
void Monitor::ListAll(const std::string& path)
|
||||
{
|
||||
try {
|
||||
if (std::filesystem::is_empty(path)) {
|
||||
LOG(info) << "directory " << filesystem::path(path) << " is empty.";
|
||||
if (fs::is_empty(path)) {
|
||||
LOG(info) << "directory " << fs::path(path) << " is empty.";
|
||||
return;
|
||||
}
|
||||
|
||||
for (const auto& entry : filesystem::directory_iterator(path)) {
|
||||
for (const auto& entry : fs::directory_iterator(path)) {
|
||||
string filename = entry.path().filename().string();
|
||||
// LOG(info) << filename << ", size: " << entry.file_size() << " bytes";
|
||||
if (tools::StrStartsWith(filename, "fmq_") || tools::StrStartsWith(filename, "sem.fmq_")) {
|
||||
@@ -296,7 +303,7 @@ void Monitor::ListAll(const std::string& path)
|
||||
LOG(info) << "The file '" << filename << "' does not belong to FairMQ, skipping...";
|
||||
}
|
||||
}
|
||||
} catch (filesystem::filesystem_error& fse) {
|
||||
} catch (fs::filesystem_error& fse) {
|
||||
LOG(error) << "error: " << fse.what();
|
||||
}
|
||||
}
|
||||
@@ -490,7 +497,7 @@ std::pair<std::string, bool> RunRemoval(std::function<bool(const std::string&)>
|
||||
return {name, true};
|
||||
} else {
|
||||
if (verbose) {
|
||||
LOG(info) << "Did not remove '" << name << "'. Already removed?";
|
||||
LOG(debug) << "Did not remove '" << name << "'. Already removed?";
|
||||
}
|
||||
return {name, false};
|
||||
}
|
||||
@@ -518,7 +525,7 @@ std::vector<std::pair<std::string, bool>> Monitor::Cleanup(const ShmId& shmId, b
|
||||
RegionCounter* rc = managementSegment.find<RegionCounter>(bipc::unique_instance).first;
|
||||
if (rc) {
|
||||
if (verbose) {
|
||||
LOG(info) << "Region counter found: " << rc->fCount;
|
||||
LOG(debug) << "Region counter found: " << rc->fCount;
|
||||
}
|
||||
uint16_t regionCount = rc->fCount;
|
||||
|
||||
|
@@ -1,4 +1,4 @@
|
||||
# Shared Memory transport
|
||||
## Shared Memory transport
|
||||
|
||||
Shared memory transport for FairMQ. To try with existing devices, run the devices with `--transport shmem` option or configure channel transport in JSON (see examples/MQ/multiple-transports).
|
||||
|
||||
@@ -6,7 +6,7 @@ The transport manages shared memory via boost::interprocess library. The transfe
|
||||
|
||||
Devices track and cleanup shared memory on shutdown. For more information on the current shared memory segment and additional cleanup options, see following section.
|
||||
|
||||
# Shared Memory objects / files
|
||||
## Shared Memory objects / files
|
||||
|
||||
FairMQ Shared Memory currently uses the following names to register shared memory on the system:
|
||||
|
||||
@@ -15,7 +15,7 @@ FairMQ Shared Memory currently uses the following names to register shared memor
|
||||
| `fmq_<shmId>_m_<segmentId>` | managed segment(s) (user data) | one of the devices | devices |
|
||||
| `fmq_<shmId>_mng` | management segment (management data) | one of the devices | devices |
|
||||
| `fmq_<shmId>_mtx` | mutex | one of the devices | devices |
|
||||
| `fmq_<shmId>_cv` | condition variable | one of the devices | devices with unmanaged regions |
|
||||
| `fmq_<shmId>_cv` | condition variable | one of the devices | devices |
|
||||
| `fmq_<shmId>_rg_<index>` | unmanaged region(s) | one of the devices | devices with unmanaged regions |
|
||||
| `fmq_<shmId>_rgq_<index>` | unmanaged region queue(s) | one of the devices | devices with unmanaged regions |
|
||||
| `fmq_<shmId>_ms` | shmmonitor status | shmmonitor | devices, shmmonitor |
|
||||
@@ -35,7 +35,7 @@ The monitor runs in one of the following modes:
|
||||
| --------------------------- | ---------------------------------------------- |
|
||||
| no args | Print segment info of the specified session/shm ID and exit. |
|
||||
| `--view`,`-v` | Print segment info of the specified session/shm ID and exit. |
|
||||
| `--interactive`,`-i` | Print segment info of the specified session/shm ID and exit at a given interval (`--interval`), with some keyboard controls. Can be combined with `--view` for read-only access (and avoid receiving heartbeats). |
|
||||
| `--interactive`,`-i` | Print segment info of the specified session/shm ID at a given interval (`--interval`), with some keyboard controls. Can be combined with `--view` for read-only access (and avoid receiving heartbeats). |
|
||||
| `--monitor`,`-m` | Monitor the session shm usage by receiving heartbeats from shmem users, cleaning it up if no heartbeats arrived within configured timeout (`--timeout`/`-t`). Only one heartbeat receiver per session is currently possible. If `--self-destruct`/`-x` is added, monitor will exit either when (a) no shm has been observed for interval * 2, (b) a cleanup due to reached timeout has been performed, or (c) shm has been observed, but is now cleaned up. |
|
||||
| `--cleanup`,`-c` | Cleanup the shm for the specified session and exit. |
|
||||
| `--debug`,`-b` | Print the list of messages in the current session and exit. Only availabe when FairMQ is compiled with `FAIRMQ_DEBUG_MODE=ON` (high performance impact). |
|
||||
@@ -50,7 +50,10 @@ Additional cmd options:
|
||||
| `--daemonize`,`-d` | Can be combined with the monitoring mode to detach the process from the parent. |
|
||||
| `--verbose`,`-d` | When running as a daemon, store monitor output in `fairmq-shmmonitor_<timestamp>.log` |
|
||||
|
||||
|
||||
Possible further implementation would be to run the monitor with `--self-destruct` with each topology.
|
||||
For full option details, run with `-h`.
|
||||
|
||||
The Monitor class can also be used independently from the supplied executable, allowing integration on any level.
|
||||
|
||||
## Troubleshooting
|
||||
|
||||
Bus Error (SIGBUS) can occur if the transport tries to access shared memory that is not accessible. One reason could be because the used memory in the segment exceeds the capacity or available memory of the shmem filesystem (capacity is by default set to half of RAM on Linux).
|
||||
|
@@ -47,7 +47,7 @@ struct Region
|
||||
Region(const std::string& shmId, uint16_t id, uint64_t size, bool remote, RegionCallback callback, RegionBulkCallback bulkCallback, const std::string& path, int flags)
|
||||
: fRemote(remote)
|
||||
, fLinger(100)
|
||||
, fStop(false)
|
||||
, fStopAcks(false)
|
||||
, fName("fmq_" + shmId + "_rg_" + std::to_string(id))
|
||||
, fQueueName("fmq_" + shmId + "_rgq_" + std::to_string(id))
|
||||
, fShmemObject()
|
||||
@@ -85,18 +85,26 @@ struct Region
|
||||
LOG(debug) << "shmem: initialized file: " << fName;
|
||||
fRegion = mapped_region(fFileMapping, read_write, 0, size, 0, flags);
|
||||
} else {
|
||||
if (fRemote) {
|
||||
fShmemObject = shared_memory_object(open_only, fName.c_str(), read_write);
|
||||
} else {
|
||||
fShmemObject = shared_memory_object(create_only, fName.c_str(), read_write);
|
||||
fShmemObject.truncate(size);
|
||||
try {
|
||||
if (fRemote) {
|
||||
fShmemObject = shared_memory_object(open_only, fName.c_str(), read_write);
|
||||
} else {
|
||||
fShmemObject = shared_memory_object(create_only, fName.c_str(), read_write);
|
||||
fShmemObject.truncate(size);
|
||||
}
|
||||
} catch(interprocess_exception& e) {
|
||||
LOG(error) << "Failed " << (fRemote ? "opening" : "creating") << " shared_memory_object for region id '" << id << "': " << e.what();
|
||||
throw;
|
||||
}
|
||||
try {
|
||||
fRegion = mapped_region(fShmemObject, read_write, 0, 0, 0, flags);
|
||||
} catch(interprocess_exception& e) {
|
||||
LOG(error) << "Failed mapping shared_memory_object for region id '" << id << "': " << e.what();
|
||||
throw;
|
||||
}
|
||||
fRegion = mapped_region(fShmemObject, read_write, 0, 0, 0, flags);
|
||||
}
|
||||
|
||||
InitializeQueues();
|
||||
StartSendingAcks();
|
||||
LOG(debug) << "shmem: initialized region: " << fName;
|
||||
LOG(trace) << "shmem: initialized region: " << fName << " (" << (fRemote ? "remote" : "local") << ")";
|
||||
}
|
||||
|
||||
Region() = delete;
|
||||
@@ -108,15 +116,22 @@ struct Region
|
||||
{
|
||||
using namespace boost::interprocess;
|
||||
|
||||
if (fRemote) {
|
||||
fQueue = std::make_unique<message_queue>(open_only, fQueueName.c_str());
|
||||
} else {
|
||||
fQueue = std::make_unique<message_queue>(create_only, fQueueName.c_str(), 1024, fAckBunchSize * sizeof(RegionBlock));
|
||||
if (fQueue == nullptr) {
|
||||
if (fRemote) {
|
||||
fQueue = std::make_unique<message_queue>(open_only, fQueueName.c_str());
|
||||
} else {
|
||||
fQueue = std::make_unique<message_queue>(create_only, fQueueName.c_str(), 1024, fAckBunchSize * sizeof(RegionBlock));
|
||||
}
|
||||
LOG(trace) << "shmem: initialized region queue: " << fQueueName << " (" << (fRemote ? "remote" : "local") << ")";
|
||||
}
|
||||
LOG(debug) << "shmem: initialized region queue: " << fQueueName;
|
||||
}
|
||||
|
||||
void StartSendingAcks() { fAcksSender = std::thread(&Region::SendAcks, this); }
|
||||
void StartSendingAcks()
|
||||
{
|
||||
if (!fAcksSender.joinable()) {
|
||||
fAcksSender = std::thread(&Region::SendAcks, this);
|
||||
}
|
||||
}
|
||||
void SendAcks()
|
||||
{
|
||||
std::unique_ptr<RegionBlock[]> blocks = std::make_unique<RegionBlock[]>(fAckBunchSize);
|
||||
@@ -140,13 +155,13 @@ struct Region
|
||||
}
|
||||
|
||||
if (blocksToSend > 0) {
|
||||
while (!fQueue->try_send(blocks.get(), blocksToSend * sizeof(RegionBlock), 0) && !fStop) {
|
||||
while (!fQueue->try_send(blocks.get(), blocksToSend * sizeof(RegionBlock), 0) && !fStopAcks) {
|
||||
// receiver slow? yield and try again...
|
||||
std::this_thread::yield();
|
||||
}
|
||||
// LOG(debug) << "Sent " << blocksToSend << " blocks.";
|
||||
} else { // blocksToSend == 0
|
||||
if (fStop) {
|
||||
if (fStopAcks) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
@@ -156,7 +171,12 @@ struct Region
|
||||
<< " blocks left to send: " << blocksToSend << ").";
|
||||
}
|
||||
|
||||
void StartReceivingAcks() { fAcksReceiver = std::thread(&Region::ReceiveAcks, this); }
|
||||
void StartReceivingAcks()
|
||||
{
|
||||
if (!fAcksReceiver.joinable()) {
|
||||
fAcksReceiver = std::thread(&Region::ReceiveAcks, this);
|
||||
}
|
||||
}
|
||||
void ReceiveAcks()
|
||||
{
|
||||
unsigned int priority;
|
||||
@@ -168,7 +188,7 @@ struct Region
|
||||
while (true) {
|
||||
uint32_t timeout = 100;
|
||||
bool leave = false;
|
||||
if (fStop) {
|
||||
if (fStopAcks) {
|
||||
timeout = fLinger;
|
||||
leave = true;
|
||||
}
|
||||
@@ -213,9 +233,25 @@ struct Region
|
||||
void SetLinger(uint32_t linger) { fLinger = linger; }
|
||||
uint32_t GetLinger() const { return fLinger; }
|
||||
|
||||
void StopAcks()
|
||||
{
|
||||
fStopAcks = true;
|
||||
|
||||
if (fAcksSender.joinable()) {
|
||||
fBlockSendCV.notify_one();
|
||||
fAcksSender.join();
|
||||
}
|
||||
|
||||
if (!fRemote) {
|
||||
if (fAcksReceiver.joinable()) {
|
||||
fAcksReceiver.join();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
~Region()
|
||||
{
|
||||
fStop = true;
|
||||
fStopAcks = true;
|
||||
|
||||
if (fAcksSender.joinable()) {
|
||||
fBlockSendCV.notify_one();
|
||||
@@ -228,11 +264,11 @@ struct Region
|
||||
}
|
||||
|
||||
if (boost::interprocess::shared_memory_object::remove(fName.c_str())) {
|
||||
LOG(debug) << "Region '" << fName << "' destroyed.";
|
||||
LOG(trace) << "Region '" << fName << "' destroyed.";
|
||||
}
|
||||
|
||||
if (boost::interprocess::file_mapping::remove(fName.c_str())) {
|
||||
LOG(debug) << "File mapping '" << fName << "' destroyed.";
|
||||
LOG(trace) << "File mapping '" << fName << "' destroyed.";
|
||||
}
|
||||
|
||||
if (fFile) {
|
||||
@@ -240,19 +276,18 @@ struct Region
|
||||
}
|
||||
|
||||
if (boost::interprocess::message_queue::remove(fQueueName.c_str())) {
|
||||
LOG(debug) << "Region queue '" << fQueueName << "' destroyed.";
|
||||
LOG(trace) << "Region queue '" << fQueueName << "' destroyed.";
|
||||
}
|
||||
} else {
|
||||
// LOG(debug) << "shmem: region '" << fName << "' is remote, no cleanup necessary.";
|
||||
LOG(debug) << "Region queue '" << fQueueName << "' is remote, no cleanup necessary";
|
||||
// LOG(debug) << "Region queue '" << fQueueName << "' is remote, no cleanup necessary";
|
||||
}
|
||||
|
||||
LOG(debug) << "Region '" << fName << "' (" << (fRemote ? "remote" : "local") << ") destructed.";
|
||||
// LOG(debug) << "Region '" << fName << "' (" << (fRemote ? "remote" : "local") << ") destructed.";
|
||||
}
|
||||
|
||||
bool fRemote;
|
||||
uint32_t fLinger;
|
||||
std::atomic<bool> fStop;
|
||||
std::atomic<bool> fStopAcks;
|
||||
std::string fName;
|
||||
std::string fQueueName;
|
||||
boost::interprocess::shared_memory_object fShmemObject;
|
||||
|
@@ -378,12 +378,10 @@ class Socket final : public fair::mq::Socket
|
||||
}
|
||||
}
|
||||
|
||||
void Events(uint32_t* events) override
|
||||
int Events(uint32_t* events) override
|
||||
{
|
||||
size_t eventsSize = sizeof(uint32_t);
|
||||
if (zmq_getsockopt(fSocket, ZMQ_EVENTS, events, &eventsSize) < 0) {
|
||||
throw SocketError(tools::ToString("failed setting ZMQ_EVENTS, reason: ", zmq_strerror(errno)));
|
||||
}
|
||||
return zmq_getsockopt(fSocket, ZMQ_EVENTS, events, &eventsSize);
|
||||
}
|
||||
|
||||
int GetLinger() const override
|
||||
|
@@ -141,29 +141,29 @@ class TransportFactory final : public fair::mq::TransportFactory
|
||||
return std::make_unique<Poller>(channelsMap, channelList);
|
||||
}
|
||||
|
||||
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0) override
|
||||
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) override
|
||||
{
|
||||
return CreateUnmanagedRegion(size, 0, callback, nullptr, path, flags);
|
||||
return CreateUnmanagedRegion(size, 0, callback, nullptr, path, flags, cfg);
|
||||
}
|
||||
|
||||
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, RegionBulkCallback bulkCallback = nullptr, const std::string& path = "", int flags = 0) override
|
||||
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, RegionBulkCallback bulkCallback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) override
|
||||
{
|
||||
return CreateUnmanagedRegion(size, 0, nullptr, bulkCallback, path, flags);
|
||||
return CreateUnmanagedRegion(size, 0, nullptr, bulkCallback, path, flags, cfg);
|
||||
}
|
||||
|
||||
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, int64_t userFlags, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0) override
|
||||
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, int64_t userFlags, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) override
|
||||
{
|
||||
return CreateUnmanagedRegion(size, userFlags, callback, nullptr, path, flags);
|
||||
return CreateUnmanagedRegion(size, userFlags, callback, nullptr, path, flags, cfg);
|
||||
}
|
||||
|
||||
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, int64_t userFlags, RegionBulkCallback bulkCallback = nullptr, const std::string& path = "", int flags = 0) override
|
||||
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, int64_t userFlags, RegionBulkCallback bulkCallback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) override
|
||||
{
|
||||
return CreateUnmanagedRegion(size, userFlags, nullptr, bulkCallback, path, flags);
|
||||
return CreateUnmanagedRegion(size, userFlags, nullptr, bulkCallback, path, flags, cfg);
|
||||
}
|
||||
|
||||
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, int64_t userFlags, RegionCallback callback, RegionBulkCallback bulkCallback, const std::string& path, int flags)
|
||||
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, int64_t userFlags, RegionCallback callback, RegionBulkCallback bulkCallback, const std::string& path, int flags, fair::mq::RegionConfig cfg)
|
||||
{
|
||||
return std::make_unique<UnmanagedRegion>(*fManager, size, userFlags, callback, bulkCallback, path, flags, this);
|
||||
return std::make_unique<UnmanagedRegion>(*fManager, size, userFlags, callback, bulkCallback, path, flags, this, cfg);
|
||||
}
|
||||
|
||||
void SubscribeToRegionEvents(RegionEventCallback callback) override { fManager->SubscribeToRegionEvents(callback); }
|
||||
|
@@ -37,15 +37,16 @@ class UnmanagedRegion final : public fair::mq::UnmanagedRegion
|
||||
const int64_t userFlags,
|
||||
RegionCallback callback,
|
||||
RegionBulkCallback bulkCallback,
|
||||
const std::string& path = "",
|
||||
int flags = 0,
|
||||
FairMQTransportFactory* factory = nullptr)
|
||||
const std::string& path,
|
||||
int flags,
|
||||
FairMQTransportFactory* factory,
|
||||
fair::mq::RegionConfig cfg)
|
||||
: FairMQUnmanagedRegion(factory)
|
||||
, fManager(manager)
|
||||
, fRegion(nullptr)
|
||||
, fRegionId(0)
|
||||
{
|
||||
auto result = fManager.CreateRegion(size, userFlags, callback, bulkCallback, path, flags);
|
||||
auto result = fManager.CreateRegion(size, userFlags, callback, bulkCallback, path, flags, cfg);
|
||||
fRegion = result.first;
|
||||
fRegionId = result.second;
|
||||
}
|
||||
@@ -56,6 +57,8 @@ class UnmanagedRegion final : public fair::mq::UnmanagedRegion
|
||||
void SetLinger(uint32_t linger) override { fManager.GetRegion(fRegionId)->SetLinger(linger); }
|
||||
uint32_t GetLinger() const override { return fManager.GetRegion(fRegionId)->GetLinger(); }
|
||||
|
||||
Transport GetType() const override { return fair::mq::Transport::SHM; }
|
||||
|
||||
~UnmanagedRegion() override { fManager.RemoveRegion(fRegionId); }
|
||||
|
||||
private:
|
||||
|
@@ -89,6 +89,7 @@ int main(int argc, char** argv)
|
||||
bool listAll = false;
|
||||
string listAllPath;
|
||||
bool verbose = false;
|
||||
string severity;
|
||||
int userId = -1;
|
||||
|
||||
options_description desc("Options");
|
||||
@@ -109,6 +110,7 @@ int main(int argc, char** argv)
|
||||
("list-all" , value<bool>(&listAll)->implicit_value(true), "List all sessions & segments")
|
||||
("list-all-path" , value<string>(&listAllPath)->default_value("/dev/shm/"),"Path for the --list-all command to search segments in")
|
||||
("verbose" , value<bool>(&verbose)->implicit_value(true), "Verbose mode (daemon will output to a file 'fairmq-shmmonitor_<timestamp>')")
|
||||
("severity" , value<string>(&severity)->default_value("info"), "Log severity")
|
||||
("user-id" , value<int>(&userId)->default_value(-1), "User id (used with --get-shmid)")
|
||||
("help,h", "Print help");
|
||||
|
||||
@@ -122,6 +124,8 @@ int main(int argc, char** argv)
|
||||
|
||||
notify(vm);
|
||||
|
||||
fair::Logger::SetConsoleSeverity(severity);
|
||||
|
||||
if (getShmId) {
|
||||
if (userId == -1) {
|
||||
LOG(info) << "shmem id for session '" << sessionName << "' and current user id " << geteuid()
|
||||
|
@@ -109,6 +109,11 @@ class Message final : public fair::mq::Message
|
||||
, fAlignment(0)
|
||||
, fMsg(std::make_unique<zmq_msg_t>())
|
||||
{
|
||||
if (region->GetType() != GetType()) {
|
||||
LOG(error) << "region type (" << region->GetType() << ") does not match message type (" << GetType() << ")";
|
||||
throw TransportError(tools::ToString("region type (", region->GetType(), ") does not match message type (", GetType(), ")"));
|
||||
}
|
||||
|
||||
// FIXME: make this zero-copy:
|
||||
// simply taking over the provided buffer can casue premature delete, since region could be
|
||||
// destroyed before the message is sent out. Needs lifetime extension for the ZMQ region.
|
||||
|
@@ -323,12 +323,10 @@ class Socket final : public fair::mq::Socket
|
||||
}
|
||||
}
|
||||
|
||||
void Events(uint32_t* events) override
|
||||
int Events(uint32_t* events) override
|
||||
{
|
||||
size_t eventsSize = sizeof(uint32_t);
|
||||
if (zmq_getsockopt(fSocket, ZMQ_EVENTS, events, &eventsSize) < 0) {
|
||||
throw SocketError(tools::ToString("failed setting ZMQ_EVENTS, reason: ", zmq_strerror(errno)));
|
||||
}
|
||||
return zmq_getsockopt(fSocket, ZMQ_EVENTS, events, &eventsSize);
|
||||
}
|
||||
|
||||
void SetLinger(const int value) override
|
||||
|
@@ -96,29 +96,29 @@ class TransportFactory final : public FairMQTransportFactory
|
||||
return std::make_unique<Poller>(channelsMap, channelList);
|
||||
}
|
||||
|
||||
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, RegionCallback callback, const std::string& path = "", int flags = 0) override
|
||||
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, RegionCallback callback, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) override
|
||||
{
|
||||
return CreateUnmanagedRegion(size, 0, callback, nullptr, path, flags);
|
||||
return CreateUnmanagedRegion(size, 0, callback, nullptr, path, flags, cfg);
|
||||
}
|
||||
|
||||
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, RegionBulkCallback bulkCallback, const std::string& path = "", int flags = 0) override
|
||||
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, RegionBulkCallback bulkCallback, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) override
|
||||
{
|
||||
return CreateUnmanagedRegion(size, 0, nullptr, bulkCallback, path, flags);
|
||||
return CreateUnmanagedRegion(size, 0, nullptr, bulkCallback, path, flags, cfg);
|
||||
}
|
||||
|
||||
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, RegionCallback callback, const std::string& path = "", int flags = 0) override
|
||||
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, RegionCallback callback, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) override
|
||||
{
|
||||
return CreateUnmanagedRegion(size, userFlags, callback, nullptr, path, flags);
|
||||
return CreateUnmanagedRegion(size, userFlags, callback, nullptr, path, flags, cfg);
|
||||
}
|
||||
|
||||
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, RegionBulkCallback bulkCallback, const std::string& path = "", int flags = 0) override
|
||||
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, RegionBulkCallback bulkCallback, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) override
|
||||
{
|
||||
return CreateUnmanagedRegion(size, userFlags, nullptr, bulkCallback, path, flags);
|
||||
return CreateUnmanagedRegion(size, userFlags, nullptr, bulkCallback, path, flags, cfg);
|
||||
}
|
||||
|
||||
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, RegionCallback callback, RegionBulkCallback bulkCallback, const std::string&, int)
|
||||
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, RegionCallback callback, RegionBulkCallback bulkCallback, const std::string&, int /* flags */, fair::mq::RegionConfig cfg)
|
||||
{
|
||||
UnmanagedRegionPtr ptr = std::make_unique<UnmanagedRegion>(*fCtx, size, userFlags, callback, bulkCallback, this);
|
||||
UnmanagedRegionPtr ptr = std::make_unique<UnmanagedRegion>(*fCtx, size, userFlags, callback, bulkCallback, this, cfg);
|
||||
auto zPtr = static_cast<UnmanagedRegion*>(ptr.get());
|
||||
fCtx->AddRegion(false, zPtr->GetId(), zPtr->GetData(), zPtr->GetSize(), zPtr->GetUserFlags(), RegionEvent::created);
|
||||
return ptr;
|
||||
|
@@ -16,6 +16,8 @@
|
||||
#include <cstddef> // size_t
|
||||
#include <string>
|
||||
|
||||
#include <sys/mman.h> // mlock
|
||||
|
||||
namespace fair::mq::zmq
|
||||
{
|
||||
|
||||
@@ -30,7 +32,8 @@ class UnmanagedRegion final : public fair::mq::UnmanagedRegion
|
||||
int64_t userFlags,
|
||||
RegionCallback callback,
|
||||
RegionBulkCallback bulkCallback,
|
||||
FairMQTransportFactory* factory = nullptr)
|
||||
FairMQTransportFactory* factory,
|
||||
fair::mq::RegionConfig cfg)
|
||||
: fair::mq::UnmanagedRegion(factory)
|
||||
, fCtx(ctx)
|
||||
, fId(fCtx.RegionCount())
|
||||
@@ -39,7 +42,20 @@ class UnmanagedRegion final : public fair::mq::UnmanagedRegion
|
||||
, fUserFlags(userFlags)
|
||||
, fCallback(callback)
|
||||
, fBulkCallback(bulkCallback)
|
||||
{}
|
||||
{
|
||||
if (cfg.lock) {
|
||||
LOG(debug) << "Locking region " << fId << "...";
|
||||
if (mlock(fBuffer, fSize) == -1) {
|
||||
LOG(error) << "Could not lock region " << fId << ". Code: " << errno << ", reason: " << strerror(errno);
|
||||
}
|
||||
LOG(debug) << "Successfully locked region " << fId << ".";
|
||||
}
|
||||
if (cfg.zero) {
|
||||
LOG(debug) << "Zeroing free memory of region " << fId << "...";
|
||||
memset(fBuffer, 0x00, fSize);
|
||||
LOG(debug) << "Successfully zeroed free memory of region " << fId << ".";
|
||||
}
|
||||
}
|
||||
|
||||
UnmanagedRegion(const UnmanagedRegion&) = delete;
|
||||
UnmanagedRegion operator=(const UnmanagedRegion&) = delete;
|
||||
@@ -51,6 +67,8 @@ class UnmanagedRegion final : public fair::mq::UnmanagedRegion
|
||||
void SetLinger(uint32_t /* linger */) override { LOG(debug) << "ZeroMQ UnmanagedRegion linger option not implemented. Acknowledgements are local."; }
|
||||
uint32_t GetLinger() const override { LOG(debug) << "ZeroMQ UnmanagedRegion linger option not implemented. Acknowledgements are local."; return 0; }
|
||||
|
||||
Transport GetType() const override { return Transport::ZMQ; }
|
||||
|
||||
virtual ~UnmanagedRegion()
|
||||
{
|
||||
LOG(debug) << "destroying region " << fId;
|
||||
|
@@ -153,9 +153,11 @@ TEST(ErrorState, interactive_InReset)
|
||||
EXPECT_EXIT(RunErrorStateIn("Reset", "interactive", "q"), ::testing::ExitedWithCode(1), "");
|
||||
}
|
||||
|
||||
#ifdef FAIRMQ_DEBUG_MODE
|
||||
TEST(ErrorState, OrphanMessages)
|
||||
{
|
||||
BadDevice badDevice;
|
||||
}
|
||||
#endif
|
||||
|
||||
} // namespace
|
||||
|
@@ -23,6 +23,67 @@ namespace
|
||||
|
||||
using namespace std;
|
||||
|
||||
void RegionsCache(const string& transport, const string& address)
|
||||
{
|
||||
size_t session1 = fair::mq::tools::UuidHash();
|
||||
size_t session2 = fair::mq::tools::UuidHash();
|
||||
|
||||
fair::mq::ProgOptions config1;
|
||||
fair::mq::ProgOptions config2;
|
||||
config1.SetProperty<string>("session", to_string(session1));
|
||||
config2.SetProperty<string>("session", to_string(session2));
|
||||
|
||||
auto factory1 = FairMQTransportFactory::CreateTransportFactory(transport, fair::mq::tools::Uuid(), &config1);
|
||||
auto factory2 = FairMQTransportFactory::CreateTransportFactory(transport, fair::mq::tools::Uuid(), &config2);
|
||||
|
||||
auto region1 = factory1->CreateUnmanagedRegion(1000000, [](void*, size_t, void*) {});
|
||||
auto region2 = factory2->CreateUnmanagedRegion(1000000, [](void*, size_t, void*) {});
|
||||
void* r1ptr = region1->GetData();
|
||||
void* r2ptr = region2->GetData();
|
||||
|
||||
FairMQChannel push1("Push1", "push", factory1);
|
||||
FairMQChannel pull1("Pull1", "pull", factory1);
|
||||
push1.Bind(address + to_string(1));
|
||||
pull1.Connect(address + to_string(1));
|
||||
FairMQChannel push2("Push2", "push", factory2);
|
||||
FairMQChannel pull2("Pull2", "pull", factory2);
|
||||
push2.Bind(address + to_string(2));
|
||||
pull2.Connect(address + to_string(2));
|
||||
|
||||
{
|
||||
static_cast<char*>(r1ptr)[0] = 97; // a
|
||||
static_cast<char*>(static_cast<char*>(r1ptr) + 100)[0] = 98; // b
|
||||
static_cast<char*>(r2ptr)[0] = 99; // c
|
||||
static_cast<char*>(static_cast<char*>(r2ptr) + 100)[0] = 100; // d
|
||||
|
||||
FairMQMessagePtr m1(push1.NewMessage(region1, r1ptr, 100, nullptr));
|
||||
FairMQMessagePtr m2(push1.NewMessage(region1, static_cast<char*>(r1ptr) + 100, 100, nullptr));
|
||||
push1.Send(m1);
|
||||
push1.Send(m2);
|
||||
|
||||
FairMQMessagePtr m3(push2.NewMessage(region2, r2ptr, 100, nullptr));
|
||||
FairMQMessagePtr m4(push2.NewMessage(region2, static_cast<char*>(r2ptr) + 100, 100, nullptr));
|
||||
push2.Send(m3);
|
||||
push2.Send(m4);
|
||||
}
|
||||
|
||||
{
|
||||
FairMQMessagePtr m1(pull1.NewMessage());
|
||||
FairMQMessagePtr m2(pull1.NewMessage());
|
||||
ASSERT_EQ(pull1.Receive(m1), 100);
|
||||
ASSERT_EQ(pull1.Receive(m2), 100);
|
||||
ASSERT_EQ(static_cast<char*>(m1->GetData())[0], 'a');
|
||||
ASSERT_EQ(static_cast<char*>(m2->GetData())[0], 'b');
|
||||
|
||||
FairMQMessagePtr m3(pull2.NewMessage());
|
||||
FairMQMessagePtr m4(pull2.NewMessage());
|
||||
ASSERT_EQ(pull2.Receive(m3), 100);
|
||||
ASSERT_EQ(pull2.Receive(m4), 100);
|
||||
ASSERT_EQ(static_cast<char*>(m3->GetData())[0], 'c');
|
||||
ASSERT_EQ(static_cast<char*>(m4->GetData())[0], 'd');
|
||||
}
|
||||
}
|
||||
|
||||
void RegionEventSubscriptions(const string& transport)
|
||||
{
|
||||
size_t session{fair::mq::tools::UuidHash()};
|
||||
@@ -160,6 +221,16 @@ void RegionCallbacks(const string& transport, const string& _address)
|
||||
LOG(info) << "2 done.";
|
||||
}
|
||||
|
||||
TEST(Cache, zeromq)
|
||||
{
|
||||
RegionsCache("zeromq", "ipc://test_region_cache");
|
||||
}
|
||||
|
||||
TEST(Cache, shmem)
|
||||
{
|
||||
RegionsCache("shmem", "ipc://test_region_cache");
|
||||
}
|
||||
|
||||
TEST(EventSubscriptions, zeromq)
|
||||
{
|
||||
RegionEventSubscriptions("zeromq");
|
||||
|
Reference in New Issue
Block a user