|
|
|
@@ -397,12 +397,10 @@ class Manager
|
|
|
|
|
|
|
|
|
|
UnmanagedRegion* region = nullptr;
|
|
|
|
|
bool newRegionCreated = false;
|
|
|
|
|
{
|
|
|
|
|
std::lock_guard<std::mutex> lock(fLocalRegionsMtx);
|
|
|
|
|
auto res = fRegions.emplace(id, std::make_unique<UnmanagedRegion>(fShmId, size, false, cfg));
|
|
|
|
|
newRegionCreated = res.second;
|
|
|
|
|
region = res.first->second.get();
|
|
|
|
|
}
|
|
|
|
|
std::lock_guard<std::mutex> lock(fLocalRegionsMtx);
|
|
|
|
|
auto res = fRegions.emplace(id, std::make_unique<UnmanagedRegion>(fShmId, size, false, cfg));
|
|
|
|
|
newRegionCreated = res.second;
|
|
|
|
|
region = res.first->second.get();
|
|
|
|
|
// LOG(debug) << "Created region with id '" << id << "', path: '" << cfg.path << "', flags: '" << cfg.creationFlags << "'";
|
|
|
|
|
|
|
|
|
|
if (!newRegionCreated) {
|
|
|
|
@@ -429,7 +427,7 @@ class Manager
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
UnmanagedRegion* GetRegion(uint16_t id)
|
|
|
|
|
UnmanagedRegion* GetRegionFromCache(uint16_t id)
|
|
|
|
|
{
|
|
|
|
|
// NOTE: gcc optimizations. Prevent loading tls addresses many times in the fast path
|
|
|
|
|
const auto &lTlCache = fTlRegionCache;
|
|
|
|
@@ -443,41 +441,40 @@ class Manager
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> shmLock(*fShmMtx);
|
|
|
|
|
// slow path: check invalidation
|
|
|
|
|
if (lTlCacheGen != fRegionsGen) {
|
|
|
|
|
fTlRegionCache.fRegionsTLCache.clear();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
std::lock_guard<std::mutex> lock(fLocalRegionsMtx);
|
|
|
|
|
auto* lRegion = GetRegionUnsafe(id, shmLock);
|
|
|
|
|
auto* lRegion = GetRegion(id);
|
|
|
|
|
fTlRegionCache.fRegionsTLCache.emplace_back(std::make_tuple(lRegion, id, fShmId64));
|
|
|
|
|
fTlRegionCache.fRegionsTLCacheGen = fRegionsGen;
|
|
|
|
|
return lRegion;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
UnmanagedRegion* GetRegionUnsafe(uint16_t id, boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex>& lockedShmLock)
|
|
|
|
|
UnmanagedRegion* GetRegion(uint16_t id)
|
|
|
|
|
{
|
|
|
|
|
std::lock_guard<std::mutex> lock(fLocalRegionsMtx);
|
|
|
|
|
// remote region could actually be a local one if a message originates from this device (has been sent out and returned)
|
|
|
|
|
auto it = fRegions.find(id);
|
|
|
|
|
if (it != fRegions.end()) {
|
|
|
|
|
return it->second.get();
|
|
|
|
|
} else {
|
|
|
|
|
try {
|
|
|
|
|
// get region info
|
|
|
|
|
RegionInfo regionInfo = fShmRegions->at(id);
|
|
|
|
|
// safe to unlock now - no shm container accessed after this
|
|
|
|
|
lockedShmLock.unlock();
|
|
|
|
|
RegionConfig cfg;
|
|
|
|
|
cfg.id = id;
|
|
|
|
|
cfg.creationFlags = regionInfo.fCreationFlags;
|
|
|
|
|
cfg.path = regionInfo.fPath.c_str();
|
|
|
|
|
// get region info
|
|
|
|
|
{
|
|
|
|
|
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> shmLock(*fShmMtx);
|
|
|
|
|
RegionInfo regionInfo = fShmRegions->at(id);
|
|
|
|
|
cfg.id = id;
|
|
|
|
|
cfg.creationFlags = regionInfo.fCreationFlags;
|
|
|
|
|
cfg.path = regionInfo.fPath.c_str();
|
|
|
|
|
}
|
|
|
|
|
// LOG(debug) << "Located remote region with id '" << id << "', path: '" << cfg.path << "', flags: '" << cfg.creationFlags << "'";
|
|
|
|
|
|
|
|
|
|
auto r = fRegions.emplace(id, std::make_unique<UnmanagedRegion>(fShmId, 0, true, std::move(cfg)));
|
|
|
|
|
r.first->second->InitializeQueues();
|
|
|
|
|
r.first->second->StartAckSender();
|
|
|
|
|
lockedShmLock.lock();
|
|
|
|
|
return r.first->second.get();
|
|
|
|
|
} catch (std::out_of_range& oor) {
|
|
|
|
|
LOG(error) << "Could not get remote region with id '" << id << "'. Does the region creator run with the same session id?";
|
|
|
|
@@ -493,10 +490,10 @@ class Manager
|
|
|
|
|
void RemoveRegion(uint16_t id)
|
|
|
|
|
{
|
|
|
|
|
try {
|
|
|
|
|
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> shmLock(*fShmMtx);
|
|
|
|
|
std::lock_guard<std::mutex> lock(fLocalRegionsMtx);
|
|
|
|
|
fRegions.at(id)->StopAcks();
|
|
|
|
|
{
|
|
|
|
|
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> shmLock(*fShmMtx);
|
|
|
|
|
if (fRegions.at(id)->RemoveOnDestruction()) {
|
|
|
|
|
fShmRegions->at(id).fDestroyed = true;
|
|
|
|
|
(fEventCounter->fCount)++;
|
|
|
|
@@ -512,44 +509,73 @@ class Manager
|
|
|
|
|
std::vector<fair::mq::RegionInfo> GetRegionInfo()
|
|
|
|
|
{
|
|
|
|
|
std::vector<fair::mq::RegionInfo> result;
|
|
|
|
|
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> shmLock(*fShmMtx);
|
|
|
|
|
std::map<uint64_t, RegionConfig> regionCfgs;
|
|
|
|
|
|
|
|
|
|
for (const auto& e : *fShmSegments) {
|
|
|
|
|
// make sure any segments in the session are found
|
|
|
|
|
GetSegment(e.first);
|
|
|
|
|
try {
|
|
|
|
|
{
|
|
|
|
|
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> shmLock(*fShmMtx);
|
|
|
|
|
|
|
|
|
|
for (const auto& [segmentId, segmentInfo] : *fShmSegments) {
|
|
|
|
|
// make sure any segments in the session are found
|
|
|
|
|
GetSegment(segmentId);
|
|
|
|
|
try {
|
|
|
|
|
fair::mq::RegionInfo info;
|
|
|
|
|
info.managed = true;
|
|
|
|
|
info.id = segmentId;
|
|
|
|
|
info.event = RegionEvent::created;
|
|
|
|
|
info.ptr = boost::apply_visitor(SegmentAddress(), fSegments.at(segmentId));
|
|
|
|
|
info.size = boost::apply_visitor(SegmentSize(), fSegments.at(segmentId));
|
|
|
|
|
result.push_back(info);
|
|
|
|
|
} catch (const std::out_of_range& oor) {
|
|
|
|
|
LOG(error) << "could not find segment with id " << segmentId;
|
|
|
|
|
LOG(error) << oor.what();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for (const auto& [regionId, regionInfo] : *fShmRegions) {
|
|
|
|
|
fair::mq::RegionInfo info;
|
|
|
|
|
info.managed = true;
|
|
|
|
|
info.id = e.first;
|
|
|
|
|
info.event = RegionEvent::created;
|
|
|
|
|
info.ptr = boost::apply_visitor(SegmentAddress(), fSegments.at(e.first));
|
|
|
|
|
info.size = boost::apply_visitor(SegmentSize(), fSegments.at(e.first));
|
|
|
|
|
info.managed = false;
|
|
|
|
|
info.id = regionId;
|
|
|
|
|
info.flags = regionInfo.fUserFlags;
|
|
|
|
|
info.event = regionInfo.fDestroyed ? RegionEvent::destroyed : RegionEvent::created;
|
|
|
|
|
if (info.event == RegionEvent::created) {
|
|
|
|
|
RegionConfig cfg;
|
|
|
|
|
cfg.id = info.id;
|
|
|
|
|
cfg.creationFlags = info.id;
|
|
|
|
|
cfg.path = regionInfo.fPath.c_str();
|
|
|
|
|
regionCfgs.emplace(info.id, cfg);
|
|
|
|
|
// fill the ptr+size info after shmLock is released, to avoid constructing local region under it
|
|
|
|
|
} else {
|
|
|
|
|
info.ptr = nullptr;
|
|
|
|
|
info.size = 0;
|
|
|
|
|
}
|
|
|
|
|
result.push_back(info);
|
|
|
|
|
} catch (const std::out_of_range& oor) {
|
|
|
|
|
LOG(error) << "could not find segment with id " << e.first;
|
|
|
|
|
LOG(error) << oor.what();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for (const auto& e : *fShmRegions) {
|
|
|
|
|
fair::mq::RegionInfo info;
|
|
|
|
|
info.managed = false;
|
|
|
|
|
info.id = e.first;
|
|
|
|
|
info.flags = e.second.fUserFlags;
|
|
|
|
|
info.event = e.second.fDestroyed ? RegionEvent::destroyed : RegionEvent::created;
|
|
|
|
|
if (info.event == RegionEvent::created) {
|
|
|
|
|
auto region = GetRegionUnsafe(info.id, shmLock);
|
|
|
|
|
if (region) {
|
|
|
|
|
// do another iteration outside of shm lock, to fill ptr+size of unmanaged regions
|
|
|
|
|
for (auto& info : result) {
|
|
|
|
|
if (!info.managed && info.event == RegionEvent::created) {
|
|
|
|
|
auto cfgIt = regionCfgs.find(info.id);
|
|
|
|
|
if (cfgIt != regionCfgs.end()) {
|
|
|
|
|
UnmanagedRegion* region = nullptr;
|
|
|
|
|
std::lock_guard<std::mutex> lock(fLocalRegionsMtx);
|
|
|
|
|
auto it = fRegions.find(info.id);
|
|
|
|
|
if (it != fRegions.end()) {
|
|
|
|
|
region = it->second.get();
|
|
|
|
|
} else {
|
|
|
|
|
auto r = fRegions.emplace(cfgIt->first, std::make_unique<UnmanagedRegion>(fShmId, 0, true, cfgIt->second));
|
|
|
|
|
region = r.first->second.get();
|
|
|
|
|
region->InitializeQueues();
|
|
|
|
|
region->StartAckSender();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
info.ptr = region->GetData();
|
|
|
|
|
info.size = region->GetSize();
|
|
|
|
|
} else {
|
|
|
|
|
throw std::runtime_error(tools::ToString("GetRegionInfo() could not get region with id '", info.id, "'"));
|
|
|
|
|
info.ptr = nullptr;
|
|
|
|
|
info.size = 0;
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
info.ptr = nullptr;
|
|
|
|
|
info.size = 0;
|
|
|
|
|
}
|
|
|
|
|
result.push_back(info);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return result;
|
|
|
|
|