mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-17 02:21:47 +00:00
Compare commits
21 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
e6f67b3658 | ||
|
091d0824d1 | ||
|
857aa84fa3 | ||
|
c85d6e079c | ||
|
4e466514d2 | ||
|
8b4056e408 | ||
|
b67b80e0ad | ||
|
2c89b24857 | ||
|
c6a6a5f21b | ||
|
9defa71622 | ||
|
ed2dcedf03 | ||
|
a3d56b9aeb | ||
|
8a2641d842 | ||
|
2ca62d06db | ||
|
87e0ca5450 | ||
|
ef5b3c782e | ||
|
f7ba3052aa | ||
|
a90dbf64de | ||
|
9724f184f4 | ||
|
057ba03776 | ||
|
6dfea32aee |
@@ -205,7 +205,7 @@ macro(set_fairmq_defaults)
|
|||||||
endif()
|
endif()
|
||||||
|
|
||||||
if( CMAKE_CXX_COMPILER_ID STREQUAL "GNU"
|
if( CMAKE_CXX_COMPILER_ID STREQUAL "GNU"
|
||||||
AND CMAKE_CXX_COMPILER_VERSION VERSION_LESS 8)
|
AND CMAKE_CXX_COMPILER_VERSION VERSION_LESS 9)
|
||||||
set(FAIRMQ_HAS_STD_FILESYSTEM 0)
|
set(FAIRMQ_HAS_STD_FILESYSTEM 0)
|
||||||
else()
|
else()
|
||||||
set(FAIRMQ_HAS_STD_FILESYSTEM 1)
|
set(FAIRMQ_HAS_STD_FILESYSTEM 1)
|
||||||
|
@@ -5,12 +5,6 @@
|
|||||||
* GNU Lesser General Public Licence (LGPL) version 3, *
|
* GNU Lesser General Public Licence (LGPL) version 3, *
|
||||||
* copied verbatim in the file "LICENSE" *
|
* copied verbatim in the file "LICENSE" *
|
||||||
********************************************************************************/
|
********************************************************************************/
|
||||||
/**
|
|
||||||
* Sampler.cpp
|
|
||||||
*
|
|
||||||
* @since 2014-10-10
|
|
||||||
* @author A. Rybalchenko
|
|
||||||
*/
|
|
||||||
|
|
||||||
#include "Sampler.h"
|
#include "Sampler.h"
|
||||||
|
|
||||||
@@ -37,17 +31,17 @@ void Sampler::InitTask()
|
|||||||
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
|
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
|
||||||
|
|
||||||
fChannels.at("data").at(0).Transport()->SubscribeToRegionEvents([](FairMQRegionInfo info) {
|
fChannels.at("data").at(0).Transport()->SubscribeToRegionEvents([](FairMQRegionInfo info) {
|
||||||
LOG(info) << "Region event: " << info.event
|
LOG(info) << "Region event: " << info.event << ": "
|
||||||
<< ", managed: " << info.managed
|
<< (info.managed ? "managed" : "unmanaged")
|
||||||
<< ", id: " << info.id
|
<< ", id: " << info.id
|
||||||
<< ", ptr: " << info.ptr
|
<< ", ptr: " << info.ptr
|
||||||
<< ", size: " << info.size
|
<< ", size: " << info.size
|
||||||
<< ", flags: " << info.flags;
|
<< ", flags: " << info.flags;
|
||||||
});
|
});
|
||||||
|
|
||||||
fRegion = FairMQUnmanagedRegionPtr(NewUnmanagedRegionFor("data",
|
fRegion = FairMQUnmanagedRegionPtr(NewUnmanagedRegionFor("data", // region is created using the transport of this channel...
|
||||||
0,
|
0, // ... and this sub-channel
|
||||||
10000000,
|
10000000, // region size
|
||||||
[this](const std::vector<fair::mq::RegionBlock>& blocks) { // callback to be called when message buffers no longer needed by transport
|
[this](const std::vector<fair::mq::RegionBlock>& blocks) { // callback to be called when message buffers no longer needed by transport
|
||||||
lock_guard<mutex> lock(fMtx);
|
lock_guard<mutex> lock(fMtx);
|
||||||
fNumUnackedMsgs -= blocks.size();
|
fNumUnackedMsgs -= blocks.size();
|
||||||
@@ -55,7 +49,10 @@ void Sampler::InitTask()
|
|||||||
if (fMaxIterations > 0) {
|
if (fMaxIterations > 0) {
|
||||||
LOG(info) << "Received " << blocks.size() << " acks";
|
LOG(info) << "Received " << blocks.size() << " acks";
|
||||||
}
|
}
|
||||||
}
|
},
|
||||||
|
"", // path, if a region is backed by a file
|
||||||
|
0, // flags that are passed for region creation
|
||||||
|
fair::mq::RegionConfig{true, true} // additional config: { call mlock on the region, zero the region memory }
|
||||||
));
|
));
|
||||||
fRegion->SetLinger(fLinger);
|
fRegion->SetLinger(fLinger);
|
||||||
}
|
}
|
||||||
@@ -75,20 +72,19 @@ bool Sampler::ConditionalRun()
|
|||||||
// std::this_thread::sleep_for(std::chrono::seconds(1));
|
// std::this_thread::sleep_for(std::chrono::seconds(1));
|
||||||
|
|
||||||
lock_guard<mutex> lock(fMtx);
|
lock_guard<mutex> lock(fMtx);
|
||||||
|
++fNumUnackedMsgs;
|
||||||
if (Send(msg, "data", 0) > 0) {
|
if (Send(msg, "data", 0) > 0) {
|
||||||
if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) {
|
if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) {
|
||||||
LOG(info) << "Configured maximum number of iterations reached. Leaving RUNNING state.";
|
LOG(info) << "Configured maximum number of iterations reached. Leaving RUNNING state.";
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
++fNumUnackedMsgs;
|
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
void Sampler::ResetTask()
|
void Sampler::ResetTask()
|
||||||
{
|
{
|
||||||
// On destruction UnmanagedRegion will try to TODO
|
|
||||||
fRegion.reset();
|
fRegion.reset();
|
||||||
{
|
{
|
||||||
lock_guard<mutex> lock(fMtx);
|
lock_guard<mutex> lock(fMtx);
|
||||||
|
@@ -30,8 +30,8 @@ void Sink::InitTask()
|
|||||||
// Get the fMaxIterations value from the command line options (via fConfig)
|
// Get the fMaxIterations value from the command line options (via fConfig)
|
||||||
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
|
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
|
||||||
fChannels.at("data").at(0).Transport()->SubscribeToRegionEvents([](FairMQRegionInfo info) {
|
fChannels.at("data").at(0).Transport()->SubscribeToRegionEvents([](FairMQRegionInfo info) {
|
||||||
LOG(info) << "Region event: " << info.event
|
LOG(info) << "Region event: " << info.event << ": "
|
||||||
<< ", managed: " << info.managed
|
<< (info.managed ? "managed" : "unmanaged")
|
||||||
<< ", id: " << info.id
|
<< ", id: " << info.id
|
||||||
<< ", ptr: " << info.ptr
|
<< ", ptr: " << info.ptr
|
||||||
<< ", size: " << info.size
|
<< ", size: " << info.size
|
||||||
|
@@ -2,9 +2,14 @@
|
|||||||
|
|
||||||
export FAIRMQ_PATH=@FAIRMQ_BIN_DIR@
|
export FAIRMQ_PATH=@FAIRMQ_BIN_DIR@
|
||||||
|
|
||||||
|
transport="shmem"
|
||||||
msgSize="1000000"
|
msgSize="1000000"
|
||||||
|
|
||||||
if [[ $1 =~ ^[0-9]+$ ]]; then
|
if [[ $1 =~ ^[a-z]+$ ]]; then
|
||||||
|
transport=$1
|
||||||
|
fi
|
||||||
|
|
||||||
|
if [[ $2 =~ ^[0-9]+$ ]]; then
|
||||||
msgSize=$1
|
msgSize=$1
|
||||||
fi
|
fi
|
||||||
|
|
||||||
@@ -13,13 +18,13 @@ SAMPLER+=" --id sampler1"
|
|||||||
SAMPLER+=" --severity debug"
|
SAMPLER+=" --severity debug"
|
||||||
SAMPLER+=" --msg-size $msgSize"
|
SAMPLER+=" --msg-size $msgSize"
|
||||||
# SAMPLER+=" --rate 10"
|
# SAMPLER+=" --rate 10"
|
||||||
SAMPLER+=" --transport shmem"
|
SAMPLER+=" --transport $transport"
|
||||||
SAMPLER+=" --channel-config name=data,type=push,method=bind,address=tcp://127.0.0.1:7777,sndKernelSize=212992"
|
SAMPLER+=" --channel-config name=data,type=push,method=bind,address=tcp://127.0.0.1:7777,sndKernelSize=212992"
|
||||||
xterm -geometry 80x23+0+0 -hold -e @EX_BIN_DIR@/$SAMPLER &
|
xterm -geometry 120x60+0+0 -hold -e @EX_BIN_DIR@/$SAMPLER &
|
||||||
|
|
||||||
SINK="fairmq-ex-region-sink"
|
SINK="fairmq-ex-region-sink"
|
||||||
SINK+=" --id sink1"
|
SINK+=" --id sink1"
|
||||||
SINK+=" --severity debug"
|
SINK+=" --severity debug"
|
||||||
SINK+=" --transport shmem"
|
SINK+=" --transport $transport"
|
||||||
SINK+=" --channel-config name=data,type=pull,method=connect,address=tcp://127.0.0.1:7777,rcvKernelSize=212992"
|
SINK+=" --channel-config name=data,type=pull,method=connect,address=tcp://127.0.0.1:7777,rcvKernelSize=212992"
|
||||||
xterm -geometry 80x23+500+0 -hold -e @EX_BIN_DIR@/$SINK &
|
xterm -geometry 120x60+750+0 -hold -e @EX_BIN_DIR@/$SINK &
|
||||||
|
@@ -31,6 +31,7 @@ SAMPLER_PID=$!
|
|||||||
SINK="fairmq-ex-region-sink"
|
SINK="fairmq-ex-region-sink"
|
||||||
SINK+=" --id sink1"
|
SINK+=" --id sink1"
|
||||||
SINK+=" --transport $transport"
|
SINK+=" --transport $transport"
|
||||||
|
SINK+=" --severity debug"
|
||||||
SINK+=" --session $SESSION"
|
SINK+=" --session $SESSION"
|
||||||
SINK+=" --verbosity veryhigh"
|
SINK+=" --verbosity veryhigh"
|
||||||
SINK+=" --control static --color false"
|
SINK+=" --control static --color false"
|
||||||
|
@@ -56,7 +56,7 @@ class FairMQSocket
|
|||||||
/// If the backend supports it, fills the unsigned integer @a events with the ZMQ_EVENTS value
|
/// If the backend supports it, fills the unsigned integer @a events with the ZMQ_EVENTS value
|
||||||
/// DISCLAIMER: this API is experimental and unsupported and might be dropped / refactored in
|
/// DISCLAIMER: this API is experimental and unsupported and might be dropped / refactored in
|
||||||
/// the future.
|
/// the future.
|
||||||
virtual void Events(uint32_t* events) = 0;
|
virtual int Events(uint32_t* events) = 0;
|
||||||
virtual void SetLinger(const int value) = 0;
|
virtual void SetLinger(const int value) = 0;
|
||||||
virtual int GetLinger() const = 0;
|
virtual int GetLinger() const = 0;
|
||||||
virtual void SetSndBufSize(const int value) = 0;
|
virtual void SetSndBufSize(const int value) = 0;
|
||||||
|
@@ -92,8 +92,8 @@ class FairMQTransportFactory
|
|||||||
/// @param path optional parameter to pass to the underlying transport
|
/// @param path optional parameter to pass to the underlying transport
|
||||||
/// @param flags optional parameter to pass to the underlying transport
|
/// @param flags optional parameter to pass to the underlying transport
|
||||||
/// @return pointer to UnmanagedRegion
|
/// @return pointer to UnmanagedRegion
|
||||||
virtual FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback = nullptr, const std::string& path = "", int flags = 0) = 0;
|
virtual FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) = 0;
|
||||||
virtual FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionBulkCallback callback = nullptr, const std::string& path = "", int flags = 0) = 0;
|
virtual FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionBulkCallback callback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) = 0;
|
||||||
/// @brief Create new UnmanagedRegion
|
/// @brief Create new UnmanagedRegion
|
||||||
/// @param size size of the region
|
/// @param size size of the region
|
||||||
/// @param userFlags flags to be stored with the region, have no effect on the transport, but can be retrieved from the region by the user
|
/// @param userFlags flags to be stored with the region, have no effect on the transport, but can be retrieved from the region by the user
|
||||||
@@ -101,8 +101,8 @@ class FairMQTransportFactory
|
|||||||
/// @param path optional parameter to pass to the underlying transport
|
/// @param path optional parameter to pass to the underlying transport
|
||||||
/// @param flags optional parameter to pass to the underlying transport
|
/// @param flags optional parameter to pass to the underlying transport
|
||||||
/// @return pointer to UnmanagedRegion
|
/// @return pointer to UnmanagedRegion
|
||||||
virtual FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, FairMQRegionCallback callback = nullptr, const std::string& path = "", int flags = 0) = 0;
|
virtual FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, FairMQRegionCallback callback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) = 0;
|
||||||
virtual FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, FairMQRegionBulkCallback callback = nullptr, const std::string& path = "", int flags = 0) = 0;
|
virtual FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, FairMQRegionBulkCallback callback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) = 0;
|
||||||
|
|
||||||
/// @brief Subscribe to region events (creation, destruction, ...)
|
/// @brief Subscribe to region events (creation, destruction, ...)
|
||||||
/// @param callback the callback that is called when a region event occurs
|
/// @param callback the callback that is called when a region event occurs
|
||||||
|
@@ -79,6 +79,7 @@ class FairMQUnmanagedRegion
|
|||||||
virtual void SetLinger(uint32_t linger) = 0;
|
virtual void SetLinger(uint32_t linger) = 0;
|
||||||
virtual uint32_t GetLinger() const = 0;
|
virtual uint32_t GetLinger() const = 0;
|
||||||
|
|
||||||
|
virtual fair::mq::Transport GetType() const = 0;
|
||||||
FairMQTransportFactory* GetTransport() { return fTransport; }
|
FairMQTransportFactory* GetTransport() { return fTransport; }
|
||||||
void SetTransport(FairMQTransportFactory* transport) { fTransport = transport; }
|
void SetTransport(FairMQTransportFactory* transport) { fTransport = transport; }
|
||||||
|
|
||||||
@@ -107,6 +108,19 @@ inline std::ostream& operator<<(std::ostream& os, const FairMQRegionEvent& event
|
|||||||
namespace fair::mq
|
namespace fair::mq
|
||||||
{
|
{
|
||||||
|
|
||||||
|
struct RegionConfig {
|
||||||
|
bool lock;
|
||||||
|
bool zero;
|
||||||
|
|
||||||
|
RegionConfig()
|
||||||
|
: lock(false), zero(false)
|
||||||
|
{}
|
||||||
|
|
||||||
|
RegionConfig(bool l, bool z)
|
||||||
|
: lock(l), zero(z)
|
||||||
|
{}
|
||||||
|
};
|
||||||
|
|
||||||
using RegionCallback = FairMQRegionCallback;
|
using RegionCallback = FairMQRegionCallback;
|
||||||
using RegionBulkCallback = FairMQRegionBulkCallback;
|
using RegionBulkCallback = FairMQRegionBulkCallback;
|
||||||
using RegionEventCallback = FairMQRegionEventCallback;
|
using RegionEventCallback = FairMQRegionEventCallback;
|
||||||
|
@@ -200,7 +200,19 @@ auto fair::mq::PluginManager::LoadPluginStatic(const string& pluginName) -> void
|
|||||||
// Load symbol
|
// Load symbol
|
||||||
if (fPluginFactories.find(pluginName) == fPluginFactories.end()) {
|
if (fPluginFactories.find(pluginName) == fPluginFactories.end()) {
|
||||||
try {
|
try {
|
||||||
LoadSymbols(pluginName, dll::program_location());
|
if ("control" == pluginName) {
|
||||||
|
try {
|
||||||
|
fPluginProgOptions.insert({pluginName, plugins::ControlPluginProgramOptions().value()});
|
||||||
|
}
|
||||||
|
catch (const boost::bad_optional_access& e) { /* just ignore, if no prog options are declared */ }
|
||||||
|
} else if ("config" == pluginName) {
|
||||||
|
try {
|
||||||
|
fPluginProgOptions.insert({pluginName, plugins::ConfigPluginProgramOptions().value()});
|
||||||
|
}
|
||||||
|
catch (const boost::bad_optional_access& e) { /* just ignore, if no prog options are declared */ }
|
||||||
|
} else {
|
||||||
|
LoadSymbols(pluginName, dll::program_location());
|
||||||
|
}
|
||||||
fPluginOrder.push_back(pluginName);
|
fPluginOrder.push_back(pluginName);
|
||||||
} catch (boost::system::system_error& e) {
|
} catch (boost::system::system_error& e) {
|
||||||
throw PluginLoadError(ToString("An error occurred while loading static plugin ", pluginName, ": ", e.what()));
|
throw PluginLoadError(ToString("An error occurred while loading static plugin ", pluginName, ": ", e.what()));
|
||||||
@@ -211,7 +223,13 @@ auto fair::mq::PluginManager::LoadPluginStatic(const string& pluginName) -> void
|
|||||||
auto fair::mq::PluginManager::InstantiatePlugin(const string& pluginName) -> void
|
auto fair::mq::PluginManager::InstantiatePlugin(const string& pluginName) -> void
|
||||||
{
|
{
|
||||||
if (fPlugins.find(pluginName) == fPlugins.end()) {
|
if (fPlugins.find(pluginName) == fPlugins.end()) {
|
||||||
fPlugins[pluginName] = fPluginFactories[pluginName](*fPluginServices);
|
if ("control" == pluginName) {
|
||||||
|
fPlugins[pluginName] = plugins::Make_control_Plugin(fPluginServices.get());
|
||||||
|
} else if ("config" == pluginName) {
|
||||||
|
fPlugins[pluginName] = plugins::Make_config_Plugin(fPluginServices.get());
|
||||||
|
} else {
|
||||||
|
fPlugins[pluginName] = fPluginFactories[pluginName](*fPluginServices);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -12,6 +12,7 @@
|
|||||||
#include <fairmq/tools/Strings.h>
|
#include <fairmq/tools/Strings.h>
|
||||||
|
|
||||||
#include <memory>
|
#include <memory>
|
||||||
|
#include <ostream>
|
||||||
#include <stdexcept>
|
#include <stdexcept>
|
||||||
#include <string>
|
#include <string>
|
||||||
#include <unordered_map>
|
#include <unordered_map>
|
||||||
@@ -60,6 +61,11 @@ try {
|
|||||||
throw TransportError(tools::ToString("Unknown transport provided: ", transport));
|
throw TransportError(tools::ToString("Unknown transport provided: ", transport));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
inline std::ostream& operator<<(std::ostream& os, const Transport& transport)
|
||||||
|
{
|
||||||
|
return os << TransportName(transport);
|
||||||
|
}
|
||||||
|
|
||||||
} // namespace fair::mq
|
} // namespace fair::mq
|
||||||
|
|
||||||
#endif /* FAIR_MQ_TRANSPORTS_H */
|
#endif /* FAIR_MQ_TRANSPORTS_H */
|
||||||
|
@@ -41,7 +41,7 @@ class Socket final : public fair::mq::Socket
|
|||||||
|
|
||||||
auto GetId() const -> std::string override { return fId; }
|
auto GetId() const -> std::string override { return fId; }
|
||||||
|
|
||||||
auto Events(uint32_t *events) -> void override { *events = 0; }
|
auto Events(uint32_t *events) -> int override { *events = 0; return -1; }
|
||||||
auto Bind(const std::string& address) -> bool override;
|
auto Bind(const std::string& address) -> bool override;
|
||||||
auto Connect(const std::string& address) -> bool override;
|
auto Connect(const std::string& address) -> bool override;
|
||||||
|
|
||||||
|
@@ -92,22 +92,22 @@ auto TransportFactory::CreatePoller(const unordered_map<string, vector<FairMQCha
|
|||||||
// return PollerPtr{new Poller(channelsMap, channelList)};
|
// return PollerPtr{new Poller(channelsMap, channelList)};
|
||||||
}
|
}
|
||||||
|
|
||||||
auto TransportFactory::CreateUnmanagedRegion(const size_t /*size*/, FairMQRegionCallback /*callback*/, const std::string& /* path = "" */, int /* flags = 0 */) -> UnmanagedRegionPtr
|
auto TransportFactory::CreateUnmanagedRegion(const size_t /*size*/, FairMQRegionCallback /*callback*/, const std::string& /* path = "" */, int /* flags = 0 */, fair::mq::RegionConfig /* cfg = fair::mq::RegionConfig() */) -> UnmanagedRegionPtr
|
||||||
{
|
{
|
||||||
throw runtime_error{"Not yet implemented UMR."};
|
throw runtime_error{"Not yet implemented UMR."};
|
||||||
}
|
}
|
||||||
|
|
||||||
auto TransportFactory::CreateUnmanagedRegion(const size_t /*size*/, FairMQRegionBulkCallback /*callback*/, const std::string& /* path = "" */, int /* flags = 0 */) -> UnmanagedRegionPtr
|
auto TransportFactory::CreateUnmanagedRegion(const size_t /*size*/, FairMQRegionBulkCallback /*callback*/, const std::string& /* path = "" */, int /* flags = 0 */, fair::mq::RegionConfig /* cfg = fair::mq::RegionConfig() */) -> UnmanagedRegionPtr
|
||||||
{
|
{
|
||||||
throw runtime_error{"Not yet implemented UMR."};
|
throw runtime_error{"Not yet implemented UMR."};
|
||||||
}
|
}
|
||||||
|
|
||||||
auto TransportFactory::CreateUnmanagedRegion(const size_t /*size*/, const int64_t /*userFlags*/, FairMQRegionCallback /*callback*/, const std::string& /* path = "" */, int /* flags = 0 */) -> UnmanagedRegionPtr
|
auto TransportFactory::CreateUnmanagedRegion(const size_t /*size*/, const int64_t /*userFlags*/, FairMQRegionCallback /*callback*/, const std::string& /* path = "" */, int /* flags = 0 */, fair::mq::RegionConfig /* cfg = fair::mq::RegionConfig() */) -> UnmanagedRegionPtr
|
||||||
{
|
{
|
||||||
throw runtime_error{"Not yet implemented UMR."};
|
throw runtime_error{"Not yet implemented UMR."};
|
||||||
}
|
}
|
||||||
|
|
||||||
auto TransportFactory::CreateUnmanagedRegion(const size_t /*size*/, const int64_t /*userFlags*/, FairMQRegionBulkCallback /*callback*/, const std::string& /* path = "" */, int /* flags = 0 */) -> UnmanagedRegionPtr
|
auto TransportFactory::CreateUnmanagedRegion(const size_t /*size*/, const int64_t /*userFlags*/, FairMQRegionBulkCallback /*callback*/, const std::string& /* path = "" */, int /* flags = 0 */, fair::mq::RegionConfig /* cfg = fair::mq::RegionConfig() */) -> UnmanagedRegionPtr
|
||||||
{
|
{
|
||||||
throw runtime_error{"Not yet implemented UMR."};
|
throw runtime_error{"Not yet implemented UMR."};
|
||||||
}
|
}
|
||||||
|
@@ -44,10 +44,10 @@ class TransportFactory final : public FairMQTransportFactory
|
|||||||
auto CreatePoller(const std::vector<FairMQChannel*>& channels) const -> PollerPtr override;
|
auto CreatePoller(const std::vector<FairMQChannel*>& channels) const -> PollerPtr override;
|
||||||
auto CreatePoller(const std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, const std::vector<std::string>& channelList) const -> PollerPtr override;
|
auto CreatePoller(const std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, const std::vector<std::string>& channelList) const -> PollerPtr override;
|
||||||
|
|
||||||
auto CreateUnmanagedRegion(const size_t size, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0) -> UnmanagedRegionPtr override;
|
auto CreateUnmanagedRegion(const size_t size, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) -> UnmanagedRegionPtr override;
|
||||||
auto CreateUnmanagedRegion(const size_t size, RegionBulkCallback callback = nullptr, const std::string& path = "", int flags = 0) -> UnmanagedRegionPtr override;
|
auto CreateUnmanagedRegion(const size_t size, RegionBulkCallback callback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) -> UnmanagedRegionPtr override;
|
||||||
auto CreateUnmanagedRegion(const size_t size, int64_t userFlags, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0) -> UnmanagedRegionPtr override;
|
auto CreateUnmanagedRegion(const size_t size, int64_t userFlags, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) -> UnmanagedRegionPtr override;
|
||||||
auto CreateUnmanagedRegion(const size_t size, int64_t userFlags, RegionBulkCallback callback = nullptr, const std::string& path = "", int flags = 0) -> UnmanagedRegionPtr override;
|
auto CreateUnmanagedRegion(const size_t size, int64_t userFlags, RegionBulkCallback callback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) -> UnmanagedRegionPtr override;
|
||||||
|
|
||||||
void SubscribeToRegionEvents(RegionEventCallback /* callback */) override { LOG(error) << "SubscribeToRegionEvents not yet implemented for OFI"; }
|
void SubscribeToRegionEvents(RegionEventCallback /* callback */) override { LOG(error) << "SubscribeToRegionEvents not yet implemented for OFI"; }
|
||||||
bool SubscribedToRegionEvents() override { LOG(error) << "Region event subscriptions not yet implemented for OFI"; return false; }
|
bool SubscribedToRegionEvents() override { LOG(error) << "Region event subscriptions not yet implemented for OFI"; return false; }
|
||||||
|
@@ -65,7 +65,10 @@ struct DDSEnvironment::Impl
|
|||||||
" mkdir -p \"$HOME/.DDS\"\n"
|
" mkdir -p \"$HOME/.DDS\"\n"
|
||||||
" dds-user-defaults --ignore-default-sid -d -c \"$HOME/.DDS/DDS.cfg\"\n"
|
" dds-user-defaults --ignore-default-sid -d -c \"$HOME/.DDS/DDS.cfg\"\n"
|
||||||
"fi\n";
|
"fi\n";
|
||||||
std::system(cmd.str().c_str());
|
auto rc(std::system(cmd.str().c_str()));
|
||||||
|
if (rc != 0) {
|
||||||
|
LOG(warn) << "DDSEnvironment::SetupConfigHome failed";
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
auto SetupPath() -> void
|
auto SetupPath() -> void
|
||||||
|
@@ -44,6 +44,7 @@
|
|||||||
#include <thread>
|
#include <thread>
|
||||||
#include <unordered_map>
|
#include <unordered_map>
|
||||||
#include <utility> // pair
|
#include <utility> // pair
|
||||||
|
#include <tuple>
|
||||||
#include <vector>
|
#include <vector>
|
||||||
|
|
||||||
#include <unistd.h> // getuid
|
#include <unistd.h> // getuid
|
||||||
@@ -58,14 +59,15 @@ class Manager
|
|||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
Manager(const std::string& sessionName, std::string deviceId, size_t size, const ProgOptions* config)
|
Manager(const std::string& sessionName, std::string deviceId, size_t size, const ProgOptions* config)
|
||||||
: fShmId(makeShmIdStr(sessionName))
|
: fShmId64(makeShmIdUint64(sessionName))
|
||||||
|
, fShmId(makeShmIdStr(sessionName))
|
||||||
, fSegmentId(config ? config->GetProperty<uint16_t>("shm-segment-id", 0) : 0)
|
, fSegmentId(config ? config->GetProperty<uint16_t>("shm-segment-id", 0) : 0)
|
||||||
, fDeviceId(std::move(deviceId))
|
, fDeviceId(std::move(deviceId))
|
||||||
, fSegments()
|
, fSegments()
|
||||||
, fManagementSegment(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_mng").c_str(), 6553600)
|
, fManagementSegment(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_mng").c_str(), 6553600)
|
||||||
, fShmVoidAlloc(fManagementSegment.get_segment_manager())
|
, fShmVoidAlloc(fManagementSegment.get_segment_manager())
|
||||||
, fShmMtx(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_mtx").c_str())
|
, fShmMtx(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_mtx").c_str())
|
||||||
, fRegionEventsCV(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_cv").c_str())
|
, fRegionEventsShmCV(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_cv").c_str())
|
||||||
, fRegionEventsSubscriptionActive(false)
|
, fRegionEventsSubscriptionActive(false)
|
||||||
, fNumObservedEvents(0)
|
, fNumObservedEvents(0)
|
||||||
, fDeviceCounter(nullptr)
|
, fDeviceCounter(nullptr)
|
||||||
@@ -73,10 +75,11 @@ class Manager
|
|||||||
, fShmSegments(nullptr)
|
, fShmSegments(nullptr)
|
||||||
, fShmRegions(nullptr)
|
, fShmRegions(nullptr)
|
||||||
, fInterrupted(false)
|
, fInterrupted(false)
|
||||||
, fMsgCounter(0)
|
|
||||||
#ifdef FAIRMQ_DEBUG_MODE
|
#ifdef FAIRMQ_DEBUG_MODE
|
||||||
, fMsgDebug(nullptr)
|
, fMsgDebug(nullptr)
|
||||||
, fShmMsgCounters(nullptr)
|
, fShmMsgCounters(nullptr)
|
||||||
|
, fMsgCounterNew(0)
|
||||||
|
, fMsgCounterDelete(0)
|
||||||
#endif
|
#endif
|
||||||
, fHeartbeatThread()
|
, fHeartbeatThread()
|
||||||
, fSendHeartbeats(true)
|
, fSendHeartbeats(true)
|
||||||
@@ -235,7 +238,7 @@ class Manager
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (!p.empty()) {
|
if (!p.empty()) {
|
||||||
boost::process::spawn(p, "-x", "-m", "--shmid", id, "-d", "-t", "2000", verbose ? "--verbose" : "", env);
|
boost::process::spawn(p, "-x", "-m", "--shmid", id, "-d", "-t", "2000", (verbose ? "--verbose" : ""), env);
|
||||||
int numTries = 0;
|
int numTries = 0;
|
||||||
do {
|
do {
|
||||||
try {
|
try {
|
||||||
@@ -260,10 +263,13 @@ class Manager
|
|||||||
void Resume() { fInterrupted.store(false); }
|
void Resume() { fInterrupted.store(false); }
|
||||||
void Reset()
|
void Reset()
|
||||||
{
|
{
|
||||||
if (fMsgCounter.load() != 0) {
|
#ifdef FAIRMQ_DEBUG_MODE
|
||||||
LOG(error) << "Message counter during Reset expected to be 0, found: " << fMsgCounter.load();
|
auto diff = fMsgCounterNew.load() - fMsgCounterDelete.load();
|
||||||
throw MessageError(tools::ToString("Message counter during Reset expected to be 0, found: ", fMsgCounter.load()));
|
if (diff != 0) {
|
||||||
|
LOG(error) << "Message counter during Reset expected to be 0, found: " << diff;
|
||||||
|
throw MessageError(tools::ToString("Message counter during Reset expected to be 0, found: ", diff));
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
}
|
}
|
||||||
bool Interrupted() { return fInterrupted.load(); }
|
bool Interrupted() { return fInterrupted.load(); }
|
||||||
|
|
||||||
@@ -271,8 +277,9 @@ class Manager
|
|||||||
const int64_t userFlags,
|
const int64_t userFlags,
|
||||||
RegionCallback callback,
|
RegionCallback callback,
|
||||||
RegionBulkCallback bulkCallback,
|
RegionBulkCallback bulkCallback,
|
||||||
const std::string& path = "",
|
const std::string& path,
|
||||||
int flags = 0)
|
int flags,
|
||||||
|
fair::mq::RegionConfig cfg)
|
||||||
{
|
{
|
||||||
using namespace boost::interprocess;
|
using namespace boost::interprocess;
|
||||||
try {
|
try {
|
||||||
@@ -302,22 +309,35 @@ class Manager
|
|||||||
return {nullptr, id};
|
return {nullptr, id};
|
||||||
}
|
}
|
||||||
|
|
||||||
// create region info
|
|
||||||
fShmRegions->emplace(id, RegionInfo(path.c_str(), flags, userFlags, fShmVoidAlloc));
|
|
||||||
|
|
||||||
auto r = fRegions.emplace(id, std::make_unique<Region>(fShmId, id, size, false, callback, bulkCallback, path, flags));
|
auto r = fRegions.emplace(id, std::make_unique<Region>(fShmId, id, size, false, callback, bulkCallback, path, flags));
|
||||||
// LOG(debug) << "Created region with id '" << id << "', path: '" << path << "', flags: '" << flags << "'";
|
// LOG(debug) << "Created region with id '" << id << "', path: '" << path << "', flags: '" << flags << "'";
|
||||||
|
|
||||||
|
if (cfg.lock) {
|
||||||
|
LOG(debug) << "Locking region " << id << "...";
|
||||||
|
if (mlock(r.first->second->fRegion.get_address(), r.first->second->fRegion.get_size()) == -1) {
|
||||||
|
LOG(error) << "Could not lock region " << id << ". Code: " << errno << ", reason: " << strerror(errno);
|
||||||
|
}
|
||||||
|
LOG(debug) << "Successfully locked region " << id << ".";
|
||||||
|
}
|
||||||
|
if (cfg.zero) {
|
||||||
|
LOG(debug) << "Zeroing free memory of region " << id << "...";
|
||||||
|
memset(r.first->second->fRegion.get_address(), 0x00, r.first->second->fRegion.get_size());
|
||||||
|
LOG(debug) << "Successfully zeroed free memory of region " << id << ".";
|
||||||
|
}
|
||||||
|
|
||||||
|
fShmRegions->emplace(id, RegionInfo(path.c_str(), flags, userFlags, fShmVoidAlloc));
|
||||||
|
|
||||||
|
r.first->second->InitializeQueues();
|
||||||
r.first->second->StartReceivingAcks();
|
r.first->second->StartReceivingAcks();
|
||||||
result.first = &(r.first->second->fRegion);
|
result.first = &(r.first->second->fRegion);
|
||||||
result.second = id;
|
result.second = id;
|
||||||
|
|
||||||
(fEventCounter->fCount)++;
|
(fEventCounter->fCount)++;
|
||||||
}
|
}
|
||||||
fRegionEventsCV.notify_all();
|
fRegionsGen += 1; // signal TL cache invalidation
|
||||||
|
fRegionEventsShmCV.notify_all();
|
||||||
|
|
||||||
return result;
|
return result;
|
||||||
|
|
||||||
} catch (interprocess_exception& e) {
|
} catch (interprocess_exception& e) {
|
||||||
LOG(error) << "cannot create region. Already created/not cleaned up?";
|
LOG(error) << "cannot create region. Already created/not cleaned up?";
|
||||||
LOG(error) << e.what();
|
LOG(error) << e.what();
|
||||||
@@ -327,8 +347,28 @@ class Manager
|
|||||||
|
|
||||||
Region* GetRegion(const uint16_t id)
|
Region* GetRegion(const uint16_t id)
|
||||||
{
|
{
|
||||||
|
// NOTE: gcc optimizations. Prevent loading tls addresses many times in the fast path
|
||||||
|
const auto &lTlCache = fTlRegionCache;
|
||||||
|
const auto &lTlCacheVec = lTlCache.fRegionsTLCache;
|
||||||
|
const auto lTlCacheGen = lTlCache.fRegionsTLCacheGen;
|
||||||
|
|
||||||
|
// fast path
|
||||||
|
for (const auto &lRegion : lTlCacheVec) {
|
||||||
|
if ((std::get<1>(lRegion) == id) && (lTlCacheGen == fRegionsGen) && (std::get<2>(lRegion) == fShmId64)) {
|
||||||
|
return std::get<0>(lRegion);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
|
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
|
||||||
return GetRegionUnsafe(id);
|
// slow path: check invalidation
|
||||||
|
if (lTlCacheGen != fRegionsGen) {
|
||||||
|
fTlRegionCache.fRegionsTLCache.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
auto *lRegion = GetRegionUnsafe(id);
|
||||||
|
fTlRegionCache.fRegionsTLCache.emplace_back(std::make_tuple(lRegion, id, fShmId64));
|
||||||
|
fTlRegionCache.fRegionsTLCacheGen = fRegionsGen;
|
||||||
|
return lRegion;
|
||||||
}
|
}
|
||||||
|
|
||||||
Region* GetRegionUnsafe(const uint16_t id)
|
Region* GetRegionUnsafe(const uint16_t id)
|
||||||
@@ -352,7 +392,7 @@ class Manager
|
|||||||
LOG(error) << oor.what();
|
LOG(error) << oor.what();
|
||||||
return nullptr;
|
return nullptr;
|
||||||
} catch (boost::interprocess::interprocess_exception& e) {
|
} catch (boost::interprocess::interprocess_exception& e) {
|
||||||
LOG(warn) << "Could not get remote region for id '" << id << "'";
|
LOG(error) << "Could not get remote region for id '" << id << "': " << e.what();
|
||||||
return nullptr;
|
return nullptr;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -360,13 +400,19 @@ class Manager
|
|||||||
|
|
||||||
void RemoveRegion(const uint16_t id)
|
void RemoveRegion(const uint16_t id)
|
||||||
{
|
{
|
||||||
fRegions.erase(id);
|
try {
|
||||||
{
|
fRegions.at(id)->StopAcks();
|
||||||
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
|
{
|
||||||
fShmRegions->at(id).fDestroyed = true;
|
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
|
||||||
(fEventCounter->fCount)++;
|
fShmRegions->at(id).fDestroyed = true;
|
||||||
|
fRegions.erase(id);
|
||||||
|
(fEventCounter->fCount)++;
|
||||||
|
}
|
||||||
|
fRegionEventsShmCV.notify_all();
|
||||||
|
} catch(std::out_of_range& oor) {
|
||||||
|
LOG(debug) << "RemoveRegion() could not locate region with id '" << id << "'";
|
||||||
}
|
}
|
||||||
fRegionEventsCV.notify_all();
|
fRegionsGen += 1; // signal TL cache invalidation
|
||||||
}
|
}
|
||||||
|
|
||||||
std::vector<fair::mq::RegionInfo> GetRegionInfo()
|
std::vector<fair::mq::RegionInfo> GetRegionInfo()
|
||||||
@@ -387,8 +433,12 @@ class Manager
|
|||||||
info.event = e.second.fDestroyed ? RegionEvent::destroyed : RegionEvent::created;
|
info.event = e.second.fDestroyed ? RegionEvent::destroyed : RegionEvent::created;
|
||||||
if (!e.second.fDestroyed) {
|
if (!e.second.fDestroyed) {
|
||||||
auto region = GetRegionUnsafe(info.id);
|
auto region = GetRegionUnsafe(info.id);
|
||||||
info.ptr = region->fRegion.get_address();
|
if (region) {
|
||||||
info.size = region->fRegion.get_size();
|
info.ptr = region->fRegion.get_address();
|
||||||
|
info.size = region->fRegion.get_size();
|
||||||
|
} else {
|
||||||
|
throw std::runtime_error(tools::ToString("GetRegionInfoUnsafe() could not get region with id '", info.id, "'"));
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
info.ptr = nullptr;
|
info.ptr = nullptr;
|
||||||
info.size = 0;
|
info.size = 0;
|
||||||
@@ -423,7 +473,7 @@ class Manager
|
|||||||
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
|
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
|
||||||
fRegionEventsSubscriptionActive = false;
|
fRegionEventsSubscriptionActive = false;
|
||||||
lock.unlock();
|
lock.unlock();
|
||||||
fRegionEventsCV.notify_all();
|
fRegionEventsShmCV.notify_all();
|
||||||
fRegionEventThread.join();
|
fRegionEventThread.join();
|
||||||
}
|
}
|
||||||
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
|
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
|
||||||
@@ -440,7 +490,7 @@ class Manager
|
|||||||
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
|
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
|
||||||
fRegionEventsSubscriptionActive = false;
|
fRegionEventsSubscriptionActive = false;
|
||||||
lock.unlock();
|
lock.unlock();
|
||||||
fRegionEventsCV.notify_all();
|
fRegionEventsShmCV.notify_all();
|
||||||
fRegionEventThread.join();
|
fRegionEventThread.join();
|
||||||
lock.lock();
|
lock.lock();
|
||||||
fRegionEventCallback = nullptr;
|
fRegionEventCallback = nullptr;
|
||||||
@@ -454,36 +504,49 @@ class Manager
|
|||||||
auto infos = GetRegionInfoUnsafe();
|
auto infos = GetRegionInfoUnsafe();
|
||||||
for (const auto& i : infos) {
|
for (const auto& i : infos) {
|
||||||
auto el = fObservedRegionEvents.find({i.id, i.managed});
|
auto el = fObservedRegionEvents.find({i.id, i.managed});
|
||||||
if (el == fObservedRegionEvents.end()) {
|
if (el == fObservedRegionEvents.end()) { // if event id has not been observed
|
||||||
fRegionEventCallback(i);
|
|
||||||
fObservedRegionEvents.emplace(std::make_pair(i.id, i.managed), i.event);
|
fObservedRegionEvents.emplace(std::make_pair(i.id, i.managed), i.event);
|
||||||
++fNumObservedEvents;
|
// if a region has been created and destroyed rapidly, we could see 'destroyed' without ever seeing 'created'
|
||||||
} else {
|
// TODO: do we care to show 'created' events if we know region is already destroyed?
|
||||||
|
if (i.event == RegionEvent::created) {
|
||||||
|
fRegionEventCallback(i);
|
||||||
|
++fNumObservedEvents;
|
||||||
|
} else {
|
||||||
|
fNumObservedEvents += 2;
|
||||||
|
}
|
||||||
|
} else { // if event id has been observed (expected - there are two events per id - created & destroyed)
|
||||||
|
// fire a callback if we have observed 'created' event and incoming is 'destroyed'
|
||||||
if (el->second == RegionEvent::created && i.event == RegionEvent::destroyed) {
|
if (el->second == RegionEvent::created && i.event == RegionEvent::destroyed) {
|
||||||
fRegionEventCallback(i);
|
fRegionEventCallback(i);
|
||||||
el->second = i.event;
|
el->second = i.event;
|
||||||
++fNumObservedEvents;
|
++fNumObservedEvents;
|
||||||
} else {
|
} else {
|
||||||
// LOG(debug) << "ignoring event for id" << i.id << ":";
|
// LOG(debug) << "ignoring event " << i.id << ": incoming: " << i.event << ", stored: " << el->second;
|
||||||
// LOG(debug) << "incoming event: " << i.event;
|
|
||||||
// LOG(debug) << "stored event: " << el->second;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
fRegionEventsCV.wait(lock, [&] { return !fRegionEventsSubscriptionActive || fNumObservedEvents != fEventCounter->fCount; });
|
fRegionEventsShmCV.wait(lock, [&] { return !fRegionEventsSubscriptionActive || fNumObservedEvents != fEventCounter->fCount; });
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void IncrementMsgCounter() { fMsgCounter.fetch_add(1, std::memory_order_relaxed); }
|
void IncrementMsgCounter()
|
||||||
void DecrementMsgCounter() { fMsgCounter.fetch_sub(1, std::memory_order_relaxed); }
|
{
|
||||||
|
#ifdef FAIRMQ_DEBUG_MODE
|
||||||
|
fMsgCounterNew.fetch_add(1, std::memory_order_relaxed);
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
void DecrementMsgCounter()
|
||||||
|
{
|
||||||
|
#ifdef FAIRMQ_DEBUG_MODE
|
||||||
|
fMsgCounterDelete.fetch_add(1, std::memory_order_relaxed);
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
|
||||||
#ifdef FAIRMQ_DEBUG_MODE
|
#ifdef FAIRMQ_DEBUG_MODE
|
||||||
void IncrementShmMsgCounter(uint16_t segmentId) { ++((*fShmMsgCounters)[segmentId].fCount); }
|
void IncrementShmMsgCounter(uint16_t segmentId) { ++((*fShmMsgCounters)[segmentId].fCount); }
|
||||||
void DecrementShmMsgCounter(uint16_t segmentId) { --((*fShmMsgCounters)[segmentId].fCount); }
|
void DecrementShmMsgCounter(uint16_t segmentId) { --((*fShmMsgCounters)[segmentId].fCount); }
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
boost::interprocess::named_mutex& GetMtx() { return fShmMtx; }
|
|
||||||
|
|
||||||
void SendHeartbeats()
|
void SendHeartbeats()
|
||||||
{
|
{
|
||||||
std::string controlQueueName("fmq_" + fShmId + "_cq");
|
std::string controlQueueName("fmq_" + fShmId + "_cq");
|
||||||
@@ -511,7 +574,7 @@ class Manager
|
|||||||
auto it = fSegments.find(id);
|
auto it = fSegments.find(id);
|
||||||
if (it == fSegments.end()) {
|
if (it == fSegments.end()) {
|
||||||
try {
|
try {
|
||||||
// get region info
|
// get segment info
|
||||||
SegmentInfo segmentInfo = fShmSegments->at(id);
|
SegmentInfo segmentInfo = fShmSegments->at(id);
|
||||||
LOG(debug) << "Located segment with id '" << id << "'";
|
LOG(debug) << "Located segment with id '" << id << "'";
|
||||||
|
|
||||||
@@ -613,6 +676,7 @@ class Manager
|
|||||||
using namespace boost::interprocess;
|
using namespace boost::interprocess;
|
||||||
bool lastRemoved = false;
|
bool lastRemoved = false;
|
||||||
|
|
||||||
|
fRegionsGen += 1; // signal TL cache invalidation
|
||||||
UnsubscribeFromRegionEvents();
|
UnsubscribeFromRegionEvents();
|
||||||
|
|
||||||
{
|
{
|
||||||
@@ -645,6 +709,7 @@ class Manager
|
|||||||
}
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
uint64_t fShmId64;
|
||||||
std::string fShmId;
|
std::string fShmId;
|
||||||
uint16_t fSegmentId;
|
uint16_t fSegmentId;
|
||||||
std::string fDeviceId;
|
std::string fDeviceId;
|
||||||
@@ -653,11 +718,11 @@ class Manager
|
|||||||
VoidAlloc fShmVoidAlloc;
|
VoidAlloc fShmVoidAlloc;
|
||||||
boost::interprocess::named_mutex fShmMtx;
|
boost::interprocess::named_mutex fShmMtx;
|
||||||
|
|
||||||
boost::interprocess::named_condition fRegionEventsCV;
|
boost::interprocess::named_condition fRegionEventsShmCV;
|
||||||
std::thread fRegionEventThread;
|
std::thread fRegionEventThread;
|
||||||
bool fRegionEventsSubscriptionActive;
|
bool fRegionEventsSubscriptionActive;
|
||||||
std::function<void(fair::mq::RegionInfo)> fRegionEventCallback;
|
std::function<void(fair::mq::RegionInfo)> fRegionEventCallback;
|
||||||
std::map<std::pair<uint16_t, bool>, RegionEvent> fObservedRegionEvents;
|
std::map<std::pair<uint16_t, bool>, RegionEvent> fObservedRegionEvents; // pair: <region id, managed>
|
||||||
uint64_t fNumObservedEvents;
|
uint64_t fNumObservedEvents;
|
||||||
|
|
||||||
DeviceCounter* fDeviceCounter;
|
DeviceCounter* fDeviceCounter;
|
||||||
@@ -665,12 +730,20 @@ class Manager
|
|||||||
Uint16SegmentInfoHashMap* fShmSegments;
|
Uint16SegmentInfoHashMap* fShmSegments;
|
||||||
Uint16RegionInfoHashMap* fShmRegions;
|
Uint16RegionInfoHashMap* fShmRegions;
|
||||||
std::unordered_map<uint16_t, std::unique_ptr<Region>> fRegions;
|
std::unordered_map<uint16_t, std::unique_ptr<Region>> fRegions;
|
||||||
|
// make sure this is alone in the cache line: mostly read
|
||||||
|
alignas(128) inline static std::atomic<unsigned long> fRegionsGen = 0ul;
|
||||||
|
inline static thread_local struct ManagerTLCache {
|
||||||
|
unsigned long fRegionsTLCacheGen;
|
||||||
|
std::vector<std::tuple<Region*, uint16_t, uint64_t>> fRegionsTLCache;
|
||||||
|
} fTlRegionCache;
|
||||||
|
|
||||||
std::atomic<bool> fInterrupted;
|
std::atomic<bool> fInterrupted;
|
||||||
std::atomic<int32_t> fMsgCounter; // TODO: find a better lifetime solution instead of the counter
|
|
||||||
#ifdef FAIRMQ_DEBUG_MODE
|
#ifdef FAIRMQ_DEBUG_MODE
|
||||||
|
// make sure the counters are not thrashing the cache line between threads doing creation and deallocation
|
||||||
Uint16MsgDebugMapHashMap* fMsgDebug;
|
Uint16MsgDebugMapHashMap* fMsgDebug;
|
||||||
Uint16MsgCounterHashMap* fShmMsgCounters;
|
Uint16MsgCounterHashMap* fShmMsgCounters;
|
||||||
|
alignas(128) std::atomic_uint64_t fMsgCounterNew;
|
||||||
|
alignas(128) std::atomic_uint64_t fMsgCounterDelete;
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
std::thread fHeartbeatThread;
|
std::thread fHeartbeatThread;
|
||||||
@@ -680,6 +753,8 @@ class Manager
|
|||||||
|
|
||||||
bool fThrowOnBadAlloc;
|
bool fThrowOnBadAlloc;
|
||||||
bool fNoCleanup;
|
bool fNoCleanup;
|
||||||
|
|
||||||
|
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace fair::mq::shmem
|
} // namespace fair::mq::shmem
|
||||||
|
@@ -109,12 +109,17 @@ class Message final : public fair::mq::Message
|
|||||||
, fRegionPtr(nullptr)
|
, fRegionPtr(nullptr)
|
||||||
, fLocalPtr(static_cast<char*>(data))
|
, fLocalPtr(static_cast<char*>(data))
|
||||||
{
|
{
|
||||||
|
if (region->GetType() != GetType()) {
|
||||||
|
LOG(error) << "region type (" << region->GetType() << ") does not match message type (" << GetType() << ")";
|
||||||
|
throw TransportError(tools::ToString("region type (", region->GetType(), ") does not match message type (", GetType(), ")"));
|
||||||
|
}
|
||||||
|
|
||||||
if (reinterpret_cast<const char*>(data) >= reinterpret_cast<const char*>(region->GetData()) &&
|
if (reinterpret_cast<const char*>(data) >= reinterpret_cast<const char*>(region->GetData()) &&
|
||||||
reinterpret_cast<const char*>(data) <= reinterpret_cast<const char*>(region->GetData()) + region->GetSize()) {
|
reinterpret_cast<const char*>(data) <= reinterpret_cast<const char*>(region->GetData()) + region->GetSize()) {
|
||||||
fMeta.fHandle = (boost::interprocess::managed_shared_memory::handle_t)(reinterpret_cast<const char*>(data) - reinterpret_cast<const char*>(region->GetData()));
|
fMeta.fHandle = (boost::interprocess::managed_shared_memory::handle_t)(reinterpret_cast<const char*>(data) - reinterpret_cast<const char*>(region->GetData()));
|
||||||
} else {
|
} else {
|
||||||
LOG(error) << "trying to create region message with data from outside the region";
|
LOG(error) << "trying to create region message with data from outside the region";
|
||||||
throw std::runtime_error("trying to create region message with data from outside the region");
|
throw TransportError("trying to create region message with data from outside the region");
|
||||||
}
|
}
|
||||||
fManager.IncrementMsgCounter();
|
fManager.IncrementMsgCounter();
|
||||||
}
|
}
|
||||||
@@ -304,6 +309,8 @@ class Message final : public fair::mq::Message
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (fRegionPtr) {
|
if (fRegionPtr) {
|
||||||
|
fRegionPtr->InitializeQueues();
|
||||||
|
fRegionPtr->StartSendingAcks();
|
||||||
fRegionPtr->ReleaseBlock({fMeta.fHandle, fMeta.fSize, fMeta.fHint});
|
fRegionPtr->ReleaseBlock({fMeta.fHandle, fMeta.fSize, fMeta.fHint});
|
||||||
} else {
|
} else {
|
||||||
LOG(warn) << "region ack queue for id " << fMeta.fRegionId << " no longer exist. Not sending ack";
|
LOG(warn) << "region ack queue for id " << fMeta.fRegionId << " no longer exist. Not sending ack";
|
||||||
@@ -319,7 +326,7 @@ class Message final : public fair::mq::Message
|
|||||||
Deallocate();
|
Deallocate();
|
||||||
fAlignment = 0;
|
fAlignment = 0;
|
||||||
|
|
||||||
fManager.DecrementMsgCounter(); // TODO: put this to debug mode
|
fManager.DecrementMsgCounter();
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@@ -497,7 +497,7 @@ std::pair<std::string, bool> RunRemoval(std::function<bool(const std::string&)>
|
|||||||
return {name, true};
|
return {name, true};
|
||||||
} else {
|
} else {
|
||||||
if (verbose) {
|
if (verbose) {
|
||||||
LOG(info) << "Did not remove '" << name << "'. Already removed?";
|
LOG(debug) << "Did not remove '" << name << "'. Already removed?";
|
||||||
}
|
}
|
||||||
return {name, false};
|
return {name, false};
|
||||||
}
|
}
|
||||||
@@ -525,7 +525,7 @@ std::vector<std::pair<std::string, bool>> Monitor::Cleanup(const ShmId& shmId, b
|
|||||||
RegionCounter* rc = managementSegment.find<RegionCounter>(bipc::unique_instance).first;
|
RegionCounter* rc = managementSegment.find<RegionCounter>(bipc::unique_instance).first;
|
||||||
if (rc) {
|
if (rc) {
|
||||||
if (verbose) {
|
if (verbose) {
|
||||||
LOG(info) << "Region counter found: " << rc->fCount;
|
LOG(debug) << "Region counter found: " << rc->fCount;
|
||||||
}
|
}
|
||||||
uint16_t regionCount = rc->fCount;
|
uint16_t regionCount = rc->fCount;
|
||||||
|
|
||||||
|
@@ -1,4 +1,4 @@
|
|||||||
# Shared Memory transport
|
## Shared Memory transport
|
||||||
|
|
||||||
Shared memory transport for FairMQ. To try with existing devices, run the devices with `--transport shmem` option or configure channel transport in JSON (see examples/MQ/multiple-transports).
|
Shared memory transport for FairMQ. To try with existing devices, run the devices with `--transport shmem` option or configure channel transport in JSON (see examples/MQ/multiple-transports).
|
||||||
|
|
||||||
@@ -6,7 +6,7 @@ The transport manages shared memory via boost::interprocess library. The transfe
|
|||||||
|
|
||||||
Devices track and cleanup shared memory on shutdown. For more information on the current shared memory segment and additional cleanup options, see following section.
|
Devices track and cleanup shared memory on shutdown. For more information on the current shared memory segment and additional cleanup options, see following section.
|
||||||
|
|
||||||
# Shared Memory objects / files
|
## Shared Memory objects / files
|
||||||
|
|
||||||
FairMQ Shared Memory currently uses the following names to register shared memory on the system:
|
FairMQ Shared Memory currently uses the following names to register shared memory on the system:
|
||||||
|
|
||||||
@@ -53,3 +53,7 @@ Additional cmd options:
|
|||||||
For full option details, run with `-h`.
|
For full option details, run with `-h`.
|
||||||
|
|
||||||
The Monitor class can also be used independently from the supplied executable, allowing integration on any level.
|
The Monitor class can also be used independently from the supplied executable, allowing integration on any level.
|
||||||
|
|
||||||
|
## Troubleshooting
|
||||||
|
|
||||||
|
Bus Error (SIGBUS) can occur if the transport tries to access shared memory that is not accessible. One reason could be because the used memory in the segment exceeds the capacity or available memory of the shmem filesystem (capacity is by default set to half of RAM on Linux).
|
||||||
|
@@ -47,7 +47,7 @@ struct Region
|
|||||||
Region(const std::string& shmId, uint16_t id, uint64_t size, bool remote, RegionCallback callback, RegionBulkCallback bulkCallback, const std::string& path, int flags)
|
Region(const std::string& shmId, uint16_t id, uint64_t size, bool remote, RegionCallback callback, RegionBulkCallback bulkCallback, const std::string& path, int flags)
|
||||||
: fRemote(remote)
|
: fRemote(remote)
|
||||||
, fLinger(100)
|
, fLinger(100)
|
||||||
, fStop(false)
|
, fStopAcks(false)
|
||||||
, fName("fmq_" + shmId + "_rg_" + std::to_string(id))
|
, fName("fmq_" + shmId + "_rg_" + std::to_string(id))
|
||||||
, fQueueName("fmq_" + shmId + "_rgq_" + std::to_string(id))
|
, fQueueName("fmq_" + shmId + "_rgq_" + std::to_string(id))
|
||||||
, fShmemObject()
|
, fShmemObject()
|
||||||
@@ -85,18 +85,26 @@ struct Region
|
|||||||
LOG(debug) << "shmem: initialized file: " << fName;
|
LOG(debug) << "shmem: initialized file: " << fName;
|
||||||
fRegion = mapped_region(fFileMapping, read_write, 0, size, 0, flags);
|
fRegion = mapped_region(fFileMapping, read_write, 0, size, 0, flags);
|
||||||
} else {
|
} else {
|
||||||
if (fRemote) {
|
try {
|
||||||
fShmemObject = shared_memory_object(open_only, fName.c_str(), read_write);
|
if (fRemote) {
|
||||||
} else {
|
fShmemObject = shared_memory_object(open_only, fName.c_str(), read_write);
|
||||||
fShmemObject = shared_memory_object(create_only, fName.c_str(), read_write);
|
} else {
|
||||||
fShmemObject.truncate(size);
|
fShmemObject = shared_memory_object(create_only, fName.c_str(), read_write);
|
||||||
|
fShmemObject.truncate(size);
|
||||||
|
}
|
||||||
|
} catch(interprocess_exception& e) {
|
||||||
|
LOG(error) << "Failed " << (fRemote ? "opening" : "creating") << " shared_memory_object for region id '" << id << "': " << e.what();
|
||||||
|
throw;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
fRegion = mapped_region(fShmemObject, read_write, 0, 0, 0, flags);
|
||||||
|
} catch(interprocess_exception& e) {
|
||||||
|
LOG(error) << "Failed mapping shared_memory_object for region id '" << id << "': " << e.what();
|
||||||
|
throw;
|
||||||
}
|
}
|
||||||
fRegion = mapped_region(fShmemObject, read_write, 0, 0, 0, flags);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
InitializeQueues();
|
LOG(trace) << "shmem: initialized region: " << fName << " (" << (fRemote ? "remote" : "local") << ")";
|
||||||
StartSendingAcks();
|
|
||||||
LOG(debug) << "shmem: initialized region: " << fName;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Region() = delete;
|
Region() = delete;
|
||||||
@@ -108,15 +116,22 @@ struct Region
|
|||||||
{
|
{
|
||||||
using namespace boost::interprocess;
|
using namespace boost::interprocess;
|
||||||
|
|
||||||
if (fRemote) {
|
if (fQueue == nullptr) {
|
||||||
fQueue = std::make_unique<message_queue>(open_only, fQueueName.c_str());
|
if (fRemote) {
|
||||||
} else {
|
fQueue = std::make_unique<message_queue>(open_only, fQueueName.c_str());
|
||||||
fQueue = std::make_unique<message_queue>(create_only, fQueueName.c_str(), 1024, fAckBunchSize * sizeof(RegionBlock));
|
} else {
|
||||||
|
fQueue = std::make_unique<message_queue>(create_only, fQueueName.c_str(), 1024, fAckBunchSize * sizeof(RegionBlock));
|
||||||
|
}
|
||||||
|
LOG(trace) << "shmem: initialized region queue: " << fQueueName << " (" << (fRemote ? "remote" : "local") << ")";
|
||||||
}
|
}
|
||||||
LOG(debug) << "shmem: initialized region queue: " << fQueueName;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void StartSendingAcks() { fAcksSender = std::thread(&Region::SendAcks, this); }
|
void StartSendingAcks()
|
||||||
|
{
|
||||||
|
if (!fAcksSender.joinable()) {
|
||||||
|
fAcksSender = std::thread(&Region::SendAcks, this);
|
||||||
|
}
|
||||||
|
}
|
||||||
void SendAcks()
|
void SendAcks()
|
||||||
{
|
{
|
||||||
std::unique_ptr<RegionBlock[]> blocks = std::make_unique<RegionBlock[]>(fAckBunchSize);
|
std::unique_ptr<RegionBlock[]> blocks = std::make_unique<RegionBlock[]>(fAckBunchSize);
|
||||||
@@ -140,13 +155,13 @@ struct Region
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (blocksToSend > 0) {
|
if (blocksToSend > 0) {
|
||||||
while (!fQueue->try_send(blocks.get(), blocksToSend * sizeof(RegionBlock), 0) && !fStop) {
|
while (!fQueue->try_send(blocks.get(), blocksToSend * sizeof(RegionBlock), 0) && !fStopAcks) {
|
||||||
// receiver slow? yield and try again...
|
// receiver slow? yield and try again...
|
||||||
std::this_thread::yield();
|
std::this_thread::yield();
|
||||||
}
|
}
|
||||||
// LOG(debug) << "Sent " << blocksToSend << " blocks.";
|
// LOG(debug) << "Sent " << blocksToSend << " blocks.";
|
||||||
} else { // blocksToSend == 0
|
} else { // blocksToSend == 0
|
||||||
if (fStop) {
|
if (fStopAcks) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -156,7 +171,12 @@ struct Region
|
|||||||
<< " blocks left to send: " << blocksToSend << ").";
|
<< " blocks left to send: " << blocksToSend << ").";
|
||||||
}
|
}
|
||||||
|
|
||||||
void StartReceivingAcks() { fAcksReceiver = std::thread(&Region::ReceiveAcks, this); }
|
void StartReceivingAcks()
|
||||||
|
{
|
||||||
|
if (!fAcksReceiver.joinable()) {
|
||||||
|
fAcksReceiver = std::thread(&Region::ReceiveAcks, this);
|
||||||
|
}
|
||||||
|
}
|
||||||
void ReceiveAcks()
|
void ReceiveAcks()
|
||||||
{
|
{
|
||||||
unsigned int priority;
|
unsigned int priority;
|
||||||
@@ -168,7 +188,7 @@ struct Region
|
|||||||
while (true) {
|
while (true) {
|
||||||
uint32_t timeout = 100;
|
uint32_t timeout = 100;
|
||||||
bool leave = false;
|
bool leave = false;
|
||||||
if (fStop) {
|
if (fStopAcks) {
|
||||||
timeout = fLinger;
|
timeout = fLinger;
|
||||||
leave = true;
|
leave = true;
|
||||||
}
|
}
|
||||||
@@ -213,9 +233,25 @@ struct Region
|
|||||||
void SetLinger(uint32_t linger) { fLinger = linger; }
|
void SetLinger(uint32_t linger) { fLinger = linger; }
|
||||||
uint32_t GetLinger() const { return fLinger; }
|
uint32_t GetLinger() const { return fLinger; }
|
||||||
|
|
||||||
|
void StopAcks()
|
||||||
|
{
|
||||||
|
fStopAcks = true;
|
||||||
|
|
||||||
|
if (fAcksSender.joinable()) {
|
||||||
|
fBlockSendCV.notify_one();
|
||||||
|
fAcksSender.join();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!fRemote) {
|
||||||
|
if (fAcksReceiver.joinable()) {
|
||||||
|
fAcksReceiver.join();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
~Region()
|
~Region()
|
||||||
{
|
{
|
||||||
fStop = true;
|
fStopAcks = true;
|
||||||
|
|
||||||
if (fAcksSender.joinable()) {
|
if (fAcksSender.joinable()) {
|
||||||
fBlockSendCV.notify_one();
|
fBlockSendCV.notify_one();
|
||||||
@@ -228,11 +264,11 @@ struct Region
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (boost::interprocess::shared_memory_object::remove(fName.c_str())) {
|
if (boost::interprocess::shared_memory_object::remove(fName.c_str())) {
|
||||||
LOG(debug) << "Region '" << fName << "' destroyed.";
|
LOG(trace) << "Region '" << fName << "' destroyed.";
|
||||||
}
|
}
|
||||||
|
|
||||||
if (boost::interprocess::file_mapping::remove(fName.c_str())) {
|
if (boost::interprocess::file_mapping::remove(fName.c_str())) {
|
||||||
LOG(debug) << "File mapping '" << fName << "' destroyed.";
|
LOG(trace) << "File mapping '" << fName << "' destroyed.";
|
||||||
}
|
}
|
||||||
|
|
||||||
if (fFile) {
|
if (fFile) {
|
||||||
@@ -240,19 +276,18 @@ struct Region
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (boost::interprocess::message_queue::remove(fQueueName.c_str())) {
|
if (boost::interprocess::message_queue::remove(fQueueName.c_str())) {
|
||||||
LOG(debug) << "Region queue '" << fQueueName << "' destroyed.";
|
LOG(trace) << "Region queue '" << fQueueName << "' destroyed.";
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// LOG(debug) << "shmem: region '" << fName << "' is remote, no cleanup necessary.";
|
// LOG(debug) << "Region queue '" << fQueueName << "' is remote, no cleanup necessary";
|
||||||
LOG(debug) << "Region queue '" << fQueueName << "' is remote, no cleanup necessary";
|
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG(debug) << "Region '" << fName << "' (" << (fRemote ? "remote" : "local") << ") destructed.";
|
// LOG(debug) << "Region '" << fName << "' (" << (fRemote ? "remote" : "local") << ") destructed.";
|
||||||
}
|
}
|
||||||
|
|
||||||
bool fRemote;
|
bool fRemote;
|
||||||
uint32_t fLinger;
|
uint32_t fLinger;
|
||||||
std::atomic<bool> fStop;
|
std::atomic<bool> fStopAcks;
|
||||||
std::string fName;
|
std::string fName;
|
||||||
std::string fQueueName;
|
std::string fQueueName;
|
||||||
boost::interprocess::shared_memory_object fShmemObject;
|
boost::interprocess::shared_memory_object fShmemObject;
|
||||||
|
@@ -378,12 +378,10 @@ class Socket final : public fair::mq::Socket
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void Events(uint32_t* events) override
|
int Events(uint32_t* events) override
|
||||||
{
|
{
|
||||||
size_t eventsSize = sizeof(uint32_t);
|
size_t eventsSize = sizeof(uint32_t);
|
||||||
if (zmq_getsockopt(fSocket, ZMQ_EVENTS, events, &eventsSize) < 0) {
|
return zmq_getsockopt(fSocket, ZMQ_EVENTS, events, &eventsSize);
|
||||||
throw SocketError(tools::ToString("failed setting ZMQ_EVENTS, reason: ", zmq_strerror(errno)));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int GetLinger() const override
|
int GetLinger() const override
|
||||||
|
@@ -141,29 +141,29 @@ class TransportFactory final : public fair::mq::TransportFactory
|
|||||||
return std::make_unique<Poller>(channelsMap, channelList);
|
return std::make_unique<Poller>(channelsMap, channelList);
|
||||||
}
|
}
|
||||||
|
|
||||||
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0) override
|
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) override
|
||||||
{
|
{
|
||||||
return CreateUnmanagedRegion(size, 0, callback, nullptr, path, flags);
|
return CreateUnmanagedRegion(size, 0, callback, nullptr, path, flags, cfg);
|
||||||
}
|
}
|
||||||
|
|
||||||
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, RegionBulkCallback bulkCallback = nullptr, const std::string& path = "", int flags = 0) override
|
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, RegionBulkCallback bulkCallback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) override
|
||||||
{
|
{
|
||||||
return CreateUnmanagedRegion(size, 0, nullptr, bulkCallback, path, flags);
|
return CreateUnmanagedRegion(size, 0, nullptr, bulkCallback, path, flags, cfg);
|
||||||
}
|
}
|
||||||
|
|
||||||
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, int64_t userFlags, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0) override
|
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, int64_t userFlags, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) override
|
||||||
{
|
{
|
||||||
return CreateUnmanagedRegion(size, userFlags, callback, nullptr, path, flags);
|
return CreateUnmanagedRegion(size, userFlags, callback, nullptr, path, flags, cfg);
|
||||||
}
|
}
|
||||||
|
|
||||||
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, int64_t userFlags, RegionBulkCallback bulkCallback = nullptr, const std::string& path = "", int flags = 0) override
|
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, int64_t userFlags, RegionBulkCallback bulkCallback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) override
|
||||||
{
|
{
|
||||||
return CreateUnmanagedRegion(size, userFlags, nullptr, bulkCallback, path, flags);
|
return CreateUnmanagedRegion(size, userFlags, nullptr, bulkCallback, path, flags, cfg);
|
||||||
}
|
}
|
||||||
|
|
||||||
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, int64_t userFlags, RegionCallback callback, RegionBulkCallback bulkCallback, const std::string& path, int flags)
|
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, int64_t userFlags, RegionCallback callback, RegionBulkCallback bulkCallback, const std::string& path, int flags, fair::mq::RegionConfig cfg)
|
||||||
{
|
{
|
||||||
return std::make_unique<UnmanagedRegion>(*fManager, size, userFlags, callback, bulkCallback, path, flags, this);
|
return std::make_unique<UnmanagedRegion>(*fManager, size, userFlags, callback, bulkCallback, path, flags, this, cfg);
|
||||||
}
|
}
|
||||||
|
|
||||||
void SubscribeToRegionEvents(RegionEventCallback callback) override { fManager->SubscribeToRegionEvents(callback); }
|
void SubscribeToRegionEvents(RegionEventCallback callback) override { fManager->SubscribeToRegionEvents(callback); }
|
||||||
|
@@ -37,15 +37,16 @@ class UnmanagedRegion final : public fair::mq::UnmanagedRegion
|
|||||||
const int64_t userFlags,
|
const int64_t userFlags,
|
||||||
RegionCallback callback,
|
RegionCallback callback,
|
||||||
RegionBulkCallback bulkCallback,
|
RegionBulkCallback bulkCallback,
|
||||||
const std::string& path = "",
|
const std::string& path,
|
||||||
int flags = 0,
|
int flags,
|
||||||
FairMQTransportFactory* factory = nullptr)
|
FairMQTransportFactory* factory,
|
||||||
|
fair::mq::RegionConfig cfg)
|
||||||
: FairMQUnmanagedRegion(factory)
|
: FairMQUnmanagedRegion(factory)
|
||||||
, fManager(manager)
|
, fManager(manager)
|
||||||
, fRegion(nullptr)
|
, fRegion(nullptr)
|
||||||
, fRegionId(0)
|
, fRegionId(0)
|
||||||
{
|
{
|
||||||
auto result = fManager.CreateRegion(size, userFlags, callback, bulkCallback, path, flags);
|
auto result = fManager.CreateRegion(size, userFlags, callback, bulkCallback, path, flags, cfg);
|
||||||
fRegion = result.first;
|
fRegion = result.first;
|
||||||
fRegionId = result.second;
|
fRegionId = result.second;
|
||||||
}
|
}
|
||||||
@@ -56,6 +57,8 @@ class UnmanagedRegion final : public fair::mq::UnmanagedRegion
|
|||||||
void SetLinger(uint32_t linger) override { fManager.GetRegion(fRegionId)->SetLinger(linger); }
|
void SetLinger(uint32_t linger) override { fManager.GetRegion(fRegionId)->SetLinger(linger); }
|
||||||
uint32_t GetLinger() const override { return fManager.GetRegion(fRegionId)->GetLinger(); }
|
uint32_t GetLinger() const override { return fManager.GetRegion(fRegionId)->GetLinger(); }
|
||||||
|
|
||||||
|
Transport GetType() const override { return fair::mq::Transport::SHM; }
|
||||||
|
|
||||||
~UnmanagedRegion() override { fManager.RemoveRegion(fRegionId); }
|
~UnmanagedRegion() override { fManager.RemoveRegion(fRegionId); }
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
@@ -89,6 +89,7 @@ int main(int argc, char** argv)
|
|||||||
bool listAll = false;
|
bool listAll = false;
|
||||||
string listAllPath;
|
string listAllPath;
|
||||||
bool verbose = false;
|
bool verbose = false;
|
||||||
|
string severity;
|
||||||
int userId = -1;
|
int userId = -1;
|
||||||
|
|
||||||
options_description desc("Options");
|
options_description desc("Options");
|
||||||
@@ -109,6 +110,7 @@ int main(int argc, char** argv)
|
|||||||
("list-all" , value<bool>(&listAll)->implicit_value(true), "List all sessions & segments")
|
("list-all" , value<bool>(&listAll)->implicit_value(true), "List all sessions & segments")
|
||||||
("list-all-path" , value<string>(&listAllPath)->default_value("/dev/shm/"),"Path for the --list-all command to search segments in")
|
("list-all-path" , value<string>(&listAllPath)->default_value("/dev/shm/"),"Path for the --list-all command to search segments in")
|
||||||
("verbose" , value<bool>(&verbose)->implicit_value(true), "Verbose mode (daemon will output to a file 'fairmq-shmmonitor_<timestamp>')")
|
("verbose" , value<bool>(&verbose)->implicit_value(true), "Verbose mode (daemon will output to a file 'fairmq-shmmonitor_<timestamp>')")
|
||||||
|
("severity" , value<string>(&severity)->default_value("info"), "Log severity")
|
||||||
("user-id" , value<int>(&userId)->default_value(-1), "User id (used with --get-shmid)")
|
("user-id" , value<int>(&userId)->default_value(-1), "User id (used with --get-shmid)")
|
||||||
("help,h", "Print help");
|
("help,h", "Print help");
|
||||||
|
|
||||||
@@ -122,6 +124,8 @@ int main(int argc, char** argv)
|
|||||||
|
|
||||||
notify(vm);
|
notify(vm);
|
||||||
|
|
||||||
|
fair::Logger::SetConsoleSeverity(severity);
|
||||||
|
|
||||||
if (getShmId) {
|
if (getShmId) {
|
||||||
if (userId == -1) {
|
if (userId == -1) {
|
||||||
LOG(info) << "shmem id for session '" << sessionName << "' and current user id " << geteuid()
|
LOG(info) << "shmem id for session '" << sessionName << "' and current user id " << geteuid()
|
||||||
|
@@ -109,6 +109,11 @@ class Message final : public fair::mq::Message
|
|||||||
, fAlignment(0)
|
, fAlignment(0)
|
||||||
, fMsg(std::make_unique<zmq_msg_t>())
|
, fMsg(std::make_unique<zmq_msg_t>())
|
||||||
{
|
{
|
||||||
|
if (region->GetType() != GetType()) {
|
||||||
|
LOG(error) << "region type (" << region->GetType() << ") does not match message type (" << GetType() << ")";
|
||||||
|
throw TransportError(tools::ToString("region type (", region->GetType(), ") does not match message type (", GetType(), ")"));
|
||||||
|
}
|
||||||
|
|
||||||
// FIXME: make this zero-copy:
|
// FIXME: make this zero-copy:
|
||||||
// simply taking over the provided buffer can casue premature delete, since region could be
|
// simply taking over the provided buffer can casue premature delete, since region could be
|
||||||
// destroyed before the message is sent out. Needs lifetime extension for the ZMQ region.
|
// destroyed before the message is sent out. Needs lifetime extension for the ZMQ region.
|
||||||
|
@@ -323,12 +323,10 @@ class Socket final : public fair::mq::Socket
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void Events(uint32_t* events) override
|
int Events(uint32_t* events) override
|
||||||
{
|
{
|
||||||
size_t eventsSize = sizeof(uint32_t);
|
size_t eventsSize = sizeof(uint32_t);
|
||||||
if (zmq_getsockopt(fSocket, ZMQ_EVENTS, events, &eventsSize) < 0) {
|
return zmq_getsockopt(fSocket, ZMQ_EVENTS, events, &eventsSize);
|
||||||
throw SocketError(tools::ToString("failed setting ZMQ_EVENTS, reason: ", zmq_strerror(errno)));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void SetLinger(const int value) override
|
void SetLinger(const int value) override
|
||||||
|
@@ -96,29 +96,29 @@ class TransportFactory final : public FairMQTransportFactory
|
|||||||
return std::make_unique<Poller>(channelsMap, channelList);
|
return std::make_unique<Poller>(channelsMap, channelList);
|
||||||
}
|
}
|
||||||
|
|
||||||
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, RegionCallback callback, const std::string& path = "", int flags = 0) override
|
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, RegionCallback callback, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) override
|
||||||
{
|
{
|
||||||
return CreateUnmanagedRegion(size, 0, callback, nullptr, path, flags);
|
return CreateUnmanagedRegion(size, 0, callback, nullptr, path, flags, cfg);
|
||||||
}
|
}
|
||||||
|
|
||||||
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, RegionBulkCallback bulkCallback, const std::string& path = "", int flags = 0) override
|
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, RegionBulkCallback bulkCallback, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) override
|
||||||
{
|
{
|
||||||
return CreateUnmanagedRegion(size, 0, nullptr, bulkCallback, path, flags);
|
return CreateUnmanagedRegion(size, 0, nullptr, bulkCallback, path, flags, cfg);
|
||||||
}
|
}
|
||||||
|
|
||||||
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, RegionCallback callback, const std::string& path = "", int flags = 0) override
|
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, RegionCallback callback, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) override
|
||||||
{
|
{
|
||||||
return CreateUnmanagedRegion(size, userFlags, callback, nullptr, path, flags);
|
return CreateUnmanagedRegion(size, userFlags, callback, nullptr, path, flags, cfg);
|
||||||
}
|
}
|
||||||
|
|
||||||
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, RegionBulkCallback bulkCallback, const std::string& path = "", int flags = 0) override
|
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, RegionBulkCallback bulkCallback, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) override
|
||||||
{
|
{
|
||||||
return CreateUnmanagedRegion(size, userFlags, nullptr, bulkCallback, path, flags);
|
return CreateUnmanagedRegion(size, userFlags, nullptr, bulkCallback, path, flags, cfg);
|
||||||
}
|
}
|
||||||
|
|
||||||
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, RegionCallback callback, RegionBulkCallback bulkCallback, const std::string&, int)
|
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, RegionCallback callback, RegionBulkCallback bulkCallback, const std::string&, int /* flags */, fair::mq::RegionConfig cfg)
|
||||||
{
|
{
|
||||||
UnmanagedRegionPtr ptr = std::make_unique<UnmanagedRegion>(*fCtx, size, userFlags, callback, bulkCallback, this);
|
UnmanagedRegionPtr ptr = std::make_unique<UnmanagedRegion>(*fCtx, size, userFlags, callback, bulkCallback, this, cfg);
|
||||||
auto zPtr = static_cast<UnmanagedRegion*>(ptr.get());
|
auto zPtr = static_cast<UnmanagedRegion*>(ptr.get());
|
||||||
fCtx->AddRegion(false, zPtr->GetId(), zPtr->GetData(), zPtr->GetSize(), zPtr->GetUserFlags(), RegionEvent::created);
|
fCtx->AddRegion(false, zPtr->GetId(), zPtr->GetData(), zPtr->GetSize(), zPtr->GetUserFlags(), RegionEvent::created);
|
||||||
return ptr;
|
return ptr;
|
||||||
|
@@ -16,6 +16,8 @@
|
|||||||
#include <cstddef> // size_t
|
#include <cstddef> // size_t
|
||||||
#include <string>
|
#include <string>
|
||||||
|
|
||||||
|
#include <sys/mman.h> // mlock
|
||||||
|
|
||||||
namespace fair::mq::zmq
|
namespace fair::mq::zmq
|
||||||
{
|
{
|
||||||
|
|
||||||
@@ -30,7 +32,8 @@ class UnmanagedRegion final : public fair::mq::UnmanagedRegion
|
|||||||
int64_t userFlags,
|
int64_t userFlags,
|
||||||
RegionCallback callback,
|
RegionCallback callback,
|
||||||
RegionBulkCallback bulkCallback,
|
RegionBulkCallback bulkCallback,
|
||||||
FairMQTransportFactory* factory = nullptr)
|
FairMQTransportFactory* factory,
|
||||||
|
fair::mq::RegionConfig cfg)
|
||||||
: fair::mq::UnmanagedRegion(factory)
|
: fair::mq::UnmanagedRegion(factory)
|
||||||
, fCtx(ctx)
|
, fCtx(ctx)
|
||||||
, fId(fCtx.RegionCount())
|
, fId(fCtx.RegionCount())
|
||||||
@@ -39,7 +42,20 @@ class UnmanagedRegion final : public fair::mq::UnmanagedRegion
|
|||||||
, fUserFlags(userFlags)
|
, fUserFlags(userFlags)
|
||||||
, fCallback(callback)
|
, fCallback(callback)
|
||||||
, fBulkCallback(bulkCallback)
|
, fBulkCallback(bulkCallback)
|
||||||
{}
|
{
|
||||||
|
if (cfg.lock) {
|
||||||
|
LOG(debug) << "Locking region " << fId << "...";
|
||||||
|
if (mlock(fBuffer, fSize) == -1) {
|
||||||
|
LOG(error) << "Could not lock region " << fId << ". Code: " << errno << ", reason: " << strerror(errno);
|
||||||
|
}
|
||||||
|
LOG(debug) << "Successfully locked region " << fId << ".";
|
||||||
|
}
|
||||||
|
if (cfg.zero) {
|
||||||
|
LOG(debug) << "Zeroing free memory of region " << fId << "...";
|
||||||
|
memset(fBuffer, 0x00, fSize);
|
||||||
|
LOG(debug) << "Successfully zeroed free memory of region " << fId << ".";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
UnmanagedRegion(const UnmanagedRegion&) = delete;
|
UnmanagedRegion(const UnmanagedRegion&) = delete;
|
||||||
UnmanagedRegion operator=(const UnmanagedRegion&) = delete;
|
UnmanagedRegion operator=(const UnmanagedRegion&) = delete;
|
||||||
@@ -51,6 +67,8 @@ class UnmanagedRegion final : public fair::mq::UnmanagedRegion
|
|||||||
void SetLinger(uint32_t /* linger */) override { LOG(debug) << "ZeroMQ UnmanagedRegion linger option not implemented. Acknowledgements are local."; }
|
void SetLinger(uint32_t /* linger */) override { LOG(debug) << "ZeroMQ UnmanagedRegion linger option not implemented. Acknowledgements are local."; }
|
||||||
uint32_t GetLinger() const override { LOG(debug) << "ZeroMQ UnmanagedRegion linger option not implemented. Acknowledgements are local."; return 0; }
|
uint32_t GetLinger() const override { LOG(debug) << "ZeroMQ UnmanagedRegion linger option not implemented. Acknowledgements are local."; return 0; }
|
||||||
|
|
||||||
|
Transport GetType() const override { return Transport::ZMQ; }
|
||||||
|
|
||||||
virtual ~UnmanagedRegion()
|
virtual ~UnmanagedRegion()
|
||||||
{
|
{
|
||||||
LOG(debug) << "destroying region " << fId;
|
LOG(debug) << "destroying region " << fId;
|
||||||
|
@@ -153,9 +153,11 @@ TEST(ErrorState, interactive_InReset)
|
|||||||
EXPECT_EXIT(RunErrorStateIn("Reset", "interactive", "q"), ::testing::ExitedWithCode(1), "");
|
EXPECT_EXIT(RunErrorStateIn("Reset", "interactive", "q"), ::testing::ExitedWithCode(1), "");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#ifdef FAIRMQ_DEBUG_MODE
|
||||||
TEST(ErrorState, OrphanMessages)
|
TEST(ErrorState, OrphanMessages)
|
||||||
{
|
{
|
||||||
BadDevice badDevice;
|
BadDevice badDevice;
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
} // namespace
|
} // namespace
|
||||||
|
@@ -23,6 +23,67 @@ namespace
|
|||||||
|
|
||||||
using namespace std;
|
using namespace std;
|
||||||
|
|
||||||
|
void RegionsCache(const string& transport, const string& address)
|
||||||
|
{
|
||||||
|
size_t session1 = fair::mq::tools::UuidHash();
|
||||||
|
size_t session2 = fair::mq::tools::UuidHash();
|
||||||
|
|
||||||
|
fair::mq::ProgOptions config1;
|
||||||
|
fair::mq::ProgOptions config2;
|
||||||
|
config1.SetProperty<string>("session", to_string(session1));
|
||||||
|
config2.SetProperty<string>("session", to_string(session2));
|
||||||
|
|
||||||
|
auto factory1 = FairMQTransportFactory::CreateTransportFactory(transport, fair::mq::tools::Uuid(), &config1);
|
||||||
|
auto factory2 = FairMQTransportFactory::CreateTransportFactory(transport, fair::mq::tools::Uuid(), &config2);
|
||||||
|
|
||||||
|
auto region1 = factory1->CreateUnmanagedRegion(1000000, [](void*, size_t, void*) {});
|
||||||
|
auto region2 = factory2->CreateUnmanagedRegion(1000000, [](void*, size_t, void*) {});
|
||||||
|
void* r1ptr = region1->GetData();
|
||||||
|
void* r2ptr = region2->GetData();
|
||||||
|
|
||||||
|
FairMQChannel push1("Push1", "push", factory1);
|
||||||
|
FairMQChannel pull1("Pull1", "pull", factory1);
|
||||||
|
push1.Bind(address + to_string(1));
|
||||||
|
pull1.Connect(address + to_string(1));
|
||||||
|
FairMQChannel push2("Push2", "push", factory2);
|
||||||
|
FairMQChannel pull2("Pull2", "pull", factory2);
|
||||||
|
push2.Bind(address + to_string(2));
|
||||||
|
pull2.Connect(address + to_string(2));
|
||||||
|
|
||||||
|
{
|
||||||
|
static_cast<char*>(r1ptr)[0] = 97; // a
|
||||||
|
static_cast<char*>(static_cast<char*>(r1ptr) + 100)[0] = 98; // b
|
||||||
|
static_cast<char*>(r2ptr)[0] = 99; // c
|
||||||
|
static_cast<char*>(static_cast<char*>(r2ptr) + 100)[0] = 100; // d
|
||||||
|
|
||||||
|
FairMQMessagePtr m1(push1.NewMessage(region1, r1ptr, 100, nullptr));
|
||||||
|
FairMQMessagePtr m2(push1.NewMessage(region1, static_cast<char*>(r1ptr) + 100, 100, nullptr));
|
||||||
|
push1.Send(m1);
|
||||||
|
push1.Send(m2);
|
||||||
|
|
||||||
|
FairMQMessagePtr m3(push2.NewMessage(region2, r2ptr, 100, nullptr));
|
||||||
|
FairMQMessagePtr m4(push2.NewMessage(region2, static_cast<char*>(r2ptr) + 100, 100, nullptr));
|
||||||
|
push2.Send(m3);
|
||||||
|
push2.Send(m4);
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
FairMQMessagePtr m1(pull1.NewMessage());
|
||||||
|
FairMQMessagePtr m2(pull1.NewMessage());
|
||||||
|
ASSERT_EQ(pull1.Receive(m1), 100);
|
||||||
|
ASSERT_EQ(pull1.Receive(m2), 100);
|
||||||
|
ASSERT_EQ(static_cast<char*>(m1->GetData())[0], 'a');
|
||||||
|
ASSERT_EQ(static_cast<char*>(m2->GetData())[0], 'b');
|
||||||
|
|
||||||
|
FairMQMessagePtr m3(pull2.NewMessage());
|
||||||
|
FairMQMessagePtr m4(pull2.NewMessage());
|
||||||
|
ASSERT_EQ(pull2.Receive(m3), 100);
|
||||||
|
ASSERT_EQ(pull2.Receive(m4), 100);
|
||||||
|
ASSERT_EQ(static_cast<char*>(m3->GetData())[0], 'c');
|
||||||
|
ASSERT_EQ(static_cast<char*>(m4->GetData())[0], 'd');
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void RegionEventSubscriptions(const string& transport)
|
void RegionEventSubscriptions(const string& transport)
|
||||||
{
|
{
|
||||||
size_t session{fair::mq::tools::UuidHash()};
|
size_t session{fair::mq::tools::UuidHash()};
|
||||||
@@ -160,6 +221,16 @@ void RegionCallbacks(const string& transport, const string& _address)
|
|||||||
LOG(info) << "2 done.";
|
LOG(info) << "2 done.";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST(Cache, zeromq)
|
||||||
|
{
|
||||||
|
RegionsCache("zeromq", "ipc://test_region_cache");
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(Cache, shmem)
|
||||||
|
{
|
||||||
|
RegionsCache("shmem", "ipc://test_region_cache");
|
||||||
|
}
|
||||||
|
|
||||||
TEST(EventSubscriptions, zeromq)
|
TEST(EventSubscriptions, zeromq)
|
||||||
{
|
{
|
||||||
RegionEventSubscriptions("zeromq");
|
RegionEventSubscriptions("zeromq");
|
||||||
|
Reference in New Issue
Block a user