diff --git a/fairmq/shmem/Monitor.cxx b/fairmq/shmem/Monitor.cxx index b2ceff4b..7cb90787 100644 --- a/fairmq/shmem/Monitor.cxx +++ b/fairmq/shmem/Monitor.cxx @@ -10,7 +10,6 @@ #include "Common.h" #include -#include #include #include @@ -81,8 +80,6 @@ Monitor::Monitor(const string& shmId, bool selfDestruct, bool interactive, bool , fTimeoutInMS(timeoutInMS) , fIntervalInMS(intervalInMS) , fShmId(shmId) - , fSegmentName("fmq_" + fShmId + "_m_0") - , fManagementSegmentName("fmq_" + fShmId + "_mng") , fControlQueueName("fmq_" + fShmId + "_cq") , fTerminating(false) , fHeartbeatTriggered(false) @@ -90,18 +87,18 @@ Monitor::Monitor(const string& shmId, bool selfDestruct, bool interactive, bool , fSignalThread() , fDeviceHeartbeats() { - if (fMonitor) { + if (!fViewOnly) { try { bipc::named_mutex monitorStatus(bipc::create_only, string("fmq_" + fShmId + "_ms").c_str()); } catch (bie&) { - cout << "fairmq-shmmonitor (in monitoring mode) for shared memory id " << fShmId << " already started or did not not properly exited. Try `fairmq-shmmonitor --cleanup --shmid " << fShmId << "`" << endl; - throw DaemonPresent(tools::ToString("fairmq-shmmonitor (in monitoring mode) for shared memory id ", fShmId, " already started or did not not properly exited.")); + if (fInteractive) { + cout << "fairmq-shmmonitor for shm id " << fShmId << " is already running. Try `fairmq-shmmonitor --cleanup --shmid " << fShmId << "`, or run in view-only mode (-v)" << endl; + } else { + cout << "fairmq-shmmonitor for shm id " << fShmId << " is already running. Try `fairmq-shmmonitor --cleanup --shmid " << fShmId << "`" << endl; + } + throw DaemonPresent(tools::ToString("fairmq-shmmonitor (in monitoring mode) for shm id ", fShmId, " is already running.")); } } - - Logger::SetConsoleColor(false); - Logger::DefineVerbosity(Verbosity::user1, VerbositySpec::Make(VerbositySpec::Info::timestamp_us)); - Logger::SetVerbosity(Verbosity::verylow); } void Monitor::CatchSignals() @@ -131,18 +128,13 @@ void Monitor::Run() thread heartbeatThread; if (!fViewOnly) { RemoveQueue(fControlQueueName); - heartbeatThread = thread(&Monitor::MonitorHeartbeats, this); + heartbeatThread = thread(&Monitor::ReceiveHeartbeats, this); } if (fInteractive) { Interactive(); - } else if (fViewOnly) { - CheckSegment(); } else { - while (!fTerminating) { - this_thread::sleep_for(chrono::milliseconds(500)); - CheckSegment(); - } + Watch(); } if (!fViewOnly) { @@ -150,7 +142,136 @@ void Monitor::Run() } } -void Monitor::MonitorHeartbeats() +void Monitor::Watch() +{ + while (!fTerminating) { + using namespace boost::interprocess; + + try { + managed_shared_memory managementSegment(open_read_only, std::string("fmq_" + fShmId + "_mng").c_str()); + + fSeenOnce = true; + + auto now = chrono::high_resolution_clock::now(); + unsigned int duration = chrono::duration_cast(now - fLastHeartbeat).count(); + + if (fHeartbeatTriggered && duration > fTimeoutInMS) { + // memory is present, but no heartbeats since timeout duration + cout << "no heartbeats since over " << fTimeoutInMS << " milliseconds, cleaning..." << endl; + Cleanup(ShmId{fShmId}); + fHeartbeatTriggered = false; + if (fSelfDestruct) { + cout << "self destructing (segment has been observed and cleaned up by the monitor)" << endl; + fTerminating = true; + } + } + } catch (bie&) { + fHeartbeatTriggered = false; + + if (fSelfDestruct) { + if (fSeenOnce) { + // segment has been observed at least once, can self-destruct + cout << "self destructing (segment has been observed and cleaned up orderly)" << endl; + fTerminating = true; + } else { + // if self-destruct is requested, and no segment has ever been observed, quit after double timeout duration + auto now = chrono::high_resolution_clock::now(); + unsigned int duration = chrono::duration_cast(now - fLastHeartbeat).count(); + + if (duration > fTimeoutInMS * 2) { + Cleanup(ShmId{fShmId}); + cout << "self destructing (no segments observed within (timeout * 2) since start)" << endl; + fTerminating = true; + } + } + } + } + + this_thread::sleep_for(chrono::milliseconds(100)); + } +} + +bool Monitor::PrintShm(const ShmId& shmId) +{ + using namespace boost::interprocess; + + try { + managed_shared_memory managementSegment(open_read_only, std::string("fmq_" + shmId.shmId + "_mng").c_str()); + VoidAlloc allocInstance(managementSegment.get_segment_manager()); + + Uint16SegmentInfoHashMap* segmentInfos = managementSegment.find(unique_instance).first; + std::unordered_map> segments; + + if (!segmentInfos) { + cout << "Found management segment, but cannot locate segment info, something went wrong..." << endl; + return false; + } + + for (const auto& s : *segmentInfos) { + if (s.second.fAllocationAlgorithm == AllocationAlgorithm::rbtree_best_fit) { + segments.emplace(s.first, RBTreeBestFitSegment(open_read_only, std::string("fmq_" + shmId.shmId + "_m_" + to_string(s.first)).c_str())); + } else { + segments.emplace(s.first, SimpleSeqFitSegment(open_read_only, std::string("fmq_" + shmId.shmId + "_m_" + to_string(s.first)).c_str())); + } + } + + unsigned int numDevices = 0; + int creatorId = -1; + std::string sessionName; + + DeviceCounter* deviceCounter = managementSegment.find(unique_instance).first; + if (deviceCounter) { + numDevices = deviceCounter->fCount; + } + SessionInfo* sessionInfo = managementSegment.find(unique_instance).first; + if (sessionInfo) { + creatorId = sessionInfo->fCreatorId; + sessionName = sessionInfo->fSessionName; + } +#ifdef FAIRMQ_DEBUG_MODE + Uint16MsgCounterHashMap* msgCounters = managementSegment.find(unique_instance).first; +#endif + + stringstream ss; + size_t mfree = managementSegment.get_free_memory(); + size_t mtotal = managementSegment.get_size(); + size_t mused = mtotal - mfree; + + ss << "shm id: " << shmId.shmId + << ", session: " << sessionName + << ", creator id: " << creatorId + << ", devices: " << numDevices + << ", segments:\n"; + + for (const auto& s : segments) { + size_t free = boost::apply_visitor(SegmentFreeMemory(), s.second); + size_t total = boost::apply_visitor(SegmentSize(), s.second); + size_t used = total - free; + ss << " [" << s.first + << "]: total: " << total +#ifdef FAIRMQ_DEBUG_MODE + << ", msgs: " << ( (msgCounters != nullptr) ? to_string((*msgCounters)[s.first].fCount) : "unknown") +#else + << ", msgs: NODEBUG" +#endif + << ", free: " << free + << ", used: " << used + << "\n"; + } + + ss << " [m]: " + << "total: " << mtotal + << ", free: " << mfree + << ", used: " << mused; + LOGV(info, user1) << ss.str(); + } catch (bie&) { + return false; + } + + return true; +} + +void Monitor::ReceiveHeartbeats() { try { bipc::message_queue mq(bipc::open_or_create, fControlQueueName.c_str(), 1000, 256); @@ -235,132 +356,7 @@ void Monitor::Interactive() break; } - CheckSegment(); - - if (!fTerminating) { - cout << "\r"; - } - } -} - -void Monitor::CheckSegment() -{ - using namespace boost::interprocess; - - try { - managed_shared_memory managementSegment(open_read_only, fManagementSegmentName.c_str()); - VoidAlloc allocInstance(managementSegment.get_segment_manager()); - - Uint16SegmentInfoHashMap* segmentInfos = managementSegment.find(unique_instance).first; - std::unordered_map> segments; - - if (!segmentInfos) { - cout << "Found management segment, but cannot locate segment info, something went wrong..." << endl; - return; - } - - for (const auto& s : *segmentInfos) { - if (s.second.fAllocationAlgorithm == AllocationAlgorithm::rbtree_best_fit) { - segments.emplace(s.first, RBTreeBestFitSegment(open_read_only, std::string("fmq_" + fShmId + "_m_" + to_string(s.first)).c_str())); - } else { - segments.emplace(s.first, SimpleSeqFitSegment(open_read_only, std::string("fmq_" + fShmId + "_m_" + to_string(s.first)).c_str())); - } - } - - fSeenOnce = true; - - unsigned int numDevices = 0; - int creatorId = -1; - std::string sessionName; -#ifdef FAIRMQ_DEBUG_MODE - Uint16MsgCounterHashMap* msgCounters = nullptr; -#endif - - if (fInteractive || fViewOnly) { - DeviceCounter* dc = managementSegment.find(unique_instance).first; - if (dc) { - numDevices = dc->fCount; - } - SessionInfo* si = managementSegment.find(unique_instance).first; - if (si) { - creatorId = si->fCreatorId; - sessionName = si->fSessionName; - } -#ifdef FAIRMQ_DEBUG_MODE - msgCounters = managementSegment.find(unique_instance).first; -#endif - } - - auto now = chrono::high_resolution_clock::now(); - unsigned int duration = chrono::duration_cast(now - fLastHeartbeat).count(); - - if (fHeartbeatTriggered && duration > fTimeoutInMS) { - // memory is present, but no heartbeats since timeout duration - cout << "no heartbeats since over " << fTimeoutInMS << " milliseconds, cleaning..." << endl; - Cleanup(ShmId{fShmId}); - fHeartbeatTriggered = false; - if (fSelfDestruct) { - cout << "self destructing (segment has been observed and cleaned up by the monitor)" << endl; - fTerminating = true; - } - } - - if (fInteractive || fViewOnly) { - stringstream ss; - size_t mfree = managementSegment.get_free_memory(); - size_t mtotal = managementSegment.get_size(); - size_t mused = mtotal - mfree; - - ss << "shm id: " << fShmId - << ", session: " << sessionName - << ", creator id: " << creatorId - << ", devices: " << numDevices - << ", segments:\n"; - for (const auto& s : segments) { - size_t free = boost::apply_visitor(SegmentFreeMemory(), s.second); - size_t total = boost::apply_visitor(SegmentSize(), s.second); - size_t used = total - free; - ss << " [" << s.first - << "]: total: " << total -#ifdef FAIRMQ_DEBUG_MODE - << ", msgs: " << ( (msgCounters != nullptr) ? to_string((*msgCounters)[s.first].fCount) : "unknown") -#else - << ", msgs: NODEBUG" -#endif - << ", free: " << free - << ", used: " << used - << "\n"; - } - ss << " [m]: " - << "total: " << mtotal - << ", free: " << mfree - << ", used: " << mused; - LOGV(info, user1) << ss.str(); - } - } catch (bie&) { - if (!fMonitor && !fInteractive) { - cout << "No segments found." << endl; - } - fHeartbeatTriggered = false; - - if (fSelfDestruct) { - if (fSeenOnce) { - // segment has been observed at least once, can self-destruct - cout << "self destructing (segment has been observed and cleaned up orderly)" << endl; - fTerminating = true; - } else { - // if self-destruct is requested, and no segment has ever been observed, quit after double timeout duration - auto now = chrono::high_resolution_clock::now(); - unsigned int duration = chrono::duration_cast(now - fLastHeartbeat).count(); - - if (fMonitor && duration > fTimeoutInMS * 2) { - // clean just in case any other artifacts are left. - Cleanup(ShmId{fShmId}); - cout << "self destructing (no segments observed within (timeout * 2) since start)" << endl; - fTerminating = true; - } - } - } + PrintShm(ShmId{fShmId}); } } diff --git a/fairmq/shmem/Monitor.h b/fairmq/shmem/Monitor.h index 434e7dc8..b6d8b163 100644 --- a/fairmq/shmem/Monitor.h +++ b/fairmq/shmem/Monitor.h @@ -8,6 +8,8 @@ #ifndef FAIR_MQ_SHMEM_MONITOR_H_ #define FAIR_MQ_SHMEM_MONITOR_H_ +#include + #include #include #include @@ -82,6 +84,8 @@ class Monitor static std::unordered_map> GetDebugInfo(const ShmId& shmId); static std::unordered_map> GetDebugInfo(const SessionId& shmId); + static bool PrintShm(const ShmId& shmId); + static bool RemoveObject(const std::string& name); static bool RemoveFileMapping(const std::string& name); static bool RemoveQueue(const std::string& name); @@ -92,7 +96,8 @@ class Monitor private: void PrintHelp(); - void MonitorHeartbeats(); + void Watch(); + void ReceiveHeartbeats(); void CheckSegment(); void Interactive(); void SignalMonitor(); @@ -106,8 +111,6 @@ class Monitor unsigned int fTimeoutInMS; unsigned int fIntervalInMS; std::string fShmId; - std::string fSegmentName; - std::string fManagementSegmentName; std::string fControlQueueName; std::atomic fTerminating; std::atomic fHeartbeatTriggered; diff --git a/fairmq/shmem/runMonitor.cxx b/fairmq/shmem/runMonitor.cxx index 5f305715..6f4cb601 100644 --- a/fairmq/shmem/runMonitor.cxx +++ b/fairmq/shmem/runMonitor.cxx @@ -69,6 +69,10 @@ static void daemonize() int main(int argc, char** argv) { try { + fair::Logger::SetConsoleColor(false); + fair::Logger::DefineVerbosity(fair::Verbosity::user1, fair::VerbositySpec::Make(fair::VerbositySpec::Info::timestamp_us)); + fair::Logger::SetVerbosity(fair::Verbosity::verylow); + string sessionName; string shmId; bool cleanup = false; @@ -97,7 +101,7 @@ int main(int argc, char** argv) ("monitor,m" , value(&monitor)->implicit_value(true), "Run in monitoring mode") ("debug,b" , value(&debug)->implicit_value(true), "Debug - Print a list of messages)") ("clean-on-exit,e", value(&cleanOnExit)->implicit_value(true), "Perform cleanup on exit") - ("interval" , value(&intervalInMS)->default_value(100), "Output interval for interactive/view-only mode") + ("interval" , value(&intervalInMS)->default_value(100), "Output interval for interactive mode") ("get-shmid" , value(&getShmId)->implicit_value(true), "Translate given session id and user id to a shmem id (uses current user id if none provided)") ("user-id" , value(&userId)->default_value(-1), "User id (used with --get-shmid)") ("help,h", "Print help"); @@ -141,18 +145,22 @@ int main(int argc, char** argv) return 0; } - cout << "Starting shared memory monitor for session: \"" << sessionName << "\" (shmId: " << shmId << ")..." << endl; - if (!viewOnly && !interactive && !monitor) { // if neither of the run modes are selected, use view only mode. viewOnly = true; } - Monitor shmmonitor(shmId, selfDestruct, interactive, viewOnly, timeoutInMS, intervalInMS, monitor, cleanOnExit); - - if (interactive || !viewOnly) { - shmmonitor.CatchSignals(); + if (viewOnly && !interactive) { + if (!Monitor::PrintShm(ShmId{shmId})) { + cout << "No segments found." << endl; + } + return 0; } + + cout << "Starting shared memory monitor for session: \"" << sessionName << "\" (shm id: " << shmId << ")..." << endl; + + Monitor shmmonitor(shmId, selfDestruct, interactive, viewOnly, timeoutInMS, intervalInMS, monitor, cleanOnExit); + shmmonitor.CatchSignals(); shmmonitor.Run(); } catch (Monitor::DaemonPresent& dp) { return 0;