mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-16 10:01:47 +00:00
Compare commits
8 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
fa0bf96eb2 | ||
|
29827f0426 | ||
|
8efe7adf0e | ||
|
b747a8787c | ||
|
1a75141fc4 | ||
|
2f82eb4f09 | ||
|
92a56c26bc | ||
|
4f9aeda8ec |
@@ -133,6 +133,7 @@ struct RegionConfig
|
|||||||
bool removeOnDestruction = true; /// remove the region on object destruction
|
bool removeOnDestruction = true; /// remove the region on object destruction
|
||||||
int creationFlags = 0; /// flags passed to the underlying transport on region creation
|
int creationFlags = 0; /// flags passed to the underlying transport on region creation
|
||||||
int64_t userFlags = 0; /// custom flags that have no effect on the transport, but can be retrieved from the region by the user
|
int64_t userFlags = 0; /// custom flags that have no effect on the transport, but can be retrieved from the region by the user
|
||||||
|
uint64_t size = 0; /// region size
|
||||||
std::string path = ""; /// file path, if the region is backed by a file
|
std::string path = ""; /// file path, if the region is backed by a file
|
||||||
std::optional<uint16_t> id = std::nullopt; /// region id
|
std::optional<uint16_t> id = std::nullopt; /// region id
|
||||||
uint32_t linger = 100; /// delay in ms before region destruction to collect outstanding events
|
uint32_t linger = 100; /// delay in ms before region destruction to collect outstanding events
|
||||||
|
@@ -28,6 +28,8 @@
|
|||||||
namespace fair::mq::shmem
|
namespace fair::mq::shmem
|
||||||
{
|
{
|
||||||
|
|
||||||
|
static constexpr uint64_t kManagementSegmentSize = 6553600;
|
||||||
|
|
||||||
struct SharedMemoryError : std::runtime_error { using std::runtime_error::runtime_error; };
|
struct SharedMemoryError : std::runtime_error { using std::runtime_error::runtime_error; };
|
||||||
|
|
||||||
using SimpleSeqFitSegment = boost::interprocess::basic_managed_shared_memory<char,
|
using SimpleSeqFitSegment = boost::interprocess::basic_managed_shared_memory<char,
|
||||||
@@ -58,19 +60,22 @@ struct RegionInfo
|
|||||||
: fPath("", alloc)
|
: fPath("", alloc)
|
||||||
, fCreationFlags(0)
|
, fCreationFlags(0)
|
||||||
, fUserFlags(0)
|
, fUserFlags(0)
|
||||||
|
, fSize(0)
|
||||||
, fDestroyed(false)
|
, fDestroyed(false)
|
||||||
{}
|
{}
|
||||||
|
|
||||||
RegionInfo(const char* path, const int flags, const uint64_t userFlags, const VoidAlloc& alloc)
|
RegionInfo(const char* path, int flags, uint64_t userFlags, uint64_t size, const VoidAlloc& alloc)
|
||||||
: fPath(path, alloc)
|
: fPath(path, alloc)
|
||||||
, fCreationFlags(flags)
|
, fCreationFlags(flags)
|
||||||
, fUserFlags(userFlags)
|
, fUserFlags(userFlags)
|
||||||
|
, fSize(size)
|
||||||
, fDestroyed(false)
|
, fDestroyed(false)
|
||||||
{}
|
{}
|
||||||
|
|
||||||
Str fPath;
|
Str fPath;
|
||||||
int fCreationFlags;
|
int fCreationFlags;
|
||||||
uint64_t fUserFlags;
|
uint64_t fUserFlags;
|
||||||
|
uint64_t fSize;
|
||||||
bool fDestroyed;
|
bool fDestroyed;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@@ -132,7 +132,7 @@ class Manager
|
|||||||
: fShmId64(config ? config->GetProperty<uint64_t>("shmid", makeShmIdUint64(sessionName)) : makeShmIdUint64(sessionName))
|
: fShmId64(config ? config->GetProperty<uint64_t>("shmid", makeShmIdUint64(sessionName)) : makeShmIdUint64(sessionName))
|
||||||
, fShmId(makeShmIdStr(fShmId64))
|
, fShmId(makeShmIdStr(fShmId64))
|
||||||
, fSegmentId(config ? config->GetProperty<uint16_t>("shm-segment-id", 0) : 0)
|
, fSegmentId(config ? config->GetProperty<uint16_t>("shm-segment-id", 0) : 0)
|
||||||
, fManagementSegment(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_mng").c_str(), 6553600)
|
, fManagementSegment(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_mng").c_str(), kManagementSegmentSize)
|
||||||
, fShmVoidAlloc(fManagementSegment.get_segment_manager())
|
, fShmVoidAlloc(fManagementSegment.get_segment_manager())
|
||||||
, fShmMtx(fManagementSegment.find_or_construct<boost::interprocess::interprocess_mutex>(boost::interprocess::unique_instance)())
|
, fShmMtx(fManagementSegment.find_or_construct<boost::interprocess::interprocess_mutex>(boost::interprocess::unique_instance)())
|
||||||
, fNumObservedEvents(0)
|
, fNumObservedEvents(0)
|
||||||
@@ -228,6 +228,8 @@ class Manager
|
|||||||
fShmSegments = fManagementSegment.find_or_construct<Uint16SegmentInfoHashMap>(unique_instance)(fShmVoidAlloc);
|
fShmSegments = fManagementSegment.find_or_construct<Uint16SegmentInfoHashMap>(unique_instance)(fShmVoidAlloc);
|
||||||
fShmRegions = fManagementSegment.find_or_construct<Uint16RegionInfoHashMap>(unique_instance)(fShmVoidAlloc);
|
fShmRegions = fManagementSegment.find_or_construct<Uint16RegionInfoHashMap>(unique_instance)(fShmVoidAlloc);
|
||||||
|
|
||||||
|
bool createdSegment = false;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
std::string segmentName("fmq_" + fShmId + "_m_" + std::to_string(fSegmentId));
|
std::string segmentName("fmq_" + fShmId + "_m_" + std::to_string(fSegmentId));
|
||||||
auto it = fShmSegments->find(fSegmentId);
|
auto it = fShmSegments->find(fSegmentId);
|
||||||
@@ -246,6 +248,7 @@ class Manager
|
|||||||
if (zeroSegmentOnCreation) {
|
if (zeroSegmentOnCreation) {
|
||||||
ZeroSegment(fSegmentId);
|
ZeroSegment(fSegmentId);
|
||||||
}
|
}
|
||||||
|
createdSegment = true;
|
||||||
} else {
|
} else {
|
||||||
// found segment with the given id, opening
|
// found segment with the given id, opening
|
||||||
if (it->second.fAllocationAlgorithm == AllocationAlgorithm::rbtree_best_fit) {
|
if (it->second.fAllocationAlgorithm == AllocationAlgorithm::rbtree_best_fit) {
|
||||||
@@ -278,7 +281,9 @@ class Manager
|
|||||||
ZeroSegment(fSegmentId);
|
ZeroSegment(fSegmentId);
|
||||||
}
|
}
|
||||||
|
|
||||||
(fEventCounter->fCount)++;
|
if (createdSegment) {
|
||||||
|
(fEventCounter->fCount)++;
|
||||||
|
}
|
||||||
|
|
||||||
#ifdef FAIRMQ_DEBUG_MODE
|
#ifdef FAIRMQ_DEBUG_MODE
|
||||||
fMsgDebug = fManagementSegment.find_or_construct<Uint16MsgDebugMapHashMap>(unique_instance)(fShmVoidAlloc);
|
fMsgDebug = fManagementSegment.find_or_construct<Uint16MsgDebugMapHashMap>(unique_instance)(fShmVoidAlloc);
|
||||||
@@ -360,7 +365,7 @@ class Manager
|
|||||||
}
|
}
|
||||||
bool Interrupted() { return fInterrupted.load(); }
|
bool Interrupted() { return fInterrupted.load(); }
|
||||||
|
|
||||||
std::pair<UnmanagedRegion*, uint16_t> CreateRegion(const size_t size,
|
std::pair<UnmanagedRegion*, uint16_t> CreateRegion(size_t size,
|
||||||
RegionCallback callback,
|
RegionCallback callback,
|
||||||
RegionBulkCallback bulkCallback,
|
RegionBulkCallback bulkCallback,
|
||||||
RegionConfig cfg)
|
RegionConfig cfg)
|
||||||
@@ -370,7 +375,7 @@ class Manager
|
|||||||
std::pair<UnmanagedRegion*, uint16_t> result;
|
std::pair<UnmanagedRegion*, uint16_t> result;
|
||||||
|
|
||||||
{
|
{
|
||||||
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> lock(*fShmMtx);
|
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> shmLock(*fShmMtx);
|
||||||
|
|
||||||
if (!cfg.id.has_value()) {
|
if (!cfg.id.has_value()) {
|
||||||
RegionCounter* rc = fManagementSegment.find<RegionCounter>(unique_instance).first;
|
RegionCounter* rc = fManagementSegment.find<RegionCounter>(unique_instance).first;
|
||||||
@@ -390,23 +395,28 @@ class Manager
|
|||||||
|
|
||||||
const uint16_t id = cfg.id.value();
|
const uint16_t id = cfg.id.value();
|
||||||
|
|
||||||
auto res = fRegions.emplace(id, std::make_unique<UnmanagedRegion>(fShmId, size, false, cfg));
|
UnmanagedRegion* region = nullptr;
|
||||||
bool newRegionCreated = res.second;
|
bool newRegionCreated = false;
|
||||||
UnmanagedRegion& region = *(res.first->second);
|
{
|
||||||
|
std::lock_guard<std::mutex> lock(fLocalRegionsMtx);
|
||||||
|
auto res = fRegions.emplace(id, std::make_unique<UnmanagedRegion>(fShmId, size, false, cfg));
|
||||||
|
newRegionCreated = res.second;
|
||||||
|
region = res.first->second.get();
|
||||||
|
}
|
||||||
// LOG(debug) << "Created region with id '" << id << "', path: '" << cfg.path << "', flags: '" << cfg.creationFlags << "'";
|
// LOG(debug) << "Created region with id '" << id << "', path: '" << cfg.path << "', flags: '" << cfg.creationFlags << "'";
|
||||||
|
|
||||||
if (!newRegionCreated) {
|
if (!newRegionCreated) {
|
||||||
region.fRemote = false; // TODO: this should be more clear, refactor it.
|
region->fRemote = false; // TODO: this should be more clear, refactor it.
|
||||||
}
|
}
|
||||||
|
|
||||||
// start ack receiver only if a callback has been provided.
|
// start ack receiver only if a callback has been provided.
|
||||||
if (callback || bulkCallback) {
|
if (callback || bulkCallback) {
|
||||||
region.SetCallbacks(callback, bulkCallback);
|
region->SetCallbacks(callback, bulkCallback);
|
||||||
region.InitializeQueues();
|
region->InitializeQueues();
|
||||||
region.StartAckSender();
|
region->StartAckSender();
|
||||||
region.StartAckReceiver();
|
region->StartAckReceiver();
|
||||||
}
|
}
|
||||||
result.first = &(region);
|
result.first = region;
|
||||||
result.second = id;
|
result.second = id;
|
||||||
}
|
}
|
||||||
fRegionsGen += 1; // signal TL cache invalidation
|
fRegionsGen += 1; // signal TL cache invalidation
|
||||||
@@ -419,7 +429,7 @@ class Manager
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
UnmanagedRegion* GetRegion(const uint16_t id)
|
UnmanagedRegion* GetRegion(uint16_t id)
|
||||||
{
|
{
|
||||||
// NOTE: gcc optimizations. Prevent loading tls addresses many times in the fast path
|
// NOTE: gcc optimizations. Prevent loading tls addresses many times in the fast path
|
||||||
const auto &lTlCache = fTlRegionCache;
|
const auto &lTlCache = fTlRegionCache;
|
||||||
@@ -439,13 +449,14 @@ class Manager
|
|||||||
fTlRegionCache.fRegionsTLCache.clear();
|
fTlRegionCache.fRegionsTLCache.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
std::lock_guard<std::mutex> lock(fLocalRegionsMtx);
|
||||||
auto* lRegion = GetRegionUnsafe(id, shmLock);
|
auto* lRegion = GetRegionUnsafe(id, shmLock);
|
||||||
fTlRegionCache.fRegionsTLCache.emplace_back(std::make_tuple(lRegion, id, fShmId64));
|
fTlRegionCache.fRegionsTLCache.emplace_back(std::make_tuple(lRegion, id, fShmId64));
|
||||||
fTlRegionCache.fRegionsTLCacheGen = fRegionsGen;
|
fTlRegionCache.fRegionsTLCacheGen = fRegionsGen;
|
||||||
return lRegion;
|
return lRegion;
|
||||||
}
|
}
|
||||||
|
|
||||||
UnmanagedRegion* GetRegionUnsafe(const uint16_t id, boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex>& lockedShmLock)
|
UnmanagedRegion* GetRegionUnsafe(uint16_t id, boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex>& lockedShmLock)
|
||||||
{
|
{
|
||||||
// remote region could actually be a local one if a message originates from this device (has been sent out and returned)
|
// remote region could actually be a local one if a message originates from this device (has been sent out and returned)
|
||||||
auto it = fRegions.find(id);
|
auto it = fRegions.find(id);
|
||||||
@@ -479,12 +490,13 @@ class Manager
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void RemoveRegion(const uint16_t id)
|
void RemoveRegion(uint16_t id)
|
||||||
{
|
{
|
||||||
try {
|
try {
|
||||||
|
std::lock_guard<std::mutex> lock(fLocalRegionsMtx);
|
||||||
fRegions.at(id)->StopAcks();
|
fRegions.at(id)->StopAcks();
|
||||||
{
|
{
|
||||||
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> lock(*fShmMtx);
|
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> shmLock(*fShmMtx);
|
||||||
if (fRegions.at(id)->RemoveOnDestruction()) {
|
if (fRegions.at(id)->RemoveOnDestruction()) {
|
||||||
fShmRegions->at(id).fDestroyed = true;
|
fShmRegions->at(id).fDestroyed = true;
|
||||||
(fEventCounter->fCount)++;
|
(fEventCounter->fCount)++;
|
||||||
@@ -800,6 +812,7 @@ class Manager
|
|||||||
VoidAlloc fShmVoidAlloc;
|
VoidAlloc fShmVoidAlloc;
|
||||||
boost::interprocess::interprocess_mutex* fShmMtx;
|
boost::interprocess::interprocess_mutex* fShmMtx;
|
||||||
|
|
||||||
|
std::mutex fLocalRegionsMtx;
|
||||||
std::mutex fRegionEventsMtx;
|
std::mutex fRegionEventsMtx;
|
||||||
std::condition_variable fRegionEventsCV;
|
std::condition_variable fRegionEventsCV;
|
||||||
std::thread fRegionEventThread;
|
std::thread fRegionEventThread;
|
||||||
|
@@ -6,9 +6,10 @@
|
|||||||
* copied verbatim in the file "LICENSE" *
|
* copied verbatim in the file "LICENSE" *
|
||||||
********************************************************************************/
|
********************************************************************************/
|
||||||
|
|
||||||
#include "Monitor.h"
|
|
||||||
#include "Common.h"
|
#include "Common.h"
|
||||||
#include "UnmanagedRegion.h"
|
#include "Monitor.h"
|
||||||
|
#include "Segment.h"
|
||||||
|
#include <fairmq/shmem/UnmanagedRegion.h>
|
||||||
|
|
||||||
#include <fairmq/tools/IO.h>
|
#include <fairmq/tools/IO.h>
|
||||||
#include <fairmq/tools/Strings.h>
|
#include <fairmq/tools/Strings.h>
|
||||||
@@ -267,13 +268,14 @@ bool Monitor::PrintShm(const ShmId& shmId)
|
|||||||
ss << "\n unmanaged regions:";
|
ss << "\n unmanaged regions:";
|
||||||
for (const auto& r : *shmRegions) {
|
for (const auto& r : *shmRegions) {
|
||||||
ss << "\n [" << r.first << "]: " << (r.second.fDestroyed ? "destroyed" : "alive");
|
ss << "\n [" << r.first << "]: " << (r.second.fDestroyed ? "destroyed" : "alive");
|
||||||
|
ss << ", size: " << r.second.fSize;
|
||||||
|
|
||||||
try {
|
// try {
|
||||||
boost::interprocess::message_queue q(open_only, std::string("fmq_" + std::string(shmId) + "_rgq_" + to_string(r.first)).c_str());
|
// boost::interprocess::message_queue q(open_only, std::string("fmq_" + std::string(shmId) + "_rgq_" + to_string(r.first)).c_str());
|
||||||
ss << ", ack queue: " << q.get_num_msg() << " messages";
|
// ss << ", ack queue: " << q.get_num_msg() << " messages";
|
||||||
} catch (bie&) {
|
// } catch (bie&) {
|
||||||
ss << ", ack queue: not found";
|
// ss << ", ack queue: not found";
|
||||||
}
|
// }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
LOGV(info, user1) << ss.str();
|
LOGV(info, user1) << ss.str();
|
||||||
@@ -414,24 +416,28 @@ void Monitor::PrintDebugInfo(const ShmId& shmId __attribute__((unused)))
|
|||||||
|
|
||||||
size_t numMessages = 0;
|
size_t numMessages = 0;
|
||||||
|
|
||||||
for (const auto& e : *debug) {
|
if (debug) {
|
||||||
numMessages += e.second.size();
|
for (const auto& e : *debug) {
|
||||||
}
|
numMessages += e.second.size();
|
||||||
LOG(info) << endl << "found " << numMessages << " messages.";
|
|
||||||
|
|
||||||
for (const auto& s : *debug) {
|
|
||||||
for (const auto& e : s.second) {
|
|
||||||
using time_point = chrono::system_clock::time_point;
|
|
||||||
time_point tmpt{chrono::duration_cast<time_point::duration>(chrono::nanoseconds(e.second.fCreationTime))};
|
|
||||||
time_t t = chrono::system_clock::to_time_t(tmpt);
|
|
||||||
uint64_t ms = e.second.fCreationTime % 1000000;
|
|
||||||
auto tm = localtime(&t);
|
|
||||||
LOG(info) << "segment: " << setw(3) << setfill(' ') << s.first
|
|
||||||
<< ", offset: " << setw(12) << setfill(' ') << e.first
|
|
||||||
<< ", size: " << setw(10) << setfill(' ') << e.second.fSize
|
|
||||||
<< ", creator PID: " << e.second.fPid << setfill('0')
|
|
||||||
<< ", at: " << setw(2) << tm->tm_hour << ":" << setw(2) << tm->tm_min << ":" << setw(2) << tm->tm_sec << "." << setw(6) << ms;
|
|
||||||
}
|
}
|
||||||
|
LOG(info) << endl << "found " << numMessages << " messages.";
|
||||||
|
|
||||||
|
for (const auto& s : *debug) {
|
||||||
|
for (const auto& e : s.second) {
|
||||||
|
using time_point = chrono::system_clock::time_point;
|
||||||
|
time_point tmpt{chrono::duration_cast<time_point::duration>(chrono::nanoseconds(e.second.fCreationTime))};
|
||||||
|
time_t t = chrono::system_clock::to_time_t(tmpt);
|
||||||
|
uint64_t ms = e.second.fCreationTime % 1000000;
|
||||||
|
auto tm = localtime(&t);
|
||||||
|
LOG(info) << "segment: " << setw(3) << setfill(' ') << s.first
|
||||||
|
<< ", offset: " << setw(12) << setfill(' ') << e.first
|
||||||
|
<< ", size: " << setw(10) << setfill(' ') << e.second.fSize
|
||||||
|
<< ", creator PID: " << e.second.fPid << setfill('0')
|
||||||
|
<< ", at: " << setw(2) << tm->tm_hour << ":" << setw(2) << tm->tm_min << ":" << setw(2) << tm->tm_sec << "." << setw(6) << ms;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
LOG(info) << "no debug data found";
|
||||||
}
|
}
|
||||||
} catch (bie&) {
|
} catch (bie&) {
|
||||||
LOG(info) << "no segments found";
|
LOG(info) << "no segments found";
|
||||||
@@ -462,11 +468,16 @@ unordered_map<uint16_t, std::vector<BufferDebugInfo>> Monitor::GetDebugInfo(cons
|
|||||||
|
|
||||||
result.reserve(debug->size());
|
result.reserve(debug->size());
|
||||||
|
|
||||||
for (const auto& s : *debug) {
|
|
||||||
result[s.first].reserve(s.second.size());
|
if (debug) {
|
||||||
for (const auto& e : s.second) {
|
for (const auto& s : *debug) {
|
||||||
result[s.first][e.first] = BufferDebugInfo(e.first, e.second.fPid, e.second.fSize, e.second.fCreationTime);
|
result[s.first].reserve(s.second.size());
|
||||||
|
for (const auto& e : s.second) {
|
||||||
|
result[s.first][e.first] = BufferDebugInfo(e.first, e.second.fPid, e.second.fSize, e.second.fCreationTime);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
LOG(info) << "no debug data found";
|
||||||
}
|
}
|
||||||
} catch (bie&) {
|
} catch (bie&) {
|
||||||
LOG(info) << "no segments found";
|
LOG(info) << "no segments found";
|
||||||
@@ -552,15 +563,16 @@ std::pair<std::string, bool> Remove(const std::string& name, bool verbose)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
std::vector<std::pair<std::string, bool>> Monitor::Cleanup(const ShmId& shmId, bool verbose /* = true */)
|
std::vector<std::pair<std::string, bool>> Monitor::Cleanup(const ShmId& shmIdT, bool verbose /* = true */)
|
||||||
{
|
{
|
||||||
|
std::string shmId = shmIdT.shmId;
|
||||||
std::vector<std::pair<std::string, bool>> result;
|
std::vector<std::pair<std::string, bool>> result;
|
||||||
|
|
||||||
if (verbose) {
|
if (verbose) {
|
||||||
LOG(info) << "Cleaning up for shared memory id '" << shmId.shmId << "'...";
|
LOG(info) << "Cleaning up for shared memory id '" << shmId << "'...";
|
||||||
}
|
}
|
||||||
|
|
||||||
string managementSegmentName("fmq_" + shmId.shmId + "_mng");
|
string managementSegmentName("fmq_" + shmId + "_mng");
|
||||||
try {
|
try {
|
||||||
bipc::managed_shared_memory managementSegment(bipc::open_only, managementSegmentName.c_str());
|
bipc::managed_shared_memory managementSegment(bipc::open_only, managementSegmentName.c_str());
|
||||||
|
|
||||||
@@ -578,22 +590,21 @@ std::vector<std::pair<std::string, bool>> Monitor::Cleanup(const ShmId& shmId, b
|
|||||||
LOG(info) << "Found RegionInfo with path: '" << path << "', flags: " << flags << ", fDestroyed: " << info.fDestroyed << ".";
|
LOG(info) << "Found RegionInfo with path: '" << path << "', flags: " << flags << ", fDestroyed: " << info.fDestroyed << ".";
|
||||||
}
|
}
|
||||||
if (!path.empty()) {
|
if (!path.empty()) {
|
||||||
result.emplace_back(Remove<bipc::file_mapping>(path + "fmq_" + shmId.shmId + "_rg_" + to_string(id), verbose));
|
result.emplace_back(Remove<bipc::file_mapping>(path + "fmq_" + shmId + "_rg_" + to_string(id), verbose));
|
||||||
} else {
|
} else {
|
||||||
result.emplace_back(Remove<bipc::shared_memory_object>("fmq_" + shmId.shmId + "_rg_" + to_string(id), verbose));
|
result.emplace_back(Remove<bipc::shared_memory_object>("fmq_" + shmId + "_rg_" + to_string(id), verbose));
|
||||||
}
|
}
|
||||||
result.emplace_back(Remove<bipc::message_queue>("fmq_" + shmId.shmId + "_rgq_" + to_string(id), verbose));
|
result.emplace_back(Remove<bipc::message_queue>("fmq_" + shmId + "_rgq_" + to_string(id), verbose));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Uint16SegmentInfoHashMap* shmSegments = managementSegment.find<Uint16SegmentInfoHashMap>(bipc::unique_instance).first;
|
Uint16SegmentInfoHashMap* shmSegments = managementSegment.find<Uint16SegmentInfoHashMap>(bipc::unique_instance).first;
|
||||||
|
|
||||||
if (shmSegments) {
|
if (shmSegments) {
|
||||||
if (verbose) {
|
if (verbose) {
|
||||||
LOG(info) << "Found " << shmSegments->size() << " managed segments...";
|
LOG(info) << "Found " << shmSegments->size() << " managed segments...";
|
||||||
}
|
}
|
||||||
for (const auto& segment : *shmSegments) {
|
for (const auto& segment : *shmSegments) {
|
||||||
result.emplace_back(Remove<bipc::shared_memory_object>("fmq_" + shmId.shmId + "_m_" + to_string(segment.first), verbose));
|
result.emplace_back(Remove<bipc::shared_memory_object>("fmq_" + shmId + "_m_" + to_string(segment.first), verbose));
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (verbose) {
|
if (verbose) {
|
||||||
@@ -636,41 +647,49 @@ std::vector<std::pair<std::string, bool>> Monitor::CleanupFull(const SessionId&
|
|||||||
return CleanupFull(shmId, verbose);
|
return CleanupFull(shmId, verbose);
|
||||||
}
|
}
|
||||||
|
|
||||||
void Monitor::ResetContent(const ShmId& shmId, bool verbose /* = true */)
|
void Monitor::ResetContent(const ShmId& shmIdT, bool verbose /* = true */)
|
||||||
{
|
{
|
||||||
|
std::string shmId = shmIdT.shmId;
|
||||||
if (verbose) {
|
if (verbose) {
|
||||||
cout << "Resetting segments content for shared memory id '" << shmId.shmId << "'..." << endl;
|
cout << "Resetting segments content for shared memory id '" << shmId << "'..." << endl;
|
||||||
}
|
}
|
||||||
|
|
||||||
string managementSegmentName("fmq_" + shmId.shmId + "_mng");
|
string managementSegmentName("fmq_" + shmId + "_mng");
|
||||||
try {
|
try {
|
||||||
using namespace boost::interprocess;
|
using namespace boost::interprocess;
|
||||||
managed_shared_memory managementSegment(open_only, managementSegmentName.c_str());
|
managed_shared_memory managementSegment(open_only, managementSegmentName.c_str());
|
||||||
|
|
||||||
Uint16SegmentInfoHashMap* segmentInfos = managementSegment.find<Uint16SegmentInfoHashMap>(unique_instance).first;
|
Uint16SegmentInfoHashMap* segmentInfos = managementSegment.find<Uint16SegmentInfoHashMap>(unique_instance).first;
|
||||||
|
|
||||||
for (const auto& s : *segmentInfos) {
|
for (const auto& s : *segmentInfos) {
|
||||||
if (verbose) {
|
if (verbose) {
|
||||||
cout << "Resetting content of segment '" << "fmq_" << shmId.shmId << "_m_" << s.first << "'..." << endl;
|
cout << "Resetting content of segment '" << "fmq_" << shmId << "_m_" << s.first << "'..." << endl;
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
if (s.second.fAllocationAlgorithm == AllocationAlgorithm::rbtree_best_fit) {
|
if (s.second.fAllocationAlgorithm == AllocationAlgorithm::rbtree_best_fit) {
|
||||||
RBTreeBestFitSegment segment(open_only, std::string("fmq_" + shmId.shmId + "_m_" + to_string(s.first)).c_str());
|
RBTreeBestFitSegment segment(open_only, std::string("fmq_" + shmId + "_m_" + to_string(s.first)).c_str());
|
||||||
void* ptr = segment.get_segment_manager();
|
void* ptr = segment.get_segment_manager();
|
||||||
size_t size = segment.get_segment_manager()->get_size();
|
size_t size = segment.get_segment_manager()->get_size();
|
||||||
new(ptr) segment_manager<char, rbtree_best_fit<mutex_family, offset_ptr<void>>, null_index>(size);
|
new(ptr) segment_manager<char, rbtree_best_fit<mutex_family, offset_ptr<void>>, null_index>(size);
|
||||||
} else {
|
} else {
|
||||||
SimpleSeqFitSegment segment(open_only, std::string("fmq_" + shmId.shmId + "_m_" + to_string(s.first)).c_str());
|
SimpleSeqFitSegment segment(open_only, std::string("fmq_" + shmId + "_m_" + to_string(s.first)).c_str());
|
||||||
void* ptr = segment.get_segment_manager();
|
void* ptr = segment.get_segment_manager();
|
||||||
size_t size = segment.get_segment_manager()->get_size();
|
size_t size = segment.get_segment_manager()->get_size();
|
||||||
new(ptr) segment_manager<char, simple_seq_fit<mutex_family, offset_ptr<void>>, null_index>(size);
|
new(ptr) segment_manager<char, simple_seq_fit<mutex_family, offset_ptr<void>>, null_index>(size);
|
||||||
}
|
}
|
||||||
} catch (bie& e) {
|
} catch (bie& e) {
|
||||||
if (verbose) {
|
if (verbose) {
|
||||||
cout << "Error resetting content of segment '" << std::string("fmq_" + shmId.shmId + "_m_" + to_string(s.first)) << "': " << e.what() << endl;
|
cout << "Error resetting content of segment '" << std::string("fmq_" + shmId + "_m_" + to_string(s.first)) << "': " << e.what() << endl;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Uint16RegionInfoHashMap* shmRegions = managementSegment.find<Uint16RegionInfoHashMap>(bipc::unique_instance).first;
|
||||||
|
if (shmRegions) {
|
||||||
|
for (const auto& region : *shmRegions) {
|
||||||
|
uint16_t id = region.first;
|
||||||
|
Remove<bipc::message_queue>("fmq_" + shmId + "_rgq_" + to_string(id), verbose);
|
||||||
|
}
|
||||||
|
}
|
||||||
} catch (bie& e) {
|
} catch (bie& e) {
|
||||||
if (verbose) {
|
if (verbose) {
|
||||||
cout << "Could not find '" << managementSegmentName << "' segment. Nothing to cleanup." << endl;
|
cout << "Could not find '" << managementSegmentName << "' segment. Nothing to cleanup." << endl;
|
||||||
@@ -679,7 +698,7 @@ void Monitor::ResetContent(const ShmId& shmId, bool verbose /* = true */)
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (verbose) {
|
if (verbose) {
|
||||||
cout << "Done resetting segment content for shared memory id '" << shmId.shmId << "'." << endl;
|
cout << "Done resetting segment content for shared memory id '" << shmId << "'." << endl;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -692,6 +711,43 @@ void Monitor::ResetContent(const SessionId& sessionId, bool verbose /* = true */
|
|||||||
ResetContent(shmId, verbose);
|
ResetContent(shmId, verbose);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void Monitor::ResetContent(const ShmId& shmIdT, const std::vector<SegmentConfig>& segmentCfgs, const std::vector<RegionConfig>& regionCfgs, bool verbose /* = true */)
|
||||||
|
{
|
||||||
|
using namespace boost::interprocess;
|
||||||
|
|
||||||
|
std::string shmId = shmIdT.shmId;
|
||||||
|
std::string managementSegmentName("fmq_" + shmId + "_mng");
|
||||||
|
// reset managed segments
|
||||||
|
ResetContent(shmIdT, verbose);
|
||||||
|
// delete management segment
|
||||||
|
Remove<bipc::shared_memory_object>(managementSegmentName, verbose);
|
||||||
|
// recreate management segment
|
||||||
|
managed_shared_memory mngSegment(create_only, managementSegmentName.c_str(), kManagementSegmentSize);
|
||||||
|
// fill management segment with segment & region infos
|
||||||
|
for (const auto& s : segmentCfgs) {
|
||||||
|
if (s.allocationAlgorithm == "rbtree_best_fit") {
|
||||||
|
Segment::Register(shmId, s.id, AllocationAlgorithm::rbtree_best_fit);
|
||||||
|
} else if (s.allocationAlgorithm == "simple_seq_fit") {
|
||||||
|
Segment::Register(shmId, s.id, AllocationAlgorithm::simple_seq_fit);
|
||||||
|
} else {
|
||||||
|
LOG(error) << "Unknown allocation algorithm provided: " << s.allocationAlgorithm;
|
||||||
|
throw MonitorError("Unknown allocation algorithm provided: " + s.allocationAlgorithm);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for (const auto& r : regionCfgs) {
|
||||||
|
fair::mq::shmem::UnmanagedRegion::Register(shmId, r);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void Monitor::ResetContent(const SessionId& sessionId, const std::vector<SegmentConfig>& segmentCfgs, const std::vector<RegionConfig>& regionCfgs, bool verbose /* = true */)
|
||||||
|
{
|
||||||
|
ShmId shmId{makeShmIdStr(sessionId.sessionId)};
|
||||||
|
if (verbose) {
|
||||||
|
cout << "ResetContent called with session id '" << sessionId.sessionId << "', translating to shared memory id '" << shmId.shmId << "'" << endl;
|
||||||
|
}
|
||||||
|
ResetContent(shmId, segmentCfgs, regionCfgs, verbose);
|
||||||
|
}
|
||||||
|
|
||||||
Monitor::~Monitor()
|
Monitor::~Monitor()
|
||||||
{
|
{
|
||||||
if (fSignalThread.joinable()) {
|
if (fSignalThread.joinable()) {
|
||||||
|
@@ -8,6 +8,8 @@
|
|||||||
#ifndef FAIR_MQ_SHMEM_MONITOR_H_
|
#ifndef FAIR_MQ_SHMEM_MONITOR_H_
|
||||||
#define FAIR_MQ_SHMEM_MONITOR_H_
|
#define FAIR_MQ_SHMEM_MONITOR_H_
|
||||||
|
|
||||||
|
#include <fairmq/UnmanagedRegion.h>
|
||||||
|
|
||||||
#include <fairlogger/Logger.h>
|
#include <fairlogger/Logger.h>
|
||||||
|
|
||||||
#include <thread>
|
#include <thread>
|
||||||
@@ -49,6 +51,13 @@ struct BufferDebugInfo
|
|||||||
uint64_t fCreationTime;
|
uint64_t fCreationTime;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
struct SegmentConfig
|
||||||
|
{
|
||||||
|
uint16_t id;
|
||||||
|
uint64_t size;
|
||||||
|
std::string allocationAlgorithm;
|
||||||
|
};
|
||||||
|
|
||||||
class Monitor
|
class Monitor
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
@@ -88,6 +97,14 @@ class Monitor
|
|||||||
/// @param sessionId session id
|
/// @param sessionId session id
|
||||||
/// Only call this when segment is not in use
|
/// Only call this when segment is not in use
|
||||||
static void ResetContent(const SessionId& sessionId, bool verbose = true);
|
static void ResetContent(const SessionId& sessionId, bool verbose = true);
|
||||||
|
/// @brief [EXPERIMENTAL] cleanup the content of the shem segment, without recreating it
|
||||||
|
/// @param shmId shared memory id
|
||||||
|
/// Only call this when segment is not in use
|
||||||
|
static void ResetContent(const ShmId& shmId, const std::vector<SegmentConfig>& segmentCfgs, const std::vector<RegionConfig>& regionCfgs, bool verbose = true);
|
||||||
|
/// @brief [EXPERIMENTAL] cleanup the content of the shem segment, without recreating it
|
||||||
|
/// @param sessionId session id
|
||||||
|
/// Only call this when segment is not in use
|
||||||
|
static void ResetContent(const SessionId& sessionId, const std::vector<SegmentConfig>& segmentCfgs, const std::vector<RegionConfig>& regionCfgs, bool verbose = true);
|
||||||
|
|
||||||
/// @brief Outputs list of messages in shmem (if compiled with FAIRMQ_DEBUG_MODE=ON)
|
/// @brief Outputs list of messages in shmem (if compiled with FAIRMQ_DEBUG_MODE=ON)
|
||||||
/// @param shmId shmem id
|
/// @param shmId shmem id
|
||||||
|
@@ -26,6 +26,8 @@ static const RBTreeBestFit rbTreeBestFit = RBTreeBestFit();
|
|||||||
|
|
||||||
struct Segment
|
struct Segment
|
||||||
{
|
{
|
||||||
|
friend class Monitor;
|
||||||
|
|
||||||
Segment(const std::string& shmId, uint16_t id, size_t size, SimpleSeqFit)
|
Segment(const std::string& shmId, uint16_t id, size_t size, SimpleSeqFit)
|
||||||
: fSegment(SimpleSeqFitSegment(boost::interprocess::open_or_create,
|
: fSegment(SimpleSeqFitSegment(boost::interprocess::open_or_create,
|
||||||
std::string("fmq_" + shmId + "_m_" + std::to_string(id)).c_str(),
|
std::string("fmq_" + shmId + "_m_" + std::to_string(id)).c_str(),
|
||||||
@@ -66,15 +68,12 @@ struct Segment
|
|||||||
static void Register(const std::string& shmId, uint16_t id, AllocationAlgorithm allocAlgo)
|
static void Register(const std::string& shmId, uint16_t id, AllocationAlgorithm allocAlgo)
|
||||||
{
|
{
|
||||||
using namespace boost::interprocess;
|
using namespace boost::interprocess;
|
||||||
managed_shared_memory mngSegment(open_or_create, std::string("fmq_" + shmId + "_mng").c_str(), 6553600);
|
managed_shared_memory mngSegment(open_or_create, std::string("fmq_" + shmId + "_mng").c_str(), kManagementSegmentSize);
|
||||||
VoidAlloc alloc(mngSegment.get_segment_manager());
|
VoidAlloc alloc(mngSegment.get_segment_manager());
|
||||||
|
|
||||||
Uint16SegmentInfoHashMap* shmSegments = mngSegment.find_or_construct<Uint16SegmentInfoHashMap>(unique_instance)(alloc);
|
Uint16SegmentInfoHashMap* shmSegments = mngSegment.find_or_construct<Uint16SegmentInfoHashMap>(unique_instance)(alloc);
|
||||||
|
|
||||||
EventCounter* eventCounter = mngSegment.find<EventCounter>(unique_instance).first;
|
EventCounter* eventCounter = mngSegment.find_or_construct<EventCounter>(unique_instance)(0);
|
||||||
if (!eventCounter) {
|
|
||||||
eventCounter = mngSegment.construct<EventCounter>(unique_instance)(0);
|
|
||||||
}
|
|
||||||
|
|
||||||
bool newSegmentRegistered = shmSegments->emplace(id, allocAlgo).second;
|
bool newSegmentRegistered = shmSegments->emplace(id, allocAlgo).second;
|
||||||
if (newSegmentRegistered) {
|
if (newSegmentRegistered) {
|
||||||
|
@@ -41,6 +41,7 @@ struct UnmanagedRegion
|
|||||||
{
|
{
|
||||||
friend class Message;
|
friend class Message;
|
||||||
friend class Manager;
|
friend class Manager;
|
||||||
|
friend class Monitor;
|
||||||
|
|
||||||
UnmanagedRegion(const std::string& shmId, uint16_t id, uint64_t size)
|
UnmanagedRegion(const std::string& shmId, uint16_t id, uint64_t size)
|
||||||
: UnmanagedRegion(shmId, size, false, makeRegionConfig(id))
|
: UnmanagedRegion(shmId, size, false, makeRegionConfig(id))
|
||||||
@@ -50,6 +51,10 @@ struct UnmanagedRegion
|
|||||||
: UnmanagedRegion(shmId, size, false, std::move(cfg))
|
: UnmanagedRegion(shmId, size, false, std::move(cfg))
|
||||||
{}
|
{}
|
||||||
|
|
||||||
|
UnmanagedRegion(const std::string& shmId, RegionConfig cfg)
|
||||||
|
: UnmanagedRegion(shmId, cfg.size, false, std::move(cfg))
|
||||||
|
{}
|
||||||
|
|
||||||
UnmanagedRegion(const std::string& shmId, uint64_t size, bool remote, RegionConfig cfg)
|
UnmanagedRegion(const std::string& shmId, uint64_t size, bool remote, RegionConfig cfg)
|
||||||
: fRemote(remote)
|
: fRemote(remote)
|
||||||
, fRemoveOnDestruction(cfg.removeOnDestruction)
|
, fRemoveOnDestruction(cfg.removeOnDestruction)
|
||||||
@@ -66,6 +71,9 @@ struct UnmanagedRegion
|
|||||||
{
|
{
|
||||||
using namespace boost::interprocess;
|
using namespace boost::interprocess;
|
||||||
|
|
||||||
|
// TODO: refactor this
|
||||||
|
cfg.size = size;
|
||||||
|
|
||||||
if (!cfg.path.empty()) {
|
if (!cfg.path.empty()) {
|
||||||
fName = std::string(cfg.path + fName);
|
fName = std::string(cfg.path + fName);
|
||||||
|
|
||||||
@@ -91,16 +99,25 @@ struct UnmanagedRegion
|
|||||||
fRegion = mapped_region(fFileMapping, read_write, 0, size, 0, cfg.creationFlags);
|
fRegion = mapped_region(fFileMapping, read_write, 0, size, 0, cfg.creationFlags);
|
||||||
} else {
|
} else {
|
||||||
try {
|
try {
|
||||||
fShmemObject = shared_memory_object(open_or_create, fName.c_str(), read_write);
|
// if opening fails, create
|
||||||
if (size != 0) {
|
try {
|
||||||
|
fShmemObject = shared_memory_object(open_only, fName.c_str(), read_write);
|
||||||
|
} catch (interprocess_exception& e) {
|
||||||
|
LOG(debug) << "Could not open " << (remote ? "remote" : "local") << " shared_memory_object for region id '" << cfg.id.value() << "': " << e.what() << ", creating...";
|
||||||
|
fShmemObject = shared_memory_object(create_only, fName.c_str(), read_write);
|
||||||
fShmemObject.truncate(size);
|
fShmemObject.truncate(size);
|
||||||
}
|
}
|
||||||
} catch (interprocess_exception& e) {
|
} catch (interprocess_exception& e) {
|
||||||
LOG(error) << "Failed " << (remote ? "opening" : "creating") << " shared_memory_object for region id '" << cfg.id.value() << "': " << e.what();
|
LOG(error) << "Failed " << (remote ? "opening" : "creating") << " shared_memory_object for region id '" << cfg.id.value() << "': " << e.what();
|
||||||
throw;
|
throw;
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
fRegion = mapped_region(fShmemObject, read_write, 0, 0, 0, cfg.creationFlags);
|
fRegion = mapped_region(fShmemObject, read_write, 0, 0, 0, cfg.creationFlags);
|
||||||
|
if (size != 0 && size != fRegion.get_size()) {
|
||||||
|
LOG(error) << "Created/opened region size (" << fRegion.get_size() << ") does not match configured size (" << size << ")";
|
||||||
|
throw TransportError(tools::ToString("Created/opened region size (", fRegion.get_size(), ") does not match configured size (", size, ")"));
|
||||||
|
}
|
||||||
} catch (interprocess_exception& e) {
|
} catch (interprocess_exception& e) {
|
||||||
LOG(error) << "Failed mapping shared_memory_object for region id '" << cfg.id.value() << "': " << e.what();
|
LOG(error) << "Failed mapping shared_memory_object for region id '" << cfg.id.value() << "': " << e.what();
|
||||||
throw;
|
throw;
|
||||||
@@ -223,20 +240,17 @@ struct UnmanagedRegion
|
|||||||
return regionCfg;
|
return regionCfg;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void Register(const std::string& shmId, RegionConfig& cfg)
|
static void Register(const std::string& shmId, const RegionConfig& cfg)
|
||||||
{
|
{
|
||||||
using namespace boost::interprocess;
|
using namespace boost::interprocess;
|
||||||
managed_shared_memory mngSegment(open_or_create, std::string("fmq_" + shmId + "_mng").c_str(), 6553600);
|
managed_shared_memory mngSegment(open_or_create, std::string("fmq_" + shmId + "_mng").c_str(), kManagementSegmentSize);
|
||||||
VoidAlloc alloc(mngSegment.get_segment_manager());
|
VoidAlloc alloc(mngSegment.get_segment_manager());
|
||||||
|
|
||||||
Uint16RegionInfoHashMap* shmRegions = mngSegment.find_or_construct<Uint16RegionInfoHashMap>(unique_instance)(alloc);
|
Uint16RegionInfoHashMap* shmRegions = mngSegment.find_or_construct<Uint16RegionInfoHashMap>(unique_instance)(alloc);
|
||||||
|
|
||||||
EventCounter* eventCounter = mngSegment.find<EventCounter>(unique_instance).first;
|
EventCounter* eventCounter = mngSegment.find_or_construct<EventCounter>(unique_instance)(0);
|
||||||
if (!eventCounter) {
|
|
||||||
eventCounter = mngSegment.construct<EventCounter>(unique_instance)(0);
|
|
||||||
}
|
|
||||||
|
|
||||||
bool newShmRegionCreated = shmRegions->emplace(cfg.id.value(), RegionInfo(cfg.path.c_str(), cfg.creationFlags, cfg.userFlags, alloc)).second;
|
bool newShmRegionCreated = shmRegions->emplace(cfg.id.value(), RegionInfo(cfg.path.c_str(), cfg.creationFlags, cfg.userFlags, cfg.size, alloc)).second;
|
||||||
if (newShmRegionCreated) {
|
if (newShmRegionCreated) {
|
||||||
(eventCounter->fCount)++;
|
(eventCounter->fCount)++;
|
||||||
}
|
}
|
||||||
|
@@ -27,7 +27,11 @@ inline bool Bind(void* socket, const std::string& address, const std::string& id
|
|||||||
if (errno == EADDRINUSE) {
|
if (errno == EADDRINUSE) {
|
||||||
// do not print error in this case, this is handled upstream in case no
|
// do not print error in this case, this is handled upstream in case no
|
||||||
// connection could be established after trying a number of random ports from a range.
|
// connection could be established after trying a number of random ports from a range.
|
||||||
return false;
|
size_t protocolPos = address.find(':');
|
||||||
|
std::string protocol = address.substr(0, protocolPos);
|
||||||
|
if (protocol == "tcp") {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
} else if (errno == EACCES) {
|
} else if (errno == EACCES) {
|
||||||
// check if TCP port 1 was given, if yes then it will be handeled upstream, print debug only
|
// check if TCP port 1 was given, if yes then it will be handeled upstream, print debug only
|
||||||
size_t protocolPos = address.find(':');
|
size_t protocolPos = address.find(':');
|
||||||
|
@@ -25,6 +25,26 @@ namespace
|
|||||||
using namespace std;
|
using namespace std;
|
||||||
using namespace fair::mq;
|
using namespace fair::mq;
|
||||||
|
|
||||||
|
void RegionsSizeMismatch()
|
||||||
|
{
|
||||||
|
size_t session = tools::UuidHash();
|
||||||
|
|
||||||
|
ProgOptions config;
|
||||||
|
config.SetProperty<string>("session", to_string(session));
|
||||||
|
config.SetProperty<size_t>("shm-segment-size", 100000000);
|
||||||
|
|
||||||
|
auto factory = TransportFactory::CreateTransportFactory("shmem", tools::Uuid(), &config);
|
||||||
|
|
||||||
|
fair::mq::RegionConfig rCfg;
|
||||||
|
rCfg.id = 10;
|
||||||
|
UnmanagedRegionPtr region1 = nullptr;
|
||||||
|
ASSERT_NO_THROW(region1 = factory->CreateUnmanagedRegion(10000, [](void*, size_t, void*) {}, rCfg));
|
||||||
|
ASSERT_NE(region1, nullptr);
|
||||||
|
UnmanagedRegionPtr region2 = nullptr;
|
||||||
|
ASSERT_THROW(region2 = factory->CreateUnmanagedRegion(16000, [](void*, size_t, void*) {}, rCfg), fair::mq::TransportError);
|
||||||
|
ASSERT_EQ(region2, nullptr);
|
||||||
|
}
|
||||||
|
|
||||||
void RegionsCache(const string& transport, const string& address)
|
void RegionsCache(const string& transport, const string& address)
|
||||||
{
|
{
|
||||||
size_t session1 = tools::UuidHash();
|
size_t session1 = tools::UuidHash();
|
||||||
@@ -226,6 +246,11 @@ void RegionCallbacks(const string& transport, const string& _address)
|
|||||||
LOG(info) << "2 done.";
|
LOG(info) << "2 done.";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST(RegionsSizeMismatch, shmem)
|
||||||
|
{
|
||||||
|
RegionsSizeMismatch();
|
||||||
|
}
|
||||||
|
|
||||||
TEST(Cache, zeromq)
|
TEST(Cache, zeromq)
|
||||||
{
|
{
|
||||||
RegionsCache("zeromq", "ipc://test_region_cache");
|
RegionsCache("zeromq", "ipc://test_region_cache");
|
||||||
|
Reference in New Issue
Block a user