mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-13 00:31:14 +00:00
fix(shm): race/deadlock in region locks
This commit is contained in:
parent
73fd1b2c2a
commit
4587af2eb4
|
@ -76,6 +76,8 @@ struct Sampler : fair::mq::Device
|
||||||
|
|
||||||
void ResetTask() override
|
void ResetTask() override
|
||||||
{
|
{
|
||||||
|
// give some time for acks to be received
|
||||||
|
std::this_thread::sleep_for(std::chrono::milliseconds(250));
|
||||||
fRegion.reset();
|
fRegion.reset();
|
||||||
{
|
{
|
||||||
std::lock_guard<std::mutex> lock(fMtx);
|
std::lock_guard<std::mutex> lock(fMtx);
|
||||||
|
|
|
@ -397,12 +397,10 @@ class Manager
|
||||||
|
|
||||||
UnmanagedRegion* region = nullptr;
|
UnmanagedRegion* region = nullptr;
|
||||||
bool newRegionCreated = false;
|
bool newRegionCreated = false;
|
||||||
{
|
std::lock_guard<std::mutex> lock(fLocalRegionsMtx);
|
||||||
std::lock_guard<std::mutex> lock(fLocalRegionsMtx);
|
auto res = fRegions.emplace(id, std::make_unique<UnmanagedRegion>(fShmId, size, false, cfg));
|
||||||
auto res = fRegions.emplace(id, std::make_unique<UnmanagedRegion>(fShmId, size, false, cfg));
|
newRegionCreated = res.second;
|
||||||
newRegionCreated = res.second;
|
region = res.first->second.get();
|
||||||
region = res.first->second.get();
|
|
||||||
}
|
|
||||||
// LOG(debug) << "Created region with id '" << id << "', path: '" << cfg.path << "', flags: '" << cfg.creationFlags << "'";
|
// LOG(debug) << "Created region with id '" << id << "', path: '" << cfg.path << "', flags: '" << cfg.creationFlags << "'";
|
||||||
|
|
||||||
if (!newRegionCreated) {
|
if (!newRegionCreated) {
|
||||||
|
@ -429,7 +427,7 @@ class Manager
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
UnmanagedRegion* GetRegion(uint16_t id)
|
UnmanagedRegion* GetRegionFromCache(uint16_t id)
|
||||||
{
|
{
|
||||||
// NOTE: gcc optimizations. Prevent loading tls addresses many times in the fast path
|
// NOTE: gcc optimizations. Prevent loading tls addresses many times in the fast path
|
||||||
const auto &lTlCache = fTlRegionCache;
|
const auto &lTlCache = fTlRegionCache;
|
||||||
|
@ -443,41 +441,40 @@ class Manager
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> shmLock(*fShmMtx);
|
|
||||||
// slow path: check invalidation
|
// slow path: check invalidation
|
||||||
if (lTlCacheGen != fRegionsGen) {
|
if (lTlCacheGen != fRegionsGen) {
|
||||||
fTlRegionCache.fRegionsTLCache.clear();
|
fTlRegionCache.fRegionsTLCache.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
std::lock_guard<std::mutex> lock(fLocalRegionsMtx);
|
auto* lRegion = GetRegion(id);
|
||||||
auto* lRegion = GetRegionUnsafe(id, shmLock);
|
|
||||||
fTlRegionCache.fRegionsTLCache.emplace_back(std::make_tuple(lRegion, id, fShmId64));
|
fTlRegionCache.fRegionsTLCache.emplace_back(std::make_tuple(lRegion, id, fShmId64));
|
||||||
fTlRegionCache.fRegionsTLCacheGen = fRegionsGen;
|
fTlRegionCache.fRegionsTLCacheGen = fRegionsGen;
|
||||||
return lRegion;
|
return lRegion;
|
||||||
}
|
}
|
||||||
|
|
||||||
UnmanagedRegion* GetRegionUnsafe(uint16_t id, boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex>& lockedShmLock)
|
UnmanagedRegion* GetRegion(uint16_t id)
|
||||||
{
|
{
|
||||||
|
std::lock_guard<std::mutex> lock(fLocalRegionsMtx);
|
||||||
// remote region could actually be a local one if a message originates from this device (has been sent out and returned)
|
// remote region could actually be a local one if a message originates from this device (has been sent out and returned)
|
||||||
auto it = fRegions.find(id);
|
auto it = fRegions.find(id);
|
||||||
if (it != fRegions.end()) {
|
if (it != fRegions.end()) {
|
||||||
return it->second.get();
|
return it->second.get();
|
||||||
} else {
|
} else {
|
||||||
try {
|
try {
|
||||||
// get region info
|
|
||||||
RegionInfo regionInfo = fShmRegions->at(id);
|
|
||||||
// safe to unlock now - no shm container accessed after this
|
|
||||||
lockedShmLock.unlock();
|
|
||||||
RegionConfig cfg;
|
RegionConfig cfg;
|
||||||
cfg.id = id;
|
// get region info
|
||||||
cfg.creationFlags = regionInfo.fCreationFlags;
|
{
|
||||||
cfg.path = regionInfo.fPath.c_str();
|
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> shmLock(*fShmMtx);
|
||||||
|
RegionInfo regionInfo = fShmRegions->at(id);
|
||||||
|
cfg.id = id;
|
||||||
|
cfg.creationFlags = regionInfo.fCreationFlags;
|
||||||
|
cfg.path = regionInfo.fPath.c_str();
|
||||||
|
}
|
||||||
// LOG(debug) << "Located remote region with id '" << id << "', path: '" << cfg.path << "', flags: '" << cfg.creationFlags << "'";
|
// LOG(debug) << "Located remote region with id '" << id << "', path: '" << cfg.path << "', flags: '" << cfg.creationFlags << "'";
|
||||||
|
|
||||||
auto r = fRegions.emplace(id, std::make_unique<UnmanagedRegion>(fShmId, 0, true, std::move(cfg)));
|
auto r = fRegions.emplace(id, std::make_unique<UnmanagedRegion>(fShmId, 0, true, std::move(cfg)));
|
||||||
r.first->second->InitializeQueues();
|
r.first->second->InitializeQueues();
|
||||||
r.first->second->StartAckSender();
|
r.first->second->StartAckSender();
|
||||||
lockedShmLock.lock();
|
|
||||||
return r.first->second.get();
|
return r.first->second.get();
|
||||||
} catch (std::out_of_range& oor) {
|
} catch (std::out_of_range& oor) {
|
||||||
LOG(error) << "Could not get remote region with id '" << id << "'. Does the region creator run with the same session id?";
|
LOG(error) << "Could not get remote region with id '" << id << "'. Does the region creator run with the same session id?";
|
||||||
|
@ -493,10 +490,10 @@ class Manager
|
||||||
void RemoveRegion(uint16_t id)
|
void RemoveRegion(uint16_t id)
|
||||||
{
|
{
|
||||||
try {
|
try {
|
||||||
|
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> shmLock(*fShmMtx);
|
||||||
std::lock_guard<std::mutex> lock(fLocalRegionsMtx);
|
std::lock_guard<std::mutex> lock(fLocalRegionsMtx);
|
||||||
fRegions.at(id)->StopAcks();
|
fRegions.at(id)->StopAcks();
|
||||||
{
|
{
|
||||||
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> shmLock(*fShmMtx);
|
|
||||||
if (fRegions.at(id)->RemoveOnDestruction()) {
|
if (fRegions.at(id)->RemoveOnDestruction()) {
|
||||||
fShmRegions->at(id).fDestroyed = true;
|
fShmRegions->at(id).fDestroyed = true;
|
||||||
(fEventCounter->fCount)++;
|
(fEventCounter->fCount)++;
|
||||||
|
@ -512,44 +509,73 @@ class Manager
|
||||||
std::vector<fair::mq::RegionInfo> GetRegionInfo()
|
std::vector<fair::mq::RegionInfo> GetRegionInfo()
|
||||||
{
|
{
|
||||||
std::vector<fair::mq::RegionInfo> result;
|
std::vector<fair::mq::RegionInfo> result;
|
||||||
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> shmLock(*fShmMtx);
|
std::map<uint64_t, RegionConfig> regionCfgs;
|
||||||
|
|
||||||
for (const auto& e : *fShmSegments) {
|
{
|
||||||
// make sure any segments in the session are found
|
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> shmLock(*fShmMtx);
|
||||||
GetSegment(e.first);
|
|
||||||
try {
|
for (const auto& [segmentId, segmentInfo] : *fShmSegments) {
|
||||||
|
// make sure any segments in the session are found
|
||||||
|
GetSegment(segmentId);
|
||||||
|
try {
|
||||||
|
fair::mq::RegionInfo info;
|
||||||
|
info.managed = true;
|
||||||
|
info.id = segmentId;
|
||||||
|
info.event = RegionEvent::created;
|
||||||
|
info.ptr = boost::apply_visitor(SegmentAddress(), fSegments.at(segmentId));
|
||||||
|
info.size = boost::apply_visitor(SegmentSize(), fSegments.at(segmentId));
|
||||||
|
result.push_back(info);
|
||||||
|
} catch (const std::out_of_range& oor) {
|
||||||
|
LOG(error) << "could not find segment with id " << segmentId;
|
||||||
|
LOG(error) << oor.what();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for (const auto& [regionId, regionInfo] : *fShmRegions) {
|
||||||
fair::mq::RegionInfo info;
|
fair::mq::RegionInfo info;
|
||||||
info.managed = true;
|
info.managed = false;
|
||||||
info.id = e.first;
|
info.id = regionId;
|
||||||
info.event = RegionEvent::created;
|
info.flags = regionInfo.fUserFlags;
|
||||||
info.ptr = boost::apply_visitor(SegmentAddress(), fSegments.at(e.first));
|
info.event = regionInfo.fDestroyed ? RegionEvent::destroyed : RegionEvent::created;
|
||||||
info.size = boost::apply_visitor(SegmentSize(), fSegments.at(e.first));
|
if (info.event == RegionEvent::created) {
|
||||||
|
RegionConfig cfg;
|
||||||
|
cfg.id = info.id;
|
||||||
|
cfg.creationFlags = info.id;
|
||||||
|
cfg.path = regionInfo.fPath.c_str();
|
||||||
|
regionCfgs.emplace(info.id, cfg);
|
||||||
|
// fill the ptr+size info after shmLock is released, to avoid constructing local region under it
|
||||||
|
} else {
|
||||||
|
info.ptr = nullptr;
|
||||||
|
info.size = 0;
|
||||||
|
}
|
||||||
result.push_back(info);
|
result.push_back(info);
|
||||||
} catch (const std::out_of_range& oor) {
|
|
||||||
LOG(error) << "could not find segment with id " << e.first;
|
|
||||||
LOG(error) << oor.what();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for (const auto& e : *fShmRegions) {
|
// do another iteration outside of shm lock, to fill ptr+size of unmanaged regions
|
||||||
fair::mq::RegionInfo info;
|
for (auto& info : result) {
|
||||||
info.managed = false;
|
if (!info.managed && info.event == RegionEvent::created) {
|
||||||
info.id = e.first;
|
auto cfgIt = regionCfgs.find(info.id);
|
||||||
info.flags = e.second.fUserFlags;
|
if (cfgIt != regionCfgs.end()) {
|
||||||
info.event = e.second.fDestroyed ? RegionEvent::destroyed : RegionEvent::created;
|
UnmanagedRegion* region = nullptr;
|
||||||
if (info.event == RegionEvent::created) {
|
std::lock_guard<std::mutex> lock(fLocalRegionsMtx);
|
||||||
auto region = GetRegionUnsafe(info.id, shmLock);
|
auto it = fRegions.find(info.id);
|
||||||
if (region) {
|
if (it != fRegions.end()) {
|
||||||
|
region = it->second.get();
|
||||||
|
} else {
|
||||||
|
auto r = fRegions.emplace(cfgIt->first, std::make_unique<UnmanagedRegion>(fShmId, 0, true, cfgIt->second));
|
||||||
|
region = r.first->second.get();
|
||||||
|
region->InitializeQueues();
|
||||||
|
region->StartAckSender();
|
||||||
|
}
|
||||||
|
|
||||||
info.ptr = region->GetData();
|
info.ptr = region->GetData();
|
||||||
info.size = region->GetSize();
|
info.size = region->GetSize();
|
||||||
} else {
|
} else {
|
||||||
throw std::runtime_error(tools::ToString("GetRegionInfo() could not get region with id '", info.id, "'"));
|
info.ptr = nullptr;
|
||||||
|
info.size = 0;
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
info.ptr = nullptr;
|
|
||||||
info.size = 0;
|
|
||||||
}
|
}
|
||||||
result.push_back(info);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return result;
|
return result;
|
||||||
|
|
|
@ -195,7 +195,7 @@ class Message final : public fair::mq::Message
|
||||||
fLocalPtr = nullptr;
|
fLocalPtr = nullptr;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
fRegionPtr = fManager.GetRegion(fMeta.fRegionId);
|
fRegionPtr = fManager.GetRegionFromCache(fMeta.fRegionId);
|
||||||
if (fRegionPtr) {
|
if (fRegionPtr) {
|
||||||
fLocalPtr = reinterpret_cast<char*>(fRegionPtr->GetData()) + fMeta.fHandle;
|
fLocalPtr = reinterpret_cast<char*>(fRegionPtr->GetData()) + fMeta.fHandle;
|
||||||
} else {
|
} else {
|
||||||
|
@ -365,7 +365,7 @@ class Message final : public fair::mq::Message
|
||||||
void ReleaseUnmanagedRegionBlock()
|
void ReleaseUnmanagedRegionBlock()
|
||||||
{
|
{
|
||||||
if (!fRegionPtr) {
|
if (!fRegionPtr) {
|
||||||
fRegionPtr = fManager.GetRegion(fMeta.fRegionId);
|
fRegionPtr = fManager.GetRegionFromCache(fMeta.fRegionId);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (fRegionPtr) {
|
if (fRegionPtr) {
|
||||||
|
|
Loading…
Reference in New Issue
Block a user