diff --git a/fairmq/shmem/Manager.cxx b/fairmq/shmem/Manager.cxx index 12f5274a..645eacac 100644 --- a/fairmq/shmem/Manager.cxx +++ b/fairmq/shmem/Manager.cxx @@ -233,31 +233,37 @@ vector Manager::GetRegionInfoUnsafe() void Manager::SubscribeToRegionEvents(RegionEventCallback callback) { - bipc::scoped_lock lock(fShmMtx); if (fRegionEventThread.joinable()) { - fRegionEventsSubscriptionActive.store(false); + LOG(debug) << "Already subscribed. Overwriting previous subscription."; + bipc::scoped_lock lock(fShmMtx); + fRegionEventsSubscriptionActive = false; + lock.unlock(); + fRegionEventsCV.notify_all(); fRegionEventThread.join(); } + bipc::scoped_lock lock(fShmMtx); fRegionEventCallback = callback; - fRegionEventsSubscriptionActive.store(true); + fRegionEventsSubscriptionActive = true; fRegionEventThread = thread(&Manager::RegionEventsSubscription, this); } void Manager::UnsubscribeFromRegionEvents() { if (fRegionEventThread.joinable()) { - fRegionEventsSubscriptionActive.store(false); + bipc::scoped_lock lock(fShmMtx); + fRegionEventsSubscriptionActive = false; + lock.unlock(); fRegionEventsCV.notify_all(); fRegionEventThread.join(); + lock.lock(); + fRegionEventCallback = nullptr; } - bipc::scoped_lock lock(fShmMtx); - fRegionEventCallback = nullptr; } void Manager::RegionEventsSubscription() { - while (fRegionEventsSubscriptionActive.load()) { - bipc::scoped_lock lock(fShmMtx); + bipc::scoped_lock lock(fShmMtx); + while (fRegionEventsSubscriptionActive) { auto infos = GetRegionInfoUnsafe(); for (const auto& i : infos) { auto el = fObservedRegionEvents.find(i.id); @@ -298,11 +304,7 @@ Manager::~Manager() { bool lastRemoved = false; - if (fRegionEventThread.joinable()) { - fRegionEventsSubscriptionActive.store(false); - fRegionEventsCV.notify_all(); - fRegionEventThread.join(); - } + UnsubscribeFromRegionEvents(); try { bipc::scoped_lock lock(fShmMtx); diff --git a/fairmq/shmem/Manager.h b/fairmq/shmem/Manager.h index ccce4e85..bd0a5e4b 100644 --- a/fairmq/shmem/Manager.h +++ b/fairmq/shmem/Manager.h @@ -94,7 +94,7 @@ class Manager boost::interprocess::named_condition fRegionEventsCV; std::thread fRegionEventThread; - std::atomic fRegionEventsSubscriptionActive; + bool fRegionEventsSubscriptionActive; std::function fRegionEventCallback; std::unordered_map fObservedRegionEvents;