From 2df3d909fa93d348d7fa2e8c81af2088901da483 Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Wed, 29 Nov 2023 18:48:42 +0100 Subject: [PATCH] shm: when refCount segment size is zero, fallback to old behaviour , which is to store reference counts inside the main data segment --- .../fairmq-start-ex-region-advanced.sh.in | 1 + examples/region/keep-alive.cxx | 3 +- fairmq/shmem/Message.h | 60 +++++++++++++------ fairmq/shmem/UnmanagedRegion.h | 8 ++- test/message/_message.cxx | 22 +++++-- 5 files changed, 69 insertions(+), 25 deletions(-) diff --git a/examples/region/fairmq-start-ex-region-advanced.sh.in b/examples/region/fairmq-start-ex-region-advanced.sh.in index 2f4ae1b6..6a1cbf84 100755 --- a/examples/region/fairmq-start-ex-region-advanced.sh.in +++ b/examples/region/fairmq-start-ex-region-advanced.sh.in @@ -11,6 +11,7 @@ SAMPLER+=" --id sampler1" SAMPLER+=" --severity debug" SAMPLER+=" --msg-size $msgSize" SAMPLER+=" --transport $transport" +SAMPLER+=" --rc-segment-size 0" SAMPLER+=" --shm-monitor true" SAMPLER+=" --chan-name data1" SAMPLER+=" --channel-config name=data1,type=push,method=bind,address=tcp://127.0.0.1:7777" diff --git a/examples/region/keep-alive.cxx b/examples/region/keep-alive.cxx index 98234bf4..506bc79b 100644 --- a/examples/region/keep-alive.cxx +++ b/examples/region/keep-alive.cxx @@ -95,10 +95,11 @@ struct ShmManager uint64_t size = stoull(conf.at(1)); fair::mq::RegionConfig cfg; cfg.id = id; + cfg.rcSegmentSize = 0; cfg.size = size; regionCfgs.push_back(cfg); - auto ret = regions.emplace(id, make_unique(shmId, id, size)); + auto ret = regions.emplace(id, make_unique(shmId, cfg)); fair::mq::shmem::UnmanagedRegion& region = *(ret.first->second); LOG(info) << "Created unamanged region " << id << " of size " << region.GetSize() << ", starting at " << region.GetData() << ". Locking..."; diff --git a/fairmq/shmem/Message.h b/fairmq/shmem/Message.h index acaf4200..d53eef5a 100644 --- a/fairmq/shmem/Message.h +++ b/fairmq/shmem/Message.h @@ -251,7 +251,12 @@ class Message final : public fair::mq::Message if (!fRegionPtr) { throw TransportError(tools::ToString("Cannot get unmanaged region with id ", fRegionId)); } - return fRegionPtr->GetRefCountAddressFromHandle(fShared)->Get(); + if (fRegionPtr->fRcSegmentSize > 0) { + return fRegionPtr->GetRefCountAddressFromHandle(fShared)->Get(); + } else { + fManager.GetSegment(fSegmentId); + return ShmHeader::RefCount(fManager.GetAddressFromHandle(fShared, fSegmentId)); + } } void Copy(const fair::mq::Message& other) override @@ -277,19 +282,29 @@ class Message final : public fair::mq::Message if (!fRegionPtr) { throw TransportError(tools::ToString("Cannot get unmanaged region with id ", otherMsg.fRegionId)); } - if (otherMsg.fShared < 0) { - // UR msg not yet shared, create the reference counting object with count 2 - try { - otherMsg.fShared = fRegionPtr->HandleFromAddress(&(fRegionPtr->MakeRefCount(2))); - } catch (boost::interprocess::bad_alloc& ba) { - throw RefCountBadAlloc(tools::ToString( - "Insufficient space in the reference count segment ", - otherMsg.fRegionId, - ", original exception: bad_alloc: ", - ba.what())); + if (fRegionPtr->fRcSegmentSize > 0) { + if (otherMsg.fShared < 0) { + // UR msg not yet shared, create the reference counting object with count 2 + try { + otherMsg.fShared = fRegionPtr->HandleFromAddress(&(fRegionPtr->MakeRefCount(2))); + } catch (boost::interprocess::bad_alloc& ba) { + throw RefCountBadAlloc(tools::ToString("Insufficient space in the reference count segment ", otherMsg.fRegionId, ", original exception: bad_alloc: ", ba.what())); + } + } else { + fRegionPtr->GetRefCountAddressFromHandle(otherMsg.fShared)->Increment(); + } + } else { // if RefCount segment size is 0, store the ref count in the managed segment + if (otherMsg.fShared < 0) { // if UR msg is not yet shared + char* ptr = fManager.Allocate(2, 0); + // point the fShared in the unmanaged region message to the refCount holder + otherMsg.fShared = fManager.GetHandleFromAddress(ptr, fSegmentId); + // the message needs to be able to locate in which segment the refCount is stored + otherMsg.fSegmentId = fSegmentId; + ShmHeader::IncrementRefCount(ptr); + } else { // if the UR msg is already shared + fManager.GetSegment(otherMsg.fSegmentId); + ShmHeader::IncrementRefCount(fManager.GetAddressFromHandle(otherMsg.fShared, otherMsg.fSegmentId)); } - } else { - fRegionPtr->GetRefCountAddressFromHandle(otherMsg.fShared)->Increment(); } } @@ -357,10 +372,21 @@ class Message final : public fair::mq::Message if (!fRegionPtr) { throw TransportError(tools::ToString("Cannot get unmanaged region with id ", fRegionId)); } - uint16_t refCount = fRegionPtr->GetRefCountAddressFromHandle(fShared)->Decrement(); - if (refCount == 1) { - fRegionPtr->RemoveRefCount(*(fRegionPtr->GetRefCountAddressFromHandle(fShared))); - ReleaseUnmanagedRegionBlock(); + if (fRegionPtr->fRcSegmentSize > 0) { + uint16_t refCount = fRegionPtr->GetRefCountAddressFromHandle(fShared)->Decrement(); + if (refCount == 1) { + fRegionPtr->RemoveRefCount(*(fRegionPtr->GetRefCountAddressFromHandle(fShared))); + ReleaseUnmanagedRegionBlock(); + } + } else { // if RefCount segment size is 0, get the ref count from the managed segment + // make sure segment is initialized in this transport + fManager.GetSegment(fSegmentId); + // release unmanaged region block if ref count is one + uint16_t refCount = ShmHeader::DecrementRefCount(fManager.GetAddressFromHandle(fShared, fSegmentId)); + if (refCount == 1) { + fManager.Deallocate(fShared, fSegmentId); + ReleaseUnmanagedRegionBlock(); + } } } else { ReleaseUnmanagedRegionBlock(); diff --git a/fairmq/shmem/UnmanagedRegion.h b/fairmq/shmem/UnmanagedRegion.h index 76a1e183..2253f5eb 100644 --- a/fairmq/shmem/UnmanagedRegion.h +++ b/fairmq/shmem/UnmanagedRegion.h @@ -65,6 +65,7 @@ struct UnmanagedRegion , fShmemObject() , fFile(nullptr) , fFileMapping() + , fRcSegmentSize(cfg.rcSegmentSize) , fQueue(nullptr) , fCallback(nullptr) , fBulkCallback(nullptr) @@ -146,13 +147,13 @@ struct UnmanagedRegion LOG(debug) << "Successfully zeroed free memory of region " << id << "."; } - InitializeRefCountSegment(cfg.rcSegmentSize); + InitializeRefCountSegment(fRcSegmentSize); if (fControlling && created) { Register(shmId, cfg); } - LOG(debug) << (created ? "Created" : "Opened") << " unmanaged shared memory region: " << fName << " (" << (fControlling ? "controller" : "viewer") << ")"; + LOG(debug) << (created ? "Created" : "Opened") << " unmanaged shared memory region: " << fName << " (" << (fControlling ? "controller" : "viewer") << "), refCount segment size: " << fRcSegmentSize; } UnmanagedRegion() = delete; @@ -266,6 +267,7 @@ struct UnmanagedRegion std::condition_variable fBlockSendCV; std::vector fBlocksToFree; const std::size_t fAckBunchSize = 256; + uint64_t fRcSegmentSize; std::unique_ptr fQueue; std::unique_ptr fRefCountSegment; std::unique_ptr fRefCountPool; @@ -321,7 +323,7 @@ struct UnmanagedRegion void InitializeRefCountSegment(uint64_t size) { using namespace boost::interprocess; - if (!fRefCountSegment) { + if (!fRefCountSegment && size > 0) { fRefCountSegment = std::make_unique(open_or_create, fRefCountSegmentName.c_str(), size); LOG(trace) << "shmem: initialized ref count segment: " << fRefCountSegmentName; fRefCountPool = std::make_unique(fRefCountSegment->get_segment_manager()); diff --git a/test/message/_message.cxx b/test/message/_message.cxx index d50c0311..1757df3d 100644 --- a/test/message/_message.cxx +++ b/test/message/_message.cxx @@ -287,8 +287,10 @@ auto ZeroCopy(bool expandedShmMetadata = false) -> void // The "zero copy" property of the Copy() method is an implementation detail and is not guaranteed. // Currently it holds true for the shmem (across devices) and for zeromq (within same device) transports. -auto ZeroCopyFromUnmanaged(string const& address, bool expandedShmMetadata = false) -> void +auto ZeroCopyFromUnmanaged(string const& address, bool expandedShmMetadata, uint64_t rcSegmentSize) -> void { + fair::Logger::SetConsoleSeverity(fair::Severity::debug); + ProgOptions config1; ProgOptions config2; string session(tools::Uuid()); @@ -311,11 +313,13 @@ auto ZeroCopyFromUnmanaged(string const& address, bool expandedShmMetadata = fal const size_t msgSize{100}; const size_t regionSize{1000000}; + RegionConfig cfg; + cfg.rcSegmentSize = rcSegmentSize; tools::Semaphore blocker; auto region = factory1->CreateUnmanagedRegion(regionSize, [&blocker](void*, size_t, void*) { blocker.Signal(); - }); + }, cfg); { Channel push("Push", "push", factory1); @@ -461,12 +465,22 @@ TEST(ZeroCopy, shmem_expanded_metadata) // NOLINT TEST(ZeroCopyFromUnmanaged, shmem) // NOLINT { - ZeroCopyFromUnmanaged("ipc://test_zerocopy_unmanaged"); + ZeroCopyFromUnmanaged("ipc://test_zerocopy_unmanaged", false, 10000000); } TEST(ZeroCopyFromUnmanaged, shmem_expanded_metadata) // NOLINT { - ZeroCopyFromUnmanaged("ipc://test_zerocopy_unmanaged", true); + ZeroCopyFromUnmanaged("ipc://test_zerocopy_unmanaged_expanded", true, 10000000); +} + +TEST(ZeroCopyFromUnmanaged, shmem_no_rc_segment) // NOLINT +{ + ZeroCopyFromUnmanaged("ipc://test_zerocopy_unmanaged_no_rc_segment", false, 0); +} + +TEST(ZeroCopyFromUnmanaged, shmem_expanded_metadata_no_rc_segment) // NOLINT +{ + ZeroCopyFromUnmanaged("ipc://test_zerocopy_unmanaged_expanded_no_rc_segment", true, 0); } } // namespace