From 266843cda505b59dd334132c2e6a223b660a0f97 Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Tue, 25 Aug 2020 21:04:02 +0200 Subject: [PATCH] Shm: initial multiple segments support --- fairmq/FairMQUnmanagedRegion.h | 2 +- fairmq/plugins/config/Config.cxx | 1 + fairmq/shmem/Common.h | 19 +++--- fairmq/shmem/Manager.h | 111 +++++++++++++++++++------------ fairmq/shmem/Message.h | 22 +++--- fairmq/shmem/Monitor.cxx | 90 +++++++++++++------------ fairmq/shmem/README.md | 20 +++--- fairmq/shmem/Region.h | 2 +- fairmq/shmem/UnmanagedRegion.h | 4 +- fairmq/zeromq/Context.h | 8 +-- fairmq/zeromq/UnmanagedRegion.h | 4 +- 11 files changed, 159 insertions(+), 124 deletions(-) diff --git a/fairmq/FairMQUnmanagedRegion.h b/fairmq/FairMQUnmanagedRegion.h index a7048819..a353e0b6 100644 --- a/fairmq/FairMQUnmanagedRegion.h +++ b/fairmq/FairMQUnmanagedRegion.h @@ -75,7 +75,7 @@ class FairMQUnmanagedRegion virtual void* GetData() const = 0; virtual size_t GetSize() const = 0; - virtual uint64_t GetId() const = 0; + virtual uint16_t GetId() const = 0; virtual void SetLinger(uint32_t linger) = 0; virtual uint32_t GetLinger() const = 0; diff --git a/fairmq/plugins/config/Config.cxx b/fairmq/plugins/config/Config.cxx index dae14c2a..22a53632 100644 --- a/fairmq/plugins/config/Config.cxx +++ b/fairmq/plugins/config/Config.cxx @@ -70,6 +70,7 @@ Plugin::ProgOptions ConfigPluginProgramOptions() ("print-channels", po::value()->implicit_value(true), "Print registered channel endpoints in a machine-readable format (::)") ("shm-segment-size", po::value()->default_value(2ULL << 30), "Shared memory: size of the shared memory segment (in bytes).") ("shm-allocation", po::value()->default_value("rbtree_best_fit"), "Shared memory allocation algorithm: rbtree_best_fit/simple_seq_fit.") + ("shm-segment-id", po::value()->default_value(0), "EXPERIMENTAL: Shared memory segment id for message creation.") ("shm-mlock-segment", po::value()->default_value(false), "Shared memory: mlock the shared memory segment after initialization.") ("shm-zero-segment", po::value()->default_value(false), "Shared memory: zero the shared memory segment memory after initialization.") ("shm-throw-bad-alloc", po::value()->default_value(true), "Throw a fair::mq::MessageBadAlloc if cannot allocate a message (retry if false).") diff --git a/fairmq/shmem/Common.h b/fairmq/shmem/Common.h index 22cec53a..83650055 100644 --- a/fairmq/shmem/Common.h +++ b/fairmq/shmem/Common.h @@ -81,9 +81,9 @@ struct RegionInfo bool fDestroyed; }; -using Uint64RegionInfoPairAlloc = boost::interprocess::allocator, SegmentManager>; -using Uint64RegionInfoMap = boost::interprocess::map, Uint64RegionInfoPairAlloc>; -using Uint64RegionInfoHashMap = boost::unordered_map, std::equal_to, Uint64RegionInfoPairAlloc>; +using Uint16RegionInfoPairAlloc = boost::interprocess::allocator, SegmentManager>; +using Uint16RegionInfoMap = boost::interprocess::map, Uint16RegionInfoPairAlloc>; +using Uint16RegionInfoHashMap = boost::unordered_map, std::equal_to, Uint16RegionInfoPairAlloc>; struct SegmentInfo { @@ -94,9 +94,9 @@ struct SegmentInfo AllocationAlgorithm fAllocationAlgorithm; }; -using Uint64SegmentInfoPairAlloc = boost::interprocess::allocator, SegmentManager>; -using Uint64SegmentInfoMap = boost::interprocess::map, Uint64SegmentInfoPairAlloc>; -using Uint64SegmentInfoHashMap = boost::unordered_map, std::equal_to, Uint64SegmentInfoPairAlloc>; +using Uint16SegmentInfoPairAlloc = boost::interprocess::allocator, SegmentManager>; +using Uint16SegmentInfoMap = boost::interprocess::map, Uint16SegmentInfoPairAlloc>; +using Uint16SegmentInfoHashMap = boost::unordered_map, std::equal_to, Uint16SegmentInfoPairAlloc>; struct DeviceCounter { @@ -120,18 +120,19 @@ struct MsgCounter struct RegionCounter { - RegionCounter(uint64_t c) + RegionCounter(uint16_t c) : fCount(c) {} - std::atomic fCount; + std::atomic fCount; }; struct MetaHeader { size_t fSize; - size_t fRegionId; size_t fHint; + uint16_t fRegionId; + uint16_t fSegmentId; boost::interprocess::managed_shared_memory::handle_t fHandle; }; diff --git a/fairmq/shmem/Manager.h b/fairmq/shmem/Manager.h index 46ed42e4..9a92430c 100644 --- a/fairmq/shmem/Manager.h +++ b/fairmq/shmem/Manager.h @@ -60,6 +60,7 @@ class Manager public: Manager(std::string shmId, std::string deviceId, size_t size, const ProgOptions* config) : fShmId(std::move(shmId)) + , fSegmentId(0) , fDeviceId(std::move(deviceId)) , fSegments() , fManagementSegment(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_mng").c_str(), 6553600) @@ -88,6 +89,7 @@ class Manager std::string allocationAlgorithm("rbtree_best_fit"); if (config) { mlockSegment = config->GetProperty("shm-mlock-segment", mlockSegment); + fSegmentId = config->GetProperty("shm-segment-id", fSegmentId); zeroSegment = config->GetProperty("shm-zero-segment", zeroSegment); autolaunchMonitor = config->GetProperty("shm-monitor", autolaunchMonitor); fThrowOnBadAlloc = config->GetProperty("shm-throw-bad-alloc", fThrowOnBadAlloc); @@ -109,32 +111,30 @@ class Manager std::stringstream ss; boost::interprocess::scoped_lock lock(fShmMtx); - fShmSegments = fManagementSegment.find_or_construct(unique_instance)(fShmVoidAlloc); - - const uint64_t id = 0; + fShmSegments = fManagementSegment.find_or_construct(unique_instance)(fShmVoidAlloc); try { - auto it = fShmSegments->find(id); + auto it = fShmSegments->find(fSegmentId); if (it == fShmSegments->end()) { // no segment with given id exists, creating if (allocationAlgorithm == "rbtree_best_fit") { - fSegments.emplace(id, RBTreeBestFitSegment(create_only, std::string("fmq_" + fShmId + "_main").c_str(), size)); - fShmSegments->emplace(id, AllocationAlgorithm::rbtree_best_fit); + fSegments.emplace(fSegmentId, RBTreeBestFitSegment(create_only, std::string("fmq_" + fShmId + "_m_" + std::to_string(fSegmentId)).c_str(), size)); + fShmSegments->emplace(fSegmentId, AllocationAlgorithm::rbtree_best_fit); } else if (allocationAlgorithm == "simple_seq_fit") { - fSegments.emplace(id, SimpleSeqFitSegment(create_only, std::string("fmq_" + fShmId + "_main").c_str(), size)); - fShmSegments->emplace(id, AllocationAlgorithm::simple_seq_fit); + fSegments.emplace(fSegmentId, SimpleSeqFitSegment(create_only, std::string("fmq_" + fShmId + "_m_" + std::to_string(fSegmentId)).c_str(), size)); + fShmSegments->emplace(fSegmentId, AllocationAlgorithm::simple_seq_fit); } ss << "Created "; } else { // found segment with the given id, opening if (it->second.fAllocationAlgorithm == AllocationAlgorithm::rbtree_best_fit) { - fSegments.emplace(id, RBTreeBestFitSegment(open_only, std::string("fmq_" + fShmId + "_main").c_str())); + fSegments.emplace(fSegmentId, RBTreeBestFitSegment(open_only, std::string("fmq_" + fShmId + "_m_" + std::to_string(fSegmentId)).c_str())); if (allocationAlgorithm != "rbtree_best_fit") { LOG(warn) << "Allocation algorithm of the opened segment is rbtree_best_fit, but requested is " << allocationAlgorithm << ". Ignoring requested setting."; allocationAlgorithm = "rbtree_best_fit"; } } else { - fSegments.emplace(id, SimpleSeqFitSegment(open_only, std::string("fmq_" + fShmId + "_main").c_str())); + fSegments.emplace(fSegmentId, SimpleSeqFitSegment(open_only, std::string("fmq_" + fShmId + "_m_" + std::to_string(fSegmentId)).c_str())); if (allocationAlgorithm != "simple_seq_fit") { LOG(warn) << "Allocation algorithm of the opened segment is simple_seq_fit, but requested is " << allocationAlgorithm << ". Ignoring requested setting."; allocationAlgorithm = "simple_seq_fit"; @@ -142,33 +142,33 @@ class Manager } ss << "Opened "; } - ss << "shared memory segment '" << "fmq_" << fShmId << "_main_" << id << "'." - << " Size: " << boost::apply_visitor(SegmentSize{}, fSegments.at(id)) << " bytes." - << " Available: " << boost::apply_visitor(SegmentFreeMemory{}, fSegments.at(id)) << " bytes." + ss << "shared memory segment '" << "fmq_" << fShmId << "_m_" << fSegmentId << "'." + << " Size: " << boost::apply_visitor(SegmentSize{}, fSegments.at(fSegmentId)) << " bytes." + << " Available: " << boost::apply_visitor(SegmentFreeMemory{}, fSegments.at(fSegmentId)) << " bytes." << " Allocation algorithm: " << allocationAlgorithm; LOG(debug) << ss.str(); - } catch(interprocess_exception&) { - LOG(error) << "something went wrong"; + } catch(interprocess_exception& bie) { + LOG(error) << "something went wrong: " << bie.what(); } } if (mlockSegment) { LOG(debug) << "Locking the managed segment memory pages..."; - if (mlock(boost::apply_visitor(SegmentAddress{}, fSegments.at(0)), boost::apply_visitor(SegmentSize{}, fSegments.at(0))) == -1) { + if (mlock(boost::apply_visitor(SegmentAddress{}, fSegments.at(fSegmentId)), boost::apply_visitor(SegmentSize{}, fSegments.at(fSegmentId))) == -1) { LOG(error) << "Could not lock the managed segment memory. Code: " << errno << ", reason: " << strerror(errno); } LOG(debug) << "Successfully locked the managed segment memory pages."; } if (zeroSegment) { LOG(debug) << "Zeroing the managed segment free memory..."; - boost::apply_visitor(SegmentMemoryZeroer{}, fSegments.at(0)); + boost::apply_visitor(SegmentMemoryZeroer{}, fSegments.at(fSegmentId)); LOG(debug) << "Successfully zeroed the managed segment free memory."; } { boost::interprocess::scoped_lock lock(fShmMtx); - fShmRegions = fManagementSegment.find_or_construct(unique_instance)(fShmVoidAlloc); + fShmRegions = fManagementSegment.find_or_construct(unique_instance)(fShmVoidAlloc); #ifdef FAIRMQ_DEBUG_MODE fMsgDebug = fManagementSegment.find_or_construct(unique_instance)(fShmVoidAlloc); @@ -258,7 +258,7 @@ class Manager } bool Interrupted() { return fInterrupted.load(); } - std::pair CreateRegion(const size_t size, + std::pair CreateRegion(const size_t size, const int64_t userFlags, RegionCallback callback, RegionBulkCallback bulkCallback, @@ -267,10 +267,10 @@ class Manager { using namespace boost::interprocess; try { - std::pair result; + std::pair result; { - uint64_t id = 0; + uint16_t id = 0; boost::interprocess::scoped_lock lock(fShmMtx); RegionCounter* rc = fManagementSegment.find(unique_instance).first; @@ -314,13 +314,13 @@ class Manager } } - Region* GetRegion(const uint64_t id) + Region* GetRegion(const uint16_t id) { boost::interprocess::scoped_lock lock(fShmMtx); return GetRegionUnsafe(id); } - Region* GetRegionUnsafe(const uint64_t id) + Region* GetRegionUnsafe(const uint16_t id) { // remote region could actually be a local one if a message originates from this device (has been sent out and returned) auto it = fRegions.find(id); @@ -347,7 +347,7 @@ class Manager } } - void RemoveRegion(const uint64_t id) + void RemoveRegion(const uint16_t id) { fRegions.erase(id); { @@ -495,13 +495,37 @@ class Manager bool ThrowingOnBadAlloc() const { return fThrowOnBadAlloc; } - boost::interprocess::managed_shared_memory::handle_t GetHandleFromAddress(const void* ptr) const + void GetSegment(uint16_t id) { - return boost::apply_visitor(SegmentHandleFromAddress{ptr}, fSegments.at(0)); + auto it = fSegments.find(id); + if (it == fSegments.end()) { + try { + // get region info + SegmentInfo segmentInfo = fShmSegments->at(id); + LOG(info) << "LOCATED SEGMENT WITH ID '" << id << "'"; + + using namespace boost::interprocess; + + if (segmentInfo.fAllocationAlgorithm == AllocationAlgorithm::rbtree_best_fit) { + fSegments.emplace(id, RBTreeBestFitSegment(open_only, std::string("fmq_" + fShmId + "_m_" + std::to_string(id)).c_str())); + } else { + fSegments.emplace(id, SimpleSeqFitSegment(open_only, std::string("fmq_" + fShmId + "_m_" + std::to_string(id)).c_str())); + } + } catch (std::out_of_range& oor) { + LOG(error) << "Could not get segment with id '" << id << "': " << oor.what(); + } catch (boost::interprocess::interprocess_exception& bie) { + LOG(error) << "Could not get segment with id '" << id << "': " << bie.what(); + } + } } - void* GetAddressFromHandle(const boost::interprocess::managed_shared_memory::handle_t handle) const + + boost::interprocess::managed_shared_memory::handle_t GetHandleFromAddress(const void* ptr, uint16_t segmentId) const { - return boost::apply_visitor(SegmentAddressFromHandle{handle}, fSegments.at(0)); + return boost::apply_visitor(SegmentHandleFromAddress{ptr}, fSegments.at(segmentId)); + } + void* GetAddressFromHandle(const boost::interprocess::managed_shared_memory::handle_t handle, uint16_t segmentId) const + { + return boost::apply_visitor(SegmentAddressFromHandle{handle}, fSegments.at(segmentId)); } char* Allocate(const size_t size, size_t alignment = 0) @@ -513,20 +537,20 @@ class Manager try { // boost::interprocess::managed_shared_memory::size_type actualSize = size; // char* hint = 0; // unused for boost::interprocess::allocate_new - // ptr = fSegments.at(0).allocation_command(boost::interprocess::allocate_new, size, actualSize, hint); - size_t segmentSize = boost::apply_visitor(SegmentSize{}, fSegments.at(0)); + // ptr = fSegments.at(fSegmentId).allocation_command(boost::interprocess::allocate_new, size, actualSize, hint); + size_t segmentSize = boost::apply_visitor(SegmentSize{}, fSegments.at(fSegmentId)); if (size > segmentSize) { throw MessageBadAlloc(tools::ToString("Requested message size (", size, ") exceeds segment size (", segmentSize, ")")); } if (alignment == 0) { - ptr = reinterpret_cast(boost::apply_visitor(SegmentAllocate{size}, fSegments.at(0))); + ptr = reinterpret_cast(boost::apply_visitor(SegmentAllocate{size}, fSegments.at(fSegmentId))); } else { - ptr = reinterpret_cast(boost::apply_visitor(SegmentAllocateAligned{size, alignment}, fSegments.at(0))); + ptr = reinterpret_cast(boost::apply_visitor(SegmentAllocateAligned{size, alignment}, fSegments.at(fSegmentId))); } } catch (boost::interprocess::bad_alloc& ba) { // LOG(warn) << "Shared memory full..."; if (ThrowingOnBadAlloc()) { - throw MessageBadAlloc(tools::ToString("shmem: could not create a message of size ", size, ", alignment: ", (alignment != 0) ? std::to_string(alignment) : "default", ", free memory: ", boost::apply_visitor(SegmentFreeMemory{}, fSegments.at(0)))); + throw MessageBadAlloc(tools::ToString("shmem: could not create a message of size ", size, ", alignment: ", (alignment != 0) ? std::to_string(alignment) : "default", ", free memory: ", boost::apply_visitor(SegmentFreeMemory{}, fSegments.at(fSegmentId)))); } // rateLimiter.maybe_sleep(); std::this_thread::sleep_for(std::chrono::milliseconds(50)); @@ -546,9 +570,9 @@ class Manager return ptr; } - void Deallocate(boost::interprocess::managed_shared_memory::handle_t handle) + void Deallocate(boost::interprocess::managed_shared_memory::handle_t handle, uint16_t segmentId) { - boost::apply_visitor(SegmentDeallocate{GetAddressFromHandle(handle)}, fSegments.at(0)); + boost::apply_visitor(SegmentDeallocate{GetAddressFromHandle(handle, segmentId)}, fSegments.at(segmentId)); #ifdef FAIRMQ_DEBUG_MODE boost::interprocess::scoped_lock lock(fShmMtx); DecrementShmMsgCounter(); @@ -556,11 +580,13 @@ class Manager #endif } - char* ShrinkInPlace(size_t oldSize, size_t newSize, char* localPtr) + char* ShrinkInPlace(size_t oldSize, size_t newSize, char* localPtr, uint16_t segmentId) { - return boost::apply_visitor(SegmentBufferShrink{oldSize, newSize, localPtr}, fSegments.at(0)); + return boost::apply_visitor(SegmentBufferShrink{oldSize, newSize, localPtr}, fSegments.at(segmentId)); } + uint16_t GetSegmentId() const { return fSegmentId; } + ~Manager() { using namespace boost::interprocess; @@ -599,8 +625,9 @@ class Manager private: std::string fShmId; + uint16_t fSegmentId; std::string fDeviceId; - std::unordered_map> fSegments; + std::unordered_map> fSegments; boost::interprocess::managed_shared_memory fManagementSegment; VoidAlloc fShmVoidAlloc; boost::interprocess::named_mutex fShmMtx; @@ -609,12 +636,12 @@ class Manager std::thread fRegionEventThread; bool fRegionEventsSubscriptionActive; std::function fRegionEventCallback; - std::unordered_map fObservedRegionEvents; + std::unordered_map fObservedRegionEvents; DeviceCounter* fDeviceCounter; - Uint64SegmentInfoHashMap* fShmSegments; - Uint64RegionInfoHashMap* fShmRegions; - std::unordered_map> fRegions; + Uint16SegmentInfoHashMap* fShmSegments; + Uint16RegionInfoHashMap* fShmRegions; + std::unordered_map> fRegions; std::atomic fInterrupted; std::atomic fMsgCounter; // TODO: find a better lifetime solution instead of the counter diff --git a/fairmq/shmem/Message.h b/fairmq/shmem/Message.h index 1cf1e12c..fcd5a388 100644 --- a/fairmq/shmem/Message.h +++ b/fairmq/shmem/Message.h @@ -43,7 +43,7 @@ class Message final : public fair::mq::Message : fair::mq::Message(factory) , fManager(manager) , fQueued(false) - , fMeta{0, 0, 0, -1} + , fMeta{0, 0, 0, fManager.GetSegmentId(), -1} , fRegionPtr(nullptr) , fLocalPtr(nullptr) { @@ -54,7 +54,7 @@ class Message final : public fair::mq::Message : fair::mq::Message(factory) , fManager(manager) , fQueued(false) - , fMeta{0, 0, 0, -1} + , fMeta{0, 0, 0, fManager.GetSegmentId(), -1} , fRegionPtr(nullptr) , fLocalPtr(nullptr) { @@ -65,7 +65,7 @@ class Message final : public fair::mq::Message : fair::mq::Message(factory) , fManager(manager) , fQueued(false) - , fMeta{0, 0, 0, -1} + , fMeta{0, 0, 0, fManager.GetSegmentId(), -1} , fRegionPtr(nullptr) , fLocalPtr(nullptr) { @@ -77,7 +77,7 @@ class Message final : public fair::mq::Message : fair::mq::Message(factory) , fManager(manager) , fQueued(false) - , fMeta{0, 0, 0, -1} + , fMeta{0, 0, 0, fManager.GetSegmentId(), -1} , fRegionPtr(nullptr) , fLocalPtr(nullptr) { @@ -89,7 +89,7 @@ class Message final : public fair::mq::Message : fair::mq::Message(factory) , fManager(manager) , fQueued(false) - , fMeta{0, 0, 0, -1} + , fMeta{0, 0, 0, fManager.GetSegmentId(), -1} , fRegionPtr(nullptr) , fLocalPtr(nullptr) { @@ -108,7 +108,7 @@ class Message final : public fair::mq::Message : fair::mq::Message(factory) , fManager(manager) , fQueued(false) - , fMeta{size, static_cast(region.get())->fRegionId, reinterpret_cast(hint), -1} + , fMeta{size, reinterpret_cast(hint), static_cast(region.get())->fRegionId, fManager.GetSegmentId(), -1} , fRegionPtr(nullptr) , fLocalPtr(static_cast(data)) { @@ -169,7 +169,8 @@ class Message final : public fair::mq::Message if (!fLocalPtr) { if (fMeta.fRegionId == 0) { if (fMeta.fSize > 0) { - fLocalPtr = reinterpret_cast(fManager.GetAddressFromHandle(fMeta.fHandle)); + fManager.GetSegment(fMeta.fSegmentId); + fLocalPtr = reinterpret_cast(fManager.GetAddressFromHandle(fMeta.fHandle, fMeta.fSegmentId)); } else { fLocalPtr = nullptr; } @@ -195,7 +196,7 @@ class Message final : public fair::mq::Message return true; } else if (newSize <= fMeta.fSize) { try { - fLocalPtr = fManager.ShrinkInPlace(fMeta.fSize, newSize, fLocalPtr); + fLocalPtr = fManager.ShrinkInPlace(fMeta.fSize, newSize, fLocalPtr, fMeta.fSegmentId); fMeta.fSize = newSize; return true; } catch (boost::interprocess::interprocess_exception& e) { @@ -248,7 +249,7 @@ class Message final : public fair::mq::Message { fLocalPtr = fManager.Allocate(size, alignment); if (fLocalPtr) { - fMeta.fHandle = fManager.GetHandleFromAddress(fLocalPtr); + fMeta.fHandle = fManager.GetHandleFromAddress(fLocalPtr, fMeta.fSegmentId); fMeta.fSize = size; } return fLocalPtr; @@ -258,7 +259,8 @@ class Message final : public fair::mq::Message { if (fMeta.fHandle >= 0 && !fQueued) { if (fMeta.fRegionId == 0) { - fManager.Deallocate(fMeta.fHandle); + fManager.GetSegment(fMeta.fSegmentId); + fManager.Deallocate(fMeta.fHandle, fMeta.fSegmentId); fMeta.fHandle = -1; } else { if (!fRegionPtr) { diff --git a/fairmq/shmem/Monitor.cxx b/fairmq/shmem/Monitor.cxx index f436505f..fbe96ce9 100644 --- a/fairmq/shmem/Monitor.cxx +++ b/fairmq/shmem/Monitor.cxx @@ -63,7 +63,7 @@ Monitor::Monitor(const string& shmId, bool selfDestruct, bool interactive, bool , fTimeoutInMS(timeoutInMS) , fIntervalInMS(intervalInMS) , fShmId(shmId) - , fSegmentName("fmq_" + fShmId + "_main") + , fSegmentName("fmq_" + fShmId + "_m_0") , fManagementSegmentName("fmq_" + fShmId + "_mng") , fControlQueueName("fmq_" + fShmId + "_cq") , fTerminating(false) @@ -280,23 +280,19 @@ void Monitor::CheckSegment() try { managed_shared_memory managementSegment(open_only, fManagementSegmentName.c_str()); - Uint64SegmentInfoHashMap* segmentInfos = managementSegment.find(unique_instance).first; - std::unordered_map> segments; + Uint16SegmentInfoHashMap* segmentInfos = managementSegment.find(unique_instance).first; + std::unordered_map> segments; if (!segmentInfos) { cout << "Found management segment, but cannot locate segment info, something went wrong..." << endl; return; } - const uint64_t id = 0; - - auto it = segmentInfos->find(id); - if (it != segmentInfos->end()) { - // found segment with the given id, opening - if (it->second.fAllocationAlgorithm == AllocationAlgorithm::rbtree_best_fit) { - segments.emplace(id, RBTreeBestFitSegment(open_only, fSegmentName.c_str())); + for (const auto& s : *segmentInfos) { + if (s.second.fAllocationAlgorithm == AllocationAlgorithm::rbtree_best_fit) { + segments.emplace(s.first, RBTreeBestFitSegment(open_only, std::string("fmq_" + fShmId + "_m_" + to_string(s.first)).c_str())); } else { - segments.emplace(id, SimpleSeqFitSegment(open_only, fSegmentName.c_str())); + segments.emplace(s.first, SimpleSeqFitSegment(open_only, std::string("fmq_" + fShmId + "_m_" + to_string(s.first)).c_str())); } } @@ -336,8 +332,8 @@ void Monitor::CheckSegment() if (fInteractive) { cout << "| " << setw(18) << fSegmentName << " | " - << setw(10) << boost::apply_visitor(SegmentSize{}, segments.at(id)) << " | " - << setw(10) << boost::apply_visitor(SegmentFreeMemory{}, segments.at(id)) << " | " + << setw(10) << boost::apply_visitor(SegmentSize{}, segments.at(0)) << " | " + << setw(10) << boost::apply_visitor(SegmentFreeMemory{}, segments.at(0)) << " | " << setw(8) << numDevices << " | " #ifdef FAIRMQ_DEBUG_MODE << setw(8) << numMessages << " | " @@ -347,8 +343,8 @@ void Monitor::CheckSegment() << setw(10) << (fViewOnly ? "view only" : to_string(duration)) << " |" << c << flush; } else if (fViewOnly) { - size_t free = boost::apply_visitor(SegmentFreeMemory{}, segments.at(id)); - size_t total = boost::apply_visitor(SegmentSize{}, segments.at(id)); + size_t free = boost::apply_visitor(SegmentFreeMemory{}, segments.at(0)); + size_t total = boost::apply_visitor(SegmentSize{}, segments.at(0)); size_t used = total - free; // size_t mfree = managementSegment.get_free_memory(); // size_t mtotal = managementSegment.get_size(); @@ -528,52 +524,60 @@ std::vector> Monitor::Cleanup(const ShmId& shmId, b string managementSegmentName("fmq_" + shmId.shmId + "_mng"); try { bipc::managed_shared_memory managementSegment(bipc::open_only, managementSegmentName.c_str()); - RegionCounter* rc = managementSegment.find(bipc::unique_instance).first; - if (rc) { - if (verbose) { - cout << "Region counter found: " << rc->fCount << endl; - } - uint64_t regionCount = rc->fCount; - Uint64RegionInfoMap* m = managementSegment.find(bipc::unique_instance).first; + try { + RegionCounter* rc = managementSegment.find(bipc::unique_instance).first; + if (rc) { + if (verbose) { + cout << "Region counter found: " << rc->fCount << endl; + } + uint16_t regionCount = rc->fCount; - for (uint64_t i = 1; i <= regionCount; ++i) { - if (m != nullptr) { - RegionInfo ri = m->at(i); - string path = ri.fPath.c_str(); - int flags = ri.fFlags; - if (verbose) { - cout << "Found RegionInfo with path: '" << path << "', flags: " << flags << ", fDestroyed: " << ri.fDestroyed << "." << endl; - } - if (path != "") { - result.emplace_back(RunRemoval(Monitor::RemoveFileMapping, path + "fmq_" + shmId.shmId + "_rg_" + to_string(i), verbose)); + Uint16RegionInfoMap* m = managementSegment.find(bipc::unique_instance).first; + + for (uint16_t i = 1; i <= regionCount; ++i) { + if (m != nullptr) { + RegionInfo ri = m->at(i); + string path = ri.fPath.c_str(); + int flags = ri.fFlags; + if (verbose) { + cout << "Found RegionInfo with path: '" << path << "', flags: " << flags << ", fDestroyed: " << ri.fDestroyed << "." << endl; + } + if (path != "") { + result.emplace_back(RunRemoval(Monitor::RemoveFileMapping, path + "fmq_" + shmId.shmId + "_rg_" + to_string(i), verbose)); + } else { + result.emplace_back(RunRemoval(Monitor::RemoveObject, "fmq_" + shmId.shmId + "_rg_" + to_string(i), verbose)); + } } else { result.emplace_back(RunRemoval(Monitor::RemoveObject, "fmq_" + shmId.shmId + "_rg_" + to_string(i), verbose)); } - } else { - result.emplace_back(RunRemoval(Monitor::RemoveObject, "fmq_" + shmId.shmId + "_rg_" + to_string(i), verbose)); - } - result.emplace_back(RunRemoval(Monitor::RemoveQueue, string("fmq_" + shmId.shmId + "_rgq_" + to_string(i)), verbose)); + result.emplace_back(RunRemoval(Monitor::RemoveQueue, string("fmq_" + shmId.shmId + "_rgq_" + to_string(i)), verbose)); + } + } else { + if (verbose) { + cout << "No region counter found. No regions to cleanup." << endl; + } } - } else { + } catch(out_of_range& oor) { if (verbose) { - cout << "No region counter found. No regions to cleanup." << endl; + cout << "Could not locate element in the region map, out of range: " << oor.what() << endl; } } + Uint16SegmentInfoHashMap* segmentInfos = managementSegment.find(bipc::unique_instance).first; + + for (const auto& s : *segmentInfos) { + result.emplace_back(RunRemoval(Monitor::RemoveObject, "fmq_" + shmId.shmId + "_m_" + to_string(s.first), verbose)); + } + result.emplace_back(RunRemoval(Monitor::RemoveObject, managementSegmentName.c_str(), verbose)); } catch (bie&) { if (verbose) { cout << "Did not find '" << managementSegmentName << "' shared memory segment. No regions to cleanup." << endl; } - } catch(out_of_range& oor) { - if (verbose) { - cout << "Could not locate element in the region map, out of range: " << oor.what() << endl; - } } - result.emplace_back(RunRemoval(Monitor::RemoveObject, "fmq_" + shmId.shmId + "_main", verbose)); result.emplace_back(RunRemoval(Monitor::RemoveMutex, "fmq_" + shmId.shmId + "_mtx", verbose)); result.emplace_back(RunRemoval(Monitor::RemoveCondition, "fmq_" + shmId.shmId + "_cv", verbose)); diff --git a/fairmq/shmem/README.md b/fairmq/shmem/README.md index bb7cfe09..ae5216bf 100644 --- a/fairmq/shmem/README.md +++ b/fairmq/shmem/README.md @@ -10,16 +10,16 @@ Devices track and cleanup shared memory on shutdown. For more information on the FairMQ Shared Memory currently uses the following names to register shared memory on the system: -| name | info | created by | used by | -| ------------------------- | ---------------------------------------------- | ------------------ | ------------------------------ | -| `fmq__main` | main segment (user data) | one of the devices | devices | -| `fmq__mng` | management segment (management data) | one of the devices | devices | -| `fmq__mtx` | mutex | one of the devices | devices | -| `fmq__cv` | condition variable | one of the devices | devices with unmanaged regions | -| `fmq__rg_` | unmanaged region(s) | one of the devices | devices with unmanaged regions | -| `fmq__rgq_` | unmanaged region queue(s) | one of the devices | devices with unmanaged regions | -| `fmq__ms` | shmmonitor status | shmmonitor | devices, shmmonitor | -| `fmq__cq` | message queue between transport and shmmonitor | shmmonitor | devices, shmmonitor | +| name | info | created by | used by | +| --------------------------- | ---------------------------------------------- | ------------------ | ------------------------------ | +| `fmq__m_` | managed segment(s) (user data) | one of the devices | devices | +| `fmq__mng` | management segment (management data) | one of the devices | devices | +| `fmq__mtx` | mutex | one of the devices | devices | +| `fmq__cv` | condition variable | one of the devices | devices with unmanaged regions | +| `fmq__rg_` | unmanaged region(s) | one of the devices | devices with unmanaged regions | +| `fmq__rgq_` | unmanaged region queue(s) | one of the devices | devices with unmanaged regions | +| `fmq__ms` | shmmonitor status | shmmonitor | devices, shmmonitor | +| `fmq__cq` | message queue between transport and shmmonitor | shmmonitor | devices, shmmonitor | The shmId is generated out of session id and user id. diff --git a/fairmq/shmem/Region.h b/fairmq/shmem/Region.h index c798ab6a..b38e67ee 100644 --- a/fairmq/shmem/Region.h +++ b/fairmq/shmem/Region.h @@ -48,7 +48,7 @@ namespace shmem struct Region { - Region(const std::string& shmId, uint64_t id, uint64_t size, bool remote, RegionCallback callback, RegionBulkCallback bulkCallback, const std::string& path, int flags) + Region(const std::string& shmId, uint16_t id, uint64_t size, bool remote, RegionCallback callback, RegionBulkCallback bulkCallback, const std::string& path, int flags) : fRemote(remote) , fLinger(100) , fStop(false) diff --git a/fairmq/shmem/UnmanagedRegion.h b/fairmq/shmem/UnmanagedRegion.h index 2b308b11..333198a3 100644 --- a/fairmq/shmem/UnmanagedRegion.h +++ b/fairmq/shmem/UnmanagedRegion.h @@ -56,7 +56,7 @@ class UnmanagedRegion final : public fair::mq::UnmanagedRegion void* GetData() const override { return fRegion->get_address(); } size_t GetSize() const override { return fRegion->get_size(); } - uint64_t GetId() const override { return fRegionId; } + uint16_t GetId() const override { return fRegionId; } void SetLinger(uint32_t linger) override { fManager.GetRegion(fRegionId)->SetLinger(linger); } uint32_t GetLinger() const override { return fManager.GetRegion(fRegionId)->GetLinger(); } @@ -65,7 +65,7 @@ class UnmanagedRegion final : public fair::mq::UnmanagedRegion private: Manager& fManager; boost::interprocess::mapped_region* fRegion; - uint64_t fRegionId; + uint16_t fRegionId; }; } diff --git a/fairmq/zeromq/Context.h b/fairmq/zeromq/Context.h index d90cf909..6fdd7650 100644 --- a/fairmq/zeromq/Context.h +++ b/fairmq/zeromq/Context.h @@ -114,13 +114,13 @@ class Context return fRegionInfos; } - uint64_t RegionCount() const + uint16_t RegionCount() const { std::lock_guard lock(fMtx); return fRegionCounter; } - void AddRegion(bool managed, uint64_t id, void* ptr, size_t size, int64_t userFlags, RegionEvent event) + void AddRegion(bool managed, uint16_t id, void* ptr, size_t size, int64_t userFlags, RegionEvent event) { { std::lock_guard lock(fMtx); @@ -131,7 +131,7 @@ class Context fRegionEventsCV.notify_one(); } - void RemoveRegion(uint64_t id) + void RemoveRegion(uint16_t id) { { std::lock_guard lock(fMtx); @@ -182,7 +182,7 @@ class Context mutable std::mutex fMtx; std::atomic fInterrupted; - uint64_t fRegionCounter; + uint16_t fRegionCounter; std::condition_variable fRegionEventsCV; std::vector fRegionInfos; std::queue fRegionEvents; diff --git a/fairmq/zeromq/UnmanagedRegion.h b/fairmq/zeromq/UnmanagedRegion.h index 110e67bd..b8b3c982 100644 --- a/fairmq/zeromq/UnmanagedRegion.h +++ b/fairmq/zeromq/UnmanagedRegion.h @@ -50,7 +50,7 @@ class UnmanagedRegion final : public fair::mq::UnmanagedRegion virtual void* GetData() const override { return fBuffer; } virtual size_t GetSize() const override { return fSize; } - uint64_t GetId() const override { return fId; } + uint16_t GetId() const override { return fId; } int64_t GetUserFlags() const { return fUserFlags; } void SetLinger(uint32_t /* linger */) override { LOG(debug) << "ZeroMQ UnmanagedRegion linger option not implemented. Acknowledgements are local."; } uint32_t GetLinger() const override { LOG(debug) << "ZeroMQ UnmanagedRegion linger option not implemented. Acknowledgements are local."; return 0; } @@ -64,7 +64,7 @@ class UnmanagedRegion final : public fair::mq::UnmanagedRegion private: Context& fCtx; - uint64_t fId; + uint16_t fId; void* fBuffer; size_t fSize; int64_t fUserFlags;