shm: eliminate race/deadlock in region subscriptions

This commit is contained in:
Alexey Rybalchenko 2021-04-12 09:36:17 +02:00
parent c6a6a5f21b
commit 2c89b24857
2 changed files with 23 additions and 17 deletions

View File

@ -384,10 +384,10 @@ class Manager
void RemoveRegion(const uint16_t id) void RemoveRegion(const uint16_t id)
{ {
fRegions.erase(id);
{ {
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx); boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
fShmRegions->at(id).fDestroyed = true; fShmRegions->at(id).fDestroyed = true;
fRegions.erase(id);
(fEventCounter->fCount)++; (fEventCounter->fCount)++;
} }
fRegionsGen += 1; // signal TL cache invalidation fRegionsGen += 1; // signal TL cache invalidation
@ -483,19 +483,26 @@ class Manager
auto infos = GetRegionInfoUnsafe(); auto infos = GetRegionInfoUnsafe();
for (const auto& i : infos) { for (const auto& i : infos) {
auto el = fObservedRegionEvents.find({i.id, i.managed}); auto el = fObservedRegionEvents.find({i.id, i.managed});
if (el == fObservedRegionEvents.end()) { if (el == fObservedRegionEvents.end()) { // if event id has not been observed
fRegionEventCallback(i);
fObservedRegionEvents.emplace(std::make_pair(i.id, i.managed), i.event); fObservedRegionEvents.emplace(std::make_pair(i.id, i.managed), i.event);
++fNumObservedEvents; // if a region has been created and destroyed rapidly, we could see 'destroyed' without ever seeing 'created'
} else { // TODO: do we care to show 'created' events if we know region is already destroyed?
if (i.event == RegionEvent::created) {
fRegionEventCallback(i);
++fNumObservedEvents;
} else {
fNumObservedEvents += 2;
}
} else { // if event id has been observed (expected - there are two events per id - created & destroyed)
// fire a callback if we have observed 'created' event and incoming is 'destroyed'
if (el->second == RegionEvent::created && i.event == RegionEvent::destroyed) { if (el->second == RegionEvent::created && i.event == RegionEvent::destroyed) {
fRegionEventCallback(i); fRegionEventCallback(i);
el->second = i.event; el->second = i.event;
++fNumObservedEvents; ++fNumObservedEvents;
} else { } else {
// LOG(debug) << "ignoring event for id" << i.id << ":"; // LOG(debug) << "ignoring event for id " << i.id << ":"
// LOG(debug) << "incoming event: " << i.event; // << " incoming: " << i.event << ","
// LOG(debug) << "stored event: " << el->second; // << " stored: " << el->second;
} }
} }
} }
@ -688,7 +695,7 @@ class Manager
std::thread fRegionEventThread; std::thread fRegionEventThread;
bool fRegionEventsSubscriptionActive; bool fRegionEventsSubscriptionActive;
std::function<void(fair::mq::RegionInfo)> fRegionEventCallback; std::function<void(fair::mq::RegionInfo)> fRegionEventCallback;
std::map<std::pair<uint16_t, bool>, RegionEvent> fObservedRegionEvents; std::map<std::pair<uint16_t, bool>, RegionEvent> fObservedRegionEvents; // pair: <region id, managed>
uint64_t fNumObservedEvents; uint64_t fNumObservedEvents;
DeviceCounter* fDeviceCounter; DeviceCounter* fDeviceCounter;

View File

@ -106,7 +106,7 @@ struct Region
InitializeQueues(); InitializeQueues();
StartSendingAcks(); StartSendingAcks();
LOG(debug) << "shmem: initialized region: " << fName; LOG(trace) << "shmem: initialized region: " << fName << " (" << (fRemote ? "remote" : "local") << ")";
} }
Region() = delete; Region() = delete;
@ -123,7 +123,7 @@ struct Region
} else { } else {
fQueue = std::make_unique<message_queue>(create_only, fQueueName.c_str(), 1024, fAckBunchSize * sizeof(RegionBlock)); fQueue = std::make_unique<message_queue>(create_only, fQueueName.c_str(), 1024, fAckBunchSize * sizeof(RegionBlock));
} }
LOG(debug) << "shmem: initialized region queue: " << fQueueName; LOG(trace) << "shmem: initialized region queue: " << fQueueName << " (" << (fRemote ? "remote" : "local") << ")";
} }
void StartSendingAcks() { fAcksSender = std::thread(&Region::SendAcks, this); } void StartSendingAcks() { fAcksSender = std::thread(&Region::SendAcks, this); }
@ -238,11 +238,11 @@ struct Region
} }
if (boost::interprocess::shared_memory_object::remove(fName.c_str())) { if (boost::interprocess::shared_memory_object::remove(fName.c_str())) {
LOG(debug) << "Region '" << fName << "' destroyed."; LOG(trace) << "Region '" << fName << "' destroyed.";
} }
if (boost::interprocess::file_mapping::remove(fName.c_str())) { if (boost::interprocess::file_mapping::remove(fName.c_str())) {
LOG(debug) << "File mapping '" << fName << "' destroyed."; LOG(trace) << "File mapping '" << fName << "' destroyed.";
} }
if (fFile) { if (fFile) {
@ -250,14 +250,13 @@ struct Region
} }
if (boost::interprocess::message_queue::remove(fQueueName.c_str())) { if (boost::interprocess::message_queue::remove(fQueueName.c_str())) {
LOG(debug) << "Region queue '" << fQueueName << "' destroyed."; LOG(trace) << "Region queue '" << fQueueName << "' destroyed.";
} }
} else { } else {
// LOG(debug) << "shmem: region '" << fName << "' is remote, no cleanup necessary."; // LOG(debug) << "Region queue '" << fQueueName << "' is remote, no cleanup necessary";
LOG(debug) << "Region queue '" << fQueueName << "' is remote, no cleanup necessary";
} }
LOG(debug) << "Region '" << fName << "' (" << (fRemote ? "remote" : "local") << ") destructed."; // LOG(debug) << "Region '" << fName << "' (" << (fRemote ? "remote" : "local") << ") destructed.";
} }
bool fRemote; bool fRemote;