Compare commits

..

41 Commits

Author SHA1 Message Date
Alexey Rybalchenko
3eca8e9def Add test for shm transport options 2020-07-15 13:59:53 +02:00
Alexey Rybalchenko
beb7766fca Shm: add options to zero and/or mlock the segment 2020-07-15 13:59:53 +02:00
Giulio Eulisse
bf909f94dc ofi: adapt to the new API for FairMQSocket::Events 2020-07-15 13:58:47 +02:00
Dennis Klein
1140c4c6ab SDK: Provide comparison operator for device and topo states 2020-07-15 13:01:23 +02:00
Alexey Rybalchenko
a6da208e79 Add Undefined and Mixed state for use in SDK 2020-07-15 13:01:23 +02:00
Giulio Eulisse
ba3a82b1df Update FairMQSocket.h 2020-07-15 12:09:54 +02:00
Giulio Eulisse
e8cc104344 Add API to extract ZMQ_EVENTS from socket backend 2020-07-15 12:09:54 +02:00
Alexey Rybalchenko
d5d5c27958 QC examples: fix incorrect topology path 2020-07-09 23:34:28 +02:00
Alexey Rybalchenko
5a7dcd9fc1 SDK: warn if given path translates to no selected tasks 2020-07-09 23:34:28 +02:00
Alexey Rybalchenko
78b1c188bf Shm: report correct size when opening segment 2020-06-30 20:58:53 +02:00
Alexey Rybalchenko
66bc7ba762 Remove useless variable 2020-06-30 20:58:53 +02:00
Alexey Rybalchenko
88bc1f7a06 Shm: throw if requested message size exceeds total segment size 2020-06-30 20:58:53 +02:00
Alexey Rybalchenko
f70201610b Do not attemp to find random port for non-TCP protocols 2020-06-30 20:58:53 +02:00
Alexey Rybalchenko
fc7f6f1116 Enable Boost_NO_BOOST_CMAKE for tests 2020-06-29 14:44:49 +02:00
Alexey Rybalchenko
8125489776 Handle out_of_range when locating RegionInfo 2020-06-29 14:44:49 +02:00
Alexey Rybalchenko
6dd0a44308 Make shmid an 8-digit hex number 2020-06-29 14:44:49 +02:00
Alexey Rybalchenko
afe2dcaa02 BenchmarkSampler: add memset option 2020-06-29 14:44:49 +02:00
Alexey Rybalchenko
aeab9e5407 Socket.h: refactor to reduce duplicate code 2020-06-29 14:44:49 +02:00
Alexey Rybalchenko
539e5602a6 Expose fair::mq::shmem::Monitor::Cleanup() API 2020-06-29 14:44:49 +02:00
Alexey Rybalchenko
beb510ded8 Adjust example 1 docs 2020-06-29 14:44:49 +02:00
Alexey Rybalchenko
d1c51e0f1f Use region linger setting in region example 2020-06-29 14:44:49 +02:00
Alexey Rybalchenko
f885b4618e Optimize unmanaged region ReceiveAcks 2020-06-29 14:44:49 +02:00
Alexey Rybalchenko
3364da9541 Add linger setting for unmanaged region 2020-06-29 14:44:49 +02:00
Alexey Rybalchenko
7aec6f91de Fix typo 2020-06-29 14:44:49 +02:00
Alexey Rybalchenko
9e2a002942 Add -Og flag to Debug build 2020-06-29 14:44:49 +02:00
Alexey Rybalchenko
52c6264faf Fix message counter in region example 2020-06-29 14:44:49 +02:00
Alexey Rybalchenko
79489bb501 Add missing includes 2020-06-10 13:55:15 +02:00
Alexey Rybalchenko
c60dd9965c Add copyright entry for PicoSHA2 2020-06-05 18:16:13 +02:00
Alexey Rybalchenko
79ca436b74 Zmq: let GetData of an empty message return nullptr 2020-06-05 18:16:13 +02:00
Alexey Rybalchenko
36d4f3c937 Use SHA2 instead of boost::hash to generate shmem id 2020-06-05 18:16:13 +02:00
Alexey Rybalchenko
bdf895ae9e Add PicoSHA2 dependency 2020-06-05 18:16:13 +02:00
Alexey Rybalchenko
42986e664c Add PicoSHA2 submodule 2020-06-05 18:16:13 +02:00
Giulio Eulisse
dd47b34e06 Add ability to retrieve ZMQ_FD 2020-06-03 19:44:00 +02:00
Alexey Rybalchenko
a59c902c74 MemoryResource: propagate alignment 2020-06-03 19:41:40 +02:00
Alexey Rybalchenko
dabc48c21a Shm: fix incorrect ptr range check 2020-05-29 23:34:27 +02:00
Alexey Rybalchenko
236d5a8608 Let default shm segm size be multiple of page size
To allow potential optimizations (e.g. huge pages)
2020-05-28 17:23:18 +02:00
Alexey Rybalchenko
5a782e8726 Add a test for unregisted options 2020-05-28 17:23:18 +02:00
Alexey Rybalchenko
5008fa4732 Fix regression in handling unregistered options 2020-05-28 17:23:18 +02:00
Alexey Rybalchenko
b5bb476b0d Adjust program options style to disallow guessing 2020-05-25 14:03:12 +02:00
Giulio Eulisse
ea7ae04025 Adapt to requests by @rbx 2020-05-25 08:36:57 +02:00
Giulio Eulisse
02692e7002 Initiate termination process on SIGTERM as well 2020-05-25 08:36:57 +02:00
54 changed files with 756 additions and 418 deletions

3
.gitmodules vendored
View File

@@ -4,3 +4,6 @@
[submodule "extern/asio"] [submodule "extern/asio"]
path = extern/asio path = extern/asio
url = https://github.com/chriskohlhoff/asio url = https://github.com/chriskohlhoff/asio
[submodule "extern/PicoSHA2"]
path = extern/PicoSHA2
url = https://github.com/okdshin/PicoSHA2

View File

@@ -146,6 +146,8 @@ if(BUILD_FAIRMQ)
find_package2(PRIVATE ZeroMQ REQUIRED find_package2(PRIVATE ZeroMQ REQUIRED
VERSION 4.1.4 VERSION 4.1.4
) )
build_bundled(PicoSHA2 extern/PicoSHA2)
find_package2(PRIVATE PicoSHA2 REQUIRED)
endif() endif()
if(BUILD_TESTING) if(BUILD_TESTING)

View File

@@ -23,6 +23,10 @@ Files: extern/asio
Copyright: 2003-2019, Christopher M. Kohlhoff (chris at kohlhoff dot com) Copyright: 2003-2019, Christopher M. Kohlhoff (chris at kohlhoff dot com)
License: BSL-1.0 License: BSL-1.0
Files: extern/PicoSHA2
Copyright: 2017 okdshin
License: MIT
License: LGPL-3.0-only License: LGPL-3.0-only
[see LICENSE file] [see LICENSE file]
@@ -102,3 +106,22 @@ License: BSL-1.0
FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE, FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE,
ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE. DEALINGS IN THE SOFTWARE.
License: MIT
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View File

@@ -32,6 +32,7 @@ Set(configure_options "${configure_options};-DBUILD_SDK=ON")
Set(configure_options "${configure_options};-DBUILD_SDK_COMMANDS=ON") Set(configure_options "${configure_options};-DBUILD_SDK_COMMANDS=ON")
Set(configure_options "${configure_options};-DFAST_BUILD=ON") Set(configure_options "${configure_options};-DFAST_BUILD=ON")
Set(configure_options "${configure_options};-DCOTIRE_MAXIMUM_NUMBER_OF_UNITY_INCLUDES=-j$ENV{number_of_processors}") Set(configure_options "${configure_options};-DCOTIRE_MAXIMUM_NUMBER_OF_UNITY_INCLUDES=-j$ENV{number_of_processors}")
Set(configure_options "${configure_options};-DBoost_NO_BOOST_CMAKE=ON")
Set(EXTRA_FLAGS $ENV{EXTRA_FLAGS}) Set(EXTRA_FLAGS $ENV{EXTRA_FLAGS})
If(EXTRA_FLAGS) If(EXTRA_FLAGS)

View File

@@ -145,7 +145,7 @@ macro(set_fairmq_defaults)
# Configure build types # Configure build types
set(CMAKE_CONFIGURATION_TYPES "Debug" "Release" "RelWithDebInfo" "Nightly" "Profile" "Experimental" "AddressSan" "ThreadSan") set(CMAKE_CONFIGURATION_TYPES "Debug" "Release" "RelWithDebInfo" "Nightly" "Profile" "Experimental" "AddressSan" "ThreadSan")
set(_warnings "-Wshadow -Wall -Wextra -Wpedantic") set(_warnings "-Wshadow -Wall -Wextra -Wpedantic")
set(CMAKE_CXX_FLAGS_DEBUG "-g ${_warnings}") set(CMAKE_CXX_FLAGS_DEBUG "-Og -g ${_warnings}")
set(CMAKE_CXX_FLAGS_RELEASE "-O2 -DNDEBUG") set(CMAKE_CXX_FLAGS_RELEASE "-O2 -DNDEBUG")
set(CMAKE_CXX_FLAGS_RELWITHDEBINFO "-O2 -g ${_warnings} -DNDEBUG") set(CMAKE_CXX_FLAGS_RELWITHDEBINFO "-O2 -g ${_warnings} -DNDEBUG")
set(CMAKE_CXX_FLAGS_NIGHTLY "-O2 -g ${_warnings}") set(CMAKE_CXX_FLAGS_NIGHTLY "-O2 -g ${_warnings}")
@@ -498,6 +498,8 @@ function(build_bundled package bundle)
set(${package}_BUILD_INCLUDE_DIR ${${package}_SOURCE_DIR}/asio/include CACHE PATH "Bundled ${package} build-interface include dir") set(${package}_BUILD_INCLUDE_DIR ${${package}_SOURCE_DIR}/asio/include CACHE PATH "Bundled ${package} build-interface include dir")
set(${package}_INSTALL_INCLUDE_DIR ${PROJECT_INSTALL_INCDIR}/bundled CACHE PATH "Bundled ${package} install-interface include dir") set(${package}_INSTALL_INCLUDE_DIR ${PROJECT_INSTALL_INCDIR}/bundled CACHE PATH "Bundled ${package} install-interface include dir")
set(${package}_ROOT ${${package}_SOURCE_DIR}/asio) set(${package}_ROOT ${${package}_SOURCE_DIR}/asio)
elseif(${package} STREQUAL PicoSHA2)
set(${package}_ROOT ${${package}_SOURCE_DIR})
endif() endif()
string(TOUPPER ${package} package_upper) string(TOUPPER ${package} package_upper)

21
cmake/FindPicoSHA2.cmake Normal file
View File

@@ -0,0 +1,21 @@
################################################################################
# Copyright (C) 2020 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
# #
# This software is distributed under the terms of the #
# GNU Lesser General Public Licence (LGPL) version 3, #
# copied verbatim in the file "LICENSE" #
################################################################################
find_path(PicoSHA2_INCLUDE_DIR NAMES picosha2.h)
include(FindPackageHandleStandardArgs)
find_package_handle_standard_args(PicoSHA2
REQUIRED_VARS PicoSHA2_INCLUDE_DIR
)
if(PicoSHA2_FOUND)
add_library(PicoSHA2 INTERFACE IMPORTED)
set_target_properties(PicoSHA2 PROPERTIES
INTERFACE_INCLUDE_DIRECTORIES "${PicoSHA2_INCLUDE_DIR}"
)
endif()

View File

@@ -17,8 +17,7 @@ Sampler::Sampler()
: fText() : fText()
, fMaxIterations(0) , fMaxIterations(0)
, fNumIterations(0) , fNumIterations(0)
{ {}
}
void Sampler::InitTask() void Sampler::InitTask()
{ {
@@ -32,25 +31,22 @@ bool Sampler::ConditionalRun()
// create a copy of the data with new(), that will be deleted after the transfer is complete // create a copy of the data with new(), that will be deleted after the transfer is complete
string* text = new string(fText); string* text = new string(fText);
// create message object with a pointer to the data buffer, // create message object with a pointer to the data buffer, its size,
// its size,
// custom deletion function (called when transfer is done), // custom deletion function (called when transfer is done),
// and pointer to the object managing the data buffer // and pointer to the object managing the data buffer
FairMQMessagePtr msg(NewMessage(const_cast<char*>(text->c_str()), FairMQMessagePtr msg(NewMessage(
text->length(), const_cast<char*>(text->c_str()),
[](void* /*data*/, void* object) { delete static_cast<string*>(object); }, text->length(),
text)); [](void* /*data*/, void* object) { delete static_cast<string*>(object); },
text));
LOG(info) << "Sending \"" << fText << "\""; LOG(info) << "Sending \"" << fText << "\"";
// in case of error or transfer interruption, return false to go to IDLE state // in case of error or transfer interruption, return false to go to the Ready state
// successfull transfer will return number of bytes transfered (can be 0 if sending an empty message). // successfull transfer will return number of bytes transfered (can be 0 if sending an empty message).
if (Send(msg, "data") < 0) if (Send(msg, "data") < 0) {
{
return false; return false;
} } else if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) {
else if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations)
{
LOG(info) << "Configured maximum number of iterations reached. Leaving RUNNING state."; LOG(info) << "Configured maximum number of iterations reached. Leaving RUNNING state.";
return false; return false;
} }
@@ -58,8 +54,6 @@ bool Sampler::ConditionalRun()
return true; return true;
} }
Sampler::~Sampler() Sampler::~Sampler() {}
{
}
} // namespace example_1_1 } // namespace example_1_1

View File

@@ -33,23 +33,22 @@ void Sink::InitTask()
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations"); fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
} }
// handler is called whenever a message arrives on "data", with a reference to the message and a sub-channel index (here 0) // handler is called whenever a message arrives on "data", with a reference to the message and a
// sub-channel index (here 0)
bool Sink::HandleData(FairMQMessagePtr& msg, int /*index*/) bool Sink::HandleData(FairMQMessagePtr& msg, int /*index*/)
{ {
LOG(info) << "Received: \"" << string(static_cast<char*>(msg->GetData()), msg->GetSize()) << "\""; LOG(info) << "Received: \"" << string(static_cast<char*>(msg->GetData()), msg->GetSize()) << "\"";
if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) {
{
LOG(info) << "Configured maximum number of iterations reached. Leaving RUNNING state."; LOG(info) << "Configured maximum number of iterations reached. Leaving RUNNING state.";
return false; return false;
} }
// return true if want to be called again (otherwise return false go to IDLE state) // return true if you want the handler to be called again (otherwise return false go to the
// Ready state)
return true; return true;
} }
Sink::~Sink() Sink::~Sink() {}
{
}
} // namespace example_1_1 } // namespace example_1_1

View File

@@ -51,7 +51,7 @@ fairmq-dds-command-ui -c k
fairmq-dds-command-ui -c b fairmq-dds-command-ui -c b
fairmq-dds-command-ui -c x fairmq-dds-command-ui -c x
fairmq-dds-command-ui -c j fairmq-dds-command-ui -c j
allexceptqctasks="main/(Sampler|QCDispatcher|Sink)" allexceptqctasks="main/(Sampler|QCDispatcher|Sink).*"
fairmq-dds-command-ui -c r -p $allexceptqctasks fairmq-dds-command-ui -c r -p $allexceptqctasks
qctask="main/QCTask.*" qctask="main/QCTask.*"
qcdispatcher="main/QCDispatcher.*" qcdispatcher="main/QCDispatcher.*"

View File

@@ -23,36 +23,40 @@ namespace example_region
Sampler::Sampler() Sampler::Sampler()
: fMsgSize(10000) : fMsgSize(10000)
, fLinger(100)
, fMaxIterations(0) , fMaxIterations(0)
, fNumIterations(0) , fNumIterations(0)
, fRegion(nullptr) , fRegion(nullptr)
, fNumUnackedMsgs(0) , fNumUnackedMsgs(0)
{ {}
}
void Sampler::InitTask() void Sampler::InitTask()
{ {
fMsgSize = fConfig->GetProperty<int>("msg-size"); fMsgSize = fConfig->GetProperty<int>("msg-size");
fLinger = fConfig->GetProperty<uint32_t>("region-linger");
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations"); fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
fChannels.at("data").at(0).Transport()->SubscribeToRegionEvents([](FairMQRegionInfo info) { fChannels.at("data").at(0).Transport()->SubscribeToRegionEvents([](FairMQRegionInfo info) {
LOG(warn) << ">>>" << info.event; LOG(info) << "Region event: " << info.event
LOG(warn) << "id: " << info.id; << ", id: " << info.id
LOG(warn) << "ptr: " << info.ptr; << ", ptr: " << info.ptr
LOG(warn) << "size: " << info.size; << ", size: " << info.size
LOG(warn) << "flags: " << info.flags; << ", flags: " << info.flags;
}); });
fRegion = FairMQUnmanagedRegionPtr(NewUnmanagedRegionFor("data", fRegion = FairMQUnmanagedRegionPtr(NewUnmanagedRegionFor("data",
0, 0,
10000000, 10000000,
[this](const std::vector<fair::mq::RegionBlock>& blocks) { // callback to be called when message buffers no longer needed by transport [this](const std::vector<fair::mq::RegionBlock>& blocks) { // callback to be called when message buffers no longer needed by transport
lock_guard<mutex> lock(fMtx);
fNumUnackedMsgs -= blocks.size(); fNumUnackedMsgs -= blocks.size();
if (fMaxIterations > 0) { if (fMaxIterations > 0) {
LOG(debug) << "Received " << blocks.size() << " acks"; LOG(info) << "Received " << blocks.size() << " acks";
} }
} }
)); ));
fRegion->SetLinger(fLinger);
} }
bool Sampler::ConditionalRun() bool Sampler::ConditionalRun()
@@ -69,27 +73,30 @@ bool Sampler::ConditionalRun()
// LOG(info) << "check: " << static_cast<char*>(fRegion->GetData())[3]; // LOG(info) << "check: " << static_cast<char*>(fRegion->GetData())[3];
// std::this_thread::sleep_for(std::chrono::seconds(1)); // std::this_thread::sleep_for(std::chrono::seconds(1));
lock_guard<mutex> lock(fMtx);
if (Send(msg, "data", 0) > 0) { if (Send(msg, "data", 0) > 0) {
++fNumUnackedMsgs;
if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) { if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) {
LOG(info) << "Configured maximum number of iterations reached. Leaving RUNNING state."; LOG(info) << "Configured maximum number of iterations reached. Leaving RUNNING state.";
return false; return false;
} }
} }
++fNumUnackedMsgs;
return true; return true;
} }
void Sampler::ResetTask() void Sampler::ResetTask()
{ {
// if not all messages acknowledged, wait for a bit. But only once, since receiver could be already dead. // On destruction UnmanagedRegion will try to TODO
if (fNumUnackedMsgs != 0) {
LOG(debug) << "waiting for all acknowledgements... (" << fNumUnackedMsgs << ")";
this_thread::sleep_for(chrono::milliseconds(500));
LOG(debug) << "done, still unacked: " << fNumUnackedMsgs;
}
fRegion.reset(); fRegion.reset();
{
lock_guard<mutex> lock(fMtx);
if (fNumUnackedMsgs != 0) {
LOG(info) << "Done, still not acknowledged: " << fNumUnackedMsgs;
} else {
LOG(info) << "All acknowledgements received.";
}
}
fChannels.at("data").at(0).Transport()->UnsubscribeFromRegionEvents(); fChannels.at("data").at(0).Transport()->UnsubscribeFromRegionEvents();
} }

View File

@@ -15,7 +15,8 @@
#ifndef FAIRMQEXAMPLEREGIONSAMPLER_H #ifndef FAIRMQEXAMPLEREGIONSAMPLER_H
#define FAIRMQEXAMPLEREGIONSAMPLER_H #define FAIRMQEXAMPLEREGIONSAMPLER_H
#include <atomic> #include <mutex>
#include <cstdint>
#include "FairMQDevice.h" #include "FairMQDevice.h"
@@ -35,10 +36,12 @@ class Sampler : public FairMQDevice
private: private:
int fMsgSize; int fMsgSize;
uint32_t fLinger;
uint64_t fMaxIterations; uint64_t fMaxIterations;
uint64_t fNumIterations; uint64_t fNumIterations;
FairMQUnmanagedRegionPtr fRegion; FairMQUnmanagedRegionPtr fRegion;
std::atomic<uint64_t> fNumUnackedMsgs; std::mutex fMtx;
uint64_t fNumUnackedMsgs;
}; };
} // namespace example_region } // namespace example_region

View File

@@ -30,11 +30,11 @@ void Sink::InitTask()
// Get the fMaxIterations value from the command line options (via fConfig) // Get the fMaxIterations value from the command line options (via fConfig)
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations"); fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
fChannels.at("data").at(0).Transport()->SubscribeToRegionEvents([](FairMQRegionInfo info) { fChannels.at("data").at(0).Transport()->SubscribeToRegionEvents([](FairMQRegionInfo info) {
LOG(warn) << ">>>" << info.event; LOG(info) << "Region event: " << info.event
LOG(warn) << "id: " << info.id; << ", id: " << info.id
LOG(warn) << "ptr: " << info.ptr; << ", ptr: " << info.ptr
LOG(warn) << "size: " << info.size; << ", size: " << info.size
LOG(warn) << "flags: " << info.flags; << ", flags: " << info.flags;
}); });
} }

View File

@@ -15,6 +15,7 @@ void addCustomOptions(bpo::options_description& options)
{ {
options.add_options() options.add_options()
("msg-size", bpo::value<int>()->default_value(1000), "Message size in bytes") ("msg-size", bpo::value<int>()->default_value(1000), "Message size in bytes")
("region-linger", bpo::value<uint32_t>()->default_value(100), "Linger period for regions")
("max-iterations", bpo::value<uint64_t>()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)"); ("max-iterations", bpo::value<uint64_t>()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
} }

View File

@@ -23,6 +23,7 @@ SAMPLER+=" --verbosity veryhigh"
SAMPLER+=" --control static --color false" SAMPLER+=" --control static --color false"
SAMPLER+=" --max-iterations 1" SAMPLER+=" --max-iterations 1"
SAMPLER+=" --msg-size $msgSize" SAMPLER+=" --msg-size $msgSize"
SAMPLER+=" --region-linger 500"
SAMPLER+=" --channel-config name=data,type=push,method=bind,address=tcp://127.0.0.1:7777" SAMPLER+=" --channel-config name=data,type=push,method=bind,address=tcp://127.0.0.1:7777"
@CMAKE_CURRENT_BINARY_DIR@/$SAMPLER & @CMAKE_CURRENT_BINARY_DIR@/$SAMPLER &
SAMPLER_PID=$! SAMPLER_PID=$!

1
extern/PicoSHA2 vendored Submodule

Submodule extern/PicoSHA2 added at 599843c396

View File

@@ -170,6 +170,7 @@ if(BUILD_FAIRMQ)
PluginManager.h PluginManager.h
PluginServices.h PluginServices.h
runFairMQDevice.h runFairMQDevice.h
shmem/Monitor.h
) )
set(FAIRMQ_PRIVATE_HEADER_FILES set(FAIRMQ_PRIVATE_HEADER_FILES
@@ -234,6 +235,7 @@ if(BUILD_FAIRMQ)
plugins/config/Config.cxx plugins/config/Config.cxx
plugins/Control.cxx plugins/Control.cxx
MemoryResources.cxx MemoryResources.cxx
shmem/Monitor.cxx
) )
if(BUILD_OFI_TRANSPORT) if(BUILD_OFI_TRANSPORT)
@@ -325,6 +327,7 @@ if(BUILD_FAIRMQ)
PRIVATE # only libFairMQ links against private dependencies PRIVATE # only libFairMQ links against private dependencies
libzmq libzmq
PicoSHA2
${OFI_DEPS} ${OFI_DEPS}
) )
set_target_properties(${_target} PROPERTIES set_target_properties(${_target} PROPERTIES
@@ -377,6 +380,7 @@ if(BUILD_FAIRMQ)
Boost::boost Boost::boost
Boost::date_time Boost::date_time
Boost::program_options Boost::program_options
PicoSHA2
) )
target_include_directories(fairmq-shmmonitor PUBLIC target_include_directories(fairmq-shmmonitor PUBLIC
$<BUILD_INTERFACE:${CMAKE_SOURCE_DIR}> $<BUILD_INTERFACE:${CMAKE_SOURCE_DIR}>

View File

@@ -671,7 +671,6 @@ void FairMQChannel::Init()
bool FairMQChannel::ConnectEndpoint(const string& endpoint) bool FairMQChannel::ConnectEndpoint(const string& endpoint)
{ {
lock_guard<mutex> lock(fMtx); lock_guard<mutex> lock(fMtx);
return fSocket->Connect(endpoint); return fSocket->Connect(endpoint);
} }
@@ -683,6 +682,13 @@ bool FairMQChannel::BindEndpoint(string& endpoint)
if (fSocket->Bind(endpoint)) { if (fSocket->Bind(endpoint)) {
return true; return true;
} else { } else {
// auto-bind only implemented for TCP
size_t protocolPos = endpoint.find(':');
string protocol = endpoint.substr(0, protocolPos);
if (protocol != "tcp") {
return false;
}
if (fAutoBind) { if (fAutoBind) {
// number of attempts when choosing a random port // number of attempts when choosing a random port
int numAttempts = 0; int numAttempts = 0;

View File

@@ -11,6 +11,7 @@
#include <cstddef> // for size_t #include <cstddef> // for size_t
#include <memory> // unique_ptr #include <memory> // unique_ptr
#include <stdexcept>
#include <fairmq/Transports.h> #include <fairmq/Transports.h>

View File

@@ -9,9 +9,10 @@
#ifndef FAIRMQSOCKET_H_ #ifndef FAIRMQSOCKET_H_
#define FAIRMQSOCKET_H_ #define FAIRMQSOCKET_H_
#include <memory>
#include <stdexcept>
#include <string> #include <string>
#include <vector> #include <vector>
#include <memory>
#include "FairMQMessage.h" #include "FairMQMessage.h"
class FairMQTransportFactory; class FairMQTransportFactory;
@@ -37,6 +38,10 @@ class FairMQSocket
virtual void SetOption(const std::string& option, const void* value, size_t valueSize) = 0; virtual void SetOption(const std::string& option, const void* value, size_t valueSize) = 0;
virtual void GetOption(const std::string& option, void* value, size_t* valueSize) = 0; virtual void GetOption(const std::string& option, void* value, size_t* valueSize) = 0;
/// If the backend supports it, fills the unsigned integer @a events with the ZMQ_EVENTS value
/// DISCLAIMER: this API is experimental and unsupported and might be dropped / refactored in
/// the future.
virtual void Events(uint32_t* events) = 0;
virtual void SetLinger(const int value) = 0; virtual void SetLinger(const int value) = 0;
virtual int GetLinger() const = 0; virtual int GetLinger() const = 0;
virtual void SetSndBufSize(const int value) = 0; virtual void SetSndBufSize(const int value) = 0;

View File

@@ -10,6 +10,7 @@
#define FAIRMQUNMANAGEDREGION_H_ #define FAIRMQUNMANAGEDREGION_H_
#include <cstddef> // size_t #include <cstddef> // size_t
#include <cstdint> // uint32_t
#include <memory> // std::unique_ptr #include <memory> // std::unique_ptr
#include <functional> // std::function #include <functional> // std::function
#include <ostream> // std::ostream #include <ostream> // std::ostream
@@ -72,6 +73,8 @@ class FairMQUnmanagedRegion
virtual void* GetData() const = 0; virtual void* GetData() const = 0;
virtual size_t GetSize() const = 0; virtual size_t GetSize() const = 0;
virtual uint64_t GetId() const = 0; virtual uint64_t GetId() const = 0;
virtual void SetLinger(uint32_t linger) = 0;
virtual uint32_t GetLinger() const = 0;
FairMQTransportFactory* GetTransport() { return fTransport; } FairMQTransportFactory* GetTransport() { return fTransport; }
void SetTransport(FairMQTransportFactory* transport) { fTransport = transport; } void SetTransport(FairMQTransportFactory* transport) { fTransport = transport; }

View File

@@ -15,8 +15,7 @@
#include <fairmq/FairMQTransportFactory.h> #include <fairmq/FairMQTransportFactory.h>
#include <fairmq/MemoryResources.h> #include <fairmq/MemoryResources.h>
void *fair::mq::ChannelResource::do_allocate(std::size_t bytes, std::size_t /*alignment*/) void *fair::mq::ChannelResource::do_allocate(std::size_t bytes, std::size_t alignment)
{ {
return setMessage(factory->CreateMessage(bytes)); return setMessage(factory->CreateMessage(bytes, fair::mq::Alignment{alignment}));
} }

View File

@@ -117,16 +117,24 @@ void ProgOptions::ParseAll(const int argc, char const* const* argv, bool allowUn
// clear the container because it was filled with default values and subsequent calls to store() do not overwrite the existing values // clear the container because it was filled with default values and subsequent calls to store() do not overwrite the existing values
fVarMap.clear(); fVarMap.clear();
if (allowUnregistered) {
po::command_line_parser parser(argc, argv);
parser.options(fAllOptions).allow_unregistered();
po::parsed_options parsed = parser.run();
fUnregisteredOptions = po::collect_unrecognized(parsed.options, po::include_positional);
po::store(parsed, fVarMap); po::command_line_parser parser(argc, argv);
} else {
po::store(po::parse_command_line(argc, argv, fAllOptions), fVarMap); parser.options(fAllOptions);
if (allowUnregistered) {
parser.allow_unregistered();
} }
using namespace po::command_line_style;
style_t style = style_t(allow_short | short_allow_adjacent | short_allow_next | allow_long | long_allow_adjacent | long_allow_next | allow_sticky | allow_dash_for_short);
parser.style(style);
po::parsed_options parsed = parser.run();
fUnregisteredOptions = po::collect_unrecognized(parsed.options, po::include_positional);
po::store(parsed, fVarMap);
} }
void ProgOptions::Notify() void ProgOptions::Notify()

View File

@@ -18,9 +18,10 @@ namespace fair
namespace mq namespace mq
{ {
array<string, 15> stateNames = array<string, 16> stateNames =
{ {
{ {
"UNDEFINED",
"OK", "OK",
"ERROR", "ERROR",
"IDLE", "IDLE",
@@ -41,6 +42,7 @@ array<string, 15> stateNames =
unordered_map<string, State> states = unordered_map<string, State> states =
{ {
{ "UNDEFINED", State::Undefined },
{ "OK", State::Ok }, { "OK", State::Ok },
{ "ERROR", State::Error }, { "ERROR", State::Error },
{ "IDLE", State::Idle }, { "IDLE", State::Idle },

View File

@@ -20,6 +20,7 @@ namespace mq
enum class State : int enum class State : int
{ {
Undefined = 0,
Ok, Ok,
Error, Error,
Idle, Idle,
@@ -39,7 +40,7 @@ enum class State : int
enum class Transition : int enum class Transition : int
{ {
Auto, Auto = 0,
InitDevice, InitDevice,
CompleteInit, CompleteInit,
Bind, Bind,

View File

@@ -17,6 +17,7 @@
#include <chrono> #include <chrono>
#include <cstddef> // size_t #include <cstddef> // size_t
#include <cstdint> // uint64_t #include <cstdint> // uint64_t
#include <cstring> // memset
#include <string> #include <string>
/** /**
@@ -28,6 +29,7 @@ class FairMQBenchmarkSampler : public FairMQDevice
public: public:
FairMQBenchmarkSampler() FairMQBenchmarkSampler()
: fMultipart(false) : fMultipart(false)
, fMemSet(false)
, fNumParts(1) , fNumParts(1)
, fMsgSize(10000) , fMsgSize(10000)
, fMsgRate(0) , fMsgRate(0)
@@ -39,6 +41,7 @@ class FairMQBenchmarkSampler : public FairMQDevice
void InitTask() override void InitTask() override
{ {
fMultipart = fConfig->GetProperty<bool>("multipart"); fMultipart = fConfig->GetProperty<bool>("multipart");
fMemSet = fConfig->GetProperty<bool>("memset");
fNumParts = fConfig->GetProperty<size_t>("num-parts"); fNumParts = fConfig->GetProperty<size_t>("num-parts");
fMsgSize = fConfig->GetProperty<size_t>("msg-size"); fMsgSize = fConfig->GetProperty<size_t>("msg-size");
fMsgRate = fConfig->GetProperty<float>("msg-rate"); fMsgRate = fConfig->GetProperty<float>("msg-rate");
@@ -51,8 +54,6 @@ class FairMQBenchmarkSampler : public FairMQDevice
// store the channel reference to avoid traversing the map on every loop iteration // store the channel reference to avoid traversing the map on every loop iteration
FairMQChannel& dataOutChannel = fChannels.at(fOutChannelName).at(0); FairMQChannel& dataOutChannel = fChannels.at(fOutChannelName).at(0);
FairMQMessagePtr baseMsg(dataOutChannel.NewMessage(fMsgSize));
LOG(info) << "Starting the benchmark with message size of " << fMsgSize << " and " << fMaxIterations << " iterations."; LOG(info) << "Starting the benchmark with message size of " << fMsgSize << " and " << fMaxIterations << " iterations.";
auto tStart = std::chrono::high_resolution_clock::now(); auto tStart = std::chrono::high_resolution_clock::now();
@@ -64,6 +65,9 @@ class FairMQBenchmarkSampler : public FairMQDevice
for (size_t i = 0; i < fNumParts; ++i) { for (size_t i = 0; i < fNumParts; ++i) {
parts.AddPart(dataOutChannel.NewMessage(fMsgSize)); parts.AddPart(dataOutChannel.NewMessage(fMsgSize));
if (fMemSet) {
std::memset(parts.At(i)->GetData(), 0, parts.At(i)->GetSize());
}
} }
if (dataOutChannel.Send(parts) >= 0) { if (dataOutChannel.Send(parts) >= 0) {
@@ -76,6 +80,9 @@ class FairMQBenchmarkSampler : public FairMQDevice
} }
} else { } else {
FairMQMessagePtr msg(dataOutChannel.NewMessage(fMsgSize)); FairMQMessagePtr msg(dataOutChannel.NewMessage(fMsgSize));
if (fMemSet) {
std::memset(msg->GetData(), 0, msg->GetSize());
}
if (dataOutChannel.Send(msg) >= 0) { if (dataOutChannel.Send(msg) >= 0) {
if (fMaxIterations > 0) { if (fMaxIterations > 0) {
@@ -101,6 +108,7 @@ class FairMQBenchmarkSampler : public FairMQDevice
protected: protected:
bool fMultipart; bool fMultipart;
bool fMemSet;
size_t fNumParts; size_t fNumParts;
size_t fMsgSize; size_t fMsgSize;
std::atomic<int> fMsgCounter; std::atomic<int> fMsgCounter;

View File

@@ -45,6 +45,7 @@ class Socket final : public fair::mq::Socket
auto GetId() const -> std::string override { return fId; } auto GetId() const -> std::string override { return fId; }
auto Events(uint32_t *events) -> void override { *events = 0; }
auto Bind(const std::string& address) -> bool override; auto Bind(const std::string& address) -> bool override;
auto Connect(const std::string& address) -> bool override; auto Connect(const std::string& address) -> bool override;

View File

@@ -36,6 +36,7 @@ namespace
extern "C" auto sigterm_handler(int signal) -> void extern "C" auto sigterm_handler(int signal) -> void
{ {
++gSignalCount;
gLastSignal = signal; gLastSignal = signal;
} }
} }

View File

@@ -65,7 +65,7 @@ void handleCommand(const string& command, const string& path, unsigned int timeo
cout << d.taskId << " : " << d.state << endl; cout << d.taskId << " : " << d.state << endl;
} }
} else if (command == "o") { } else if (command == "o") {
cout << "> dumping config of the devices (" << path << ")" << endl; cout << "> dumping config of " << (path == "" ? "all" : path) << endl;
// TODO: extend this regex to return all properties, once command size limitation is removed. // TODO: extend this regex to return all properties, once command size limitation is removed.
auto const result = topo.GetProperties("^(session|id)$", path, std::chrono::milliseconds(timeout)); auto const result = topo.GetProperties("^(session|id)$", path, std::chrono::milliseconds(timeout));
for (const auto& d : result.second.devices) { for (const auto& d : result.second.devices) {
@@ -79,39 +79,39 @@ void handleCommand(const string& command, const string& path, unsigned int timeo
return; return;
} }
const DeviceProperties props{{pKey, pVal}}; const DeviceProperties props{{pKey, pVal}};
cout << "> sending property (" << path << ")" << endl; cout << "> setting properties --> " << (path == "" ? "all" : path) << endl;
topo.SetProperties(props, path); topo.SetProperties(props, path);
// give dds time to complete request // give dds time to complete request
this_thread::sleep_for(chrono::milliseconds(100)); this_thread::sleep_for(chrono::milliseconds(100));
} else if (command == "i") { } else if (command == "i") {
cout << "> init devices (" << path << ")" << endl; cout << "> initiating InitDevice transition --> " << (path == "" ? "all" : path) << endl;
topo.ChangeState(TopologyTransition::InitDevice, path, std::chrono::milliseconds(timeout)); topo.ChangeState(TopologyTransition::InitDevice, path, std::chrono::milliseconds(timeout));
} else if (command == "k") { } else if (command == "k") {
cout << "> complete init (" << path << ")" << endl; cout << "> initiating CompleteInit transition --> " << (path == "" ? "all" : path) << endl;
topo.ChangeState(TopologyTransition::CompleteInit, path, std::chrono::milliseconds(timeout)); topo.ChangeState(TopologyTransition::CompleteInit, path, std::chrono::milliseconds(timeout));
} else if (command == "b") { } else if (command == "b") {
cout << "> bind devices (" << path << ")" << endl; cout << "> initiating Bind transition --> " << (path == "" ? "all" : path) << endl;
topo.ChangeState(TopologyTransition::Bind, path, std::chrono::milliseconds(timeout)); topo.ChangeState(TopologyTransition::Bind, path, std::chrono::milliseconds(timeout));
} else if (command == "x") { } else if (command == "x") {
cout << "> connect devices (" << path << ")" << endl; cout << "> initiating Connect transition --> " << (path == "" ? "all" : path) << endl;
topo.ChangeState(TopologyTransition::Connect, path, std::chrono::milliseconds(timeout)); topo.ChangeState(TopologyTransition::Connect, path, std::chrono::milliseconds(timeout));
} else if (command == "j") { } else if (command == "j") {
cout << "> init tasks (" << path << ")" << endl; cout << "> initiating InitTask transition --> " << (path == "" ? "all" : path) << endl;
topo.ChangeState(TopologyTransition::InitTask, path, std::chrono::milliseconds(timeout)); topo.ChangeState(TopologyTransition::InitTask, path, std::chrono::milliseconds(timeout));
} else if (command == "r") { } else if (command == "r") {
cout << "> run tasks (" << path << ")" << endl; cout << "> initiating Run transition --> " << (path == "" ? "all" : path) << endl;
topo.ChangeState(TopologyTransition::Run, path, std::chrono::milliseconds(timeout)); topo.ChangeState(TopologyTransition::Run, path, std::chrono::milliseconds(timeout));
} else if (command == "s") { } else if (command == "s") {
cout << "> stop devices (" << path << ")" << endl; cout << "> initiating Stop transition --> " << (path == "" ? "all" : path) << endl;
topo.ChangeState(TopologyTransition::Stop, path, std::chrono::milliseconds(timeout)); topo.ChangeState(TopologyTransition::Stop, path, std::chrono::milliseconds(timeout));
} else if (command == "t") { } else if (command == "t") {
cout << "> reset tasks (" << path << ")" << endl; cout << "> initiating ResetTask transition --> " << (path == "" ? "all" : path) << endl;
topo.ChangeState(TopologyTransition::ResetTask, path, std::chrono::milliseconds(timeout)); topo.ChangeState(TopologyTransition::ResetTask, path, std::chrono::milliseconds(timeout));
} else if (command == "d") { } else if (command == "d") {
cout << "> reset devices (" << path << ")" << endl; cout << "> initiating ResetDevice transition --> " << (path == "" ? "all" : path) << endl;
topo.ChangeState(TopologyTransition::ResetDevice, path, std::chrono::milliseconds(timeout)); topo.ChangeState(TopologyTransition::ResetDevice, path, std::chrono::milliseconds(timeout));
} else if (command == "q") { } else if (command == "q") {
cout << "> end (" << path << ")" << endl; cout << "> initiating End transition --> " << (path == "" ? "all" : path) << endl;
topo.ChangeState(TopologyTransition::End, path, std::chrono::milliseconds(timeout)); topo.ChangeState(TopologyTransition::End, path, std::chrono::milliseconds(timeout));
} else if (command == "h") { } else if (command == "h") {
cout << "> help" << endl; cout << "> help" << endl;
@@ -206,6 +206,7 @@ try {
sendCommand(command, path, timeout, topo, pKey, pVal); sendCommand(command, path, timeout, topo, pKey, pVal);
} }
size_t pos = targetState.find("->"); size_t pos = targetState.find("->");
cout << "> waiting for " << (path == "" ? "all" : path) << " to reach " << targetState << endl;
if (pos == string::npos) { if (pos == string::npos) {
/* auto ec = */topo.WaitForState(GetState(targetState), path, std::chrono::milliseconds(timeout)); /* auto ec = */topo.WaitForState(GetState(targetState), path, std::chrono::milliseconds(timeout));
// cout << "WaitForState(" << targetState << ") result: " << ec.message() << endl; // cout << "WaitForState(" << targetState << ") result: " << ec.message() << endl;

View File

@@ -68,7 +68,9 @@ Plugin::ProgOptions ConfigPluginProgramOptions()
("init-timeout", po::value<int >()->default_value(120), "Timeout for the initialization in seconds (when expecting dynamic initialization).") ("init-timeout", po::value<int >()->default_value(120), "Timeout for the initialization in seconds (when expecting dynamic initialization).")
("max-run-time", po::value<uint64_t >()->default_value(0), "Maximum runtime for the Running state handler, after which state will change to Ready (in seconds, 0 for no limit).") ("max-run-time", po::value<uint64_t >()->default_value(0), "Maximum runtime for the Running state handler, after which state will change to Ready (in seconds, 0 for no limit).")
("print-channels", po::value<bool >()->implicit_value(true), "Print registered channel endpoints in a machine-readable format (<channel name>:<min num subchannels>:<max num subchannels>)") ("print-channels", po::value<bool >()->implicit_value(true), "Print registered channel endpoints in a machine-readable format (<channel name>:<min num subchannels>:<max num subchannels>)")
("shm-segment-size", po::value<size_t >()->default_value(2000000000), "Shared memory: size of the shared memory segment (in bytes).") ("shm-segment-size", po::value<size_t >()->default_value(2ULL << 30), "Shared memory: size of the shared memory segment (in bytes).")
("shm-mlock-segment", po::value<bool >()->default_value(false), "Shared memory: mlock the shared memory segment after initialization.")
("shm-zero-segment", po::value<bool >()->default_value(false), "Shared memory: zero the shared memory segment memory after initialization.")
("shm-throw-bad-alloc", po::value<bool >()->default_value(true), "Throw a fair::mq::MessageBadAlloc if cannot allocate a message (retry if false).") ("shm-throw-bad-alloc", po::value<bool >()->default_value(true), "Throw a fair::mq::MessageBadAlloc if cannot allocate a message (retry if false).")
("shm-monitor", po::value<bool >()->default_value(true), "Shared memory: run monitor daemon.") ("shm-monitor", po::value<bool >()->default_value(true), "Shared memory: run monitor daemon.")
("ofi-size-hint", po::value<size_t >()->default_value(0), "EXPERIMENTAL: OFI size hint for the allocator.") ("ofi-size-hint", po::value<size_t >()->default_value(0), "EXPERIMENTAL: OFI size hint for the allocator.")

View File

@@ -15,8 +15,8 @@ void addCustomOptions(bpo::options_description& options)
{ {
options.add_options() options.add_options()
("out-channel", bpo::value<std::string>()->default_value("data"), "Name of the output channel") ("out-channel", bpo::value<std::string>()->default_value("data"), "Name of the output channel")
("same-msg", bpo::value<bool>()->default_value(false), "Re-send the same message, or recreate for each iteration")
("multipart", bpo::value<bool>()->default_value(false), "Handle multipart payloads") ("multipart", bpo::value<bool>()->default_value(false), "Handle multipart payloads")
("memset", bpo::value<bool>()->default_value(false), "Memset allocated buffers to 0")
("num-parts", bpo::value<size_t>()->default_value(1), "Number of parts to send. 1 will send single messages, not parts") ("num-parts", bpo::value<size_t>()->default_value(1), "Number of parts to send. 1 will send single messages, not parts")
("msg-size", bpo::value<size_t>()->default_value(1000000), "Message size in bytes") ("msg-size", bpo::value<size_t>()->default_value(1000000), "Message size in bytes")
("max-iterations", bpo::value<uint64_t>()->default_value(0), "Number of run iterations (0 - infinite)") ("max-iterations", bpo::value<uint64_t>()->default_value(0), "Number of run iterations (0 - infinite)")

View File

@@ -25,11 +25,6 @@ struct RuntimeError : ::std::runtime_error
{} {}
}; };
struct MixedStateError : RuntimeError
{
using RuntimeError::RuntimeError;
};
} /* namespace sdk */ } /* namespace sdk */
enum class ErrorCode enum class ErrorCode

View File

@@ -70,6 +70,47 @@ const std::map<DeviceTransition, DeviceState> expectedState =
{ DeviceTransition::End, DeviceState::Exiting } { DeviceTransition::End, DeviceState::Exiting }
}; };
// mirrors DeviceState, but adds a "Mixed" state that represents a topology where devices are currently not in the same state.
enum class AggregatedTopologyState : int
{
Undefined = static_cast<int>(fair::mq::State::Undefined),
Ok = static_cast<int>(fair::mq::State::Ok),
Error = static_cast<int>(fair::mq::State::Error),
Idle = static_cast<int>(fair::mq::State::Idle),
InitializingDevice = static_cast<int>(fair::mq::State::InitializingDevice),
Initialized = static_cast<int>(fair::mq::State::Initialized),
Binding = static_cast<int>(fair::mq::State::Binding),
Bound = static_cast<int>(fair::mq::State::Bound),
Connecting = static_cast<int>(fair::mq::State::Connecting),
DeviceReady = static_cast<int>(fair::mq::State::DeviceReady),
InitializingTask = static_cast<int>(fair::mq::State::InitializingTask),
Ready = static_cast<int>(fair::mq::State::Ready),
Running = static_cast<int>(fair::mq::State::Running),
ResettingTask = static_cast<int>(fair::mq::State::ResettingTask),
ResettingDevice = static_cast<int>(fair::mq::State::ResettingDevice),
Exiting = static_cast<int>(fair::mq::State::Exiting),
Mixed
};
inline auto operator==(DeviceState lhs, AggregatedTopologyState rhs) -> bool
{
return static_cast<int>(lhs) == static_cast<int>(rhs);
}
inline auto operator==(AggregatedTopologyState lhs, DeviceState rhs) -> bool
{
return static_cast<int>(lhs) == static_cast<int>(rhs);
}
inline std::ostream& operator<<(std::ostream& os, const AggregatedTopologyState& state)
{
if (state == AggregatedTopologyState::Mixed) {
return os << "Mixed";
} else {
return os << static_cast<DeviceState>(state);
}
}
struct DeviceStatus struct DeviceStatus
{ {
bool subscribed_to_state_changes; bool subscribed_to_state_changes;
@@ -100,22 +141,22 @@ using TopologyStateByTask = std::unordered_map<DDSTask::Id, DeviceStatus>;
using TopologyStateByCollection = std::unordered_map<DDSCollection::Id, std::vector<DeviceStatus>>; using TopologyStateByCollection = std::unordered_map<DDSCollection::Id, std::vector<DeviceStatus>>;
using TopologyTransition = fair::mq::Transition; using TopologyTransition = fair::mq::Transition;
inline DeviceState AggregateState(const TopologyState& topologyState) inline AggregatedTopologyState AggregateState(const TopologyState& topologyState)
{ {
DeviceState first = topologyState.begin()->state; DeviceState first = topologyState.begin()->state;
if (std::all_of(topologyState.cbegin(), topologyState.cend(), [&](TopologyState::value_type i) { if (std::all_of(topologyState.cbegin(), topologyState.cend(), [&](TopologyState::value_type i) {
return i.state == first; return i.state == first;
})) { })) {
return first; return static_cast<AggregatedTopologyState>(first);
} }
throw MixedStateError("State is not uniform"); return AggregatedTopologyState::Mixed;
} }
inline bool StateEqualsTo(const TopologyState& topologyState, DeviceState state) inline bool StateEqualsTo(const TopologyState& topologyState, DeviceState state)
{ {
return AggregateState(topologyState) == state; return AggregateState(topologyState) == static_cast<AggregatedTopologyState>(state);
} }
inline TopologyStateByCollection GroupByCollectionId(const TopologyState& topologyState) inline TopologyStateByCollection GroupByCollectionId(const TopologyState& topologyState)
@@ -439,6 +480,9 @@ class BasicTopology : public AsioBase<Executor, Allocator>
} }
}); });
} }
if (fTasks.empty()) {
FAIR_LOG(warn) << "ChangeState initiated on an empty set of tasks, check the path argument.";
}
} }
ChangeStateOp() = delete; ChangeStateOp() = delete;
ChangeStateOp(const ChangeStateOp&) = delete; ChangeStateOp(const ChangeStateOp&) = delete;
@@ -741,6 +785,9 @@ class BasicTopology : public AsioBase<Executor, Allocator>
} }
}); });
} }
if (fTasks.empty()) {
FAIR_LOG(warn) << "WaitForState initiated on an empty set of tasks, check the path argument.";
}
} }
WaitForStateOp() = delete; WaitForStateOp() = delete;
WaitForStateOp(const WaitForStateOp&) = delete; WaitForStateOp(const WaitForStateOp&) = delete;
@@ -754,9 +801,8 @@ class BasicTopology : public AsioBase<Executor, Allocator>
{ {
fCount = std::count_if(stateIndex.cbegin(), stateIndex.cend(), [=](const auto& s) { fCount = std::count_if(stateIndex.cbegin(), stateIndex.cend(), [=](const auto& s) {
if (ContainsTask(stateData.at(s.second).taskId)) { if (ContainsTask(stateData.at(s.second).taskId)) {
return stateData.at(s.second).state == fTargetCurrentState return stateData.at(s.second).state == fTargetCurrentState &&
&& (stateData.at(s.second).lastState == fTargetLastState || fTargetLastState == DeviceState::Undefined);
(stateData.at(s.second).lastState == fTargetLastState || fTargetLastState == DeviceState::Ok);
} else { } else {
return false; return false;
} }
@@ -768,8 +814,7 @@ class BasicTopology : public AsioBase<Executor, Allocator>
{ {
if (!fOp.IsCompleted() && ContainsTask(taskId)) { if (!fOp.IsCompleted() && ContainsTask(taskId)) {
if (currentState == fTargetCurrentState && if (currentState == fTargetCurrentState &&
(lastState == fTargetLastState || (lastState == fTargetLastState || fTargetLastState == DeviceState::Undefined)) {
fTargetLastState == DeviceState::Ok)) {
++fCount; ++fCount;
} }
TryCompletion(); TryCompletion();
@@ -873,7 +918,7 @@ class BasicTopology : public AsioBase<Executor, Allocator>
template<typename CompletionToken> template<typename CompletionToken>
auto AsyncWaitForState(const DeviceState targetCurrentState, CompletionToken&& token) auto AsyncWaitForState(const DeviceState targetCurrentState, CompletionToken&& token)
{ {
return AsyncWaitForState(DeviceState::Ok, targetCurrentState, "", Duration(0), std::move(token)); return AsyncWaitForState(DeviceState::Undefined, targetCurrentState, "", Duration(0), std::move(token));
} }
/// @brief Wait for selected FairMQ devices to reach given last & current state in this topology /// @brief Wait for selected FairMQ devices to reach given last & current state in this topology
@@ -903,7 +948,7 @@ class BasicTopology : public AsioBase<Executor, Allocator>
auto WaitForState(const DeviceState targetCurrentState, const std::string& path = "", Duration timeout = Duration(0)) auto WaitForState(const DeviceState targetCurrentState, const std::string& path = "", Duration timeout = Duration(0))
-> std::error_code -> std::error_code
{ {
return WaitForState(DeviceState::Ok, targetCurrentState, path, timeout); return WaitForState(DeviceState::Undefined, targetCurrentState, path, timeout);
} }
using GetPropertiesCompletionSignature = void(std::error_code, GetPropertiesResult); using GetPropertiesCompletionSignature = void(std::error_code, GetPropertiesResult);
@@ -938,6 +983,9 @@ class BasicTopology : public AsioBase<Executor, Allocator>
} }
}); });
} }
if (expectedCount == 0) {
FAIR_LOG(warn) << "GetProperties initiated on an empty set of tasks, check the path argument.";
}
// FAIR_LOG(debug) << "GetProperties " << fId << " with expected count of " << fExpectedCount << " started."; // FAIR_LOG(debug) << "GetProperties " << fId << " with expected count of " << fExpectedCount << " started.";
} }
GetPropertiesOp() = delete; GetPropertiesOp() = delete;
@@ -1093,6 +1141,9 @@ class BasicTopology : public AsioBase<Executor, Allocator>
} }
}); });
} }
if (expectedCount == 0) {
FAIR_LOG(warn) << "SetProperties initiated on an empty set of tasks, check the path argument.";
}
// FAIR_LOG(debug) << "SetProperties " << fId << " with expected count of " << fExpectedCount << " started."; // FAIR_LOG(debug) << "SetProperties " << fId << " with expected count of " << fExpectedCount << " started.";
} }
SetPropertiesOp() = delete; SetPropertiesOp() = delete;
@@ -1242,7 +1293,7 @@ class BasicTopology : public AsioBase<Executor, Allocator>
int index = 0; int index = 0;
for (const auto& task : fDDSTopo.GetTasks()) { for (const auto& task : fDDSTopo.GetTasks()) {
fStateData.push_back(DeviceStatus{false, DeviceState::Ok, DeviceState::Ok, task.GetId(), task.GetCollectionId()}); fStateData.push_back(DeviceStatus{false, DeviceState::Undefined, DeviceState::Undefined, task.GetId(), task.GetCollectionId()});
fStateIndex.emplace(task.GetId(), index); fStateIndex.emplace(task.GetId(), index);
index++; index++;
} }

View File

@@ -70,9 +70,10 @@ array<string, 17> typeNames =
} }
}; };
array<fair::mq::State, 15> fbStateToMQState = array<fair::mq::State, 16> fbStateToMQState =
{ {
{ {
fair::mq::State::Undefined,
fair::mq::State::Ok, fair::mq::State::Ok,
fair::mq::State::Error, fair::mq::State::Error,
fair::mq::State::Idle, fair::mq::State::Idle,
@@ -91,9 +92,10 @@ array<fair::mq::State, 15> fbStateToMQState =
} }
}; };
array<sdk::cmd::FBState, 15> mqStateToFBState = array<sdk::cmd::FBState, 16> mqStateToFBState =
{ {
{ {
sdk::cmd::FBState_Undefined,
sdk::cmd::FBState_Ok, sdk::cmd::FBState_Ok,
sdk::cmd::FBState_Error, sdk::cmd::FBState_Error,
sdk::cmd::FBState_Idle, sdk::cmd::FBState_Idle,

View File

@@ -6,6 +6,7 @@ enum FBResult:byte {
} }
enum FBState:byte { enum FBState:byte {
Undefined,
Ok, Ok,
Error, Error,
Idle, Idle,

View File

@@ -8,9 +8,10 @@
#ifndef FAIR_MQ_SHMEM_COMMON_H_ #ifndef FAIR_MQ_SHMEM_COMMON_H_
#define FAIR_MQ_SHMEM_COMMON_H_ #define FAIR_MQ_SHMEM_COMMON_H_
#include <picosha2.h>
#include <atomic> #include <atomic>
#include <string> #include <string>
#include <unordered_map>
#include <boost/interprocess/managed_shared_memory.hpp> #include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/interprocess/allocators/allocator.hpp> #include <boost/interprocess/allocators/allocator.hpp>
@@ -110,9 +111,12 @@ struct RegionBlock
// a hash of user id + session id, truncated to 8 characters (to accommodate for name size limit on some systems (MacOS)). // a hash of user id + session id, truncated to 8 characters (to accommodate for name size limit on some systems (MacOS)).
inline std::string buildShmIdFromSessionIdAndUserId(const std::string& sessionId) inline std::string buildShmIdFromSessionIdAndUserId(const std::string& sessionId)
{ {
boost::hash<std::string> stringHash; std::string seed((std::to_string(geteuid()) + sessionId));
std::string shmId(std::to_string(stringHash(std::string((std::to_string(geteuid()) + sessionId))))); // generate a 8-digit hex value out of sha256 hash
shmId.resize(8, '_'); std::vector<unsigned char> hash(4);
picosha2::hash256(seed.begin(), seed.end(), hash.begin(), hash.end());
std::string shmId = picosha2::bytes_to_hex_string(hash.begin(), hash.end());
return shmId; return shmId;
} }

View File

@@ -17,9 +17,11 @@
#include "Common.h" #include "Common.h"
#include "Region.h" #include "Region.h"
#include "Monitor.h"
#include <FairMQLogger.h> #include <FairMQLogger.h>
#include <FairMQMessage.h> #include <FairMQMessage.h>
#include <fairmq/ProgOptions.h>
#include <fairmq/tools/CppSTL.h> #include <fairmq/tools/CppSTL.h>
#include <fairmq/tools/Strings.h> #include <fairmq/tools/Strings.h>
@@ -32,6 +34,8 @@
#include <boost/interprocess/ipc/message_queue.hpp> #include <boost/interprocess/ipc/message_queue.hpp>
#include <cstdlib> // getenv #include <cstdlib> // getenv
#include <condition_variable>
#include <mutex>
#include <set> #include <set>
#include <stdexcept> #include <stdexcept>
#include <string> #include <string>
@@ -40,6 +44,8 @@
#include <utility> // pair #include <utility> // pair
#include <vector> #include <vector>
#include <sys/mman.h> // mlock
namespace fair namespace fair
{ {
namespace mq namespace mq
@@ -52,13 +58,11 @@ struct SharedMemoryError : std::runtime_error { using std::runtime_error::runtim
class Manager class Manager
{ {
public: public:
Manager(std::string id, std::string deviceId, size_t size, bool throwOnBadAlloc) Manager(std::string shmId, std::string deviceId, size_t size, const ProgOptions* config)
: fShmId(std::move(id)) : fShmId(std::move(shmId))
, fDeviceId(std::move(deviceId)) , fDeviceId(std::move(deviceId))
, fSegmentName("fmq_" + fShmId + "_main") , fSegment(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_main").c_str(), size)
, fManagementSegmentName("fmq_" + fShmId + "_mng") , fManagementSegment(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_mng").c_str(), 655360)
, fSegment(boost::interprocess::open_or_create, fSegmentName.c_str(), size)
, fManagementSegment(boost::interprocess::open_or_create, fManagementSegmentName.c_str(), 655360)
, fShmVoidAlloc(fManagementSegment.get_segment_manager()) , fShmVoidAlloc(fManagementSegment.get_segment_manager())
, fShmMtx(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_mtx").c_str()) , fShmMtx(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_mtx").c_str())
, fRegionEventsCV(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_cv").c_str()) , fRegionEventsCV(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_cv").c_str())
@@ -69,10 +73,37 @@ class Manager
, fMsgCounter(0) , fMsgCounter(0)
, fHeartbeatThread() , fHeartbeatThread()
, fSendHeartbeats(true) , fSendHeartbeats(true)
, fThrowOnBadAlloc(throwOnBadAlloc) , fThrowOnBadAlloc(true)
{ {
using namespace boost::interprocess; using namespace boost::interprocess;
LOG(debug) << "created/opened shared memory segment '" << "fmq_" << fShmId << "_main" << "' of " << size << " bytes. Available are " << fSegment.get_free_memory() << " bytes.";
bool mlockSegment = false;
bool zeroSegment = false;
bool autolaunchMonitor = false;
if (config) {
mlockSegment = config->GetProperty<bool>("shm-mlock-segment", mlockSegment);
zeroSegment = config->GetProperty<bool>("shm-zero-segment", zeroSegment);
autolaunchMonitor = config->GetProperty<bool>("shm-monitor", autolaunchMonitor);
fThrowOnBadAlloc = config->GetProperty<bool>("shm-throw-bad-alloc", fThrowOnBadAlloc);
} else {
LOG(debug) << "ProgOptions not available! Using defaults.";
}
if (autolaunchMonitor) {
StartMonitor(fShmId);
}
LOG(debug) << "created/opened shared memory segment '" << "fmq_" << fShmId << "_main" << "' of " << fSegment.get_size() << " bytes. Available are " << fSegment.get_free_memory() << " bytes.";
if (mlockSegment) {
LOG(debug) << "Locking the managed segment memory pages...";
mlock(fSegment.get_address(), fSegment.get_size());
LOG(debug) << "Successfully locked the managed segment memory pages.";
}
if (zeroSegment) {
LOG(debug) << "Zeroing the managed segment free memory...";
fSegment.zero_free_memory();
LOG(debug) << "Successfully zeroed the managed segment free memory.";
}
fRegionInfos = fManagementSegment.find_or_construct<Uint64RegionInfoMap>(unique_instance)(fShmVoidAlloc); fRegionInfos = fManagementSegment.find_or_construct<Uint64RegionInfoMap>(unique_instance)(fShmVoidAlloc);
// store info about the managed segment as region with id 0 // store info about the managed segment as region with id 0
@@ -100,39 +131,6 @@ class Manager
Manager(const Manager&) = delete; Manager(const Manager&) = delete;
Manager operator=(const Manager&) = delete; Manager operator=(const Manager&) = delete;
~Manager()
{
using namespace boost::interprocess;
bool lastRemoved = false;
UnsubscribeFromRegionEvents();
fSendHeartbeats = false;
fHeartbeatThread.join();
try {
boost::interprocess::scoped_lock<named_mutex> lock(fShmMtx);
(fDeviceCounter->fCount)--;
if (fDeviceCounter->fCount == 0) {
LOG(debug) << "last segment user, removing segment.";
RemoveSegments();
lastRemoved = true;
} else {
LOG(debug) << "other segment users present (" << fDeviceCounter->fCount << "), not removing it.";
}
} catch(interprocess_exception& e) {
LOG(error) << "error while acquiring lock in Manager destructor: " << e.what();
}
if (lastRemoved) {
named_mutex::remove(std::string("fmq_" + fShmId + "_mtx").c_str());
named_condition::remove(std::string("fmq_" + fShmId + "_cv").c_str());
}
}
boost::interprocess::managed_shared_memory& Segment() { return fSegment; } boost::interprocess::managed_shared_memory& Segment() { return fSegment; }
boost::interprocess::managed_shared_memory& ManagementSegment() { return fManagementSegment; } boost::interprocess::managed_shared_memory& ManagementSegment() { return fManagementSegment; }
@@ -265,8 +263,12 @@ class Manager
auto r = fRegions.emplace(id, tools::make_unique<Region>(fShmId, id, 0, true, nullptr, nullptr, path, flags)); auto r = fRegions.emplace(id, tools::make_unique<Region>(fShmId, id, 0, true, nullptr, nullptr, path, flags));
return r.first->second.get(); return r.first->second.get();
} catch (std::out_of_range& oor) {
LOG(error) << "Could not get remote region with id '" << id << "'. Does the region creator run with the same session id?";
LOG(error) << oor.what();
return nullptr;
} catch (boost::interprocess::interprocess_exception& e) { } catch (boost::interprocess::interprocess_exception& e) {
LOG(warn) << "Could not get remote region for id: " << id; LOG(warn) << "Could not get remote region for id '" << id << "'";
return nullptr; return nullptr;
} }
} }
@@ -274,9 +276,9 @@ class Manager
void RemoveRegion(const uint64_t id) void RemoveRegion(const uint64_t id)
{ {
fRegions.erase(id);
{ {
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx); boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
fRegions.erase(id);
fRegionInfos->at(id).fDestroyed = true; fRegionInfos->at(id).fDestroyed = true;
} }
fRegionEventsCV.notify_all(); fRegionEventsCV.notify_all();
@@ -381,48 +383,67 @@ class Manager
void IncrementMsgCounter() { ++fMsgCounter; } void IncrementMsgCounter() { ++fMsgCounter; }
void DecrementMsgCounter() { --fMsgCounter; } void DecrementMsgCounter() { --fMsgCounter; }
void RemoveSegments()
{
using namespace boost::interprocess;
if (shared_memory_object::remove(fSegmentName.c_str())) {
LOG(debug) << "successfully removed '" << fSegmentName << "' segment after the device has stopped.";
} else {
LOG(debug) << "did not remove " << fSegmentName << " segment after the device stopped. Already removed?";
}
if (shared_memory_object::remove(fManagementSegmentName.c_str())) {
LOG(debug) << "successfully removed '" << fManagementSegmentName << "' segment after the device has stopped.";
} else {
LOG(debug) << "did not remove '" << fManagementSegmentName << "' segment after the device stopped. Already removed?";
}
}
void SendHeartbeats() void SendHeartbeats()
{ {
std::string controlQueueName("fmq_" + fShmId + "_cq"); std::string controlQueueName("fmq_" + fShmId + "_cq");
std::unique_lock<std::mutex> lock(fHeartbeatsMtx);
while (fSendHeartbeats) { while (fSendHeartbeats) {
try { try {
boost::interprocess::message_queue mq(boost::interprocess::open_only, controlQueueName.c_str()); boost::interprocess::message_queue mq(boost::interprocess::open_only, controlQueueName.c_str());
boost::posix_time::ptime sndTill = boost::posix_time::microsec_clock::universal_time() + boost::posix_time::milliseconds(100); boost::posix_time::ptime sndTill = boost::posix_time::microsec_clock::universal_time() + boost::posix_time::milliseconds(100);
if (mq.timed_send(fDeviceId.c_str(), fDeviceId.size(), 0, sndTill)) { if (mq.timed_send(fDeviceId.c_str(), fDeviceId.size(), 0, sndTill)) {
std::this_thread::sleep_for(std::chrono::milliseconds(100)); fHeartbeatsCV.wait_for(lock, std::chrono::milliseconds(100), [&]() { return !fSendHeartbeats; });
} else { } else {
LOG(debug) << "control queue timeout"; LOG(debug) << "control queue timeout";
} }
} catch (boost::interprocess::interprocess_exception& ie) { } catch (boost::interprocess::interprocess_exception& ie) {
std::this_thread::sleep_for(std::chrono::milliseconds(500)); fHeartbeatsCV.wait_for(lock, std::chrono::milliseconds(500), [&]() { return !fSendHeartbeats; });
// LOG(warn) << "no " << controlQueueName << " found"; // LOG(debug) << "no " << controlQueueName << " found";
} }
} }
} }
bool ThrowingOnBadAlloc() const { return fThrowOnBadAlloc; } bool ThrowingOnBadAlloc() const { return fThrowOnBadAlloc; }
~Manager()
{
using namespace boost::interprocess;
bool lastRemoved = false;
UnsubscribeFromRegionEvents();
{
std::unique_lock<std::mutex> lock(fHeartbeatsMtx);
fSendHeartbeats = false;
}
fHeartbeatsCV.notify_one();
if (fHeartbeatThread.joinable()) {
fHeartbeatThread.join();
}
try {
boost::interprocess::scoped_lock<named_mutex> lock(fShmMtx);
(fDeviceCounter->fCount)--;
if (fDeviceCounter->fCount == 0) {
LOG(debug) << "Last segment user, removing segment.";
lastRemoved = true;
} else {
LOG(debug) << "Other segment users present (" << fDeviceCounter->fCount << "), skipping removal.";
}
} catch (interprocess_exception& e) {
LOG(error) << "Manager could not acquire lock: " << e.what();
}
if (lastRemoved) {
Monitor::Cleanup(ShmId{fShmId});
}
}
private: private:
std::string fShmId; std::string fShmId;
std::string fDeviceId; std::string fDeviceId;
std::string fSegmentName;
std::string fManagementSegmentName;
boost::interprocess::managed_shared_memory fSegment; boost::interprocess::managed_shared_memory fSegment;
boost::interprocess::managed_shared_memory fManagementSegment; boost::interprocess::managed_shared_memory fManagementSegment;
VoidAlloc fShmVoidAlloc; VoidAlloc fShmVoidAlloc;
@@ -442,7 +463,10 @@ class Manager
std::atomic<int32_t> fMsgCounter; // TODO: find a better lifetime solution instead of the counter std::atomic<int32_t> fMsgCounter; // TODO: find a better lifetime solution instead of the counter
std::thread fHeartbeatThread; std::thread fHeartbeatThread;
std::atomic<bool> fSendHeartbeats; bool fSendHeartbeats;
std::mutex fHeartbeatsMtx;
std::condition_variable fHeartbeatsCV;
bool fThrowOnBadAlloc; bool fThrowOnBadAlloc;
}; };

View File

@@ -109,7 +109,7 @@ class Message final : public fair::mq::Message
, fRegionPtr(nullptr) , fRegionPtr(nullptr)
, fLocalPtr(static_cast<char*>(data)) , fLocalPtr(static_cast<char*>(data))
{ {
if (reinterpret_cast<const char*>(data) >= reinterpret_cast<const char*>(region->GetData()) || if (reinterpret_cast<const char*>(data) >= reinterpret_cast<const char*>(region->GetData()) &&
reinterpret_cast<const char*>(data) <= reinterpret_cast<const char*>(region->GetData()) + region->GetSize()) { reinterpret_cast<const char*>(data) <= reinterpret_cast<const char*>(region->GetData()) + region->GetSize()) {
fMeta.fHandle = (boost::interprocess::managed_shared_memory::handle_t)(reinterpret_cast<const char*>(data) - reinterpret_cast<const char*>(region->GetData())); fMeta.fHandle = (boost::interprocess::managed_shared_memory::handle_t)(reinterpret_cast<const char*>(data) - reinterpret_cast<const char*>(region->GetData()));
} else { } else {
@@ -251,6 +251,10 @@ class Message final : public fair::mq::Message
// boost::interprocess::managed_shared_memory::size_type actualSize = size; // boost::interprocess::managed_shared_memory::size_type actualSize = size;
// char* hint = 0; // unused for boost::interprocess::allocate_new // char* hint = 0; // unused for boost::interprocess::allocate_new
// fLocalPtr = fManager.Segment().allocation_command<char>(boost::interprocess::allocate_new, size, actualSize, hint); // fLocalPtr = fManager.Segment().allocation_command<char>(boost::interprocess::allocate_new, size, actualSize, hint);
size_t segmentSize = fManager.Segment().get_size();
if (size > segmentSize) {
throw MessageBadAlloc(tools::ToString("Requested message size (", size, ") exceeds segment size (", segmentSize, ")"));
}
if (alignment == 0) { if (alignment == 0) {
fLocalPtr = reinterpret_cast<char*>(fManager.Segment().allocate(size)); fLocalPtr = reinterpret_cast<char*>(fManager.Segment().allocate(size));
} else { } else {

View File

@@ -64,7 +64,6 @@ Monitor::Monitor(const string& shmId, bool selfDestruct, bool interactive, bool
, fHeartbeatTriggered(false) , fHeartbeatTriggered(false)
, fLastHeartbeat(chrono::high_resolution_clock::now()) , fLastHeartbeat(chrono::high_resolution_clock::now())
, fSignalThread() , fSignalThread()
, fManagementSegment(bipc::open_or_create, fManagementSegmentName.c_str(), 65536)
, fDeviceHeartbeats() , fDeviceHeartbeats()
{ {
if (!fViewOnly) { if (!fViewOnly) {
@@ -203,7 +202,7 @@ void Monitor::Interactive()
case 'x': case 'x':
cout << "\n[x] --> closing shared memory:" << endl; cout << "\n[x] --> closing shared memory:" << endl;
if (!fViewOnly) { if (!fViewOnly) {
Cleanup(fShmId); Cleanup(ShmId{fShmId});
} else { } else {
cout << "cannot close because in view only mode" << endl; cout << "cannot close because in view only mode" << endl;
} }
@@ -288,7 +287,7 @@ void Monitor::CheckSegment()
if (fHeartbeatTriggered && duration > fTimeoutInMS) { if (fHeartbeatTriggered && duration > fTimeoutInMS) {
cout << "no heartbeats since over " << fTimeoutInMS << " milliseconds, cleaning..." << endl; cout << "no heartbeats since over " << fTimeoutInMS << " milliseconds, cleaning..." << endl;
Cleanup(fShmId); Cleanup(ShmId{fShmId});
fHeartbeatTriggered = false; fHeartbeatTriggered = false;
if (fSelfDestruct) { if (fSelfDestruct) {
cout << "\nself destructing" << endl; cout << "\nself destructing" << endl;
@@ -321,7 +320,7 @@ void Monitor::CheckSegment()
unsigned int duration = chrono::duration_cast<chrono::milliseconds>(now - fLastHeartbeat).count(); unsigned int duration = chrono::duration_cast<chrono::milliseconds>(now - fLastHeartbeat).count();
if (fIsDaemon && duration > fTimeoutInMS * 2) { if (fIsDaemon && duration > fTimeoutInMS * 2) {
Cleanup(fShmId); Cleanup(ShmId{fShmId});
fHeartbeatTriggered = false; fHeartbeatTriggered = false;
if (fSelfDestruct) { if (fSelfDestruct) {
cout << "\nself destructing" << endl; cout << "\nself destructing" << endl;
@@ -395,51 +394,52 @@ void Monitor::PrintHelp()
void Monitor::RemoveObject(const string& name) void Monitor::RemoveObject(const string& name)
{ {
if (bipc::shared_memory_object::remove(name.c_str())) { if (bipc::shared_memory_object::remove(name.c_str())) {
cout << "Successfully removed \"" << name << "\"." << endl; cout << "Successfully removed '" << name << "'." << endl;
} else { } else {
cout << "Did not remove \"" << name << "\". Already removed?" << endl; cout << "Did not remove '" << name << "'. Already removed?" << endl;
} }
} }
void Monitor::RemoveFileMapping(const string& name) void Monitor::RemoveFileMapping(const string& name)
{ {
if (bipc::file_mapping::remove(name.c_str())) { if (bipc::file_mapping::remove(name.c_str())) {
cout << "Successfully removed \"" << name << "\"." << endl; cout << "Successfully removed '" << name << "'." << endl;
} else { } else {
cout << "Did not remove \"" << name << "\". Already removed?" << endl; cout << "Did not remove '" << name << "'. Already removed?" << endl;
} }
} }
void Monitor::RemoveQueue(const string& name) void Monitor::RemoveQueue(const string& name)
{ {
if (bipc::message_queue::remove(name.c_str())) { if (bipc::message_queue::remove(name.c_str())) {
cout << "Successfully removed \"" << name << "\"." << endl; cout << "Successfully removed '" << name << "'." << endl;
} else { } else {
cout << "Did not remove \"" << name << "\". Already removed?" << endl; cout << "Did not remove '" << name << "'. Already removed?" << endl;
} }
} }
void Monitor::RemoveMutex(const string& name) void Monitor::RemoveMutex(const string& name)
{ {
if (bipc::named_mutex::remove(name.c_str())) { if (bipc::named_mutex::remove(name.c_str())) {
cout << "Successfully removed \"" << name << "\"." << endl; cout << "Successfully removed '" << name << "'." << endl;
} else { } else {
cout << "Did not remove \"" << name << "\". Already removed?" << endl; cout << "Did not remove '" << name << "'. Already removed?" << endl;
} }
} }
void Monitor::RemoveCondition(const string& name) void Monitor::RemoveCondition(const string& name)
{ {
if (bipc::named_condition::remove(name.c_str())) { if (bipc::named_condition::remove(name.c_str())) {
cout << "Successfully removed \"" << name << "\"." << endl; cout << "Successfully removed '" << name << "'." << endl;
} else { } else {
cout << "Did not remove \"" << name << "\". Already removed?" << endl; cout << "Did not remove '" << name << "'. Already removed?" << endl;
} }
} }
void Monitor::Cleanup(const string& shmId) void Monitor::Cleanup(const ShmId& shmId)
{ {
string managementSegmentName("fmq_" + shmId + "_mng"); cout << "Cleaning up for shared memory id '" << shmId.shmId << "'..." << endl;
string managementSegmentName("fmq_" + shmId.shmId + "_mng");
try { try {
bipc::managed_shared_memory managementSegment(bipc::open_only, managementSegmentName.c_str()); bipc::managed_shared_memory managementSegment(bipc::open_only, managementSegmentName.c_str());
RegionCounter* rc = managementSegment.find<RegionCounter>(bipc::unique_instance).first; RegionCounter* rc = managementSegment.find<RegionCounter>(bipc::unique_instance).first;
@@ -454,20 +454,20 @@ void Monitor::Cleanup(const string& shmId)
RegionInfo ri = m->at(i); RegionInfo ri = m->at(i);
string path = ri.fPath.c_str(); string path = ri.fPath.c_str();
int flags = ri.fFlags; int flags = ri.fFlags;
cout << "Found RegionInfo with path: '" << path << "', flags: " << flags << "'." << endl; cout << "Found RegionInfo with path: '" << path << "', flags: " << flags << ", fDestroyed: " << ri.fDestroyed << "." << endl;
if (path != "") { if (path != "") {
RemoveFileMapping(tools::ToString(path, "fmq_" + shmId + "_rg_" + to_string(i))); RemoveFileMapping(tools::ToString(path, "fmq_" + shmId.shmId + "_rg_" + to_string(i)));
} else { } else {
RemoveObject("fmq_" + shmId + "_rg_" + to_string(i)); RemoveObject("fmq_" + shmId.shmId + "_rg_" + to_string(i));
} }
} else { } else {
RemoveObject("fmq_" + shmId + "_rg_" + to_string(i)); RemoveObject("fmq_" + shmId.shmId + "_rg_" + to_string(i));
} }
RemoveQueue(string("fmq_" + shmId + "_rgq_" + to_string(i))); RemoveQueue(string("fmq_" + shmId.shmId + "_rgq_" + to_string(i)));
} }
} else { } else {
cout << "No region counter found. no regions to cleanup." << endl; cout << "No region counter found. No regions to cleanup." << endl;
} }
RemoveObject(managementSegmentName.c_str()); RemoveObject(managementSegmentName.c_str());
@@ -477,20 +477,41 @@ void Monitor::Cleanup(const string& shmId)
cout << "Could not locate element in the region map, out of range: " << oor.what() << endl; cout << "Could not locate element in the region map, out of range: " << oor.what() << endl;
} }
RemoveObject("fmq_" + shmId + "_main"); RemoveObject("fmq_" + shmId.shmId + "_main");
RemoveMutex("fmq_" + shmId + "_mtx"); RemoveMutex("fmq_" + shmId.shmId + "_mtx");
RemoveCondition("fmq_" + shmId + "_cv"); RemoveCondition("fmq_" + shmId.shmId + "_cv");
cout << endl; cout << endl;
} }
void Monitor::Cleanup(const SessionId& sessionId)
{
ShmId shmId{buildShmIdFromSessionIdAndUserId(sessionId.sessionId)};
cout << "Cleanup called with session id '" << sessionId.sessionId << "', translating to shared memory id '" << shmId.shmId << "'" << endl;
Cleanup(shmId);
}
void Monitor::CleanupFull(const ShmId& shmId)
{
Cleanup(shmId);
RemoveMutex("fmq_" + shmId.shmId + "_ms");
RemoveQueue("fmq_" + shmId.shmId + "_cq");
}
void Monitor::CleanupFull(const SessionId& sessionId)
{
ShmId shmId{buildShmIdFromSessionIdAndUserId(sessionId.sessionId)};
cout << "Cleanup called with session id '" << sessionId.sessionId << "', translating to shared memory id '" << shmId.shmId << "'" << endl;
CleanupFull(shmId);
}
Monitor::~Monitor() Monitor::~Monitor()
{ {
if (fSignalThread.joinable()) { if (fSignalThread.joinable()) {
fSignalThread.join(); fSignalThread.join();
} }
if (fCleanOnExit) { if (fCleanOnExit) {
Cleanup(fShmId); Cleanup(ShmId{fShmId});
} }
if (!fViewOnly) { if (!fViewOnly) {
RemoveMutex("fmq_" + fShmId + "_ms"); RemoveMutex("fmq_" + fShmId + "_ms");

View File

@@ -8,8 +8,6 @@
#ifndef FAIR_MQ_SHMEM_MONITOR_H_ #ifndef FAIR_MQ_SHMEM_MONITOR_H_
#define FAIR_MQ_SHMEM_MONITOR_H_ #define FAIR_MQ_SHMEM_MONITOR_H_
#include <boost/interprocess/managed_shared_memory.hpp>
#include <thread> #include <thread>
#include <chrono> #include <chrono>
#include <atomic> #include <atomic>
@@ -24,6 +22,18 @@ namespace mq
namespace shmem namespace shmem
{ {
struct SessionId
{
std::string sessionId;
explicit operator std::string() const { return sessionId; }
};
struct ShmId
{
std::string shmId;
explicit operator std::string() const { return shmId; }
};
class Monitor class Monitor
{ {
public: public:
@@ -37,7 +47,19 @@ class Monitor
void CatchSignals(); void CatchSignals();
void Run(); void Run();
static void Cleanup(const std::string& shmId); /// @brief Cleanup all shared memory artifacts created by devices
/// @param shmId shared memory id
static void Cleanup(const ShmId& shmId);
/// @brief Cleanup all shared memory artifacts created by devices
/// @param sessionId session id
static void Cleanup(const SessionId& sessionId);
/// @brief Cleanup all shared memory artifacts created by devices and monitors
/// @param shmId shared memory id
static void CleanupFull(const ShmId& shmId);
/// @brief Cleanup all shared memory artifacts created by devices and monitors
/// @param sessionId session id
static void CleanupFull(const SessionId& sessionId);
static void RemoveObject(const std::string&); static void RemoveObject(const std::string&);
static void RemoveFileMapping(const std::string&); static void RemoveFileMapping(const std::string&);
static void RemoveQueue(const std::string&); static void RemoveQueue(const std::string&);
@@ -70,7 +92,6 @@ class Monitor
std::atomic<bool> fHeartbeatTriggered; std::atomic<bool> fHeartbeatTriggered;
std::chrono::high_resolution_clock::time_point fLastHeartbeat; std::chrono::high_resolution_clock::time_point fLastHeartbeat;
std::thread fSignalThread; std::thread fSignalThread;
boost::interprocess::managed_shared_memory fManagementSegment;
std::unordered_map<std::string, std::chrono::high_resolution_clock::time_point> fDeviceHeartbeats; std::unordered_map<std::string, std::chrono::high_resolution_clock::time_point> fDeviceHeartbeats;
}; };

View File

@@ -6,6 +6,23 @@ The transport manages shared memory via boost::interprocess library. The transfe
Devices track and cleanup shared memory on shutdown. For more information on the current shared memory segment and additional cleanup options, see following section. Devices track and cleanup shared memory on shutdown. For more information on the current shared memory segment and additional cleanup options, see following section.
# Shared Memory objects / files
FairMQ Shared Memory currently uses the following names to register shared memory on the system:
| name | info | created by | used by |
| ------------------------- | ---------------------------------------------- | ------------------ | ------------------------------ |
| `fmq_<shmId>_main` | main segment (user data) | one of the devices | devices |
| `fmq_<shmId>_mng` | management segment (management data) | one of the devices | devices |
| `fmq_<shmId>_mtx` | mutex | one of the devices | devices |
| `fmq_<shmId>_cv` | condition variable | one of the devices | devices with unmanaged regions |
| `fmq_<shmId>_rg_<index>` | unmanaged region(s) | one of the devices | devices with unmanaged regions |
| `fmq_<shmId>_rgq_<index>` | unmanaged region queue(s) | one of the devices | devices with unmanaged regions |
| `fmq_<shmId>_ms` | shmmonitor status | shmmonitor | devices, shmmonitor |
| `fmq_<shmId>_cq` | message queue between transport and shmmonitor | shmmonitor | devices, shmmonitor |
The shmId is generated out of session id and user id.
## Shared memory monitor ## Shared memory monitor
The shared memory monitor tool, supplied with the shared memory transport can be used to monitor shared memory use and automatically cleanup shared memory in case of device crashes. The shared memory monitor tool, supplied with the shared memory transport can be used to monitor shared memory use and automatically cleanup shared memory in case of device crashes.
@@ -25,13 +42,3 @@ Without the `--self-destruct` option, the monitor will run continuously, moitori
Possible further implementation would be to run the monitor with `--self-destruct` with each topology. Possible further implementation would be to run the monitor with `--self-destruct` with each topology.
The Monitor class can also be used independently from the supplied executable (built from `runMonitor.cxx`), allowing integration on any level. For example invoking the monitor could be a functionality that a device offers. The Monitor class can also be used independently from the supplied executable (built from `runMonitor.cxx`), allowing integration on any level. For example invoking the monitor could be a functionality that a device offers.
FairMQ Shared Memory currently uses following names to register shared memory on the system:
`fmq_<shmId>_main` - main segment name, used for user data (the shmId is generated out of session id and user id).
`fmq_<shmId>_mng` - management segment name, used for storing management data.
`fmq_<shmId>_cq` - message queue for communicating between shm transport and shm monitor (exists independent of above segments).
`fmq_<shmId>_mtx` - boost::interprocess::named_mutex for management purposes (exists independent of above segments).
`fmq_<shmId>_ms` - shmmonitor status used to signal if it is active or not (exists independent of above segments).
`fmq_<shmId>_rg_<index>` - names of unmanaged regions.
`fmq_<shmId>_rgq_<index>` - names of queues for the unmanaged regions.

View File

@@ -30,6 +30,7 @@
#include <boost/interprocess/ipc/message_queue.hpp> #include <boost/interprocess/ipc/message_queue.hpp>
#include <algorithm> // min #include <algorithm> // min
#include <atomic>
#include <thread> #include <thread>
#include <mutex> #include <mutex>
#include <condition_variable> #include <condition_variable>
@@ -49,6 +50,7 @@ struct Region
{ {
Region(const std::string& shmId, uint64_t id, uint64_t size, bool remote, RegionCallback callback, RegionBulkCallback bulkCallback, const std::string& path, int flags) Region(const std::string& shmId, uint64_t id, uint64_t size, bool remote, RegionCallback callback, RegionBulkCallback bulkCallback, const std::string& path, int flags)
: fRemote(remote) : fRemote(remote)
, fLinger(100)
, fStop(false) , fStop(false)
, fName("fmq_" + shmId + "_rg_" + std::to_string(id)) , fName("fmq_" + shmId + "_rg_" + std::to_string(id))
, fQueueName("fmq_" + shmId + "_rgq_" + std::to_string(id)) , fQueueName("fmq_" + shmId + "_rgq_" + std::to_string(id))
@@ -56,8 +58,8 @@ struct Region
, fFile(nullptr) , fFile(nullptr)
, fFileMapping() , fFileMapping()
, fQueue(nullptr) , fQueue(nullptr)
, fReceiveAcksWorker() , fAcksReceiver()
, fSendAcksWorker() , fAcksSender()
, fCallback(callback) , fCallback(callback)
, fBulkCallback(bulkCallback) , fBulkCallback(bulkCallback)
{ {
@@ -118,38 +120,35 @@ struct Region
LOG(debug) << "shmem: initialized region queue: " << fQueueName; LOG(debug) << "shmem: initialized region queue: " << fQueueName;
} }
void StartSendingAcks() void StartSendingAcks() { fAcksSender = std::thread(&Region::SendAcks, this); }
{
fSendAcksWorker = std::thread(&Region::SendAcks, this);
}
void SendAcks() void SendAcks()
{ {
std::unique_ptr<RegionBlock[]> blocks = tools::make_unique<RegionBlock[]>(fAckBunchSize); std::unique_ptr<RegionBlock[]> blocks = tools::make_unique<RegionBlock[]>(fAckBunchSize);
size_t blocksToSend = 0;
while (true) { // we'll try to send all acks before stopping while (true) {
size_t blocksToSend = 0; blocksToSend = 0;
{
{ // mutex locking block
std::unique_lock<std::mutex> lock(fBlockMtx); std::unique_lock<std::mutex> lock(fBlockMtx);
// try to get more blocks without waiting (we can miss a notify from CloseMessage()) // try to get <fAckBunchSize> blocks
if (!fStop && (fBlocksToFree.size() < fAckBunchSize)) { if (fBlocksToFree.size() < fAckBunchSize) {
// cv.wait() timeout: send whatever blocks we have
fBlockSendCV.wait_for(lock, std::chrono::milliseconds(500)); fBlockSendCV.wait_for(lock, std::chrono::milliseconds(500));
} }
// send whatever blocks we have
blocksToSend = std::min(fBlocksToFree.size(), fAckBunchSize); blocksToSend = std::min(fBlocksToFree.size(), fAckBunchSize);
copy_n(fBlocksToFree.end() - blocksToSend, blocksToSend, blocks.get()); copy_n(fBlocksToFree.end() - blocksToSend, blocksToSend, blocks.get());
fBlocksToFree.resize(fBlocksToFree.size() - blocksToSend); fBlocksToFree.resize(fBlocksToFree.size() - blocksToSend);
} // unlock the block mutex here while sending over IPC }
if (blocksToSend > 0) { if (blocksToSend > 0) {
while (!fQueue->try_send(blocks.get(), blocksToSend * sizeof(RegionBlock), 0) && !fStop) { while (!fQueue->try_send(blocks.get(), blocksToSend * sizeof(RegionBlock), 0) && !fStop) {
// receiver slow? yield and try again... // receiver slow? yield and try again...
std::this_thread::yield(); std::this_thread::yield();
} }
// LOG(debug) << "Sent " << blocksToSend << " blocks.";
} else { // blocksToSend == 0 } else { // blocksToSend == 0
if (fStop) { if (fStop) {
break; break;
@@ -157,14 +156,11 @@ struct Region
} }
} }
LOG(debug) << "send ack worker for " << fName << " leaving."; LOG(debug) << "AcksSender for " << fName << " leaving " << "(blocks left to free: " << fBlocksToFree.size() << ", "
} << " blocks left to send: " << blocksToSend << ").";
void StartReceivingAcks()
{
fReceiveAcksWorker = std::thread(&Region::ReceiveAcks, this);
} }
void StartReceivingAcks() { fAcksReceiver = std::thread(&Region::ReceiveAcks, this); }
void ReceiveAcks() void ReceiveAcks()
{ {
unsigned int priority; unsigned int priority;
@@ -173,12 +169,18 @@ struct Region
std::vector<fair::mq::RegionBlock> result; std::vector<fair::mq::RegionBlock> result;
result.reserve(fAckBunchSize); result.reserve(fAckBunchSize);
while (!fStop) { // end thread condition (should exist until region is destroyed) while (true) {
auto rcvTill = boost::posix_time::microsec_clock::universal_time() + boost::posix_time::milliseconds(500); uint32_t timeout = 100;
bool leave = false;
if (fStop) {
timeout = fLinger;
leave = true;
}
auto rcvTill = boost::posix_time::microsec_clock::universal_time() + boost::posix_time::milliseconds(timeout);
while (fQueue->timed_receive(blocks.get(), fAckBunchSize * sizeof(RegionBlock), recvdSize, priority, rcvTill)) { while (fQueue->timed_receive(blocks.get(), fAckBunchSize * sizeof(RegionBlock), recvdSize, priority, rcvTill)) {
// LOG(debug) << "received: " << block.fHandle << " " << block.fSize << " " << block.fMessageId;
const auto numBlocks = recvdSize / sizeof(RegionBlock); const auto numBlocks = recvdSize / sizeof(RegionBlock);
// LOG(debug) << "Received " << numBlocks << " blocks (recvdSize: " << recvdSize << "). (remaining queue size: " << fQueue->get_num_msg() << ").";
if (fBulkCallback) { if (fBulkCallback) {
result.clear(); result.clear();
for (size_t i = 0; i < numBlocks; i++) { for (size_t i = 0; i < numBlocks; i++) {
@@ -191,9 +193,13 @@ struct Region
} }
} }
} }
} // while !fStop
LOG(debug) << "ReceiveAcks() worker for " << fName << " leaving."; if (leave) {
break;
}
}
LOG(debug) << "AcksReceiver for " << fName << " leaving (remaining queue size: " << fQueue->get_num_msg() << ").";
} }
void ReleaseBlock(const RegionBlock& block) void ReleaseBlock(const RegionBlock& block)
@@ -203,31 +209,34 @@ struct Region
fBlocksToFree.emplace_back(block); fBlocksToFree.emplace_back(block);
if (fBlocksToFree.size() >= fAckBunchSize) { if (fBlocksToFree.size() >= fAckBunchSize) {
lock.unlock(); // reduces contention on fBlockMtx lock.unlock();
fBlockSendCV.notify_one(); fBlockSendCV.notify_one();
} }
} }
void SetLinger(uint32_t linger) { fLinger = linger; }
uint32_t GetLinger() const { return fLinger; }
~Region() ~Region()
{ {
fStop = true; fStop = true;
if (fSendAcksWorker.joinable()) { if (fAcksSender.joinable()) {
fBlockSendCV.notify_one(); fBlockSendCV.notify_one();
fSendAcksWorker.join(); fAcksSender.join();
} }
if (!fRemote) { if (!fRemote) {
if (fReceiveAcksWorker.joinable()) { if (fAcksReceiver.joinable()) {
fReceiveAcksWorker.join(); fAcksReceiver.join();
} }
if (boost::interprocess::shared_memory_object::remove(fName.c_str())) { if (boost::interprocess::shared_memory_object::remove(fName.c_str())) {
LOG(debug) << "shmem: destroyed region " << fName; LOG(debug) << "Region '" << fName << "' destroyed.";
} }
if (boost::interprocess::file_mapping::remove(fName.c_str())) { if (boost::interprocess::file_mapping::remove(fName.c_str())) {
LOG(debug) << "shmem: destroyed file mapping " << fName; LOG(debug) << "File mapping '" << fName << "' destroyed.";
} }
if (fFile) { if (fFile) {
@@ -235,16 +244,19 @@ struct Region
} }
if (boost::interprocess::message_queue::remove(fQueueName.c_str())) { if (boost::interprocess::message_queue::remove(fQueueName.c_str())) {
LOG(debug) << "shmem: removed region queue " << fQueueName; LOG(debug) << "Region queue '" << fQueueName << "' destroyed.";
} }
} else { } else {
// LOG(debug) << "shmem: region '" << fName << "' is remote, no cleanup necessary."; // LOG(debug) << "shmem: region '" << fName << "' is remote, no cleanup necessary.";
LOG(debug) << "shmem: region queue '" << fQueueName << "' is remote, no cleanup necessary"; LOG(debug) << "Region queue '" << fQueueName << "' is remote, no cleanup necessary";
} }
LOG(debug) << "Region '" << fName << "' (" << (fRemote ? "remote" : "local") << ") destructed.";
} }
bool fRemote; bool fRemote;
bool fStop; uint32_t fLinger;
std::atomic<bool> fStop;
std::string fName; std::string fName;
std::string fQueueName; std::string fQueueName;
boost::interprocess::shared_memory_object fShmemObject; boost::interprocess::shared_memory_object fShmemObject;
@@ -258,8 +270,8 @@ struct Region
const std::size_t fAckBunchSize = 256; const std::size_t fAckBunchSize = 256;
std::unique_ptr<boost::interprocess::message_queue> fQueue; std::unique_ptr<boost::interprocess::message_queue> fQueue;
std::thread fReceiveAcksWorker; std::thread fAcksReceiver;
std::thread fSendAcksWorker; std::thread fAcksSender;
RegionCallback fCallback; RegionCallback fCallback;
RegionBulkCallback fBulkCallback; RegionBulkCallback fBulkCallback;
}; };

View File

@@ -55,8 +55,7 @@ class Socket final : public fair::mq::Socket
, fBytesRx(0) , fBytesRx(0)
, fMessagesTx(0) , fMessagesTx(0)
, fMessagesRx(0) , fMessagesRx(0)
, fSndTimeout(100) , fTimeout(100)
, fRcvTimeout(100)
{ {
assert(context); assert(context);
fSocket = zmq_socket(context, GetConstant(type)); fSocket = zmq_socket(context, GetConstant(type));
@@ -77,11 +76,11 @@ class Socket final : public fair::mq::Socket
LOG(error) << "Failed setting ZMQ_LINGER socket option, reason: " << zmq_strerror(errno); LOG(error) << "Failed setting ZMQ_LINGER socket option, reason: " << zmq_strerror(errno);
} }
if (zmq_setsockopt(fSocket, ZMQ_SNDTIMEO, &fSndTimeout, sizeof(fSndTimeout)) != 0) { if (zmq_setsockopt(fSocket, ZMQ_SNDTIMEO, &fTimeout, sizeof(fTimeout)) != 0) {
LOG(error) << "Failed setting ZMQ_SNDTIMEO socket option, reason: " << zmq_strerror(errno); LOG(error) << "Failed setting ZMQ_SNDTIMEO socket option, reason: " << zmq_strerror(errno);
} }
if (zmq_setsockopt(fSocket, ZMQ_RCVTIMEO, &fRcvTimeout, sizeof(fRcvTimeout)) != 0) { if (zmq_setsockopt(fSocket, ZMQ_RCVTIMEO, &fTimeout, sizeof(fTimeout)) != 0) {
LOG(error) << "Failed setting ZMQ_RCVTIMEO socket option, reason: " << zmq_strerror(errno); LOG(error) << "Failed setting ZMQ_RCVTIMEO socket option, reason: " << zmq_strerror(errno);
} }
@@ -129,6 +128,35 @@ class Socket final : public fair::mq::Socket
return true; return true;
} }
bool ShouldRetry(int flags, int timeout, int& elapsed) const
{
if (!fManager.Interrupted() && ((flags & ZMQ_DONTWAIT) == 0)) {
if (timeout > 0) {
elapsed += fTimeout;
if (elapsed >= timeout) {
return false;
}
}
return true;
} else {
return false;
}
}
int HandleErrors() const
{
if (zmq_errno() == ETERM) {
LOG(debug) << "Terminating socket " << fId;
return -1;
} else if (zmq_errno() == EINTR) {
LOG(debug) << "Transfer interrupted by system call";
return -1;
} else {
LOG(error) << "Failed transfer on socket " << fId << ", reason: " << zmq_strerror(errno);
return -1;
}
}
int Send(MessagePtr& msg, const int timeout = -1) override int Send(MessagePtr& msg, const int timeout = -1) override
{ {
int flags = 0; int flags = 0;
@@ -150,26 +178,13 @@ class Socket final : public fair::mq::Socket
fBytesTx += size; fBytesTx += size;
return size; return size;
} else if (zmq_errno() == EAGAIN) { } else if (zmq_errno() == EAGAIN) {
if (!fManager.Interrupted() && ((flags & ZMQ_DONTWAIT) == 0)) { if (ShouldRetry(flags, timeout, elapsed)) {
if (timeout > 0) {
elapsed += fSndTimeout;
if (elapsed >= timeout) {
return -2;
}
}
continue; continue;
} else { } else {
return -2; return -2;
} }
} else if (zmq_errno() == ETERM) { } else {
LOG(info) << "terminating socket " << fId; return HandleErrors();
return -1;
} else if (zmq_errno() == EINTR) {
LOG(debug) << "Send interrupted by system call";
return nbytes;
}else {
LOG(error) << "Failed sending on socket " << fId << ", reason: " << zmq_strerror(errno) << ", nbytes = " << nbytes;
return nbytes;
} }
} }
@@ -206,26 +221,13 @@ class Socket final : public fair::mq::Socket
++fMessagesRx; ++fMessagesRx;
return size; return size;
} else if (zmq_errno() == EAGAIN) { } else if (zmq_errno() == EAGAIN) {
if (!fManager.Interrupted() && ((flags & ZMQ_DONTWAIT) == 0)) { if (ShouldRetry(flags, timeout, elapsed)) {
if (timeout > 0) {
elapsed += fRcvTimeout;
if (elapsed >= timeout) {
return -2;
}
}
continue; continue;
} else { } else {
return -2; return -2;
} }
} else if (zmq_errno() == ETERM) { } else {
LOG(info) << "terminating socket " << fId; return HandleErrors();
return -1;
} else if (zmq_errno() == EINTR) {
LOG(debug) << "Receive interrupted by system call";
return nbytes;
}else {
LOG(error) << "Failed receiving on socket " << fId << ", errno: " << errno << ", reason: " << zmq_strerror(errno) << ", nbytes = " << nbytes;
return nbytes;
} }
} }
} }
@@ -268,26 +270,13 @@ class Socket final : public fair::mq::Socket
return totalSize; return totalSize;
} else if (zmq_errno() == EAGAIN) { } else if (zmq_errno() == EAGAIN) {
if (!fManager.Interrupted() && ((flags & ZMQ_DONTWAIT) == 0)) { if (ShouldRetry(flags, timeout, elapsed)) {
if (timeout > 0) {
elapsed += fSndTimeout;
if (elapsed >= timeout) {
return -2;
}
}
continue; continue;
} else { } else {
return -2; return -2;
} }
} else if (zmq_errno() == ETERM) { } else {
LOG(info) << "terminating socket " << fId; return HandleErrors();
return -1;
} else if (zmq_errno() == EINTR) {
LOG(debug) << "Send interrupted by system call";
return nbytes;
}else {
LOG(error) << "Failed sending on socket " << fId << ", reason: " << zmq_strerror(errno) << ", nbytes = " << nbytes;
return nbytes;
} }
} }
@@ -335,23 +324,13 @@ class Socket final : public fair::mq::Socket
return totalSize; return totalSize;
} else if (zmq_errno() == EAGAIN) { } else if (zmq_errno() == EAGAIN) {
if (!fManager.Interrupted() && ((flags & ZMQ_DONTWAIT) == 0)) { if (ShouldRetry(flags, timeout, elapsed)) {
if (timeout > 0) {
elapsed += fRcvTimeout;
if (elapsed >= timeout) {
return -2;
}
}
continue; continue;
} else { } else {
return -2; return -2;
} }
} else if (zmq_errno() == EINTR) {
LOG(debug) << "Receive interrupted by system call";
return nbytes;
} else { } else {
LOG(error) << "Failed receiving on socket " << fId << ", errno: " << errno << ", reason: " << zmq_strerror(errno) << ", nbytes = " << nbytes; return HandleErrors();
return nbytes;
} }
} }
@@ -396,6 +375,15 @@ class Socket final : public fair::mq::Socket
} }
} }
void Events(uint32_t* events) override
{
size_t eventsSize = sizeof(uint32_t);
if (zmq_getsockopt(fSocket, ZMQ_EVENTS, events, &eventsSize) < 0) {
throw SocketError(
tools::ToString("failed setting ZMQ_EVENTS, reason: ", zmq_strerror(errno)));
}
}
int GetLinger() const override int GetLinger() const override
{ {
int value = 0; int value = 0;
@@ -505,6 +493,14 @@ class Socket final : public fair::mq::Socket
if (constant == "no-block") return ZMQ_DONTWAIT; if (constant == "no-block") return ZMQ_DONTWAIT;
if (constant == "snd-more no-block") return ZMQ_DONTWAIT|ZMQ_SNDMORE; if (constant == "snd-more no-block") return ZMQ_DONTWAIT|ZMQ_SNDMORE;
if (constant == "fd") return ZMQ_FD;
if (constant == "events")
return ZMQ_EVENTS;
if (constant == "pollin")
return ZMQ_POLLIN;
if (constant == "pollout")
return ZMQ_POLLOUT;
return -1; return -1;
} }
@@ -519,8 +515,7 @@ class Socket final : public fair::mq::Socket
std::atomic<unsigned long> fMessagesTx; std::atomic<unsigned long> fMessagesTx;
std::atomic<unsigned long> fMessagesRx; std::atomic<unsigned long> fMessagesRx;
int fSndTimeout; int fTimeout;
int fRcvTimeout;
}; };
} }

View File

@@ -57,20 +57,17 @@ class TransportFactory final : public fair::mq::TransportFactory
int numIoThreads = 1; int numIoThreads = 1;
std::string sessionName = "default"; std::string sessionName = "default";
size_t segmentSize = 2000000000; size_t segmentSize = 2ULL << 30;
bool autolaunchMonitor = false;
bool throwOnBadAlloc = true;
if (config) { if (config) {
numIoThreads = config->GetProperty<int>("io-threads", numIoThreads); numIoThreads = config->GetProperty<int>("io-threads", numIoThreads);
sessionName = config->GetProperty<std::string>("session", sessionName); sessionName = config->GetProperty<std::string>("session", sessionName);
segmentSize = config->GetProperty<size_t>("shm-segment-size", segmentSize); segmentSize = config->GetProperty<size_t>("shm-segment-size", segmentSize);
autolaunchMonitor = config->GetProperty<bool>("shm-monitor", autolaunchMonitor);
throwOnBadAlloc = config->GetProperty<bool>("shm-throw-bad-alloc", throwOnBadAlloc);
} else { } else {
LOG(debug) << "ProgOptions not available! Using defaults."; LOG(debug) << "ProgOptions not available! Using defaults.";
} }
fShmId = buildShmIdFromSessionIdAndUserId(sessionName); fShmId = buildShmIdFromSessionIdAndUserId(sessionName);
LOG(debug) << "Generated shmid '" << fShmId << "' out of session id '" << sessionName << "'.";
try { try {
if (zmq_ctx_set(fZMQContext, ZMQ_IO_THREADS, numIoThreads) != 0) { if (zmq_ctx_set(fZMQContext, ZMQ_IO_THREADS, numIoThreads) != 0) {
@@ -82,11 +79,7 @@ class TransportFactory final : public fair::mq::TransportFactory
LOG(error) << "failed configuring context, reason: " << zmq_strerror(errno); LOG(error) << "failed configuring context, reason: " << zmq_strerror(errno);
} }
if (autolaunchMonitor) { fManager = tools::make_unique<Manager>(fShmId, fDeviceId, segmentSize, config);
Manager::StartMonitor(fShmId);
}
fManager = tools::make_unique<Manager>(fShmId, fDeviceId, segmentSize, throwOnBadAlloc);
} catch (boost::interprocess::interprocess_exception& e) { } catch (boost::interprocess::interprocess_exception& e) {
LOG(error) << "Could not initialize shared memory transport: " << e.what(); LOG(error) << "Could not initialize shared memory transport: " << e.what();
throw std::runtime_error(tools::ToString("Could not initialize shared memory transport: ", e.what())); throw std::runtime_error(tools::ToString("Could not initialize shared memory transport: ", e.what()));

View File

@@ -57,6 +57,8 @@ class UnmanagedRegion final : public fair::mq::UnmanagedRegion
void* GetData() const override { return fRegion->get_address(); } void* GetData() const override { return fRegion->get_address(); }
size_t GetSize() const override { return fRegion->get_size(); } size_t GetSize() const override { return fRegion->get_size(); }
uint64_t GetId() const override { return fRegionId; } uint64_t GetId() const override { return fRegionId; }
void SetLinger(uint32_t linger) override { fManager.GetRegion(fRegionId)->SetLinger(linger); }
uint32_t GetLinger() const override { return fManager.GetRegion(fRegionId)->GetLinger(); }
~UnmanagedRegion() override { fManager.RemoveRegion(fRegionId); } ~UnmanagedRegion() override { fManager.RemoveRegion(fRegionId); }

View File

@@ -111,9 +111,7 @@ int main(int argc, char** argv)
} }
if (cleanup) { if (cleanup) {
cout << "Cleaning up \"" << shmId << "\"..." << endl; Monitor::CleanupFull(ShmId{shmId});
Monitor::Cleanup(shmId);
Monitor::RemoveQueue("fmq_" + shmId + "_cq");
return 0; return 0;
} }

View File

@@ -155,9 +155,17 @@ class Message final : public fair::mq::Message
void* GetData() const override void* GetData() const override
{ {
if (!fViewMsg) { if (!fViewMsg) {
return zmq_msg_data(fMsg.get()); if (zmq_msg_size(fMsg.get()) > 0) {
return zmq_msg_data(fMsg.get());
} else {
return nullptr;
}
} else { } else {
return zmq_msg_data(fViewMsg.get()); if (zmq_msg_size(fViewMsg.get()) > 0) {
return zmq_msg_data(fViewMsg.get());
} else {
return nullptr;
}
} }
} }

View File

@@ -12,13 +12,15 @@
#include <FairMQLogger.h> #include <FairMQLogger.h>
#include <FairMQMessage.h> #include <FairMQMessage.h>
#include <FairMQSocket.h> #include <FairMQSocket.h>
#include <atomic>
#include <fairmq/Tools.h> #include <fairmq/Tools.h>
#include <fairmq/zeromq/Context.h> #include <fairmq/zeromq/Context.h>
#include <fairmq/zeromq/Message.h> #include <fairmq/zeromq/Message.h>
#include <memory> // unique_ptr
#include <zmq.h> #include <zmq.h>
#include <atomic>
#include <memory> // unique_ptr
namespace fair { namespace fair {
namespace mq { namespace mq {
namespace zmq { namespace zmq {
@@ -35,8 +37,7 @@ class Socket final : public fair::mq::Socket
, fBytesRx(0) , fBytesRx(0)
, fMessagesTx(0) , fMessagesTx(0)
, fMessagesRx(0) , fMessagesRx(0)
, fSndTimeout(100) , fTimeout(100)
, fRcvTimeout(100)
{ {
if (fSocket == nullptr) { if (fSocket == nullptr) {
LOG(error) << "Failed creating socket " << fId << ", reason: " << zmq_strerror(errno); LOG(error) << "Failed creating socket " << fId << ", reason: " << zmq_strerror(errno);
@@ -54,11 +55,11 @@ class Socket final : public fair::mq::Socket
LOG(error) << "Failed setting ZMQ_LINGER socket option, reason: " << zmq_strerror(errno); LOG(error) << "Failed setting ZMQ_LINGER socket option, reason: " << zmq_strerror(errno);
} }
if (zmq_setsockopt(fSocket, ZMQ_SNDTIMEO, &fSndTimeout, sizeof(fSndTimeout)) != 0) { if (zmq_setsockopt(fSocket, ZMQ_SNDTIMEO, &fTimeout, sizeof(fTimeout)) != 0) {
LOG(error) << "Failed setting ZMQ_SNDTIMEO socket option, reason: " << zmq_strerror(errno); LOG(error) << "Failed setting ZMQ_SNDTIMEO socket option, reason: " << zmq_strerror(errno);
} }
if (zmq_setsockopt(fSocket, ZMQ_RCVTIMEO, &fRcvTimeout, sizeof(fRcvTimeout)) != 0) { if (zmq_setsockopt(fSocket, ZMQ_RCVTIMEO, &fTimeout, sizeof(fTimeout)) != 0) {
LOG(error) << "Failed setting ZMQ_RCVTIMEO socket option, reason: " << zmq_strerror(errno); LOG(error) << "Failed setting ZMQ_RCVTIMEO socket option, reason: " << zmq_strerror(errno);
} }
@@ -78,13 +79,12 @@ class Socket final : public fair::mq::Socket
bool Bind(const std::string& address) override bool Bind(const std::string& address) override
{ {
// LOG(info) << "bind socket " << fId << " on " << address; // LOG(debug) << "Binding socket " << fId << " on " << address;
if (zmq_bind(fSocket, address.c_str()) != 0) { if (zmq_bind(fSocket, address.c_str()) != 0) {
if (errno == EADDRINUSE) { if (errno == EADDRINUSE) {
// do not print error in this case, this is handled by FairMQDevice in case no // do not print error in this case, this is handled by FairMQDevice in case no
// connection could be established after trying a number of random ports from a // connection could be established after trying a number of random ports from a range.
// range.
return false; return false;
} }
LOG(error) << "Failed binding socket " << fId << ", address: " << address << ", reason: " << zmq_strerror(errno); LOG(error) << "Failed binding socket " << fId << ", address: " << address << ", reason: " << zmq_strerror(errno);
@@ -96,7 +96,7 @@ class Socket final : public fair::mq::Socket
bool Connect(const std::string& address) override bool Connect(const std::string& address) override
{ {
// LOG(info) << "connect socket " << fId << " on " << address; // LOG(debug) << "Connecting socket " << fId << " on " << address;
if (zmq_connect(fSocket, address.c_str()) != 0) { if (zmq_connect(fSocket, address.c_str()) != 0) {
LOG(error) << "Failed connecting socket " << fId << ", address: " << address << ", reason: " << zmq_strerror(errno); LOG(error) << "Failed connecting socket " << fId << ", address: " << address << ", reason: " << zmq_strerror(errno);
@@ -106,6 +106,35 @@ class Socket final : public fair::mq::Socket
return true; return true;
} }
bool ShouldRetry(int flags, int timeout, int& elapsed) const
{
if (!fCtx.Interrupted() && ((flags & ZMQ_DONTWAIT) == 0)) {
if (timeout > 0) {
elapsed += fTimeout;
if (elapsed >= timeout) {
return false;
}
}
return true;
} else {
return false;
}
}
int HandleErrors() const
{
if (zmq_errno() == ETERM) {
LOG(debug) << "Terminating socket " << fId;
return -1;
} else if (zmq_errno() == EINTR) {
LOG(debug) << "Transfer interrupted by system call";
return -1;
} else {
LOG(error) << "Failed transfer on socket " << fId << ", errno: " << errno << ", reason: " << zmq_strerror(errno);
return -1;
}
}
int Send(MessagePtr& msg, const int timeout = -1) override int Send(MessagePtr& msg, const int timeout = -1) override
{ {
int flags = 0; int flags = 0;
@@ -121,29 +150,15 @@ class Socket final : public fair::mq::Socket
if (nbytes >= 0) { if (nbytes >= 0) {
fBytesTx += nbytes; fBytesTx += nbytes;
++fMessagesTx; ++fMessagesTx;
return nbytes; return nbytes;
} else if (zmq_errno() == EAGAIN) { } else if (zmq_errno() == EAGAIN) {
if (!fCtx.Interrupted() && ((flags & ZMQ_DONTWAIT) == 0)) { if (ShouldRetry(flags, timeout, elapsed)) {
if (timeout > 0) {
elapsed += fSndTimeout;
if (elapsed >= timeout) {
return -2;
}
}
continue; continue;
} else { } else {
return -2; return -2;
} }
} else if (zmq_errno() == ETERM) {
LOG(info) << "terminating socket " << fId;
return -1;
} else if (zmq_errno() == EINTR) {
LOG(debug) << "Send interrupted by system call";
return nbytes;
} else { } else {
LOG(error) << "Failed sending on socket " << fId << ", reason: " << zmq_strerror(errno); return HandleErrors();
return nbytes;
} }
} }
} }
@@ -163,26 +178,13 @@ class Socket final : public fair::mq::Socket
++fMessagesRx; ++fMessagesRx;
return nbytes; return nbytes;
} else if (zmq_errno() == EAGAIN) { } else if (zmq_errno() == EAGAIN) {
if (!fCtx.Interrupted() && ((flags & ZMQ_DONTWAIT) == 0)) { if (ShouldRetry(flags, timeout, elapsed)) {
if (timeout > 0) {
elapsed += fRcvTimeout;
if (elapsed >= timeout) {
return -2;
}
}
continue; continue;
} else { } else {
return -2; return -2;
} }
} else if (zmq_errno() == ETERM) {
LOG(info) << "terminating socket " << fId;
return -1;
} else if (zmq_errno() == EINTR) {
LOG(debug) << "Receive interrupted by system call";
return nbytes;
} else { } else {
LOG(error) << "Failed receiving on socket " << fId << ", reason: " << zmq_strerror(errno); return HandleErrors();
return nbytes;
} }
} }
} }
@@ -210,32 +212,15 @@ class Socket final : public fair::mq::Socket
int nbytes = zmq_msg_send(static_cast<Message*>(msgVec[i].get())->GetMessage(), fSocket, (i < vecSize - 1) ? ZMQ_SNDMORE | flags : flags); int nbytes = zmq_msg_send(static_cast<Message*>(msgVec[i].get())->GetMessage(), fSocket, (i < vecSize - 1) ? ZMQ_SNDMORE | flags : flags);
if (nbytes >= 0) { if (nbytes >= 0) {
totalSize += nbytes; totalSize += nbytes;
} else { } else if (zmq_errno() == EAGAIN) {
// according to ZMQ docs, this can only occur for the first part if (ShouldRetry(flags, timeout, elapsed)) {
if (zmq_errno() == EAGAIN) { repeat = true;
if (!fCtx.Interrupted() && ((flags & ZMQ_DONTWAIT) == 0)) { break;
if (timeout > 0) {
elapsed += fSndTimeout;
if (elapsed >= timeout) {
return -2;
}
}
repeat = true;
break;
} else {
return -2;
}
}
if (zmq_errno() == ETERM) {
LOG(info) << "terminating socket " << fId;
return -1;
} else if (zmq_errno() == EINTR) {
LOG(debug) << "Receive interrupted by system call";
return nbytes;
} else { } else {
LOG(error) << "Failed sending on socket " << fId << ", reason: " << zmq_strerror(errno); return -2;
return nbytes;
} }
} else {
return HandleErrors();
} }
} }
@@ -243,16 +228,14 @@ class Socket final : public fair::mq::Socket
continue; continue;
} }
// store statistics on how many messages have been sent (handle all parts as a // store statistics on how many messages have been sent (handle all parts as a single message)
// single message)
++fMessagesTx; ++fMessagesTx;
fBytesTx += totalSize; fBytesTx += totalSize;
return totalSize; return totalSize;
} }
} // If there's only one part, send it as a regular message } else if (vecSize == 1) { // If there's only one part, send it as a regular message
else if (vecSize == 1) {
return Send(msgVec.back(), timeout); return Send(msgVec.back(), timeout);
} else { // if the vector is empty, something might be wrong } else { // if the vector is empty, something might be wrong
LOG(warn) << "Will not send empty vector"; LOG(warn) << "Will not send empty vector";
return -1; return -1;
} }
@@ -279,23 +262,14 @@ class Socket final : public fair::mq::Socket
msgVec.push_back(move(part)); msgVec.push_back(move(part));
totalSize += nbytes; totalSize += nbytes;
} else if (zmq_errno() == EAGAIN) { } else if (zmq_errno() == EAGAIN) {
if (!fCtx.Interrupted() && ((flags & ZMQ_DONTWAIT) == 0)) { if (ShouldRetry(flags, timeout, elapsed)) {
if (timeout > 0) {
elapsed += fRcvTimeout;
if (elapsed >= timeout) {
return -2;
}
}
repeat = true; repeat = true;
break; break;
} else { } else {
return -2; return -2;
} }
} else if (zmq_errno() == EINTR) {
LOG(debug) << "Receive interrupted by system call";
return nbytes;
} else { } else {
return nbytes; return HandleErrors();
} }
size_t moreSize = sizeof(more); size_t moreSize = sizeof(more);
@@ -306,8 +280,7 @@ class Socket final : public fair::mq::Socket
continue; continue;
} }
// store statistics on how many messages have been received (handle all parts as a // store statistics on how many messages have been received (handle all parts as a single message)
// single message)
++fMessagesRx; ++fMessagesRx;
fBytesRx += totalSize; fBytesRx += totalSize;
return totalSize; return totalSize;
@@ -345,6 +318,15 @@ class Socket final : public fair::mq::Socket
} }
} }
void Events(uint32_t* events) override
{
size_t eventsSize = sizeof(uint32_t);
if (zmq_getsockopt(fSocket, ZMQ_EVENTS, events, &eventsSize) < 0) {
throw SocketError(
tools::ToString("failed setting ZMQ_EVENTS, reason: ", zmq_strerror(errno)));
}
}
void SetLinger(const int value) override void SetLinger(const int value) override
{ {
if (zmq_setsockopt(fSocket, ZMQ_LINGER, &value, sizeof(value)) < 0) { if (zmq_setsockopt(fSocket, ZMQ_LINGER, &value, sizeof(value)) < 0) {
@@ -459,6 +441,14 @@ class Socket final : public fair::mq::Socket
if (constant == "linger") return ZMQ_LINGER; if (constant == "linger") return ZMQ_LINGER;
if (constant == "fd") return ZMQ_FD;
if (constant == "events")
return ZMQ_EVENTS;
if (constant == "pollin")
return ZMQ_POLLIN;
if (constant == "pollout")
return ZMQ_POLLOUT;
return -1; return -1;
} }
@@ -473,8 +463,7 @@ class Socket final : public fair::mq::Socket
std::atomic<unsigned long> fMessagesTx; std::atomic<unsigned long> fMessagesTx;
std::atomic<unsigned long> fMessagesRx; std::atomic<unsigned long> fMessagesRx;
int fSndTimeout; int fTimeout;
int fRcvTimeout;
}; };
} // namespace zmq } // namespace zmq

View File

@@ -55,7 +55,7 @@ class TransportFactory final : public FairMQTransportFactory
{ {
return tools::make_unique<Message>(this); return tools::make_unique<Message>(this);
} }
MessagePtr CreateMessage(Alignment alignment) override MessagePtr CreateMessage(Alignment alignment) override
{ {
return tools::make_unique<Message>(alignment, this); return tools::make_unique<Message>(alignment, this);

View File

@@ -52,6 +52,8 @@ class UnmanagedRegion final : public fair::mq::UnmanagedRegion
virtual size_t GetSize() const override { return fSize; } virtual size_t GetSize() const override { return fSize; }
uint64_t GetId() const override { return fId; } uint64_t GetId() const override { return fId; }
int64_t GetUserFlags() const { return fUserFlags; } int64_t GetUserFlags() const { return fUserFlags; }
void SetLinger(uint32_t /* linger */) override { LOG(debug) << "ZeroMQ UnmanagedRegion linger option not implemented. Acknowledgements are local."; }
uint32_t GetLinger() const override { LOG(debug) << "ZeroMQ UnmanagedRegion linger option not implemented. Acknowledgements are local."; return 0; }
virtual ~UnmanagedRegion() virtual ~UnmanagedRegion()
{ {

View File

@@ -91,6 +91,31 @@ void Alignment(const string& transport)
ASSERT_EQ(reinterpret_cast<uintptr_t>(msg->GetData()) % 64, 0); ASSERT_EQ(reinterpret_cast<uintptr_t>(msg->GetData()) % 64, 0);
} }
void EmptyMessage(const string& transport, const string& _address)
{
size_t session{fair::mq::tools::UuidHash()};
std::string address(fair::mq::tools::ToString(_address, "_", transport));
fair::mq::ProgOptions config;
config.SetProperty<string>("session", to_string(session));
auto factory = FairMQTransportFactory::CreateTransportFactory(transport, fair::mq::tools::Uuid(), &config);
FairMQChannel push{"Push", "push", factory};
push.Bind(address);
FairMQChannel pull{"Pull", "pull", factory};
pull.Connect(address);
FairMQMessagePtr outMsg(push.NewMessage());
ASSERT_EQ(outMsg->GetData(), nullptr);
ASSERT_EQ(push.Send(outMsg), 0);
FairMQMessagePtr inMsg(pull.NewMessage());
ASSERT_EQ(pull.Receive(inMsg), 0);
ASSERT_EQ(inMsg->GetData(), nullptr);
}
TEST(Resize, zeromq) TEST(Resize, zeromq)
{ {
RunPushPullWithMsgResize("zeromq", "ipc://test_message_resize"); RunPushPullWithMsgResize("zeromq", "ipc://test_message_resize");
@@ -116,4 +141,14 @@ TEST(Alignment, shmem) // TODO: add test for ZeroMQ once it is implemented
Alignment("shmem"); Alignment("shmem");
} }
TEST(EmptyMessage, zeromq)
{
EmptyMessage("zeromq", "ipc://test_empty_message");
}
TEST(EmptyMessage, shmem)
{
EmptyMessage("shmem", "ipc://test_empty_message");
}
} // namespace } // namespace

View File

@@ -70,6 +70,9 @@ TEST(ProgOptions, SetAndGet)
set_and_get<vector<double>>(o, "_vector<double>", { 33.22, 33.23, 33.24 }); set_and_get<vector<double>>(o, "_vector<double>", { 33.22, 33.23, 33.24 });
set_and_get<vector<long double>>(o, "_vector<long double>", { 333.222, 333.223, 333.224 }); set_and_get<vector<long double>>(o, "_vector<long double>", { 333.222, 333.223, 333.224 });
set_and_get<vector<boost::filesystem::path>>(o, "_vector<boost::filesystem::path>", { boost::filesystem::path("C:\\Windows"), boost::filesystem::path("C:\\Windows\\System32") }); set_and_get<vector<boost::filesystem::path>>(o, "_vector<boost::filesystem::path>", { boost::filesystem::path("C:\\Windows"), boost::filesystem::path("C:\\Windows\\System32") });
ASSERT_THROW(o.ParseAll({"cmd", "--unregistered", "option"}, false), boost::program_options::unknown_option);
ASSERT_NO_THROW(o.ParseAll({"cmd", "--unregistered", "option"}, true));
} }
template<typename T> template<typename T>

View File

@@ -163,9 +163,9 @@ TEST(PushPull, Multipart_ST_ipc_zeromq)
RunSingleThreadedMultipart("zeromq", "ipc://test_Multipart_ST_ipc_zeromq_1", "ipc://test_Multipart_ST_ipc_zeromq_2"); RunSingleThreadedMultipart("zeromq", "ipc://test_Multipart_ST_ipc_zeromq_1", "ipc://test_Multipart_ST_ipc_zeromq_2");
} }
TEST(PushPull, Multipart_ST_ipc_shmen) TEST(PushPull, Multipart_ST_ipc_shmem)
{ {
RunSingleThreadedMultipart("shmem", "ipc://test_Multipart_ST_ipc_shmen_1", "ipc://test_Multipart_ST_ipc_shmen_2"); RunSingleThreadedMultipart("shmem", "ipc://test_Multipart_ST_ipc_shmem_1", "ipc://test_Multipart_ST_ipc_shmem_2");
} }
TEST(PushPull, Multipart_MT_inproc_zeromq) TEST(PushPull, Multipart_MT_inproc_zeromq)

View File

@@ -139,6 +139,32 @@ TEST_F(Topology, ChangeState)
EXPECT_EQ(sdk::StateEqualsTo(currentState, sdk::DeviceState::InitializingDevice), true); EXPECT_EQ(sdk::StateEqualsTo(currentState, sdk::DeviceState::InitializingDevice), true);
} }
TEST_F(Topology, MixedState)
{
using namespace fair::mq;
sdk::Topology topo(mDDSTopo, mDDSSession);
auto result1(topo.ChangeState(sdk::TopologyTransition::InitDevice, "main/Sampler.*"));
LOG(info) << result1.first;
EXPECT_EQ(result1.first, std::error_code());
EXPECT_EQ(sdk::AggregateState(result1.second), sdk::AggregatedTopologyState::Mixed);
EXPECT_EQ(sdk::StateEqualsTo(result1.second, sdk::DeviceState::InitializingDevice), false);
auto const currentState1 = topo.GetCurrentState();
EXPECT_EQ(sdk::AggregateState(currentState1), sdk::AggregatedTopologyState::Mixed);
EXPECT_EQ(sdk::StateEqualsTo(currentState1, sdk::DeviceState::InitializingDevice), false);
auto result2(topo.ChangeState(sdk::TopologyTransition::InitDevice, "main/SinkGroup/.*"));
LOG(info) << result2.first;
EXPECT_EQ(result2.first, std::error_code());
EXPECT_EQ(sdk::AggregateState(result2.second), sdk::AggregatedTopologyState::InitializingDevice);
EXPECT_EQ(sdk::StateEqualsTo(result2.second, sdk::DeviceState::InitializingDevice), true);
auto const currentState2 = topo.GetCurrentState();
EXPECT_EQ(sdk::AggregateState(currentState2), sdk::AggregatedTopologyState::InitializingDevice);
EXPECT_EQ(sdk::StateEqualsTo(currentState2, sdk::DeviceState::InitializingDevice), true);
}
TEST_F(Topology, AsyncChangeStateConcurrent) TEST_F(Topology, AsyncChangeStateConcurrent)
{ {
using namespace fair::mq; using namespace fair::mq;
@@ -191,9 +217,9 @@ TEST_F(Topology, AsyncChangeStateCollectionView)
ASSERT_EQ(cstate.size(), 5); ASSERT_EQ(cstate.size(), 5);
for (const auto& c : cstate) { for (const auto& c : cstate) {
LOG(debug) << "\t" << c.first; LOG(debug) << "\t" << c.first;
State s; sdk::AggregatedTopologyState s;
ASSERT_NO_THROW(s = sdk::AggregateState(c.second)); ASSERT_NO_THROW(s = sdk::AggregateState(c.second));
ASSERT_EQ(s, State::InitializingDevice); ASSERT_EQ(s, static_cast<sdk::AggregatedTopologyState>(State::InitializingDevice));
LOG(debug) << "\tAggregated state: " << s; LOG(debug) << "\tAggregated state: " << s;
for (const auto& ds : c.second) { for (const auto& ds : c.second) {
LOG(debug) << "\t\t" << ds.state; LOG(debug) << "\t\t" << ds.state;
@@ -426,4 +452,26 @@ TEST_F(Topology, SetAndGetProperties)
ASSERT_EQ(topo.ChangeState(TopologyTransition::ResetDevice).first, std::error_code()); ASSERT_EQ(topo.ChangeState(TopologyTransition::ResetDevice).first, std::error_code());
} }
TEST(Topology2, AggregatedTopologyStateComparison)
{
using namespace fair::mq::sdk;
ASSERT_TRUE(DeviceState::Undefined == AggregatedTopologyState::Undefined);
ASSERT_TRUE(AggregatedTopologyState::Undefined == DeviceState::Undefined);
ASSERT_TRUE(DeviceState::Ok == AggregatedTopologyState::Ok);
ASSERT_TRUE(DeviceState::Error == AggregatedTopologyState::Error);
ASSERT_TRUE(DeviceState::Idle == AggregatedTopologyState::Idle);
ASSERT_TRUE(DeviceState::InitializingDevice == AggregatedTopologyState::InitializingDevice);
ASSERT_TRUE(DeviceState::Initialized == AggregatedTopologyState::Initialized);
ASSERT_TRUE(DeviceState::Binding == AggregatedTopologyState::Binding);
ASSERT_TRUE(DeviceState::Bound == AggregatedTopologyState::Bound);
ASSERT_TRUE(DeviceState::Connecting == AggregatedTopologyState::Connecting);
ASSERT_TRUE(DeviceState::DeviceReady == AggregatedTopologyState::DeviceReady);
ASSERT_TRUE(DeviceState::InitializingTask == AggregatedTopologyState::InitializingTask);
ASSERT_TRUE(DeviceState::Ready == AggregatedTopologyState::Ready);
ASSERT_TRUE(DeviceState::Running == AggregatedTopologyState::Running);
ASSERT_TRUE(DeviceState::ResettingTask == AggregatedTopologyState::ResettingTask);
ASSERT_TRUE(DeviceState::ResettingDevice == AggregatedTopologyState::ResettingDevice);
ASSERT_TRUE(DeviceState::Exiting == AggregatedTopologyState::Exiting);
}
} // namespace } // namespace

View File

@@ -66,6 +66,24 @@ void RunOptionsTest(const string& transport)
ASSERT_EQ(channel.GetSocket().GetRcvKernelSize(), 8000); ASSERT_EQ(channel.GetSocket().GetRcvKernelSize(), 8000);
} }
void ZeroingAndMlock(const string& transport)
{
size_t session{fair::mq::tools::UuidHash()};
fair::mq::ProgOptions config;
config.SetProperty<string>("session", to_string(session));
config.SetProperty<size_t>("shm-segment-size", 16384);
config.SetProperty<bool>("shm-zero-segment", true);
config.SetProperty<bool>("shm-mlock-segment", true);
auto factory = FairMQTransportFactory::CreateTransportFactory(transport, fair::mq::tools::Uuid(), &config);
FairMQMessagePtr outMsg(factory->CreateMessage(10000));
char test[10000];
memset(test, 0, sizeof(test));
ASSERT_EQ(memcmp(test, outMsg->GetData(), outMsg->GetSize()), 0);
}
TEST(Options, zeromq) TEST(Options, zeromq)
{ {
RunOptionsTest("zeromq"); RunOptionsTest("zeromq");
@@ -76,4 +94,9 @@ TEST(Options, shmem)
RunOptionsTest("shmem"); RunOptionsTest("shmem");
} }
TEST(ZeroingAndMlock, shmem)
{
ZeroingAndMlock("shmem");
}
} // namespace } // namespace