From 73109fe6d30c07b3f2b757867ba395fa7e990fca Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Fri, 21 Aug 2020 12:34:13 +0200 Subject: [PATCH] Shm: configurable allocation strategy --- examples/region/Sampler.cxx | 1 + examples/region/Sink.cxx | 1 + fairmq/FairMQUnmanagedRegion.h | 9 +- fairmq/plugins/config/Config.cxx | 37 +++--- fairmq/shmem/Common.h | 128 +++++++++++++++++++++ fairmq/shmem/Manager.h | 189 ++++++++++++++++++------------- fairmq/shmem/Monitor.cxx | 89 ++++++--------- fairmq/shmem/Monitor.h | 1 - fairmq/zeromq/Context.h | 8 +- fairmq/zeromq/TransportFactory.h | 2 +- test/region/_region.cxx | 1 + 11 files changed, 307 insertions(+), 159 deletions(-) diff --git a/examples/region/Sampler.cxx b/examples/region/Sampler.cxx index 9083d0ef..f084ad68 100644 --- a/examples/region/Sampler.cxx +++ b/examples/region/Sampler.cxx @@ -38,6 +38,7 @@ void Sampler::InitTask() fChannels.at("data").at(0).Transport()->SubscribeToRegionEvents([](FairMQRegionInfo info) { LOG(info) << "Region event: " << info.event + << ", managed: " << info.managed << ", id: " << info.id << ", ptr: " << info.ptr << ", size: " << info.size diff --git a/examples/region/Sink.cxx b/examples/region/Sink.cxx index 902c390c..340d01dd 100644 --- a/examples/region/Sink.cxx +++ b/examples/region/Sink.cxx @@ -31,6 +31,7 @@ void Sink::InitTask() fMaxIterations = fConfig->GetProperty("max-iterations"); fChannels.at("data").at(0).Transport()->SubscribeToRegionEvents([](FairMQRegionInfo info) { LOG(info) << "Region event: " << info.event + << ", managed: " << info.managed << ", id: " << info.id << ", ptr: " << info.ptr << ", size: " << info.size diff --git a/fairmq/FairMQUnmanagedRegion.h b/fairmq/FairMQUnmanagedRegion.h index b6924be0..a7048819 100644 --- a/fairmq/FairMQUnmanagedRegion.h +++ b/fairmq/FairMQUnmanagedRegion.h @@ -28,21 +28,24 @@ enum class FairMQRegionEvent : int struct FairMQRegionInfo { FairMQRegionInfo() - : id(0) + : managed(true) + , id(0) , ptr(nullptr) , size(0) , flags(0) , event(FairMQRegionEvent::created) {} - FairMQRegionInfo(uint64_t _id, void* _ptr, size_t _size, int64_t _flags, FairMQRegionEvent _event) - : id(_id) + FairMQRegionInfo(bool _managed, uint64_t _id, void* _ptr, size_t _size, int64_t _flags, FairMQRegionEvent _event) + : managed(_managed) + , id(_id) , ptr(_ptr) , size(_size) , flags(_flags) , event(_event) {} + bool managed; // managed/unmanaged uint64_t id; // id of the region void* ptr; // pointer to the start of the region size_t size; // region size diff --git a/fairmq/plugins/config/Config.cxx b/fairmq/plugins/config/Config.cxx index 5fa15b93..dae14c2a 100644 --- a/fairmq/plugins/config/Config.cxx +++ b/fairmq/plugins/config/Config.cxx @@ -61,24 +61,25 @@ Plugin::ProgOptions ConfigPluginProgramOptions() namespace po = boost::program_options; auto pluginOptions = po::options_description{"FairMQ device options"}; pluginOptions.add_options() - ("id", po::value()->default_value(""), "Device ID.") - ("io-threads", po::value()->default_value(1), "Number of I/O threads.") - ("transport", po::value()->default_value("zeromq"), "Transport ('zeromq'/'shmem').") - ("network-interface", po::value()->default_value("default"), "Network interface to bind on (e.g. eth0, ib0..., default will try to detect the interface of the default route).") - ("init-timeout", po::value()->default_value(120), "Timeout for the initialization in seconds (when expecting dynamic initialization).") - ("max-run-time", po::value()->default_value(0), "Maximum runtime for the Running state handler, after which state will change to Ready (in seconds, 0 for no limit).") - ("print-channels", po::value()->implicit_value(true), "Print registered channel endpoints in a machine-readable format (::)") - ("shm-segment-size", po::value()->default_value(2ULL << 30), "Shared memory: size of the shared memory segment (in bytes).") - ("shm-mlock-segment", po::value()->default_value(false), "Shared memory: mlock the shared memory segment after initialization.") - ("shm-zero-segment", po::value()->default_value(false), "Shared memory: zero the shared memory segment memory after initialization.") - ("shm-throw-bad-alloc", po::value()->default_value(true), "Throw a fair::mq::MessageBadAlloc if cannot allocate a message (retry if false).") - ("shm-monitor", po::value()->default_value(true), "Shared memory: run monitor daemon.") - ("ofi-size-hint", po::value()->default_value(0), "EXPERIMENTAL: OFI size hint for the allocator.") - ("rate", po::value()->default_value(0.), "Rate for conditional run loop (Hz).") - ("session", po::value()->default_value("default"), "Session name.") - ("config-key", po::value(), "Use provided value instead of device id for fetching the configuration from JSON file.") - ("mq-config", po::value(), "JSON input as file.") - ("channel-config", po::value>()->multitoken()->composing(), "Configuration of single or multiple channel(s) by comma separated key=value list"); + ("id", po::value()->default_value(""), "Device ID.") + ("io-threads", po::value()->default_value(1), "Number of I/O threads.") + ("transport", po::value()->default_value("zeromq"), "Transport ('zeromq'/'shmem').") + ("network-interface", po::value()->default_value("default"), "Network interface to bind on (e.g. eth0, ib0..., default will try to detect the interface of the default route).") + ("init-timeout", po::value()->default_value(120), "Timeout for the initialization in seconds (when expecting dynamic initialization).") + ("max-run-time", po::value()->default_value(0), "Maximum runtime for the Running state handler, after which state will change to Ready (in seconds, 0 for no limit).") + ("print-channels", po::value()->implicit_value(true), "Print registered channel endpoints in a machine-readable format (::)") + ("shm-segment-size", po::value()->default_value(2ULL << 30), "Shared memory: size of the shared memory segment (in bytes).") + ("shm-allocation", po::value()->default_value("rbtree_best_fit"), "Shared memory allocation algorithm: rbtree_best_fit/simple_seq_fit.") + ("shm-mlock-segment", po::value()->default_value(false), "Shared memory: mlock the shared memory segment after initialization.") + ("shm-zero-segment", po::value()->default_value(false), "Shared memory: zero the shared memory segment memory after initialization.") + ("shm-throw-bad-alloc", po::value()->default_value(true), "Throw a fair::mq::MessageBadAlloc if cannot allocate a message (retry if false).") + ("shm-monitor", po::value()->default_value(true), "Shared memory: run monitor daemon.") + ("ofi-size-hint", po::value()->default_value(0), "EXPERIMENTAL: OFI size hint for the allocator.") + ("rate", po::value()->default_value(0.), "Rate for conditional run loop (Hz).") + ("session", po::value()->default_value("default"), "Session name.") + ("config-key", po::value(), "Use provided value instead of device id for fetching the configuration from JSON file.") + ("mq-config", po::value(), "JSON input as file.") + ("channel-config", po::value>()->multitoken()->composing(), "Configuration of single or multiple channel(s) by comma separated key=value list"); return pluginOptions; } diff --git a/fairmq/shmem/Common.h b/fairmq/shmem/Common.h index d2a3bb85..22cec53a 100644 --- a/fairmq/shmem/Common.h +++ b/fairmq/shmem/Common.h @@ -19,8 +19,11 @@ #include #include #include +#include #include +#include #include +#include #include #include @@ -32,6 +35,17 @@ namespace mq namespace shmem { +struct SharedMemoryError : std::runtime_error { using std::runtime_error::runtime_error; }; + +using SimpleSeqFitSegment = boost::interprocess::basic_managed_shared_memory, + boost::interprocess::null_index>; + // boost::interprocess::iset_index>; +using RBTreeBestFitSegment = boost::interprocess::basic_managed_shared_memory, + boost::interprocess::null_index>; + // boost::interprocess::iset_index>; + using SegmentManager = boost::interprocess::managed_shared_memory::segment_manager; using VoidAlloc = boost::interprocess::allocator; using CharAlloc = boost::interprocess::allocator; @@ -39,6 +53,12 @@ using Str = boost::interprocess::basic_string; using StrVector = boost::interprocess::vector; +enum class AllocationAlgorithm : int +{ + rbtree_best_fit, + simple_seq_fit +}; + struct RegionInfo { RegionInfo(const VoidAlloc& alloc) @@ -65,6 +85,19 @@ using Uint64RegionInfoPairAlloc = boost::interprocess::allocator, Uint64RegionInfoPairAlloc>; using Uint64RegionInfoHashMap = boost::unordered_map, std::equal_to, Uint64RegionInfoPairAlloc>; +struct SegmentInfo +{ + SegmentInfo(AllocationAlgorithm aa) + : fAllocationAlgorithm(aa) + {} + + AllocationAlgorithm fAllocationAlgorithm; +}; + +using Uint64SegmentInfoPairAlloc = boost::interprocess::allocator, SegmentManager>; +using Uint64SegmentInfoMap = boost::interprocess::map, Uint64SegmentInfoPairAlloc>; +using Uint64SegmentInfoHashMap = boost::unordered_map, std::equal_to, Uint64SegmentInfoPairAlloc>; + struct DeviceCounter { DeviceCounter(unsigned int c) @@ -153,6 +186,101 @@ inline std::string buildShmIdFromSessionIdAndUserId(const std::string& sessionId return shmId; } +struct SegmentSize : public boost::static_visitor +{ + template + size_t operator()(S& s) const { return s.get_size(); } +}; + +struct SegmentAddress : public boost::static_visitor +{ + template + void* operator()(S& s) const { return s.get_address(); } +}; + +struct SegmentMemoryZeroer : public boost::static_visitor<> +{ + template + void operator()(S& s) const { s.zero_free_memory(); } +}; + +struct SegmentFreeMemory : public boost::static_visitor +{ + template + size_t operator()(S& s) const { return s.get_free_memory(); } +}; + +struct SegmentHandleFromAddress : public boost::static_visitor +{ + SegmentHandleFromAddress(const void* _ptr) : ptr(_ptr) {} + + template + boost::interprocess::managed_shared_memory::handle_t operator()(S& s) const { return s.get_handle_from_address(ptr); } + + const void* ptr; +}; + +struct SegmentAddressFromHandle : public boost::static_visitor +{ + SegmentAddressFromHandle(const boost::interprocess::managed_shared_memory::handle_t _handle) : handle(_handle) {} + + template + void* operator()(S& s) const { return s.get_address_from_handle(handle); } + + const boost::interprocess::managed_shared_memory::handle_t handle; +}; + +struct SegmentAllocate : public boost::static_visitor +{ + SegmentAllocate(const size_t _size) : size(_size) {} + + template + void* operator()(S& s) const { return s.allocate(size); } + + const size_t size; +}; + +struct SegmentAllocateAligned : public boost::static_visitor +{ + SegmentAllocateAligned(const size_t _size, const size_t _alignment) : size(_size), alignment(_alignment) {} + + template + void* operator()(S& s) const { return s.allocate_aligned(size, alignment); } + + const size_t size; + const size_t alignment; +}; + +struct SegmentBufferShrink : public boost::static_visitor +{ + SegmentBufferShrink(const size_t _old_size, const size_t _new_size, char* _local_ptr) + : old_size(_old_size) + , new_size(_new_size) + , local_ptr(_local_ptr) + {} + + template + char* operator()(S& s) const + { + boost::interprocess::managed_shared_memory::size_type shrunk_size = new_size; + return s.template allocation_command(boost::interprocess::shrink_in_place, old_size + 128, shrunk_size, local_ptr); + } + + const size_t old_size; + const size_t new_size; + mutable char* local_ptr; +}; + +struct SegmentDeallocate : public boost::static_visitor<> +{ + SegmentDeallocate(void* _ptr) : ptr(_ptr) {} + + template + void operator()(S& s) const { return s.deallocate(ptr); } + + void* ptr; +}; + } // namespace shmem } // namespace mq } // namespace fair diff --git a/fairmq/shmem/Manager.h b/fairmq/shmem/Manager.h index 46e17f40..46ed42e4 100644 --- a/fairmq/shmem/Manager.h +++ b/fairmq/shmem/Manager.h @@ -27,12 +27,10 @@ #include #include -#include #include #include #include #include -#include #include #include @@ -40,6 +38,7 @@ #include #include #include +#include #include #include #include @@ -56,22 +55,12 @@ namespace mq namespace shmem { -struct SharedMemoryError : std::runtime_error { using std::runtime_error::runtime_error; }; - -using SimpleSeqFitSegment = boost::interprocess::basic_managed_shared_memory, - boost::interprocess::iset_index>; -using RBTreeBestFitSegment = boost::interprocess::basic_managed_shared_memory, - boost::interprocess::iset_index>; - class Manager { public: Manager(std::string shmId, std::string deviceId, size_t size, const ProgOptions* config) : fShmId(std::move(shmId)) , fDeviceId(std::move(deviceId)) - // , fSegment(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_main").c_str(), size) , fSegments() , fManagementSegment(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_mng").c_str(), 6553600) , fShmVoidAlloc(fManagementSegment.get_segment_manager()) @@ -79,6 +68,7 @@ class Manager , fRegionEventsCV(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_cv").c_str()) , fRegionEventsSubscriptionActive(false) , fDeviceCounter(nullptr) + , fShmSegments(nullptr) , fShmRegions(nullptr) , fInterrupted(false) , fMsgCounter(0) @@ -95,77 +85,120 @@ class Manager bool mlockSegment = false; bool zeroSegment = false; bool autolaunchMonitor = false; + std::string allocationAlgorithm("rbtree_best_fit"); if (config) { mlockSegment = config->GetProperty("shm-mlock-segment", mlockSegment); zeroSegment = config->GetProperty("shm-zero-segment", zeroSegment); autolaunchMonitor = config->GetProperty("shm-monitor", autolaunchMonitor); fThrowOnBadAlloc = config->GetProperty("shm-throw-bad-alloc", fThrowOnBadAlloc); + allocationAlgorithm = config->GetProperty("shm-allocation", allocationAlgorithm); } else { LOG(debug) << "ProgOptions not available! Using defaults."; } + if (allocationAlgorithm != "rbtree_best_fit" && allocationAlgorithm != "simple_seq_fit") { + LOG(error) << "Provided shared memory allocation algorithm '" << allocationAlgorithm << "' is not supported. Supported are 'rbtree_best_fit'/'simple_seq_fit'"; + throw SharedMemoryError(tools::ToString("Provided shared memory allocation algorithm '", allocationAlgorithm, "' is not supported. Supported are 'rbtree_best_fit'/'simple_seq_fit'")); + } + if (autolaunchMonitor) { StartMonitor(fShmId); } { + std::stringstream ss; boost::interprocess::scoped_lock lock(fShmMtx); + fShmSegments = fManagementSegment.find_or_construct(unique_instance)(fShmVoidAlloc); + + const uint64_t id = 0; + try { - fSegments.emplace(0, RBTreeBestFitSegment(open_only, std::string("fmq_" + fShmId + "_main").c_str())); - LOG(debug) << "opened shared memory segment '" << "fmq_" << fShmId << "_main" << "' of " << fSegments.at(0).get_size() << " bytes. Available are " << fSegments.at(0).get_free_memory() << " bytes."; + auto it = fShmSegments->find(id); + if (it == fShmSegments->end()) { + // no segment with given id exists, creating + if (allocationAlgorithm == "rbtree_best_fit") { + fSegments.emplace(id, RBTreeBestFitSegment(create_only, std::string("fmq_" + fShmId + "_main").c_str(), size)); + fShmSegments->emplace(id, AllocationAlgorithm::rbtree_best_fit); + } else if (allocationAlgorithm == "simple_seq_fit") { + fSegments.emplace(id, SimpleSeqFitSegment(create_only, std::string("fmq_" + fShmId + "_main").c_str(), size)); + fShmSegments->emplace(id, AllocationAlgorithm::simple_seq_fit); + } + ss << "Created "; + } else { + // found segment with the given id, opening + if (it->second.fAllocationAlgorithm == AllocationAlgorithm::rbtree_best_fit) { + fSegments.emplace(id, RBTreeBestFitSegment(open_only, std::string("fmq_" + fShmId + "_main").c_str())); + if (allocationAlgorithm != "rbtree_best_fit") { + LOG(warn) << "Allocation algorithm of the opened segment is rbtree_best_fit, but requested is " << allocationAlgorithm << ". Ignoring requested setting."; + allocationAlgorithm = "rbtree_best_fit"; + } + } else { + fSegments.emplace(id, SimpleSeqFitSegment(open_only, std::string("fmq_" + fShmId + "_main").c_str())); + if (allocationAlgorithm != "simple_seq_fit") { + LOG(warn) << "Allocation algorithm of the opened segment is simple_seq_fit, but requested is " << allocationAlgorithm << ". Ignoring requested setting."; + allocationAlgorithm = "simple_seq_fit"; + } + } + ss << "Opened "; + } + ss << "shared memory segment '" << "fmq_" << fShmId << "_main_" << id << "'." + << " Size: " << boost::apply_visitor(SegmentSize{}, fSegments.at(id)) << " bytes." + << " Available: " << boost::apply_visitor(SegmentFreeMemory{}, fSegments.at(id)) << " bytes." + << " Allocation algorithm: " << allocationAlgorithm; + LOG(debug) << ss.str(); } catch(interprocess_exception&) { - fSegments.emplace(0, RBTreeBestFitSegment(create_only, std::string("fmq_" + fShmId + "_main").c_str(), size)); - LOG(debug) << "created shared memory segment '" << "fmq_" << fShmId << "_main" << "' of " << fSegments.at(0).get_size() << " bytes. Available are " << fSegments.at(0).get_free_memory() << " bytes."; + LOG(error) << "something went wrong"; } } if (mlockSegment) { LOG(debug) << "Locking the managed segment memory pages..."; - if (mlock(fSegments.at(0).get_address(), fSegments.at(0).get_size()) == -1) { + if (mlock(boost::apply_visitor(SegmentAddress{}, fSegments.at(0)), boost::apply_visitor(SegmentSize{}, fSegments.at(0))) == -1) { LOG(error) << "Could not lock the managed segment memory. Code: " << errno << ", reason: " << strerror(errno); } LOG(debug) << "Successfully locked the managed segment memory pages."; } if (zeroSegment) { LOG(debug) << "Zeroing the managed segment free memory..."; - fSegments.at(0).zero_free_memory(); + boost::apply_visitor(SegmentMemoryZeroer{}, fSegments.at(0)); LOG(debug) << "Successfully zeroed the managed segment free memory."; } - boost::interprocess::scoped_lock lock(fShmMtx); + { + boost::interprocess::scoped_lock lock(fShmMtx); - fShmRegions = fManagementSegment.find_or_construct(unique_instance)(fShmVoidAlloc); -#ifdef FAIRMQ_DEBUG_MODE - fMsgDebug = fManagementSegment.find_or_construct(unique_instance)(fShmVoidAlloc); -#endif - // store info about the managed segment as region with id 0 - fShmRegions->emplace(0, RegionInfo("", 0, 0, fShmVoidAlloc)); - - fDeviceCounter = fManagementSegment.find(unique_instance).first; - - if (fDeviceCounter) { - LOG(debug) << "device counter found, with value of " << fDeviceCounter->fCount << ". incrementing."; - (fDeviceCounter->fCount)++; - LOG(debug) << "incremented device counter, now: " << fDeviceCounter->fCount; - } else { - LOG(debug) << "no device counter found, creating one and initializing with 1"; - fDeviceCounter = fManagementSegment.construct(unique_instance)(1); - LOG(debug) << "initialized device counter with: " << fDeviceCounter->fCount; - } + fShmRegions = fManagementSegment.find_or_construct(unique_instance)(fShmVoidAlloc); #ifdef FAIRMQ_DEBUG_MODE - fShmMsgCounter = fManagementSegment.find(unique_instance).first; - - if (fShmMsgCounter) { - LOG(debug) << "message counter found, with value of " << fShmMsgCounter->fCount << "."; - } else { - LOG(debug) << "no message counter found, creating one and initializing with 0"; - fShmMsgCounter = fManagementSegment.construct(unique_instance)(0); - LOG(debug) << "initialized message counter with: " << fShmMsgCounter->fCount; - } + fMsgDebug = fManagementSegment.find_or_construct(unique_instance)(fShmVoidAlloc); #endif + fDeviceCounter = fManagementSegment.find(unique_instance).first; + + if (fDeviceCounter) { + LOG(debug) << "device counter found, with value of " << fDeviceCounter->fCount << ". incrementing."; + (fDeviceCounter->fCount)++; + LOG(debug) << "incremented device counter, now: " << fDeviceCounter->fCount; + } else { + LOG(debug) << "no device counter found, creating one and initializing with 1"; + fDeviceCounter = fManagementSegment.construct(unique_instance)(1); + LOG(debug) << "initialized device counter with: " << fDeviceCounter->fCount; + } + +#ifdef FAIRMQ_DEBUG_MODE + fShmMsgCounter = fManagementSegment.find(unique_instance).first; + + if (fShmMsgCounter) { + LOG(debug) << "message counter found, with value of " << fShmMsgCounter->fCount << "."; + } else { + LOG(debug) << "no message counter found, creating one and initializing with 0"; + fShmMsgCounter = fManagementSegment.construct(unique_instance)(0); + LOG(debug) << "initialized message counter with: " << fShmMsgCounter->fCount; + } +#endif + } + fHeartbeatThread = std::thread(&Manager::SendHeartbeats, this); } @@ -336,29 +369,29 @@ class Manager for (const auto& e : *fShmRegions) { fair::mq::RegionInfo info; + info.managed = false; info.id = e.first; info.flags = e.second.fUserFlags; info.event = e.second.fDestroyed ? RegionEvent::destroyed : RegionEvent::created; - if (info.id != 0) { - if (!e.second.fDestroyed) { - auto region = GetRegionUnsafe(info.id); - info.ptr = region->fRegion.get_address(); - info.size = region->fRegion.get_size(); - } else { - info.ptr = nullptr; - info.size = 0; - } - result.push_back(info); + if (!e.second.fDestroyed) { + auto region = GetRegionUnsafe(info.id); + info.ptr = region->fRegion.get_address(); + info.size = region->fRegion.get_size(); } else { - if (!e.second.fDestroyed) { - info.ptr = fSegments.at(0).get_address(); - info.size = fSegments.at(0).get_size(); - } else { - info.ptr = nullptr; - info.size = 0; - } - result.push_back(info); + info.ptr = nullptr; + info.size = 0; } + result.push_back(info); + } + + for (const auto& e : *fShmSegments) { + fair::mq::RegionInfo info; + info.managed = true; + info.id = e.first; + info.event = RegionEvent::created; + info.ptr = boost::apply_visitor(SegmentAddress{}, fSegments.at(e.first)); + info.size = boost::apply_visitor(SegmentSize{}, fSegments.at(e.first)); + result.push_back(info); } return result; @@ -462,8 +495,14 @@ class Manager bool ThrowingOnBadAlloc() const { return fThrowOnBadAlloc; } - boost::interprocess::managed_shared_memory::handle_t GetHandleFromAddress(const void* ptr) const { return fSegments.at(0).get_handle_from_address(ptr); } - void* GetAddressFromHandle(const boost::interprocess::managed_shared_memory::handle_t handle) const { return fSegments.at(0).get_address_from_handle(handle); } + boost::interprocess::managed_shared_memory::handle_t GetHandleFromAddress(const void* ptr) const + { + return boost::apply_visitor(SegmentHandleFromAddress{ptr}, fSegments.at(0)); + } + void* GetAddressFromHandle(const boost::interprocess::managed_shared_memory::handle_t handle) const + { + return boost::apply_visitor(SegmentAddressFromHandle{handle}, fSegments.at(0)); + } char* Allocate(const size_t size, size_t alignment = 0) { @@ -475,19 +514,19 @@ class Manager // boost::interprocess::managed_shared_memory::size_type actualSize = size; // char* hint = 0; // unused for boost::interprocess::allocate_new // ptr = fSegments.at(0).allocation_command(boost::interprocess::allocate_new, size, actualSize, hint); - size_t segmentSize = fSegments.at(0).get_size(); + size_t segmentSize = boost::apply_visitor(SegmentSize{}, fSegments.at(0)); if (size > segmentSize) { throw MessageBadAlloc(tools::ToString("Requested message size (", size, ") exceeds segment size (", segmentSize, ")")); } if (alignment == 0) { - ptr = reinterpret_cast(fSegments.at(0).allocate(size)); + ptr = reinterpret_cast(boost::apply_visitor(SegmentAllocate{size}, fSegments.at(0))); } else { - ptr = reinterpret_cast(fSegments.at(0).allocate_aligned(size, alignment)); + ptr = reinterpret_cast(boost::apply_visitor(SegmentAllocateAligned{size, alignment}, fSegments.at(0))); } } catch (boost::interprocess::bad_alloc& ba) { // LOG(warn) << "Shared memory full..."; if (ThrowingOnBadAlloc()) { - throw MessageBadAlloc(tools::ToString("shmem: could not create a message of size ", size, ", alignment: ", (alignment != 0) ? std::to_string(alignment) : "default", ", free memory: ", fSegments.at(0).get_free_memory())); + throw MessageBadAlloc(tools::ToString("shmem: could not create a message of size ", size, ", alignment: ", (alignment != 0) ? std::to_string(alignment) : "default", ", free memory: ", boost::apply_visitor(SegmentFreeMemory{}, fSegments.at(0)))); } // rateLimiter.maybe_sleep(); std::this_thread::sleep_for(std::chrono::milliseconds(50)); @@ -509,7 +548,7 @@ class Manager void Deallocate(boost::interprocess::managed_shared_memory::handle_t handle) { - fSegments.at(0).deallocate(GetAddressFromHandle(handle)); + boost::apply_visitor(SegmentDeallocate{GetAddressFromHandle(handle)}, fSegments.at(0)); #ifdef FAIRMQ_DEBUG_MODE boost::interprocess::scoped_lock lock(fShmMtx); DecrementShmMsgCounter(); @@ -519,9 +558,7 @@ class Manager char* ShrinkInPlace(size_t oldSize, size_t newSize, char* localPtr) { - using namespace boost::interprocess; - managed_shared_memory::size_type shrunkSize = newSize; - return fSegments.at(0).allocation_command(shrink_in_place, oldSize + 128, shrunkSize, localPtr); + return boost::apply_visitor(SegmentBufferShrink{oldSize, newSize, localPtr}, fSegments.at(0)); } ~Manager() @@ -563,8 +600,7 @@ class Manager private: std::string fShmId; std::string fDeviceId; - // boost::interprocess::managed_shared_memory fSegment; - std::unordered_map fSegments; + std::unordered_map> fSegments; boost::interprocess::managed_shared_memory fManagementSegment; VoidAlloc fShmVoidAlloc; boost::interprocess::named_mutex fShmMtx; @@ -576,6 +612,7 @@ class Manager std::unordered_map fObservedRegionEvents; DeviceCounter* fDeviceCounter; + Uint64SegmentInfoHashMap* fShmSegments; Uint64RegionInfoHashMap* fShmRegions; std::unordered_map> fRegions; diff --git a/fairmq/shmem/Monitor.cxx b/fairmq/shmem/Monitor.cxx index a57b1200..406b5e01 100644 --- a/fairmq/shmem/Monitor.cxx +++ b/fairmq/shmem/Monitor.cxx @@ -205,10 +205,6 @@ void Monitor::Interactive() cout << "\n[q] --> quitting." << endl; fTerminating = true; break; - case 'p': - cout << "\n[p] --> active queues:" << endl; - PrintQueues(); - break; case 'x': cout << "\n[x] --> closing shared memory:" << endl; if (!fViewOnly) { @@ -254,6 +250,7 @@ void Monitor::Interactive() void Monitor::CheckSegment() { + using namespace boost::interprocess; char c = '#'; if (fInteractive) { @@ -281,8 +278,27 @@ void Monitor::CheckSegment() } try { - bipc::managed_shared_memory segment(bipc::open_only, fSegmentName.c_str()); - bipc::managed_shared_memory managementSegment(bipc::open_only, fManagementSegmentName.c_str()); + managed_shared_memory managementSegment(open_only, fManagementSegmentName.c_str()); + + Uint64SegmentInfoHashMap* segmentInfos = managementSegment.find(unique_instance).first; + std::unordered_map> segments; + + if (!segmentInfos) { + cout << "Found management segment, but cannot locate segment info, something went wrong..." << endl; + return; + } + + const uint64_t id = 0; + + auto it = segmentInfos->find(id); + if (it != segmentInfos->end()) { + // found segment with the given id, opening + if (it->second.fAllocationAlgorithm == AllocationAlgorithm::rbtree_best_fit) { + segments.emplace(id, RBTreeBestFitSegment(open_only, fSegmentName.c_str())); + } else { + segments.emplace(id, SimpleSeqFitSegment(open_only, fSegmentName.c_str())); + } + } fSeenOnce = true; @@ -292,12 +308,12 @@ void Monitor::CheckSegment() #endif if (fInteractive || fViewOnly) { - DeviceCounter* dc = managementSegment.find(bipc::unique_instance).first; + DeviceCounter* dc = managementSegment.find(unique_instance).first; if (dc) { numDevices = dc->fCount; } #ifdef FAIRMQ_DEBUG_MODE - MsgCounter* mc = managementSegment.find(bipc::unique_instance).first; + MsgCounter* mc = managementSegment.find(unique_instance).first; if (mc) { numMessages = mc->fCount; } @@ -319,20 +335,20 @@ void Monitor::CheckSegment() if (fInteractive) { cout << "| " - << setw(18) << fSegmentName << " | " - << setw(10) << segment.get_size() << " | " - << setw(10) << segment.get_free_memory() << " | " - << setw(8) << numDevices << " | " + << setw(18) << fSegmentName << " | " + << setw(10) << boost::apply_visitor(SegmentSize{}, segments.at(id)) << " | " + << setw(10) << boost::apply_visitor(SegmentFreeMemory{}, segments.at(id)) << " | " + << setw(8) << numDevices << " | " #ifdef FAIRMQ_DEBUG_MODE - << setw(8) << numMessages << " | " + << setw(8) << numMessages << " | " #else - << setw(8) << "nodebug" << " | " + << setw(8) << "nodebug" << " | " #endif - << setw(10) << (fViewOnly ? "view only" : to_string(duration)) << " |" + << setw(10) << (fViewOnly ? "view only" : to_string(duration)) << " |" << c << flush; } else if (fViewOnly) { - size_t free = segment.get_free_memory(); - size_t total = segment.get_size(); + size_t free = boost::apply_visitor(SegmentFreeMemory{}, segments.at(id)); + size_t total = boost::apply_visitor(SegmentSize{}, segments.at(id)); size_t used = total - free; // size_t mfree = managementSegment.get_free_memory(); // size_t mtotal = managementSegment.get_size(); @@ -459,44 +475,6 @@ vector Monitor::GetDebugInfo(const SessionId& sessionId) return GetDebugInfo(shmId); } -void Monitor::PrintQueues() -{ - cout << '\n'; - - try { - bipc::managed_shared_memory segment(bipc::open_only, fSegmentName.c_str()); - StrVector* queues = segment.find(string("fmq_" + fShmId + "_qs").c_str()).first; - if (queues) { - cout << "found " << queues->size() << " queue(s):" << endl; - - for (const auto& queue : *queues) { - string name(queue.c_str()); - cout << '\t' << name << " : "; - atomic* queueSize = segment.find>(name.c_str()).first; - if (queueSize) { - cout << *queueSize << " messages" << endl; - } else { - cout << "\tqueue does not have a queue size entry." << endl; - } - } - } else { - cout << "\tno queues found" << endl; - } - } catch (bie&) { - cout << "\tno queues found" << endl; - } catch (out_of_range&) { - cout << "\tno queues found" << endl; - } - - cout << "\n --> last heartbeats: " << endl << endl; - auto now = chrono::high_resolution_clock::now(); - for (const auto& h : fDeviceHeartbeats) { - cout << "\t" << h.first << " : " << chrono::duration(now - h.second).count() << "ms ago." << endl; - } - - cout << endl; -} - void Monitor::PrintHeader() { cout << "| " @@ -512,7 +490,6 @@ void Monitor::PrintHeader() void Monitor::PrintHelp() { cout << "controls: [x] close memory, " - << "[p] print queues, " << "[b] print a list of allocated messages (only available when compiled with FAIMQ_DEBUG_MODE=ON), " << "[h] help, " << "[q] quit." << endl; diff --git a/fairmq/shmem/Monitor.h b/fairmq/shmem/Monitor.h index a618b228..2e0ead4f 100644 --- a/fairmq/shmem/Monitor.h +++ b/fairmq/shmem/Monitor.h @@ -92,7 +92,6 @@ class Monitor private: void PrintHeader(); void PrintHelp(); - void PrintQueues(); void MonitorHeartbeats(); void CheckSegment(); void Interactive(); diff --git a/fairmq/zeromq/Context.h b/fairmq/zeromq/Context.h index bf061785..d90cf909 100644 --- a/fairmq/zeromq/Context.h +++ b/fairmq/zeromq/Context.h @@ -56,7 +56,7 @@ class Context throw ContextError(tools::ToString("failed configuring context, reason: ", zmq_strerror(errno))); } - fRegionEvents.emplace(0, nullptr, 0, 0, RegionEvent::local_only); + fRegionEvents.emplace(true, 0, nullptr, 0, 0, RegionEvent::local_only); } Context(const Context&) = delete; @@ -120,13 +120,13 @@ class Context return fRegionCounter; } - void AddRegion(uint64_t id, void* ptr, size_t size, int64_t userFlags, RegionEvent event) + void AddRegion(bool managed, uint64_t id, void* ptr, size_t size, int64_t userFlags, RegionEvent event) { { std::lock_guard lock(fMtx); ++fRegionCounter; - fRegionInfos.emplace_back(id, ptr, size, userFlags, event); - fRegionEvents.emplace(id, ptr, size, userFlags, event); + fRegionInfos.emplace_back(managed, id, ptr, size, userFlags, event); + fRegionEvents.emplace(managed, id, ptr, size, userFlags, event); } fRegionEventsCV.notify_one(); } diff --git a/fairmq/zeromq/TransportFactory.h b/fairmq/zeromq/TransportFactory.h index f23a9b9a..2a18d184 100644 --- a/fairmq/zeromq/TransportFactory.h +++ b/fairmq/zeromq/TransportFactory.h @@ -125,7 +125,7 @@ class TransportFactory final : public FairMQTransportFactory { UnmanagedRegionPtr ptr = tools::make_unique(*fCtx, size, userFlags, callback, bulkCallback, this); auto zPtr = static_cast(ptr.get()); - fCtx->AddRegion(zPtr->GetId(), zPtr->GetData(), zPtr->GetSize(), zPtr->GetUserFlags(), RegionEvent::created); + fCtx->AddRegion(false, zPtr->GetId(), zPtr->GetData(), zPtr->GetSize(), zPtr->GetUserFlags(), RegionEvent::created); return ptr; } diff --git a/test/region/_region.cxx b/test/region/_region.cxx index 59dbaf4f..b4b32881 100644 --- a/test/region/_region.cxx +++ b/test/region/_region.cxx @@ -48,6 +48,7 @@ void RegionEventSubscriptions(const string& transport) ASSERT_EQ(factory->SubscribedToRegionEvents(), false); factory->SubscribeToRegionEvents([&](FairMQRegionInfo info) { LOG(warn) << ">>>" << info.event; + LOG(warn) << "managed: " << info.managed; LOG(warn) << "id: " << info.id; LOG(warn) << "ptr: " << info.ptr; LOG(warn) << "size: " << info.size;