diff --git a/fairmq/shmem/Common.h b/fairmq/shmem/Common.h index 4cdfd905..f9a7fb1e 100644 --- a/fairmq/shmem/Common.h +++ b/fairmq/shmem/Common.h @@ -103,6 +103,15 @@ struct DeviceCounter std::atomic fCount; }; +struct EventCounter +{ + EventCounter(uint64_t c) + : fCount(c) + {} + + std::atomic fCount; +}; + struct RegionCounter { RegionCounter(uint16_t c) diff --git a/fairmq/shmem/Manager.h b/fairmq/shmem/Manager.h index a9ec1c2d..2463a488 100644 --- a/fairmq/shmem/Manager.h +++ b/fairmq/shmem/Manager.h @@ -64,7 +64,9 @@ class Manager , fShmMtx(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_mtx").c_str()) , fRegionEventsCV(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_cv").c_str()) , fRegionEventsSubscriptionActive(false) + , fNumObservedEvents(0) , fDeviceCounter(nullptr) + , fEventCounter(nullptr) , fShmSegments(nullptr) , fShmRegions(nullptr) , fInterrupted(false) @@ -102,6 +104,16 @@ class Manager fShmSegments = fManagementSegment.find_or_construct(unique_instance)(fShmVoidAlloc); + fEventCounter = fManagementSegment.find(unique_instance).first; + + if (fEventCounter) { + LOG(debug) << "event counter found: " << fEventCounter->fCount; + } else { + LOG(debug) << "no event counter found, creating one and initializing with 0"; + fEventCounter = fManagementSegment.construct(unique_instance)(0); + LOG(debug) << "initialized event counter with: " << fEventCounter->fCount; + } + try { auto it = fShmSegments->find(fSegmentId); if (it == fShmSegments->end()) { @@ -114,6 +126,7 @@ class Manager fShmSegments->emplace(fSegmentId, AllocationAlgorithm::simple_seq_fit); } ss << "Created "; + (fEventCounter->fCount)++; } else { // found segment with the given id, opening if (it->second.fAllocationAlgorithm == AllocationAlgorithm::rbtree_best_fit) { @@ -276,6 +289,8 @@ class Manager r.first->second->StartReceivingAcks(); result.first = &(r.first->second->fRegion); result.second = id; + + (fEventCounter->fCount)++; } fRegionEventsCV.notify_all(); @@ -327,6 +342,7 @@ class Manager { boost::interprocess::scoped_lock lock(fShmMtx); fShmRegions->at(id).fDestroyed = true; + (fEventCounter->fCount)++; } fRegionEventsCV.notify_all(); } @@ -419,10 +435,12 @@ class Manager if (el == fObservedRegionEvents.end()) { fRegionEventCallback(i); fObservedRegionEvents.emplace(std::make_pair(i.id, i.managed), i.event); + ++fNumObservedEvents; } else { if (el->second == RegionEvent::created && i.event == RegionEvent::destroyed) { fRegionEventCallback(i); el->second = i.event; + ++fNumObservedEvents; } else { // LOG(debug) << "ignoring event for id" << i.id << ":"; // LOG(debug) << "incoming event: " << i.event; @@ -430,7 +448,7 @@ class Manager } } } - fRegionEventsCV.wait(lock); + fRegionEventsCV.wait(lock, [&] { return !fRegionEventsSubscriptionActive || fNumObservedEvents != fEventCounter->fCount; }); } } @@ -618,8 +636,10 @@ class Manager bool fRegionEventsSubscriptionActive; std::function fRegionEventCallback; std::map, RegionEvent> fObservedRegionEvents; + uint64_t fNumObservedEvents; DeviceCounter* fDeviceCounter; + EventCounter* fEventCounter; Uint16SegmentInfoHashMap* fShmSegments; Uint16RegionInfoHashMap* fShmRegions; std::unordered_map> fRegions;