From 9bf908fb52df5af0299fcebb7826cfd33cdd05d6 Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Thu, 20 May 2021 00:28:44 +0200 Subject: [PATCH] shm: revert some changes from c85d6e0 that introduced a race --- fairmq/shmem/Manager.h | 1 - fairmq/shmem/Message.h | 2 -- fairmq/shmem/Region.h | 20 ++++++++++---------- 3 files changed, 10 insertions(+), 13 deletions(-) diff --git a/fairmq/shmem/Manager.h b/fairmq/shmem/Manager.h index 85e4f0bb..27eef548 100644 --- a/fairmq/shmem/Manager.h +++ b/fairmq/shmem/Manager.h @@ -327,7 +327,6 @@ class Manager fShmRegions->emplace(id, RegionInfo(path.c_str(), flags, userFlags, fShmVoidAlloc)); - r.first->second->InitializeQueues(); r.first->second->StartReceivingAcks(); result.first = &(r.first->second->fRegion); result.second = id; diff --git a/fairmq/shmem/Message.h b/fairmq/shmem/Message.h index 77abf9e0..d4af19f0 100644 --- a/fairmq/shmem/Message.h +++ b/fairmq/shmem/Message.h @@ -309,8 +309,6 @@ class Message final : public fair::mq::Message } if (fRegionPtr) { - fRegionPtr->InitializeQueues(); - fRegionPtr->StartSendingAcks(); fRegionPtr->ReleaseBlock({fMeta.fHandle, fMeta.fSize, fMeta.fHint}); } else { LOG(warn) << "region ack queue for id " << fMeta.fRegionId << " no longer exist. Not sending ack"; diff --git a/fairmq/shmem/Region.h b/fairmq/shmem/Region.h index 9a4ab719..419453c2 100644 --- a/fairmq/shmem/Region.h +++ b/fairmq/shmem/Region.h @@ -104,6 +104,10 @@ struct Region } } + + InitializeQueues(); + StartSendingAcks(); + LOG(trace) << "shmem: initialized region: " << fName << " (" << (fRemote ? "remote" : "local") << ")"; } @@ -116,21 +120,17 @@ struct Region { using namespace boost::interprocess; - if (fQueue == nullptr) { - if (fRemote) { - fQueue = std::make_unique(open_only, fQueueName.c_str()); - } else { - fQueue = std::make_unique(create_only, fQueueName.c_str(), 1024, fAckBunchSize * sizeof(RegionBlock)); - } - LOG(trace) << "shmem: initialized region queue: " << fQueueName << " (" << (fRemote ? "remote" : "local") << ")"; + if (fRemote) { + fQueue = std::make_unique(open_only, fQueueName.c_str()); + } else { + fQueue = std::make_unique(create_only, fQueueName.c_str(), 1024, fAckBunchSize * sizeof(RegionBlock)); } + LOG(trace) << "shmem: initialized region queue: " << fQueueName << " (" << (fRemote ? "remote" : "local") << ")"; } void StartSendingAcks() { - if (!fAcksSender.joinable()) { - fAcksSender = std::thread(&Region::SendAcks, this); - } + fAcksSender = std::thread(&Region::SendAcks, this); } void SendAcks() {