From 70a583d08d7cd92da8a07e92a98539456b88da3b Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Fri, 7 Aug 2020 16:29:53 +0200 Subject: [PATCH] Shm: Provide segment/msg debug infos --- fairmq/shmem/Common.h | 32 +++++++++++++++++++++-- fairmq/shmem/Manager.h | 32 ++++++++++++++++++++++- fairmq/shmem/Message.h | 9 +++++++ fairmq/shmem/Monitor.cxx | 51 ++++++++++++++++++++++++++++++++++++- fairmq/shmem/Monitor.h | 2 ++ fairmq/shmem/runMonitor.cxx | 7 +++++ 6 files changed, 129 insertions(+), 4 deletions(-) diff --git a/fairmq/shmem/Common.h b/fairmq/shmem/Common.h index e304b9fa..9d67c5ac 100644 --- a/fairmq/shmem/Common.h +++ b/fairmq/shmem/Common.h @@ -12,13 +12,15 @@ #include #include +#include // std::equal_to -#include +#include #include #include #include #include -#include +#include +#include #include #include @@ -71,6 +73,15 @@ struct DeviceCounter std::atomic fCount; }; +struct MsgCounter +{ + MsgCounter(unsigned int c) + : fCount(c) + {} + + std::atomic fCount; +}; + struct RegionCounter { RegionCounter(uint64_t c) @@ -88,6 +99,23 @@ struct MetaHeader boost::interprocess::managed_shared_memory::handle_t fHandle; }; +struct MsgDebug +{ + MsgDebug(pid_t pid, size_t size, const uint64_t creationTime) + : fPid(pid) + , fSize(size) + , fCreationTime(creationTime) + {} + + pid_t fPid; + size_t fSize; + uint64_t fCreationTime; +}; + +using Uint64MsgDebugPairAlloc = boost::interprocess::allocator, SegmentManager>; +using Uint64MsgDebugHashMap = boost::unordered_map, std::equal_to, Uint64MsgDebugPairAlloc>; +using Uint64MsgDebugMap = boost::interprocess::map, Uint64MsgDebugPairAlloc>; + struct RegionBlock { RegionBlock() diff --git a/fairmq/shmem/Manager.h b/fairmq/shmem/Manager.h index fbfd296d..25c734ff 100644 --- a/fairmq/shmem/Manager.h +++ b/fairmq/shmem/Manager.h @@ -71,7 +71,7 @@ class Manager : fShmId(std::move(shmId)) , fDeviceId(std::move(deviceId)) , fSegment(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_main").c_str(), size) - , fManagementSegment(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_mng").c_str(), 655360) + , fManagementSegment(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_mng").c_str(), 6553600) , fShmVoidAlloc(fManagementSegment.get_segment_manager()) , fShmMtx(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_mtx").c_str()) , fRegionEventsCV(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_cv").c_str()) @@ -80,6 +80,8 @@ class Manager , fRegionInfos(nullptr) , fInterrupted(false) , fMsgCounter(0) + , fMsgDebug(nullptr) + , fShmMsgCounter(nullptr) , fHeartbeatThread() , fSendHeartbeats(true) , fThrowOnBadAlloc(true) @@ -117,6 +119,7 @@ class Manager } fRegionInfos = fManagementSegment.find_or_construct(unique_instance)(fShmVoidAlloc); + fMsgDebug = fManagementSegment.find_or_construct(unique_instance)(fShmVoidAlloc); // store info about the managed segment as region with id 0 fRegionInfos->emplace(0, RegionInfo("", 0, 0, fShmVoidAlloc)); @@ -134,6 +137,16 @@ class Manager LOG(debug) << "initialized device counter with: " << fDeviceCounter->fCount; } + fShmMsgCounter = fManagementSegment.find(unique_instance).first; + + if (fShmMsgCounter) { + LOG(debug) << "message counter found, with value of " << fShmMsgCounter->fCount << "."; + } else { + LOG(debug) << "no message counter found, creating one and initializing with 0"; + fShmMsgCounter = fManagementSegment.construct(unique_instance)(0); + LOG(debug) << "initialized message counter with: " << fShmMsgCounter->fCount; + } + fHeartbeatThread = std::thread(&Manager::SendHeartbeats, this); } @@ -394,6 +407,21 @@ class Manager void IncrementMsgCounter() { fMsgCounter.fetch_add(1, std::memory_order_relaxed); } void DecrementMsgCounter() { fMsgCounter.fetch_sub(1, std::memory_order_relaxed); } + void IncrementShmMsgCounter() { ++(fShmMsgCounter->fCount); } + void DecrementShmMsgCounter() { --(fShmMsgCounter->fCount); } + + void AddMsgDebug(pid_t pid, size_t size, size_t handle, uint64_t time) + { + fMsgDebug->emplace(handle, MsgDebug(pid, size, time)); + } + + void RemoveMsgDebug(size_t handle) + { + fMsgDebug->erase(handle); + } + + boost::interprocess::named_mutex& GetMtx() { return fShmMtx; } + void SendHeartbeats() { std::string controlQueueName("fmq_" + fShmId + "_cq"); @@ -473,6 +501,8 @@ class Manager std::atomic fInterrupted; std::atomic fMsgCounter; // TODO: find a better lifetime solution instead of the counter + Uint64MsgDebugMap* fMsgDebug; + MsgCounter* fShmMsgCounter; std::thread fHeartbeatThread; bool fSendHeartbeats; diff --git a/fairmq/shmem/Message.h b/fairmq/shmem/Message.h index 6de8ff0c..37897f0c 100644 --- a/fairmq/shmem/Message.h +++ b/fairmq/shmem/Message.h @@ -22,6 +22,9 @@ #include // size_t #include +#include // getpid +#include // pid_t + namespace fair { namespace mq @@ -274,6 +277,9 @@ class Message final : public fair::mq::Message } } fMeta.fHandle = fManager.Segment().get_handle_from_address(fLocalPtr); + boost::interprocess::scoped_lock lock(fManager.GetMtx()); + fManager.IncrementShmMsgCounter(); + fManager.AddMsgDebug(getpid(), size, static_cast(fMeta.fHandle), std::chrono::system_clock::now().time_since_epoch().count()); } fMeta.fSize = size; @@ -285,6 +291,9 @@ class Message final : public fair::mq::Message if (fMeta.fHandle >= 0 && !fQueued) { if (fMeta.fRegionId == 0) { fManager.Segment().deallocate(fManager.Segment().get_address_from_handle(fMeta.fHandle)); + boost::interprocess::scoped_lock lock(fManager.GetMtx()); + fManager.DecrementShmMsgCounter(); + fManager.RemoveMsgDebug(fMeta.fHandle); fMeta.fHandle = -1; } else { if (!fRegionPtr) { diff --git a/fairmq/shmem/Monitor.cxx b/fairmq/shmem/Monitor.cxx index a648e8ad..5b62bbf2 100644 --- a/fairmq/shmem/Monitor.cxx +++ b/fairmq/shmem/Monitor.cxx @@ -225,6 +225,9 @@ void Monitor::Interactive() case '\n': cout << "\n[\\n] --> invalid input." << endl; break; + case 'b': + PrintDebug(ShmId{fShmId}); + break; default: cout << "\n[" << c << "] --> invalid input." << endl; break; @@ -284,12 +287,17 @@ void Monitor::CheckSegment() fSeenOnce = true; unsigned int numDevices = 0; + unsigned int numMessages = 0; if (fInteractive || fViewOnly) { DeviceCounter* dc = managementSegment.find(bipc::unique_instance).first; if (dc) { numDevices = dc->fCount; } + MsgCounter* mc = managementSegment.find(bipc::unique_instance).first; + if (mc) { + numMessages = mc->fCount; + } } auto now = chrono::high_resolution_clock::now(); @@ -311,17 +319,27 @@ void Monitor::CheckSegment() << setw(10) << segment.get_size() << " | " << setw(10) << segment.get_free_memory() << " | " << setw(8) << numDevices << " | " + << setw(8) << numMessages << " | " << setw(10) << (fViewOnly ? "view only" : to_string(duration)) << " |" << c << flush; } else if (fViewOnly) { size_t free = segment.get_free_memory(); size_t total = segment.get_size(); size_t used = total - free; + // size_t mfree = managementSegment.get_free_memory(); + // size_t mtotal = managementSegment.get_size(); + // size_t mused = mtotal - mfree; LOGV(info, user1) << "[" << fSegmentName << "] devices: " << numDevices << ", total: " << total + << ", msgs: " << numMessages << ", free: " << free << ", used: " << used; + // << "\n " + // << "[" << fManagementSegmentName + // << "] total: " << mtotal + // << ", free: " << mfree + // << ", used: " << mused; } } catch (bie&) { fHeartbeatTriggered = false; @@ -331,6 +349,7 @@ void Monitor::CheckSegment() << setw(10) << "-" << " | " << setw(10) << "-" << " | " << setw(8) << "-" << " | " + << setw(8) << "-" << " | " << setw(10) << "-" << " |" << c << flush; } @@ -356,6 +375,35 @@ void Monitor::CheckSegment() } } +void Monitor::PrintDebug(const ShmId& shmId) +{ + string managementSegmentName("fmq_" + shmId.shmId + "_mng"); + try { + bipc::managed_shared_memory managementSegment(bipc::open_only, managementSegmentName.c_str()); + boost::interprocess::named_mutex mtx(boost::interprocess::open_only, std::string("fmq_" + shmId.shmId + "_mtx").c_str()); + boost::interprocess::scoped_lock lock(mtx); + + Uint64MsgDebugMap* debug = managementSegment.find(bipc::unique_instance).first; + + cout << endl << "found " << debug->size() << " message(s):" << endl; + + for (const auto& e : *debug) { + using time_point = std::chrono::system_clock::time_point; + time_point tmpt{std::chrono::duration_cast(std::chrono::nanoseconds(e.second.fCreationTime))}; + std::time_t t = std::chrono::system_clock::to_time_t(tmpt); + uint64_t ms = e.second.fCreationTime % 1000000; + auto tm = localtime(&t); + cout << "offset: " << setw(12) << setfill(' ') << e.first + << ", size: " << setw(10) << setfill(' ') << e.second.fSize + << ", creator PID: " << e.second.fPid << setfill('0') + << ", at: " << setw(2) << tm->tm_hour << ":" << setw(2) << tm->tm_min << ":" << setw(2) << tm->tm_sec << "." << setw(6) << ms << endl; + } + cout << setfill(' '); + } catch (bie&) { + cout << "no segment found" << endl; + } +} + void Monitor::PrintQueues() { cout << '\n'; @@ -401,13 +449,14 @@ void Monitor::PrintHeader() << setw(10) << "size" << " | " << setw(10) << "free" << " | " << setw(8) << "devices" << " | " + << setw(8) << "msgs" << " | " << setw(10) << "last hb" << " |" << endl; } void Monitor::PrintHelp() { - cout << "controls: [x] close memory, [p] print queues, [h] help, [q] quit." << endl; + cout << "controls: [x] close memory, [p] print queues, [] print a list of allocated messages, [h] help, [q] quit." << endl; } void Monitor::RemoveObject(const string& name) diff --git a/fairmq/shmem/Monitor.h b/fairmq/shmem/Monitor.h index 22f1fd8a..88a2b2ca 100644 --- a/fairmq/shmem/Monitor.h +++ b/fairmq/shmem/Monitor.h @@ -60,6 +60,8 @@ class Monitor /// @param sessionId session id static void CleanupFull(const SessionId& sessionId); + static void PrintDebug(const ShmId& shmId); + static void RemoveObject(const std::string&); static void RemoveFileMapping(const std::string&); static void RemoveQueue(const std::string&); diff --git a/fairmq/shmem/runMonitor.cxx b/fairmq/shmem/runMonitor.cxx index 13582e5c..fa9deb1b 100644 --- a/fairmq/shmem/runMonitor.cxx +++ b/fairmq/shmem/runMonitor.cxx @@ -78,6 +78,7 @@ int main(int argc, char** argv) unsigned int timeoutInMS = 5000; unsigned int intervalInMS = 100; bool runAsDaemon = false; + bool debug = false; bool cleanOnExit = false; options_description desc("Options"); @@ -90,6 +91,7 @@ int main(int argc, char** argv) ("view,v" , value(&viewOnly)->implicit_value(true), "Run in view only mode") ("timeout,t" , value(&timeoutInMS)->default_value(5000), "Heartbeat timeout in milliseconds") ("daemonize,d" , value(&runAsDaemon)->implicit_value(true), "Daemonize the monitor") + ("debug,b" , value(&debug)->implicit_value(true), "Debug - Print a list of messages)") ("clean-on-exit,e", value(&cleanOnExit)->implicit_value(true), "Perform cleanup on exit") ("interval" , value(&intervalInMS)->default_value(100), "Output interval for interactive/view-only mode") ("help,h", "Print help"); @@ -117,6 +119,11 @@ int main(int argc, char** argv) return 0; } + if (debug) { + Monitor::PrintDebug(ShmId{shmId}); + return 0; + } + cout << "Starting shared memory monitor for session: \"" << sessionName << "\" (shmId: " << shmId << ")..." << endl; if (viewOnly && !interactive) { cout << "running in non-interactive view-only mode, outputting with interval of " << intervalInMS << "ms. (change with --interval), press ctrl+C to exit." << endl;