Shm: bring back thread-safety for fRegions (intra-process)

This commit is contained in:
Alexey Rybalchenko 2022-02-14 17:15:34 +01:00
parent 8efe7adf0e
commit 29827f0426

View File

@ -375,7 +375,7 @@ class Manager
std::pair<UnmanagedRegion*, uint16_t> result; std::pair<UnmanagedRegion*, uint16_t> result;
{ {
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> lock(*fShmMtx); boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> shmLock(*fShmMtx);
if (!cfg.id.has_value()) { if (!cfg.id.has_value()) {
RegionCounter* rc = fManagementSegment.find<RegionCounter>(unique_instance).first; RegionCounter* rc = fManagementSegment.find<RegionCounter>(unique_instance).first;
@ -395,23 +395,28 @@ class Manager
const uint16_t id = cfg.id.value(); const uint16_t id = cfg.id.value();
auto res = fRegions.emplace(id, std::make_unique<UnmanagedRegion>(fShmId, size, false, cfg)); UnmanagedRegion* region = nullptr;
bool newRegionCreated = res.second; bool newRegionCreated = false;
UnmanagedRegion& region = *(res.first->second); {
std::lock_guard<std::mutex> lock(fLocalRegionsMtx);
auto res = fRegions.emplace(id, std::make_unique<UnmanagedRegion>(fShmId, size, false, cfg));
newRegionCreated = res.second;
region = res.first->second.get();
}
// LOG(debug) << "Created region with id '" << id << "', path: '" << cfg.path << "', flags: '" << cfg.creationFlags << "'"; // LOG(debug) << "Created region with id '" << id << "', path: '" << cfg.path << "', flags: '" << cfg.creationFlags << "'";
if (!newRegionCreated) { if (!newRegionCreated) {
region.fRemote = false; // TODO: this should be more clear, refactor it. region->fRemote = false; // TODO: this should be more clear, refactor it.
} }
// start ack receiver only if a callback has been provided. // start ack receiver only if a callback has been provided.
if (callback || bulkCallback) { if (callback || bulkCallback) {
region.SetCallbacks(callback, bulkCallback); region->SetCallbacks(callback, bulkCallback);
region.InitializeQueues(); region->InitializeQueues();
region.StartAckSender(); region->StartAckSender();
region.StartAckReceiver(); region->StartAckReceiver();
} }
result.first = &(region); result.first = region;
result.second = id; result.second = id;
} }
fRegionsGen += 1; // signal TL cache invalidation fRegionsGen += 1; // signal TL cache invalidation
@ -444,6 +449,7 @@ class Manager
fTlRegionCache.fRegionsTLCache.clear(); fTlRegionCache.fRegionsTLCache.clear();
} }
std::lock_guard<std::mutex> lock(fLocalRegionsMtx);
auto* lRegion = GetRegionUnsafe(id, shmLock); auto* lRegion = GetRegionUnsafe(id, shmLock);
fTlRegionCache.fRegionsTLCache.emplace_back(std::make_tuple(lRegion, id, fShmId64)); fTlRegionCache.fRegionsTLCache.emplace_back(std::make_tuple(lRegion, id, fShmId64));
fTlRegionCache.fRegionsTLCacheGen = fRegionsGen; fTlRegionCache.fRegionsTLCacheGen = fRegionsGen;
@ -487,9 +493,10 @@ class Manager
void RemoveRegion(uint16_t id) void RemoveRegion(uint16_t id)
{ {
try { try {
std::lock_guard<std::mutex> lock(fLocalRegionsMtx);
fRegions.at(id)->StopAcks(); fRegions.at(id)->StopAcks();
{ {
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> lock(*fShmMtx); boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> shmLock(*fShmMtx);
if (fRegions.at(id)->RemoveOnDestruction()) { if (fRegions.at(id)->RemoveOnDestruction()) {
fShmRegions->at(id).fDestroyed = true; fShmRegions->at(id).fDestroyed = true;
(fEventCounter->fCount)++; (fEventCounter->fCount)++;
@ -805,6 +812,7 @@ class Manager
VoidAlloc fShmVoidAlloc; VoidAlloc fShmVoidAlloc;
boost::interprocess::interprocess_mutex* fShmMtx; boost::interprocess::interprocess_mutex* fShmMtx;
std::mutex fLocalRegionsMtx;
std::mutex fRegionEventsMtx; std::mutex fRegionEventsMtx;
std::condition_variable fRegionEventsCV; std::condition_variable fRegionEventsCV;
std::thread fRegionEventThread; std::thread fRegionEventThread;