shm: integrate mtx and cv into management segment

This commit is contained in:
Alexey Rybalchenko
2021-12-07 00:35:48 +01:00
committed by Dennis Klein
parent 80ed45df63
commit 1839f7e8c0
3 changed files with 31 additions and 34 deletions

View File

@@ -23,7 +23,8 @@
#include <boost/filesystem.hpp>
#include <boost/interprocess/ipc/message_queue.hpp>
#include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/interprocess/sync/named_condition.hpp>
#include <boost/interprocess/sync/interprocess_condition.hpp>
#include <boost/interprocess/sync/interprocess_mutex.hpp>
#include <boost/interprocess/sync/named_mutex.hpp>
#include <boost/variant.hpp>
@@ -134,8 +135,8 @@ class Manager
, fDeviceId(std::move(deviceId))
, fManagementSegment(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_mng").c_str(), 6553600)
, fShmVoidAlloc(fManagementSegment.get_segment_manager())
, fShmMtx(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_mtx").c_str())
, fRegionEventsShmCV(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_cv").c_str())
, fShmMtx(fManagementSegment.find_or_construct<boost::interprocess::interprocess_mutex>(boost::interprocess::unique_instance)())
, fRegionEventsShmCV(fManagementSegment.find_or_construct<boost::interprocess::interprocess_condition>(boost::interprocess::unique_instance)())
, fNumObservedEvents(0)
, fDeviceCounter(nullptr)
, fEventCounter(nullptr)
@@ -188,7 +189,7 @@ class Manager
}
if (autolaunchMonitor) {
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> lock(*fShmMtx);
StartMonitor(fShmId);
}
@@ -196,7 +197,7 @@ class Manager
try {
std::stringstream ss;
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> lock(*fShmMtx);
fShmSegments = fManagementSegment.find_or_construct<Uint16SegmentInfoHashMap>(unique_instance)(fShmVoidAlloc);
@@ -379,7 +380,7 @@ class Manager
std::pair<mapped_region*, uint16_t> result;
{
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> lock(*fShmMtx);
if (!cfg.id.has_value()) {
RegionCounter* rc = fManagementSegment.find<RegionCounter>(unique_instance).first;
@@ -437,7 +438,7 @@ class Manager
}
}
fRegionsGen += 1; // signal TL cache invalidation
fRegionEventsShmCV.notify_all();
fRegionEventsShmCV->notify_all();
return result;
} catch (interprocess_exception& e) {
@@ -461,7 +462,7 @@ class Manager
}
}
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> lock(*fShmMtx);
// slow path: check invalidation
if (lTlCacheGen != fRegionsGen) {
fTlRegionCache.fRegionsTLCache.clear();
@@ -507,14 +508,14 @@ class Manager
try {
fRegions.at(id)->StopAcks();
{
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> lock(*fShmMtx);
if (fRegions.at(id)->fRemoveOnDestruction) {
fShmRegions->at(id).fDestroyed = true;
(fEventCounter->fCount)++;
}
fRegions.erase(id);
}
fRegionEventsShmCV.notify_all();
fRegionEventsShmCV->notify_all();
} catch (std::out_of_range& oor) {
LOG(debug) << "RemoveRegion() could not locate region with id '" << id << "'";
}
@@ -523,7 +524,7 @@ class Manager
std::vector<fair::mq::RegionInfo> GetRegionInfo()
{
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> lock(*fShmMtx);
return GetRegionInfoUnsafe();
}
@@ -576,13 +577,13 @@ class Manager
{
if (fRegionEventThread.joinable()) {
LOG(debug) << "Already subscribed. Overwriting previous subscription.";
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> lock(*fShmMtx);
fRegionEventsSubscriptionActive = false;
lock.unlock();
fRegionEventsShmCV.notify_all();
fRegionEventsShmCV->notify_all();
fRegionEventThread.join();
}
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> lock(*fShmMtx);
fRegionEventCallback = callback;
fRegionEventsSubscriptionActive = true;
fRegionEventThread = std::thread(&Manager::RegionEventsSubscription, this);
@@ -593,10 +594,10 @@ class Manager
void UnsubscribeFromRegionEvents()
{
if (fRegionEventThread.joinable()) {
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> lock(*fShmMtx);
fRegionEventsSubscriptionActive = false;
lock.unlock();
fRegionEventsShmCV.notify_all();
fRegionEventsShmCV->notify_all();
fRegionEventThread.join();
lock.lock();
fRegionEventCallback = nullptr;
@@ -605,7 +606,7 @@ class Manager
void RegionEventsSubscription()
{
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> lock(*fShmMtx);
while (fRegionEventsSubscriptionActive) {
auto infos = GetRegionInfoUnsafe();
for (const auto& i : infos) {
@@ -631,7 +632,7 @@ class Manager
}
}
}
fRegionEventsShmCV.wait(lock, [&] { return !fRegionEventsSubscriptionActive || fNumObservedEvents != fEventCounter->fCount; });
fRegionEventsShmCV->wait(lock, [&] { return !fRegionEventsSubscriptionActive || fNumObservedEvents != fEventCounter->fCount; });
}
}
@@ -740,7 +741,7 @@ class Manager
}
}
#ifdef FAIRMQ_DEBUG_MODE
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> lock(*fShmMtx);
IncrementShmMsgCounter(fSegmentId);
if (fMsgDebug->count(fSegmentId) == 0) {
fMsgDebug->emplace(fSegmentId, fShmVoidAlloc);
@@ -759,7 +760,7 @@ class Manager
{
char* ptr = GetAddressFromHandle(handle, segmentId);
#ifdef FAIRMQ_DEBUG_MODE
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> lock(*fShmMtx);
DecrementShmMsgCounter(segmentId);
try {
fMsgDebug->at(segmentId).erase(GetHandleFromAddress(ShmHeader::UserPtr(ptr), fSegmentId));
@@ -784,7 +785,7 @@ class Manager
bool lastRemoved = false;
try {
boost::interprocess::scoped_lock<named_mutex> lock(fShmMtx);
boost::interprocess::scoped_lock<interprocess_mutex> lock(*fShmMtx);
(fDeviceCounter->fCount)--;
@@ -821,9 +822,9 @@ class Manager
std::unordered_map<uint16_t, boost::variant<RBTreeBestFitSegment, SimpleSeqFitSegment>> fSegments;
boost::interprocess::managed_shared_memory fManagementSegment;
VoidAlloc fShmVoidAlloc;
boost::interprocess::named_mutex fShmMtx;
boost::interprocess::interprocess_mutex* fShmMtx;
boost::interprocess::named_condition fRegionEventsShmCV;
boost::interprocess::interprocess_condition* fRegionEventsShmCV;
std::thread fRegionEventThread;
std::function<void(fair::mq::RegionInfo)> fRegionEventCallback;
std::map<std::pair<uint16_t, bool>, RegionEvent> fObservedRegionEvents; // pair: <region id, managed>