Compare commits

...

21 Commits

Author SHA1 Message Date
Alexey Rybalchenko
e6f67b3658 Fix Ofi interface 2021-05-07 21:59:52 +02:00
Alexey Rybalchenko
091d0824d1 ofi: fix Events() signature 2021-05-07 21:33:15 +02:00
Alexey Rybalchenko
857aa84fa3 add mlock/zero options to unmanaged region 2021-05-07 21:33:15 +02:00
Alexey Rybalchenko
c85d6e079c shm: reduce shm contention when dealing with ack queues 2021-05-07 21:33:15 +02:00
Alexey Rybalchenko
4e466514d2 region example: fix msg counter 2021-05-07 21:33:15 +02:00
Alexey Rybalchenko
8b4056e408 Update docs 2021-05-07 21:33:15 +02:00
Alexey Rybalchenko
b67b80e0ad shmmonitor: add severity setting 2021-05-07 21:33:15 +02:00
Alexey Rybalchenko
2c89b24857 shm: eliminate race/deadlock in region subscriptions 2021-05-07 21:33:15 +02:00
Alexey Rybalchenko
c6a6a5f21b Check transport type of msg and corresponding region 2021-05-07 21:33:15 +02:00
Alexey Rybalchenko
9defa71622 Add GetType() to UnmanagedRegion 2021-05-07 21:33:15 +02:00
Alexey Rybalchenko
ed2dcedf03 Add operator<< for fair::mq::Transport 2021-05-07 21:33:15 +02:00
Alexey Rybalchenko
a3d56b9aeb configurable transport for region example script 2021-05-07 21:33:15 +02:00
Alexey Rybalchenko
8a2641d842 shm: check result of region acquisition 2021-05-07 21:33:15 +02:00
Alexey Rybalchenko
2ca62d06db shm region cache: fix multiple sessions issue 2021-05-07 14:20:00 +02:00
Alexey Rybalchenko
87e0ca5450 add region cache test 2021-05-07 14:20:00 +02:00
Gvozden Neskovic
ef5b3c782e improve message counter cache line use 2021-05-07 14:20:00 +02:00
Gvozden Neskovic
f7ba3052aa use thread local cache to avoid interprocess lock on shm GetData 2021-05-07 14:20:00 +02:00
Dennis Klein
a90dbf64de Fix -Wunused-result
Fixes #281
2021-05-07 13:18:12 +02:00
Dennis Klein
9724f184f4 Fallback to Boost.Filesystem on GCC 8 2021-05-07 13:13:16 +02:00
Dennis Klein
057ba03776 PluginManager: Do not load built-in plugins via dlopen/dlsym
fixes #351
2021-05-05 03:52:12 +02:00
Giulio Eulisse
6dfea32aee Improve Events API
If the call is interrupted by a signal, this will throw, which we clearly do not want. Simplifying the API to let the user decide what to do on error is probably the best option.
2021-05-04 22:54:19 +02:00
29 changed files with 412 additions and 149 deletions

View File

@@ -205,7 +205,7 @@ macro(set_fairmq_defaults)
endif()
if( CMAKE_CXX_COMPILER_ID STREQUAL "GNU"
AND CMAKE_CXX_COMPILER_VERSION VERSION_LESS 8)
AND CMAKE_CXX_COMPILER_VERSION VERSION_LESS 9)
set(FAIRMQ_HAS_STD_FILESYSTEM 0)
else()
set(FAIRMQ_HAS_STD_FILESYSTEM 1)

View File

@@ -5,12 +5,6 @@
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* Sampler.cpp
*
* @since 2014-10-10
* @author A. Rybalchenko
*/
#include "Sampler.h"
@@ -37,17 +31,17 @@ void Sampler::InitTask()
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
fChannels.at("data").at(0).Transport()->SubscribeToRegionEvents([](FairMQRegionInfo info) {
LOG(info) << "Region event: " << info.event
<< ", managed: " << info.managed
LOG(info) << "Region event: " << info.event << ": "
<< (info.managed ? "managed" : "unmanaged")
<< ", id: " << info.id
<< ", ptr: " << info.ptr
<< ", size: " << info.size
<< ", flags: " << info.flags;
});
fRegion = FairMQUnmanagedRegionPtr(NewUnmanagedRegionFor("data",
0,
10000000,
fRegion = FairMQUnmanagedRegionPtr(NewUnmanagedRegionFor("data", // region is created using the transport of this channel...
0, // ... and this sub-channel
10000000, // region size
[this](const std::vector<fair::mq::RegionBlock>& blocks) { // callback to be called when message buffers no longer needed by transport
lock_guard<mutex> lock(fMtx);
fNumUnackedMsgs -= blocks.size();
@@ -55,7 +49,10 @@ void Sampler::InitTask()
if (fMaxIterations > 0) {
LOG(info) << "Received " << blocks.size() << " acks";
}
}
},
"", // path, if a region is backed by a file
0, // flags that are passed for region creation
fair::mq::RegionConfig{true, true} // additional config: { call mlock on the region, zero the region memory }
));
fRegion->SetLinger(fLinger);
}
@@ -75,20 +72,19 @@ bool Sampler::ConditionalRun()
// std::this_thread::sleep_for(std::chrono::seconds(1));
lock_guard<mutex> lock(fMtx);
++fNumUnackedMsgs;
if (Send(msg, "data", 0) > 0) {
if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) {
LOG(info) << "Configured maximum number of iterations reached. Leaving RUNNING state.";
return false;
}
}
++fNumUnackedMsgs;
return true;
}
void Sampler::ResetTask()
{
// On destruction UnmanagedRegion will try to TODO
fRegion.reset();
{
lock_guard<mutex> lock(fMtx);

View File

@@ -30,8 +30,8 @@ void Sink::InitTask()
// Get the fMaxIterations value from the command line options (via fConfig)
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
fChannels.at("data").at(0).Transport()->SubscribeToRegionEvents([](FairMQRegionInfo info) {
LOG(info) << "Region event: " << info.event
<< ", managed: " << info.managed
LOG(info) << "Region event: " << info.event << ": "
<< (info.managed ? "managed" : "unmanaged")
<< ", id: " << info.id
<< ", ptr: " << info.ptr
<< ", size: " << info.size

View File

@@ -2,9 +2,14 @@
export FAIRMQ_PATH=@FAIRMQ_BIN_DIR@
transport="shmem"
msgSize="1000000"
if [[ $1 =~ ^[0-9]+$ ]]; then
if [[ $1 =~ ^[a-z]+$ ]]; then
transport=$1
fi
if [[ $2 =~ ^[0-9]+$ ]]; then
msgSize=$1
fi
@@ -13,13 +18,13 @@ SAMPLER+=" --id sampler1"
SAMPLER+=" --severity debug"
SAMPLER+=" --msg-size $msgSize"
# SAMPLER+=" --rate 10"
SAMPLER+=" --transport shmem"
SAMPLER+=" --transport $transport"
SAMPLER+=" --channel-config name=data,type=push,method=bind,address=tcp://127.0.0.1:7777,sndKernelSize=212992"
xterm -geometry 80x23+0+0 -hold -e @EX_BIN_DIR@/$SAMPLER &
xterm -geometry 120x60+0+0 -hold -e @EX_BIN_DIR@/$SAMPLER &
SINK="fairmq-ex-region-sink"
SINK+=" --id sink1"
SINK+=" --severity debug"
SINK+=" --transport shmem"
SINK+=" --transport $transport"
SINK+=" --channel-config name=data,type=pull,method=connect,address=tcp://127.0.0.1:7777,rcvKernelSize=212992"
xterm -geometry 80x23+500+0 -hold -e @EX_BIN_DIR@/$SINK &
xterm -geometry 120x60+750+0 -hold -e @EX_BIN_DIR@/$SINK &

View File

@@ -31,6 +31,7 @@ SAMPLER_PID=$!
SINK="fairmq-ex-region-sink"
SINK+=" --id sink1"
SINK+=" --transport $transport"
SINK+=" --severity debug"
SINK+=" --session $SESSION"
SINK+=" --verbosity veryhigh"
SINK+=" --control static --color false"

View File

@@ -56,7 +56,7 @@ class FairMQSocket
/// If the backend supports it, fills the unsigned integer @a events with the ZMQ_EVENTS value
/// DISCLAIMER: this API is experimental and unsupported and might be dropped / refactored in
/// the future.
virtual void Events(uint32_t* events) = 0;
virtual int Events(uint32_t* events) = 0;
virtual void SetLinger(const int value) = 0;
virtual int GetLinger() const = 0;
virtual void SetSndBufSize(const int value) = 0;

View File

@@ -92,8 +92,8 @@ class FairMQTransportFactory
/// @param path optional parameter to pass to the underlying transport
/// @param flags optional parameter to pass to the underlying transport
/// @return pointer to UnmanagedRegion
virtual FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback = nullptr, const std::string& path = "", int flags = 0) = 0;
virtual FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionBulkCallback callback = nullptr, const std::string& path = "", int flags = 0) = 0;
virtual FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) = 0;
virtual FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionBulkCallback callback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) = 0;
/// @brief Create new UnmanagedRegion
/// @param size size of the region
/// @param userFlags flags to be stored with the region, have no effect on the transport, but can be retrieved from the region by the user
@@ -101,8 +101,8 @@ class FairMQTransportFactory
/// @param path optional parameter to pass to the underlying transport
/// @param flags optional parameter to pass to the underlying transport
/// @return pointer to UnmanagedRegion
virtual FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, FairMQRegionCallback callback = nullptr, const std::string& path = "", int flags = 0) = 0;
virtual FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, FairMQRegionBulkCallback callback = nullptr, const std::string& path = "", int flags = 0) = 0;
virtual FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, FairMQRegionCallback callback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) = 0;
virtual FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, FairMQRegionBulkCallback callback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) = 0;
/// @brief Subscribe to region events (creation, destruction, ...)
/// @param callback the callback that is called when a region event occurs

View File

@@ -79,6 +79,7 @@ class FairMQUnmanagedRegion
virtual void SetLinger(uint32_t linger) = 0;
virtual uint32_t GetLinger() const = 0;
virtual fair::mq::Transport GetType() const = 0;
FairMQTransportFactory* GetTransport() { return fTransport; }
void SetTransport(FairMQTransportFactory* transport) { fTransport = transport; }
@@ -107,6 +108,19 @@ inline std::ostream& operator<<(std::ostream& os, const FairMQRegionEvent& event
namespace fair::mq
{
struct RegionConfig {
bool lock;
bool zero;
RegionConfig()
: lock(false), zero(false)
{}
RegionConfig(bool l, bool z)
: lock(l), zero(z)
{}
};
using RegionCallback = FairMQRegionCallback;
using RegionBulkCallback = FairMQRegionBulkCallback;
using RegionEventCallback = FairMQRegionEventCallback;

View File

@@ -200,7 +200,19 @@ auto fair::mq::PluginManager::LoadPluginStatic(const string& pluginName) -> void
// Load symbol
if (fPluginFactories.find(pluginName) == fPluginFactories.end()) {
try {
LoadSymbols(pluginName, dll::program_location());
if ("control" == pluginName) {
try {
fPluginProgOptions.insert({pluginName, plugins::ControlPluginProgramOptions().value()});
}
catch (const boost::bad_optional_access& e) { /* just ignore, if no prog options are declared */ }
} else if ("config" == pluginName) {
try {
fPluginProgOptions.insert({pluginName, plugins::ConfigPluginProgramOptions().value()});
}
catch (const boost::bad_optional_access& e) { /* just ignore, if no prog options are declared */ }
} else {
LoadSymbols(pluginName, dll::program_location());
}
fPluginOrder.push_back(pluginName);
} catch (boost::system::system_error& e) {
throw PluginLoadError(ToString("An error occurred while loading static plugin ", pluginName, ": ", e.what()));
@@ -211,7 +223,13 @@ auto fair::mq::PluginManager::LoadPluginStatic(const string& pluginName) -> void
auto fair::mq::PluginManager::InstantiatePlugin(const string& pluginName) -> void
{
if (fPlugins.find(pluginName) == fPlugins.end()) {
fPlugins[pluginName] = fPluginFactories[pluginName](*fPluginServices);
if ("control" == pluginName) {
fPlugins[pluginName] = plugins::Make_control_Plugin(fPluginServices.get());
} else if ("config" == pluginName) {
fPlugins[pluginName] = plugins::Make_config_Plugin(fPluginServices.get());
} else {
fPlugins[pluginName] = fPluginFactories[pluginName](*fPluginServices);
}
}
}

View File

@@ -12,6 +12,7 @@
#include <fairmq/tools/Strings.h>
#include <memory>
#include <ostream>
#include <stdexcept>
#include <string>
#include <unordered_map>
@@ -60,6 +61,11 @@ try {
throw TransportError(tools::ToString("Unknown transport provided: ", transport));
}
inline std::ostream& operator<<(std::ostream& os, const Transport& transport)
{
return os << TransportName(transport);
}
} // namespace fair::mq
#endif /* FAIR_MQ_TRANSPORTS_H */

View File

@@ -41,7 +41,7 @@ class Socket final : public fair::mq::Socket
auto GetId() const -> std::string override { return fId; }
auto Events(uint32_t *events) -> void override { *events = 0; }
auto Events(uint32_t *events) -> int override { *events = 0; return -1; }
auto Bind(const std::string& address) -> bool override;
auto Connect(const std::string& address) -> bool override;

View File

@@ -92,22 +92,22 @@ auto TransportFactory::CreatePoller(const unordered_map<string, vector<FairMQCha
// return PollerPtr{new Poller(channelsMap, channelList)};
}
auto TransportFactory::CreateUnmanagedRegion(const size_t /*size*/, FairMQRegionCallback /*callback*/, const std::string& /* path = "" */, int /* flags = 0 */) -> UnmanagedRegionPtr
auto TransportFactory::CreateUnmanagedRegion(const size_t /*size*/, FairMQRegionCallback /*callback*/, const std::string& /* path = "" */, int /* flags = 0 */, fair::mq::RegionConfig /* cfg = fair::mq::RegionConfig() */) -> UnmanagedRegionPtr
{
throw runtime_error{"Not yet implemented UMR."};
}
auto TransportFactory::CreateUnmanagedRegion(const size_t /*size*/, FairMQRegionBulkCallback /*callback*/, const std::string& /* path = "" */, int /* flags = 0 */) -> UnmanagedRegionPtr
auto TransportFactory::CreateUnmanagedRegion(const size_t /*size*/, FairMQRegionBulkCallback /*callback*/, const std::string& /* path = "" */, int /* flags = 0 */, fair::mq::RegionConfig /* cfg = fair::mq::RegionConfig() */) -> UnmanagedRegionPtr
{
throw runtime_error{"Not yet implemented UMR."};
}
auto TransportFactory::CreateUnmanagedRegion(const size_t /*size*/, const int64_t /*userFlags*/, FairMQRegionCallback /*callback*/, const std::string& /* path = "" */, int /* flags = 0 */) -> UnmanagedRegionPtr
auto TransportFactory::CreateUnmanagedRegion(const size_t /*size*/, const int64_t /*userFlags*/, FairMQRegionCallback /*callback*/, const std::string& /* path = "" */, int /* flags = 0 */, fair::mq::RegionConfig /* cfg = fair::mq::RegionConfig() */) -> UnmanagedRegionPtr
{
throw runtime_error{"Not yet implemented UMR."};
}
auto TransportFactory::CreateUnmanagedRegion(const size_t /*size*/, const int64_t /*userFlags*/, FairMQRegionBulkCallback /*callback*/, const std::string& /* path = "" */, int /* flags = 0 */) -> UnmanagedRegionPtr
auto TransportFactory::CreateUnmanagedRegion(const size_t /*size*/, const int64_t /*userFlags*/, FairMQRegionBulkCallback /*callback*/, const std::string& /* path = "" */, int /* flags = 0 */, fair::mq::RegionConfig /* cfg = fair::mq::RegionConfig() */) -> UnmanagedRegionPtr
{
throw runtime_error{"Not yet implemented UMR."};
}

View File

@@ -44,10 +44,10 @@ class TransportFactory final : public FairMQTransportFactory
auto CreatePoller(const std::vector<FairMQChannel*>& channels) const -> PollerPtr override;
auto CreatePoller(const std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, const std::vector<std::string>& channelList) const -> PollerPtr override;
auto CreateUnmanagedRegion(const size_t size, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0) -> UnmanagedRegionPtr override;
auto CreateUnmanagedRegion(const size_t size, RegionBulkCallback callback = nullptr, const std::string& path = "", int flags = 0) -> UnmanagedRegionPtr override;
auto CreateUnmanagedRegion(const size_t size, int64_t userFlags, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0) -> UnmanagedRegionPtr override;
auto CreateUnmanagedRegion(const size_t size, int64_t userFlags, RegionBulkCallback callback = nullptr, const std::string& path = "", int flags = 0) -> UnmanagedRegionPtr override;
auto CreateUnmanagedRegion(const size_t size, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) -> UnmanagedRegionPtr override;
auto CreateUnmanagedRegion(const size_t size, RegionBulkCallback callback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) -> UnmanagedRegionPtr override;
auto CreateUnmanagedRegion(const size_t size, int64_t userFlags, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) -> UnmanagedRegionPtr override;
auto CreateUnmanagedRegion(const size_t size, int64_t userFlags, RegionBulkCallback callback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) -> UnmanagedRegionPtr override;
void SubscribeToRegionEvents(RegionEventCallback /* callback */) override { LOG(error) << "SubscribeToRegionEvents not yet implemented for OFI"; }
bool SubscribedToRegionEvents() override { LOG(error) << "Region event subscriptions not yet implemented for OFI"; return false; }

View File

@@ -65,7 +65,10 @@ struct DDSEnvironment::Impl
" mkdir -p \"$HOME/.DDS\"\n"
" dds-user-defaults --ignore-default-sid -d -c \"$HOME/.DDS/DDS.cfg\"\n"
"fi\n";
std::system(cmd.str().c_str());
auto rc(std::system(cmd.str().c_str()));
if (rc != 0) {
LOG(warn) << "DDSEnvironment::SetupConfigHome failed";
}
}
auto SetupPath() -> void

View File

@@ -44,6 +44,7 @@
#include <thread>
#include <unordered_map>
#include <utility> // pair
#include <tuple>
#include <vector>
#include <unistd.h> // getuid
@@ -58,14 +59,15 @@ class Manager
{
public:
Manager(const std::string& sessionName, std::string deviceId, size_t size, const ProgOptions* config)
: fShmId(makeShmIdStr(sessionName))
: fShmId64(makeShmIdUint64(sessionName))
, fShmId(makeShmIdStr(sessionName))
, fSegmentId(config ? config->GetProperty<uint16_t>("shm-segment-id", 0) : 0)
, fDeviceId(std::move(deviceId))
, fSegments()
, fManagementSegment(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_mng").c_str(), 6553600)
, fShmVoidAlloc(fManagementSegment.get_segment_manager())
, fShmMtx(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_mtx").c_str())
, fRegionEventsCV(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_cv").c_str())
, fRegionEventsShmCV(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_cv").c_str())
, fRegionEventsSubscriptionActive(false)
, fNumObservedEvents(0)
, fDeviceCounter(nullptr)
@@ -73,10 +75,11 @@ class Manager
, fShmSegments(nullptr)
, fShmRegions(nullptr)
, fInterrupted(false)
, fMsgCounter(0)
#ifdef FAIRMQ_DEBUG_MODE
, fMsgDebug(nullptr)
, fShmMsgCounters(nullptr)
, fMsgCounterNew(0)
, fMsgCounterDelete(0)
#endif
, fHeartbeatThread()
, fSendHeartbeats(true)
@@ -235,7 +238,7 @@ class Manager
}
if (!p.empty()) {
boost::process::spawn(p, "-x", "-m", "--shmid", id, "-d", "-t", "2000", verbose ? "--verbose" : "", env);
boost::process::spawn(p, "-x", "-m", "--shmid", id, "-d", "-t", "2000", (verbose ? "--verbose" : ""), env);
int numTries = 0;
do {
try {
@@ -260,10 +263,13 @@ class Manager
void Resume() { fInterrupted.store(false); }
void Reset()
{
if (fMsgCounter.load() != 0) {
LOG(error) << "Message counter during Reset expected to be 0, found: " << fMsgCounter.load();
throw MessageError(tools::ToString("Message counter during Reset expected to be 0, found: ", fMsgCounter.load()));
#ifdef FAIRMQ_DEBUG_MODE
auto diff = fMsgCounterNew.load() - fMsgCounterDelete.load();
if (diff != 0) {
LOG(error) << "Message counter during Reset expected to be 0, found: " << diff;
throw MessageError(tools::ToString("Message counter during Reset expected to be 0, found: ", diff));
}
#endif
}
bool Interrupted() { return fInterrupted.load(); }
@@ -271,8 +277,9 @@ class Manager
const int64_t userFlags,
RegionCallback callback,
RegionBulkCallback bulkCallback,
const std::string& path = "",
int flags = 0)
const std::string& path,
int flags,
fair::mq::RegionConfig cfg)
{
using namespace boost::interprocess;
try {
@@ -302,22 +309,35 @@ class Manager
return {nullptr, id};
}
// create region info
fShmRegions->emplace(id, RegionInfo(path.c_str(), flags, userFlags, fShmVoidAlloc));
auto r = fRegions.emplace(id, std::make_unique<Region>(fShmId, id, size, false, callback, bulkCallback, path, flags));
// LOG(debug) << "Created region with id '" << id << "', path: '" << path << "', flags: '" << flags << "'";
if (cfg.lock) {
LOG(debug) << "Locking region " << id << "...";
if (mlock(r.first->second->fRegion.get_address(), r.first->second->fRegion.get_size()) == -1) {
LOG(error) << "Could not lock region " << id << ". Code: " << errno << ", reason: " << strerror(errno);
}
LOG(debug) << "Successfully locked region " << id << ".";
}
if (cfg.zero) {
LOG(debug) << "Zeroing free memory of region " << id << "...";
memset(r.first->second->fRegion.get_address(), 0x00, r.first->second->fRegion.get_size());
LOG(debug) << "Successfully zeroed free memory of region " << id << ".";
}
fShmRegions->emplace(id, RegionInfo(path.c_str(), flags, userFlags, fShmVoidAlloc));
r.first->second->InitializeQueues();
r.first->second->StartReceivingAcks();
result.first = &(r.first->second->fRegion);
result.second = id;
(fEventCounter->fCount)++;
}
fRegionEventsCV.notify_all();
fRegionsGen += 1; // signal TL cache invalidation
fRegionEventsShmCV.notify_all();
return result;
} catch (interprocess_exception& e) {
LOG(error) << "cannot create region. Already created/not cleaned up?";
LOG(error) << e.what();
@@ -327,8 +347,28 @@ class Manager
Region* GetRegion(const uint16_t id)
{
// NOTE: gcc optimizations. Prevent loading tls addresses many times in the fast path
const auto &lTlCache = fTlRegionCache;
const auto &lTlCacheVec = lTlCache.fRegionsTLCache;
const auto lTlCacheGen = lTlCache.fRegionsTLCacheGen;
// fast path
for (const auto &lRegion : lTlCacheVec) {
if ((std::get<1>(lRegion) == id) && (lTlCacheGen == fRegionsGen) && (std::get<2>(lRegion) == fShmId64)) {
return std::get<0>(lRegion);
}
}
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
return GetRegionUnsafe(id);
// slow path: check invalidation
if (lTlCacheGen != fRegionsGen) {
fTlRegionCache.fRegionsTLCache.clear();
}
auto *lRegion = GetRegionUnsafe(id);
fTlRegionCache.fRegionsTLCache.emplace_back(std::make_tuple(lRegion, id, fShmId64));
fTlRegionCache.fRegionsTLCacheGen = fRegionsGen;
return lRegion;
}
Region* GetRegionUnsafe(const uint16_t id)
@@ -352,7 +392,7 @@ class Manager
LOG(error) << oor.what();
return nullptr;
} catch (boost::interprocess::interprocess_exception& e) {
LOG(warn) << "Could not get remote region for id '" << id << "'";
LOG(error) << "Could not get remote region for id '" << id << "': " << e.what();
return nullptr;
}
}
@@ -360,13 +400,19 @@ class Manager
void RemoveRegion(const uint16_t id)
{
fRegions.erase(id);
{
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
fShmRegions->at(id).fDestroyed = true;
(fEventCounter->fCount)++;
try {
fRegions.at(id)->StopAcks();
{
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
fShmRegions->at(id).fDestroyed = true;
fRegions.erase(id);
(fEventCounter->fCount)++;
}
fRegionEventsShmCV.notify_all();
} catch(std::out_of_range& oor) {
LOG(debug) << "RemoveRegion() could not locate region with id '" << id << "'";
}
fRegionEventsCV.notify_all();
fRegionsGen += 1; // signal TL cache invalidation
}
std::vector<fair::mq::RegionInfo> GetRegionInfo()
@@ -387,8 +433,12 @@ class Manager
info.event = e.second.fDestroyed ? RegionEvent::destroyed : RegionEvent::created;
if (!e.second.fDestroyed) {
auto region = GetRegionUnsafe(info.id);
info.ptr = region->fRegion.get_address();
info.size = region->fRegion.get_size();
if (region) {
info.ptr = region->fRegion.get_address();
info.size = region->fRegion.get_size();
} else {
throw std::runtime_error(tools::ToString("GetRegionInfoUnsafe() could not get region with id '", info.id, "'"));
}
} else {
info.ptr = nullptr;
info.size = 0;
@@ -423,7 +473,7 @@ class Manager
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
fRegionEventsSubscriptionActive = false;
lock.unlock();
fRegionEventsCV.notify_all();
fRegionEventsShmCV.notify_all();
fRegionEventThread.join();
}
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
@@ -440,7 +490,7 @@ class Manager
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
fRegionEventsSubscriptionActive = false;
lock.unlock();
fRegionEventsCV.notify_all();
fRegionEventsShmCV.notify_all();
fRegionEventThread.join();
lock.lock();
fRegionEventCallback = nullptr;
@@ -454,36 +504,49 @@ class Manager
auto infos = GetRegionInfoUnsafe();
for (const auto& i : infos) {
auto el = fObservedRegionEvents.find({i.id, i.managed});
if (el == fObservedRegionEvents.end()) {
fRegionEventCallback(i);
if (el == fObservedRegionEvents.end()) { // if event id has not been observed
fObservedRegionEvents.emplace(std::make_pair(i.id, i.managed), i.event);
++fNumObservedEvents;
} else {
// if a region has been created and destroyed rapidly, we could see 'destroyed' without ever seeing 'created'
// TODO: do we care to show 'created' events if we know region is already destroyed?
if (i.event == RegionEvent::created) {
fRegionEventCallback(i);
++fNumObservedEvents;
} else {
fNumObservedEvents += 2;
}
} else { // if event id has been observed (expected - there are two events per id - created & destroyed)
// fire a callback if we have observed 'created' event and incoming is 'destroyed'
if (el->second == RegionEvent::created && i.event == RegionEvent::destroyed) {
fRegionEventCallback(i);
el->second = i.event;
++fNumObservedEvents;
} else {
// LOG(debug) << "ignoring event for id" << i.id << ":";
// LOG(debug) << "incoming event: " << i.event;
// LOG(debug) << "stored event: " << el->second;
// LOG(debug) << "ignoring event " << i.id << ": incoming: " << i.event << ", stored: " << el->second;
}
}
}
fRegionEventsCV.wait(lock, [&] { return !fRegionEventsSubscriptionActive || fNumObservedEvents != fEventCounter->fCount; });
fRegionEventsShmCV.wait(lock, [&] { return !fRegionEventsSubscriptionActive || fNumObservedEvents != fEventCounter->fCount; });
}
}
void IncrementMsgCounter() { fMsgCounter.fetch_add(1, std::memory_order_relaxed); }
void DecrementMsgCounter() { fMsgCounter.fetch_sub(1, std::memory_order_relaxed); }
void IncrementMsgCounter()
{
#ifdef FAIRMQ_DEBUG_MODE
fMsgCounterNew.fetch_add(1, std::memory_order_relaxed);
#endif
}
void DecrementMsgCounter()
{
#ifdef FAIRMQ_DEBUG_MODE
fMsgCounterDelete.fetch_add(1, std::memory_order_relaxed);
#endif
}
#ifdef FAIRMQ_DEBUG_MODE
void IncrementShmMsgCounter(uint16_t segmentId) { ++((*fShmMsgCounters)[segmentId].fCount); }
void DecrementShmMsgCounter(uint16_t segmentId) { --((*fShmMsgCounters)[segmentId].fCount); }
#endif
boost::interprocess::named_mutex& GetMtx() { return fShmMtx; }
void SendHeartbeats()
{
std::string controlQueueName("fmq_" + fShmId + "_cq");
@@ -511,7 +574,7 @@ class Manager
auto it = fSegments.find(id);
if (it == fSegments.end()) {
try {
// get region info
// get segment info
SegmentInfo segmentInfo = fShmSegments->at(id);
LOG(debug) << "Located segment with id '" << id << "'";
@@ -613,6 +676,7 @@ class Manager
using namespace boost::interprocess;
bool lastRemoved = false;
fRegionsGen += 1; // signal TL cache invalidation
UnsubscribeFromRegionEvents();
{
@@ -645,6 +709,7 @@ class Manager
}
private:
uint64_t fShmId64;
std::string fShmId;
uint16_t fSegmentId;
std::string fDeviceId;
@@ -653,11 +718,11 @@ class Manager
VoidAlloc fShmVoidAlloc;
boost::interprocess::named_mutex fShmMtx;
boost::interprocess::named_condition fRegionEventsCV;
boost::interprocess::named_condition fRegionEventsShmCV;
std::thread fRegionEventThread;
bool fRegionEventsSubscriptionActive;
std::function<void(fair::mq::RegionInfo)> fRegionEventCallback;
std::map<std::pair<uint16_t, bool>, RegionEvent> fObservedRegionEvents;
std::map<std::pair<uint16_t, bool>, RegionEvent> fObservedRegionEvents; // pair: <region id, managed>
uint64_t fNumObservedEvents;
DeviceCounter* fDeviceCounter;
@@ -665,12 +730,20 @@ class Manager
Uint16SegmentInfoHashMap* fShmSegments;
Uint16RegionInfoHashMap* fShmRegions;
std::unordered_map<uint16_t, std::unique_ptr<Region>> fRegions;
// make sure this is alone in the cache line: mostly read
alignas(128) inline static std::atomic<unsigned long> fRegionsGen = 0ul;
inline static thread_local struct ManagerTLCache {
unsigned long fRegionsTLCacheGen;
std::vector<std::tuple<Region*, uint16_t, uint64_t>> fRegionsTLCache;
} fTlRegionCache;
std::atomic<bool> fInterrupted;
std::atomic<int32_t> fMsgCounter; // TODO: find a better lifetime solution instead of the counter
#ifdef FAIRMQ_DEBUG_MODE
// make sure the counters are not thrashing the cache line between threads doing creation and deallocation
Uint16MsgDebugMapHashMap* fMsgDebug;
Uint16MsgCounterHashMap* fShmMsgCounters;
alignas(128) std::atomic_uint64_t fMsgCounterNew;
alignas(128) std::atomic_uint64_t fMsgCounterDelete;
#endif
std::thread fHeartbeatThread;
@@ -680,6 +753,8 @@ class Manager
bool fThrowOnBadAlloc;
bool fNoCleanup;
};
} // namespace fair::mq::shmem

View File

@@ -109,12 +109,17 @@ class Message final : public fair::mq::Message
, fRegionPtr(nullptr)
, fLocalPtr(static_cast<char*>(data))
{
if (region->GetType() != GetType()) {
LOG(error) << "region type (" << region->GetType() << ") does not match message type (" << GetType() << ")";
throw TransportError(tools::ToString("region type (", region->GetType(), ") does not match message type (", GetType(), ")"));
}
if (reinterpret_cast<const char*>(data) >= reinterpret_cast<const char*>(region->GetData()) &&
reinterpret_cast<const char*>(data) <= reinterpret_cast<const char*>(region->GetData()) + region->GetSize()) {
fMeta.fHandle = (boost::interprocess::managed_shared_memory::handle_t)(reinterpret_cast<const char*>(data) - reinterpret_cast<const char*>(region->GetData()));
} else {
LOG(error) << "trying to create region message with data from outside the region";
throw std::runtime_error("trying to create region message with data from outside the region");
throw TransportError("trying to create region message with data from outside the region");
}
fManager.IncrementMsgCounter();
}
@@ -304,6 +309,8 @@ class Message final : public fair::mq::Message
}
if (fRegionPtr) {
fRegionPtr->InitializeQueues();
fRegionPtr->StartSendingAcks();
fRegionPtr->ReleaseBlock({fMeta.fHandle, fMeta.fSize, fMeta.fHint});
} else {
LOG(warn) << "region ack queue for id " << fMeta.fRegionId << " no longer exist. Not sending ack";
@@ -319,7 +326,7 @@ class Message final : public fair::mq::Message
Deallocate();
fAlignment = 0;
fManager.DecrementMsgCounter(); // TODO: put this to debug mode
fManager.DecrementMsgCounter();
}
};

View File

@@ -497,7 +497,7 @@ std::pair<std::string, bool> RunRemoval(std::function<bool(const std::string&)>
return {name, true};
} else {
if (verbose) {
LOG(info) << "Did not remove '" << name << "'. Already removed?";
LOG(debug) << "Did not remove '" << name << "'. Already removed?";
}
return {name, false};
}
@@ -525,7 +525,7 @@ std::vector<std::pair<std::string, bool>> Monitor::Cleanup(const ShmId& shmId, b
RegionCounter* rc = managementSegment.find<RegionCounter>(bipc::unique_instance).first;
if (rc) {
if (verbose) {
LOG(info) << "Region counter found: " << rc->fCount;
LOG(debug) << "Region counter found: " << rc->fCount;
}
uint16_t regionCount = rc->fCount;

View File

@@ -1,4 +1,4 @@
# Shared Memory transport
## Shared Memory transport
Shared memory transport for FairMQ. To try with existing devices, run the devices with `--transport shmem` option or configure channel transport in JSON (see examples/MQ/multiple-transports).
@@ -6,7 +6,7 @@ The transport manages shared memory via boost::interprocess library. The transfe
Devices track and cleanup shared memory on shutdown. For more information on the current shared memory segment and additional cleanup options, see following section.
# Shared Memory objects / files
## Shared Memory objects / files
FairMQ Shared Memory currently uses the following names to register shared memory on the system:
@@ -53,3 +53,7 @@ Additional cmd options:
For full option details, run with `-h`.
The Monitor class can also be used independently from the supplied executable, allowing integration on any level.
## Troubleshooting
Bus Error (SIGBUS) can occur if the transport tries to access shared memory that is not accessible. One reason could be because the used memory in the segment exceeds the capacity or available memory of the shmem filesystem (capacity is by default set to half of RAM on Linux).

View File

@@ -47,7 +47,7 @@ struct Region
Region(const std::string& shmId, uint16_t id, uint64_t size, bool remote, RegionCallback callback, RegionBulkCallback bulkCallback, const std::string& path, int flags)
: fRemote(remote)
, fLinger(100)
, fStop(false)
, fStopAcks(false)
, fName("fmq_" + shmId + "_rg_" + std::to_string(id))
, fQueueName("fmq_" + shmId + "_rgq_" + std::to_string(id))
, fShmemObject()
@@ -85,18 +85,26 @@ struct Region
LOG(debug) << "shmem: initialized file: " << fName;
fRegion = mapped_region(fFileMapping, read_write, 0, size, 0, flags);
} else {
if (fRemote) {
fShmemObject = shared_memory_object(open_only, fName.c_str(), read_write);
} else {
fShmemObject = shared_memory_object(create_only, fName.c_str(), read_write);
fShmemObject.truncate(size);
try {
if (fRemote) {
fShmemObject = shared_memory_object(open_only, fName.c_str(), read_write);
} else {
fShmemObject = shared_memory_object(create_only, fName.c_str(), read_write);
fShmemObject.truncate(size);
}
} catch(interprocess_exception& e) {
LOG(error) << "Failed " << (fRemote ? "opening" : "creating") << " shared_memory_object for region id '" << id << "': " << e.what();
throw;
}
try {
fRegion = mapped_region(fShmemObject, read_write, 0, 0, 0, flags);
} catch(interprocess_exception& e) {
LOG(error) << "Failed mapping shared_memory_object for region id '" << id << "': " << e.what();
throw;
}
fRegion = mapped_region(fShmemObject, read_write, 0, 0, 0, flags);
}
InitializeQueues();
StartSendingAcks();
LOG(debug) << "shmem: initialized region: " << fName;
LOG(trace) << "shmem: initialized region: " << fName << " (" << (fRemote ? "remote" : "local") << ")";
}
Region() = delete;
@@ -108,15 +116,22 @@ struct Region
{
using namespace boost::interprocess;
if (fRemote) {
fQueue = std::make_unique<message_queue>(open_only, fQueueName.c_str());
} else {
fQueue = std::make_unique<message_queue>(create_only, fQueueName.c_str(), 1024, fAckBunchSize * sizeof(RegionBlock));
if (fQueue == nullptr) {
if (fRemote) {
fQueue = std::make_unique<message_queue>(open_only, fQueueName.c_str());
} else {
fQueue = std::make_unique<message_queue>(create_only, fQueueName.c_str(), 1024, fAckBunchSize * sizeof(RegionBlock));
}
LOG(trace) << "shmem: initialized region queue: " << fQueueName << " (" << (fRemote ? "remote" : "local") << ")";
}
LOG(debug) << "shmem: initialized region queue: " << fQueueName;
}
void StartSendingAcks() { fAcksSender = std::thread(&Region::SendAcks, this); }
void StartSendingAcks()
{
if (!fAcksSender.joinable()) {
fAcksSender = std::thread(&Region::SendAcks, this);
}
}
void SendAcks()
{
std::unique_ptr<RegionBlock[]> blocks = std::make_unique<RegionBlock[]>(fAckBunchSize);
@@ -140,13 +155,13 @@ struct Region
}
if (blocksToSend > 0) {
while (!fQueue->try_send(blocks.get(), blocksToSend * sizeof(RegionBlock), 0) && !fStop) {
while (!fQueue->try_send(blocks.get(), blocksToSend * sizeof(RegionBlock), 0) && !fStopAcks) {
// receiver slow? yield and try again...
std::this_thread::yield();
}
// LOG(debug) << "Sent " << blocksToSend << " blocks.";
} else { // blocksToSend == 0
if (fStop) {
if (fStopAcks) {
break;
}
}
@@ -156,7 +171,12 @@ struct Region
<< " blocks left to send: " << blocksToSend << ").";
}
void StartReceivingAcks() { fAcksReceiver = std::thread(&Region::ReceiveAcks, this); }
void StartReceivingAcks()
{
if (!fAcksReceiver.joinable()) {
fAcksReceiver = std::thread(&Region::ReceiveAcks, this);
}
}
void ReceiveAcks()
{
unsigned int priority;
@@ -168,7 +188,7 @@ struct Region
while (true) {
uint32_t timeout = 100;
bool leave = false;
if (fStop) {
if (fStopAcks) {
timeout = fLinger;
leave = true;
}
@@ -213,9 +233,25 @@ struct Region
void SetLinger(uint32_t linger) { fLinger = linger; }
uint32_t GetLinger() const { return fLinger; }
void StopAcks()
{
fStopAcks = true;
if (fAcksSender.joinable()) {
fBlockSendCV.notify_one();
fAcksSender.join();
}
if (!fRemote) {
if (fAcksReceiver.joinable()) {
fAcksReceiver.join();
}
}
}
~Region()
{
fStop = true;
fStopAcks = true;
if (fAcksSender.joinable()) {
fBlockSendCV.notify_one();
@@ -228,11 +264,11 @@ struct Region
}
if (boost::interprocess::shared_memory_object::remove(fName.c_str())) {
LOG(debug) << "Region '" << fName << "' destroyed.";
LOG(trace) << "Region '" << fName << "' destroyed.";
}
if (boost::interprocess::file_mapping::remove(fName.c_str())) {
LOG(debug) << "File mapping '" << fName << "' destroyed.";
LOG(trace) << "File mapping '" << fName << "' destroyed.";
}
if (fFile) {
@@ -240,19 +276,18 @@ struct Region
}
if (boost::interprocess::message_queue::remove(fQueueName.c_str())) {
LOG(debug) << "Region queue '" << fQueueName << "' destroyed.";
LOG(trace) << "Region queue '" << fQueueName << "' destroyed.";
}
} else {
// LOG(debug) << "shmem: region '" << fName << "' is remote, no cleanup necessary.";
LOG(debug) << "Region queue '" << fQueueName << "' is remote, no cleanup necessary";
// LOG(debug) << "Region queue '" << fQueueName << "' is remote, no cleanup necessary";
}
LOG(debug) << "Region '" << fName << "' (" << (fRemote ? "remote" : "local") << ") destructed.";
// LOG(debug) << "Region '" << fName << "' (" << (fRemote ? "remote" : "local") << ") destructed.";
}
bool fRemote;
uint32_t fLinger;
std::atomic<bool> fStop;
std::atomic<bool> fStopAcks;
std::string fName;
std::string fQueueName;
boost::interprocess::shared_memory_object fShmemObject;

View File

@@ -378,12 +378,10 @@ class Socket final : public fair::mq::Socket
}
}
void Events(uint32_t* events) override
int Events(uint32_t* events) override
{
size_t eventsSize = sizeof(uint32_t);
if (zmq_getsockopt(fSocket, ZMQ_EVENTS, events, &eventsSize) < 0) {
throw SocketError(tools::ToString("failed setting ZMQ_EVENTS, reason: ", zmq_strerror(errno)));
}
return zmq_getsockopt(fSocket, ZMQ_EVENTS, events, &eventsSize);
}
int GetLinger() const override

View File

@@ -141,29 +141,29 @@ class TransportFactory final : public fair::mq::TransportFactory
return std::make_unique<Poller>(channelsMap, channelList);
}
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0) override
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) override
{
return CreateUnmanagedRegion(size, 0, callback, nullptr, path, flags);
return CreateUnmanagedRegion(size, 0, callback, nullptr, path, flags, cfg);
}
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, RegionBulkCallback bulkCallback = nullptr, const std::string& path = "", int flags = 0) override
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, RegionBulkCallback bulkCallback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) override
{
return CreateUnmanagedRegion(size, 0, nullptr, bulkCallback, path, flags);
return CreateUnmanagedRegion(size, 0, nullptr, bulkCallback, path, flags, cfg);
}
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, int64_t userFlags, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0) override
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, int64_t userFlags, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) override
{
return CreateUnmanagedRegion(size, userFlags, callback, nullptr, path, flags);
return CreateUnmanagedRegion(size, userFlags, callback, nullptr, path, flags, cfg);
}
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, int64_t userFlags, RegionBulkCallback bulkCallback = nullptr, const std::string& path = "", int flags = 0) override
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, int64_t userFlags, RegionBulkCallback bulkCallback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) override
{
return CreateUnmanagedRegion(size, userFlags, nullptr, bulkCallback, path, flags);
return CreateUnmanagedRegion(size, userFlags, nullptr, bulkCallback, path, flags, cfg);
}
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, int64_t userFlags, RegionCallback callback, RegionBulkCallback bulkCallback, const std::string& path, int flags)
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, int64_t userFlags, RegionCallback callback, RegionBulkCallback bulkCallback, const std::string& path, int flags, fair::mq::RegionConfig cfg)
{
return std::make_unique<UnmanagedRegion>(*fManager, size, userFlags, callback, bulkCallback, path, flags, this);
return std::make_unique<UnmanagedRegion>(*fManager, size, userFlags, callback, bulkCallback, path, flags, this, cfg);
}
void SubscribeToRegionEvents(RegionEventCallback callback) override { fManager->SubscribeToRegionEvents(callback); }

View File

@@ -37,15 +37,16 @@ class UnmanagedRegion final : public fair::mq::UnmanagedRegion
const int64_t userFlags,
RegionCallback callback,
RegionBulkCallback bulkCallback,
const std::string& path = "",
int flags = 0,
FairMQTransportFactory* factory = nullptr)
const std::string& path,
int flags,
FairMQTransportFactory* factory,
fair::mq::RegionConfig cfg)
: FairMQUnmanagedRegion(factory)
, fManager(manager)
, fRegion(nullptr)
, fRegionId(0)
{
auto result = fManager.CreateRegion(size, userFlags, callback, bulkCallback, path, flags);
auto result = fManager.CreateRegion(size, userFlags, callback, bulkCallback, path, flags, cfg);
fRegion = result.first;
fRegionId = result.second;
}
@@ -56,6 +57,8 @@ class UnmanagedRegion final : public fair::mq::UnmanagedRegion
void SetLinger(uint32_t linger) override { fManager.GetRegion(fRegionId)->SetLinger(linger); }
uint32_t GetLinger() const override { return fManager.GetRegion(fRegionId)->GetLinger(); }
Transport GetType() const override { return fair::mq::Transport::SHM; }
~UnmanagedRegion() override { fManager.RemoveRegion(fRegionId); }
private:

View File

@@ -89,6 +89,7 @@ int main(int argc, char** argv)
bool listAll = false;
string listAllPath;
bool verbose = false;
string severity;
int userId = -1;
options_description desc("Options");
@@ -109,6 +110,7 @@ int main(int argc, char** argv)
("list-all" , value<bool>(&listAll)->implicit_value(true), "List all sessions & segments")
("list-all-path" , value<string>(&listAllPath)->default_value("/dev/shm/"),"Path for the --list-all command to search segments in")
("verbose" , value<bool>(&verbose)->implicit_value(true), "Verbose mode (daemon will output to a file 'fairmq-shmmonitor_<timestamp>')")
("severity" , value<string>(&severity)->default_value("info"), "Log severity")
("user-id" , value<int>(&userId)->default_value(-1), "User id (used with --get-shmid)")
("help,h", "Print help");
@@ -122,6 +124,8 @@ int main(int argc, char** argv)
notify(vm);
fair::Logger::SetConsoleSeverity(severity);
if (getShmId) {
if (userId == -1) {
LOG(info) << "shmem id for session '" << sessionName << "' and current user id " << geteuid()

View File

@@ -109,6 +109,11 @@ class Message final : public fair::mq::Message
, fAlignment(0)
, fMsg(std::make_unique<zmq_msg_t>())
{
if (region->GetType() != GetType()) {
LOG(error) << "region type (" << region->GetType() << ") does not match message type (" << GetType() << ")";
throw TransportError(tools::ToString("region type (", region->GetType(), ") does not match message type (", GetType(), ")"));
}
// FIXME: make this zero-copy:
// simply taking over the provided buffer can casue premature delete, since region could be
// destroyed before the message is sent out. Needs lifetime extension for the ZMQ region.

View File

@@ -323,12 +323,10 @@ class Socket final : public fair::mq::Socket
}
}
void Events(uint32_t* events) override
int Events(uint32_t* events) override
{
size_t eventsSize = sizeof(uint32_t);
if (zmq_getsockopt(fSocket, ZMQ_EVENTS, events, &eventsSize) < 0) {
throw SocketError(tools::ToString("failed setting ZMQ_EVENTS, reason: ", zmq_strerror(errno)));
}
return zmq_getsockopt(fSocket, ZMQ_EVENTS, events, &eventsSize);
}
void SetLinger(const int value) override

View File

@@ -96,29 +96,29 @@ class TransportFactory final : public FairMQTransportFactory
return std::make_unique<Poller>(channelsMap, channelList);
}
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, RegionCallback callback, const std::string& path = "", int flags = 0) override
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, RegionCallback callback, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) override
{
return CreateUnmanagedRegion(size, 0, callback, nullptr, path, flags);
return CreateUnmanagedRegion(size, 0, callback, nullptr, path, flags, cfg);
}
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, RegionBulkCallback bulkCallback, const std::string& path = "", int flags = 0) override
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, RegionBulkCallback bulkCallback, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) override
{
return CreateUnmanagedRegion(size, 0, nullptr, bulkCallback, path, flags);
return CreateUnmanagedRegion(size, 0, nullptr, bulkCallback, path, flags, cfg);
}
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, RegionCallback callback, const std::string& path = "", int flags = 0) override
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, RegionCallback callback, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) override
{
return CreateUnmanagedRegion(size, userFlags, callback, nullptr, path, flags);
return CreateUnmanagedRegion(size, userFlags, callback, nullptr, path, flags, cfg);
}
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, RegionBulkCallback bulkCallback, const std::string& path = "", int flags = 0) override
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, RegionBulkCallback bulkCallback, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) override
{
return CreateUnmanagedRegion(size, userFlags, nullptr, bulkCallback, path, flags);
return CreateUnmanagedRegion(size, userFlags, nullptr, bulkCallback, path, flags, cfg);
}
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, RegionCallback callback, RegionBulkCallback bulkCallback, const std::string&, int)
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, RegionCallback callback, RegionBulkCallback bulkCallback, const std::string&, int /* flags */, fair::mq::RegionConfig cfg)
{
UnmanagedRegionPtr ptr = std::make_unique<UnmanagedRegion>(*fCtx, size, userFlags, callback, bulkCallback, this);
UnmanagedRegionPtr ptr = std::make_unique<UnmanagedRegion>(*fCtx, size, userFlags, callback, bulkCallback, this, cfg);
auto zPtr = static_cast<UnmanagedRegion*>(ptr.get());
fCtx->AddRegion(false, zPtr->GetId(), zPtr->GetData(), zPtr->GetSize(), zPtr->GetUserFlags(), RegionEvent::created);
return ptr;

View File

@@ -16,6 +16,8 @@
#include <cstddef> // size_t
#include <string>
#include <sys/mman.h> // mlock
namespace fair::mq::zmq
{
@@ -30,7 +32,8 @@ class UnmanagedRegion final : public fair::mq::UnmanagedRegion
int64_t userFlags,
RegionCallback callback,
RegionBulkCallback bulkCallback,
FairMQTransportFactory* factory = nullptr)
FairMQTransportFactory* factory,
fair::mq::RegionConfig cfg)
: fair::mq::UnmanagedRegion(factory)
, fCtx(ctx)
, fId(fCtx.RegionCount())
@@ -39,7 +42,20 @@ class UnmanagedRegion final : public fair::mq::UnmanagedRegion
, fUserFlags(userFlags)
, fCallback(callback)
, fBulkCallback(bulkCallback)
{}
{
if (cfg.lock) {
LOG(debug) << "Locking region " << fId << "...";
if (mlock(fBuffer, fSize) == -1) {
LOG(error) << "Could not lock region " << fId << ". Code: " << errno << ", reason: " << strerror(errno);
}
LOG(debug) << "Successfully locked region " << fId << ".";
}
if (cfg.zero) {
LOG(debug) << "Zeroing free memory of region " << fId << "...";
memset(fBuffer, 0x00, fSize);
LOG(debug) << "Successfully zeroed free memory of region " << fId << ".";
}
}
UnmanagedRegion(const UnmanagedRegion&) = delete;
UnmanagedRegion operator=(const UnmanagedRegion&) = delete;
@@ -51,6 +67,8 @@ class UnmanagedRegion final : public fair::mq::UnmanagedRegion
void SetLinger(uint32_t /* linger */) override { LOG(debug) << "ZeroMQ UnmanagedRegion linger option not implemented. Acknowledgements are local."; }
uint32_t GetLinger() const override { LOG(debug) << "ZeroMQ UnmanagedRegion linger option not implemented. Acknowledgements are local."; return 0; }
Transport GetType() const override { return Transport::ZMQ; }
virtual ~UnmanagedRegion()
{
LOG(debug) << "destroying region " << fId;

View File

@@ -153,9 +153,11 @@ TEST(ErrorState, interactive_InReset)
EXPECT_EXIT(RunErrorStateIn("Reset", "interactive", "q"), ::testing::ExitedWithCode(1), "");
}
#ifdef FAIRMQ_DEBUG_MODE
TEST(ErrorState, OrphanMessages)
{
BadDevice badDevice;
}
#endif
} // namespace

View File

@@ -23,6 +23,67 @@ namespace
using namespace std;
void RegionsCache(const string& transport, const string& address)
{
size_t session1 = fair::mq::tools::UuidHash();
size_t session2 = fair::mq::tools::UuidHash();
fair::mq::ProgOptions config1;
fair::mq::ProgOptions config2;
config1.SetProperty<string>("session", to_string(session1));
config2.SetProperty<string>("session", to_string(session2));
auto factory1 = FairMQTransportFactory::CreateTransportFactory(transport, fair::mq::tools::Uuid(), &config1);
auto factory2 = FairMQTransportFactory::CreateTransportFactory(transport, fair::mq::tools::Uuid(), &config2);
auto region1 = factory1->CreateUnmanagedRegion(1000000, [](void*, size_t, void*) {});
auto region2 = factory2->CreateUnmanagedRegion(1000000, [](void*, size_t, void*) {});
void* r1ptr = region1->GetData();
void* r2ptr = region2->GetData();
FairMQChannel push1("Push1", "push", factory1);
FairMQChannel pull1("Pull1", "pull", factory1);
push1.Bind(address + to_string(1));
pull1.Connect(address + to_string(1));
FairMQChannel push2("Push2", "push", factory2);
FairMQChannel pull2("Pull2", "pull", factory2);
push2.Bind(address + to_string(2));
pull2.Connect(address + to_string(2));
{
static_cast<char*>(r1ptr)[0] = 97; // a
static_cast<char*>(static_cast<char*>(r1ptr) + 100)[0] = 98; // b
static_cast<char*>(r2ptr)[0] = 99; // c
static_cast<char*>(static_cast<char*>(r2ptr) + 100)[0] = 100; // d
FairMQMessagePtr m1(push1.NewMessage(region1, r1ptr, 100, nullptr));
FairMQMessagePtr m2(push1.NewMessage(region1, static_cast<char*>(r1ptr) + 100, 100, nullptr));
push1.Send(m1);
push1.Send(m2);
FairMQMessagePtr m3(push2.NewMessage(region2, r2ptr, 100, nullptr));
FairMQMessagePtr m4(push2.NewMessage(region2, static_cast<char*>(r2ptr) + 100, 100, nullptr));
push2.Send(m3);
push2.Send(m4);
}
{
FairMQMessagePtr m1(pull1.NewMessage());
FairMQMessagePtr m2(pull1.NewMessage());
ASSERT_EQ(pull1.Receive(m1), 100);
ASSERT_EQ(pull1.Receive(m2), 100);
ASSERT_EQ(static_cast<char*>(m1->GetData())[0], 'a');
ASSERT_EQ(static_cast<char*>(m2->GetData())[0], 'b');
FairMQMessagePtr m3(pull2.NewMessage());
FairMQMessagePtr m4(pull2.NewMessage());
ASSERT_EQ(pull2.Receive(m3), 100);
ASSERT_EQ(pull2.Receive(m4), 100);
ASSERT_EQ(static_cast<char*>(m3->GetData())[0], 'c');
ASSERT_EQ(static_cast<char*>(m4->GetData())[0], 'd');
}
}
void RegionEventSubscriptions(const string& transport)
{
size_t session{fair::mq::tools::UuidHash()};
@@ -160,6 +221,16 @@ void RegionCallbacks(const string& transport, const string& _address)
LOG(info) << "2 done.";
}
TEST(Cache, zeromq)
{
RegionsCache("zeromq", "ipc://test_region_cache");
}
TEST(Cache, shmem)
{
RegionsCache("shmem", "ipc://test_region_cache");
}
TEST(EventSubscriptions, zeromq)
{
RegionEventSubscriptions("zeromq");