From 37c059177f13b6eceac7ccfa12cbf31f21d89192 Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Thu, 10 Jun 2021 11:02:40 +0200 Subject: [PATCH] shm: improve exception handling --- fairmq/shmem/Manager.h | 83 ++++++++++++++++++++++++++---------------- 1 file changed, 51 insertions(+), 32 deletions(-) diff --git a/fairmq/shmem/Manager.h b/fairmq/shmem/Manager.h index e1505516..10d9394c 100644 --- a/fairmq/shmem/Manager.h +++ b/fairmq/shmem/Manager.h @@ -19,10 +19,11 @@ #include "Region.h" #include "Monitor.h" -#include -#include #include #include +#include +#include +#include #include #include @@ -108,7 +109,7 @@ class Manager fHeartbeatThread = std::thread(&Manager::Heartbeats, this); - { + try { std::stringstream ss; boost::interprocess::scoped_lock lock(fShmMtx); @@ -132,9 +133,25 @@ class Manager LOG(debug) << "initialized event counter with: " << fEventCounter->fCount; } + fShmRegions = fManagementSegment.find_or_construct(unique_instance)(fShmVoidAlloc); + + fDeviceCounter = fManagementSegment.find(unique_instance).first; + if (fDeviceCounter) { + LOG(debug) << "device counter found, with value of " << fDeviceCounter->fCount << ". incrementing."; + (fDeviceCounter->fCount)++; + LOG(debug) << "incremented device counter, now: " << fDeviceCounter->fCount; + } else { + LOG(debug) << "no device counter found, creating one and initializing with 1"; + fDeviceCounter = fManagementSegment.construct(unique_instance)(1); + LOG(debug) << "initialized device counter with: " << fDeviceCounter->fCount; + } + + std::string op("create/open"); + try { auto it = fShmSegments->find(fSegmentId); if (it == fShmSegments->end()) { + op = "create"; // no segment with given id exists, creating if (allocationAlgorithm == "rbtree_best_fit") { fSegments.emplace(fSegmentId, RBTreeBestFitSegment(create_only, std::string("fmq_" + fShmId + "_m_" + std::to_string(fSegmentId)).c_str(), size)); @@ -146,6 +163,7 @@ class Manager ss << "Created "; (fEventCounter->fCount)++; } else { + op = "open"; // found segment with the given id, opening if (it->second.fAllocationAlgorithm == AllocationAlgorithm::rbtree_best_fit) { fSegments.emplace(fSegmentId, RBTreeBestFitSegment(open_only, std::string("fmq_" + fShmId + "_m_" + std::to_string(fSegmentId)).c_str())); @@ -168,8 +186,8 @@ class Manager << " Allocation algorithm: " << allocationAlgorithm; LOG(debug) << ss.str(); } catch(interprocess_exception& bie) { - LOG(error) << "Failed to create/open shared memory segment (" << "fmq_" << fShmId << "_m_" << fSegmentId << "): " << bie.what(); - throw std::runtime_error(tools::ToString("Failed to create/open shared memory segment (", "fmq_", fShmId, "_m_", fSegmentId, "): ", bie.what())); + LOG(error) << "Failed to " << op << " shared memory segment (" << "fmq_" << fShmId << "_m_" << fSegmentId << "): " << bie.what(); + throw TransportError(tools::ToString("Failed to ", op, " shared memory segment (", "fmq_", fShmId, "_m_", fSegmentId, "): ", bie.what())); } if (mlockSegment) { @@ -185,23 +203,14 @@ class Manager LOG(debug) << "Successfully zeroed the managed segment free memory."; } - fShmRegions = fManagementSegment.find_or_construct(unique_instance)(fShmVoidAlloc); - - fDeviceCounter = fManagementSegment.find(unique_instance).first; - if (fDeviceCounter) { - LOG(debug) << "device counter found, with value of " << fDeviceCounter->fCount << ". incrementing."; - (fDeviceCounter->fCount)++; - LOG(debug) << "incremented device counter, now: " << fDeviceCounter->fCount; - } else { - LOG(debug) << "no device counter found, creating one and initializing with 1"; - fDeviceCounter = fManagementSegment.construct(unique_instance)(1); - LOG(debug) << "initialized device counter with: " << fDeviceCounter->fCount; - } - #ifdef FAIRMQ_DEBUG_MODE fMsgDebug = fManagementSegment.find_or_construct(unique_instance)(fShmVoidAlloc); fShmMsgCounters = fManagementSegment.find_or_construct(unique_instance)(fShmVoidAlloc); #endif + } catch (...) { + StopHeartbeats(); + CleanupIfLast(); + throw; } } @@ -556,6 +565,18 @@ class Manager } } + void StopHeartbeats() + { + { + std::unique_lock lock(fHeartbeatsMtx); + fBeatTheHeart = false; + } + fHeartbeatsCV.notify_one(); + if (fHeartbeatThread.joinable()) { + fHeartbeatThread.join(); + } + } + bool ThrowingOnBadAlloc() const { return fThrowOnBadAlloc; } void GetSegment(uint16_t id) @@ -660,23 +681,11 @@ class Manager uint16_t GetSegmentId() const { return fSegmentId; } - ~Manager() + void CleanupIfLast() { using namespace boost::interprocess; + bool lastRemoved = false; - - fRegionsGen += 1; // signal TL cache invalidation - UnsubscribeFromRegionEvents(); - - { - std::unique_lock lock(fHeartbeatsMtx); - fBeatTheHeart = false; - } - fHeartbeatsCV.notify_one(); - if (fHeartbeatThread.joinable()) { - fHeartbeatThread.join(); - } - try { boost::interprocess::scoped_lock lock(fShmMtx); @@ -697,6 +706,16 @@ class Manager } } + ~Manager() + { + fRegionsGen += 1; // signal TL cache invalidation + UnsubscribeFromRegionEvents(); + + StopHeartbeats(); + + CleanupIfLast(); + } + private: uint64_t fShmId64; std::string fShmId;