From 021c1b1c4d0fbd7d238b2e34d62c2bba43649c82 Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Fri, 14 May 2021 13:36:09 +0200 Subject: [PATCH] shm: add monitor method to retrieve free segment memory --- fairmq/shmem/Monitor.cxx | 39 +++++++++++++++++++++++++++++++++++ fairmq/shmem/Monitor.h | 23 +++++++++++++++++++-- test/CMakeLists.txt | 1 + test/transport/_shmem.cxx | 43 +++++++++++++++++++++++++++++++++++++++ 4 files changed, 104 insertions(+), 2 deletions(-) create mode 100644 test/transport/_shmem.cxx diff --git a/fairmq/shmem/Monitor.cxx b/fairmq/shmem/Monitor.cxx index 27b087ce..624ad26f 100644 --- a/fairmq/shmem/Monitor.cxx +++ b/fairmq/shmem/Monitor.cxx @@ -479,6 +479,45 @@ unordered_map> Monitor::GetDebugInfo(cons return GetDebugInfo(shmId); } +unsigned long Monitor::GetFreeMemory(const ShmId& shmId, uint16_t segmentId) +{ + using namespace boost::interprocess; + try { + bipc::managed_shared_memory managementSegment(bipc::open_only, std::string("fmq_" + shmId.shmId + "_mng").c_str()); + boost::interprocess::named_mutex mtx(boost::interprocess::open_only, std::string("fmq_" + shmId.shmId + "_mtx").c_str()); + boost::interprocess::scoped_lock lock(mtx); + + Uint16SegmentInfoHashMap* segmentInfos = managementSegment.find(unique_instance).first; + + if (!segmentInfos) { + LOG(error) << "Found management segment, but could not locate segment info"; + throw MonitorError("Found management segment, but could not locate segment info"); + } + + auto it = segmentInfos->find(segmentId); + if (it != segmentInfos->end()) { + if (it->second.fAllocationAlgorithm == AllocationAlgorithm::rbtree_best_fit) { + RBTreeBestFitSegment segment(open_read_only, std::string("fmq_" + shmId.shmId + "_m_" + std::to_string(segmentId)).c_str()); + return segment.get_free_memory(); + } else { + SimpleSeqFitSegment segment(open_read_only, std::string("fmq_" + shmId.shmId + "_m_" + std::to_string(segmentId)).c_str()); + return segment.get_free_memory(); + } + } else { + LOG(error) << "Could not find segment id '" << segmentId << "'"; + throw MonitorError(tools::ToString("Could not find segment id '", segmentId, "'")); + } + } catch (bie&) { + LOG(error) << "Could not find management segment for shmid '" << shmId.shmId << "'"; + throw MonitorError(tools::ToString("Could not find management segment for shmid '", shmId.shmId, "'")); + } +} +unsigned long Monitor::GetFreeMemory(const SessionId& sessionId, uint16_t segmentId) +{ + ShmId shmId{makeShmIdStr(sessionId.sessionId)}; + return GetFreeMemory(shmId, segmentId); +} + void Monitor::PrintHelp() { LOG(info) << "controls: [x] close memory, " diff --git a/fairmq/shmem/Monitor.h b/fairmq/shmem/Monitor.h index 3d571705..65ac3a7e 100644 --- a/fairmq/shmem/Monitor.h +++ b/fairmq/shmem/Monitor.h @@ -79,10 +79,28 @@ class Monitor /// @param verbose output cleanup results to stdout static std::vector> CleanupFull(const SessionId& sessionId, bool verbose = true); + /// @brief Outputs list of messages in shmem (if compiled with FAIRMQ_DEBUG_MODE=ON) + /// @param shmId shmem id static void PrintDebugInfo(const ShmId& shmId); - static void PrintDebugInfo(const SessionId& shmId); + /// @brief Outputs list of messages in shmem (if compiled with FAIRMQ_DEBUG_MODE=ON) + /// @param sessionId session id + static void PrintDebugInfo(const SessionId& sessionId); + /// @brief Returns a list of messages in shmem (if compiled with FAIRMQ_DEBUG_MODE=ON) + /// @param shmId shmem id static std::unordered_map> GetDebugInfo(const ShmId& shmId); - static std::unordered_map> GetDebugInfo(const SessionId& shmId); + /// @brief Returns a list of messages in shmem (if compiled with FAIRMQ_DEBUG_MODE=ON) + /// @param sessionId session id + static std::unordered_map> GetDebugInfo(const SessionId& sessionId); + /// @brief Returns the amount of free memory in the specified segment + /// @param sessionId shmem id + /// @param segmentId segment id + /// @throws MonitorError + static unsigned long GetFreeMemory(const ShmId& shmId, uint16_t segmentId); + /// @brief Returns the amount of free memory in the specified segment + /// @param sessionId session id + /// @param segmentId segment id + /// @throws MonitorError + static unsigned long GetFreeMemory(const SessionId& sessionId, uint16_t segmentId); static bool PrintShm(const ShmId& shmId); static void ListAll(const std::string& path); @@ -94,6 +112,7 @@ class Monitor static bool RemoveCondition(const std::string& name); struct DaemonPresent : std::runtime_error { using std::runtime_error::runtime_error; }; + struct MonitorError : std::runtime_error { using std::runtime_error::runtime_error; }; private: void PrintHelp(); diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index d0adae70..958cb813 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -263,6 +263,7 @@ add_testsuite(Transport ${CMAKE_CURRENT_BINARY_DIR}/runner.cxx transport/_transfer_timeout.cxx transport/_options.cxx + transport/_shmem.cxx LINKS FairMQ INCLUDES ${CMAKE_CURRENT_SOURCE_DIR} diff --git a/test/transport/_shmem.cxx b/test/transport/_shmem.cxx new file mode 100644 index 00000000..fdfa4cfc --- /dev/null +++ b/test/transport/_shmem.cxx @@ -0,0 +1,43 @@ +/******************************************************************************** + * Copyright (C) 2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ + +#include +#include +#include +#include + +#include + +#include + +namespace +{ + +using namespace std; +using namespace fair::mq; + +void GetFreeMemory() +{ + ProgOptions config; + string sessionId(to_string(tools::UuidHash())); + config.SetProperty("session", sessionId); + + ASSERT_THROW(shmem::Monitor::GetFreeMemory(shmem::SessionId{sessionId}, 0), shmem::Monitor::MonitorError); + + auto factory = FairMQTransportFactory::CreateTransportFactory("shmem", tools::Uuid(), &config); + + ASSERT_NO_THROW(shmem::Monitor::GetFreeMemory(shmem::SessionId{sessionId}, 0)); + ASSERT_THROW(shmem::Monitor::GetFreeMemory(shmem::SessionId{sessionId}, 1), shmem::Monitor::MonitorError); +} + +TEST(Monitor, GetFreeMemory) +{ + GetFreeMemory(); +} + +} // namespace