From 29827f0426fef362eb6b77ebb7aae41e50e0a0f0 Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Mon, 14 Feb 2022 17:15:34 +0100 Subject: [PATCH] Shm: bring back thread-safety for fRegions (intra-process) --- fairmq/shmem/Manager.h | 30 +++++++++++++++++++----------- 1 file changed, 19 insertions(+), 11 deletions(-) diff --git a/fairmq/shmem/Manager.h b/fairmq/shmem/Manager.h index 88db7cff..66b963cd 100644 --- a/fairmq/shmem/Manager.h +++ b/fairmq/shmem/Manager.h @@ -375,7 +375,7 @@ class Manager std::pair result; { - boost::interprocess::scoped_lock lock(*fShmMtx); + boost::interprocess::scoped_lock shmLock(*fShmMtx); if (!cfg.id.has_value()) { RegionCounter* rc = fManagementSegment.find(unique_instance).first; @@ -395,23 +395,28 @@ class Manager const uint16_t id = cfg.id.value(); - auto res = fRegions.emplace(id, std::make_unique(fShmId, size, false, cfg)); - bool newRegionCreated = res.second; - UnmanagedRegion& region = *(res.first->second); + UnmanagedRegion* region = nullptr; + bool newRegionCreated = false; + { + std::lock_guard lock(fLocalRegionsMtx); + auto res = fRegions.emplace(id, std::make_unique(fShmId, size, false, cfg)); + newRegionCreated = res.second; + region = res.first->second.get(); + } // LOG(debug) << "Created region with id '" << id << "', path: '" << cfg.path << "', flags: '" << cfg.creationFlags << "'"; if (!newRegionCreated) { - region.fRemote = false; // TODO: this should be more clear, refactor it. + region->fRemote = false; // TODO: this should be more clear, refactor it. } // start ack receiver only if a callback has been provided. if (callback || bulkCallback) { - region.SetCallbacks(callback, bulkCallback); - region.InitializeQueues(); - region.StartAckSender(); - region.StartAckReceiver(); + region->SetCallbacks(callback, bulkCallback); + region->InitializeQueues(); + region->StartAckSender(); + region->StartAckReceiver(); } - result.first = &(region); + result.first = region; result.second = id; } fRegionsGen += 1; // signal TL cache invalidation @@ -444,6 +449,7 @@ class Manager fTlRegionCache.fRegionsTLCache.clear(); } + std::lock_guard lock(fLocalRegionsMtx); auto* lRegion = GetRegionUnsafe(id, shmLock); fTlRegionCache.fRegionsTLCache.emplace_back(std::make_tuple(lRegion, id, fShmId64)); fTlRegionCache.fRegionsTLCacheGen = fRegionsGen; @@ -487,9 +493,10 @@ class Manager void RemoveRegion(uint16_t id) { try { + std::lock_guard lock(fLocalRegionsMtx); fRegions.at(id)->StopAcks(); { - boost::interprocess::scoped_lock lock(*fShmMtx); + boost::interprocess::scoped_lock shmLock(*fShmMtx); if (fRegions.at(id)->RemoveOnDestruction()) { fShmRegions->at(id).fDestroyed = true; (fEventCounter->fCount)++; @@ -805,6 +812,7 @@ class Manager VoidAlloc fShmVoidAlloc; boost::interprocess::interprocess_mutex* fShmMtx; + std::mutex fLocalRegionsMtx; std::mutex fRegionEventsMtx; std::condition_variable fRegionEventsCV; std::thread fRegionEventThread;