diff --git a/fairmq/options/FairMQProgOptions.cxx b/fairmq/options/FairMQProgOptions.cxx index b93b121c..c9fba500 100644 --- a/fairmq/options/FairMQProgOptions.cxx +++ b/fairmq/options/FairMQProgOptions.cxx @@ -334,9 +334,9 @@ void FairMQProgOptions::FillOptionDescription(boost::program_options::options_de ("port-range-max", po::value()->default_value(32000), "End of the port range for dynamic initialization.") ("log-to-file", po::value()->default_value(""), "Log output to a file.") ("print-channels", po::value()->implicit_value(true), "Print registered channel endpoints in a machine-readable format (::)") - ("shm-segment-size", po::value()->default_value(2000000000), "shmem transport: size of the shared memory segment (in bytes).") - ("shm-segment-name", po::value()->default_value("fmq_shm_main"), "shmem transport: name of the shared memory segment.") - ("rate", po::value()->default_value(0.), "rate for conditional run loop (Hz)") + ("shm-segment-size", po::value()->default_value(2000000000), "Shared memory: size of the shared memory segment (in bytes).") + ("rate", po::value()->default_value(0.), "Rate for conditional run loop (Hz).") + ("session", po::value()->default_value("default"), "Session name.") ; } diff --git a/fairmq/shmem/FairMQTransportFactorySHM.cxx b/fairmq/shmem/FairMQTransportFactorySHM.cxx index 2bc1d784..2829437e 100644 --- a/fairmq/shmem/FairMQTransportFactorySHM.cxx +++ b/fairmq/shmem/FairMQTransportFactorySHM.cxx @@ -35,13 +35,13 @@ FairMQ::Transport FairMQTransportFactorySHM::fTransportType = FairMQ::Transport: FairMQTransportFactorySHM::FairMQTransportFactorySHM(const string& id, const FairMQProgOptions* config) : FairMQTransportFactory(id) + , fSessionName("default") , fContext(nullptr) , fHeartbeatSocket(nullptr) , fHeartbeatThread() , fSendHeartbeats(true) - , fShMutex(bipc::open_or_create, "fmq_shm_mutex") + , fShMutex(bipc::open_or_create, std::string("fmq_shm_" + fSessionName + "_mutex").c_str()) , fDeviceCounter(nullptr) - , fSegmentName() , fManager(nullptr) { int major, minor, patch; @@ -57,12 +57,12 @@ FairMQTransportFactorySHM::FairMQTransportFactorySHM(const string& id, const Fai } int numIoThreads = 1; - fSegmentName = "fmq_shm_main"; size_t segmentSize = 2000000000; if (config) { numIoThreads = config->GetValue("io-threads"); - fSegmentName = config->GetValue("shm-segment-name"); + fSessionName = config->GetValue("session"); + // fSegmentName = "fmq_shm_" + fSessionName + "_main"; segmentSize = config->GetValue("shm-segment-size"); } else @@ -81,7 +81,7 @@ FairMQTransportFactorySHM::FairMQTransportFactorySHM(const string& id, const Fai LOG(ERROR) << "shmem: failed configuring context, reason: " << zmq_strerror(errno); } - fManager = fair::mq::tools::make_unique(fSegmentName, segmentSize); + fManager = fair::mq::tools::make_unique(fSessionName, segmentSize); LOG(DEBUG) << "shmem: created/opened shared memory segment of " << segmentSize << " bytes. Available are " << fManager->Segment().get_free_memory() << " bytes."; { @@ -112,7 +112,7 @@ FairMQTransportFactorySHM::FairMQTransportFactorySHM(const string& id, const Fai // } // else // { - // LOG(DEBUG) << "shmem: found shmmonitor in fmq_shm_management."; + // LOG(DEBUG) << "shmem: found shmmonitor."; // } // } // catch (std::exception& e) @@ -163,11 +163,12 @@ void FairMQTransportFactorySHM::StartMonitor() void FairMQTransportFactorySHM::SendHeartbeats() { + string controlQueueName("fmq_shm_" + fSessionName + "_control_queue"); while (fSendHeartbeats) { try { - bipc::message_queue mq(bipc::open_only, "fmq_shm_control_queue"); + bipc::message_queue mq(bipc::open_only, controlQueueName.c_str()); bool heartbeat = true; bpt::ptime sndTill = bpt::microsec_clock::universal_time() + bpt::milliseconds(100); if (mq.timed_send(&heartbeat, sizeof(heartbeat), 0, sndTill)) @@ -182,7 +183,7 @@ void FairMQTransportFactorySHM::SendHeartbeats() catch (bipc::interprocess_exception& ie) { this_thread::sleep_for(chrono::milliseconds(500)); - // LOG(WARN) << "no fmq_shm_control_queue found"; + // LOG(WARN) << "no " << controlQueueName << " found"; } } } @@ -272,20 +273,20 @@ FairMQTransportFactorySHM::~FairMQTransportFactorySHM() if (fDeviceCounter->fCount == 0) { - LOG(DEBUG) << "shmem: last " << fSegmentName << " user, removing segment."; + LOG(DEBUG) << "shmem: last segment user, removing segment."; fManager->RemoveSegment(); lastRemoved = true; } else { - LOG(DEBUG) << "shmem: other " << fSegmentName << " users present (" << fDeviceCounter->fCount << "), not removing it."; + LOG(DEBUG) << "shmem: other segment users present (" << fDeviceCounter->fCount << "), not removing it."; } } if (lastRemoved) { - boost::interprocess::named_mutex::remove("fmq_shm_mutex"); + boost::interprocess::named_mutex::remove(std::string("fmq_shm_" + fSessionName + "_mutex").c_str()); } } diff --git a/fairmq/shmem/FairMQTransportFactorySHM.h b/fairmq/shmem/FairMQTransportFactorySHM.h index 14c6c0af..26b1509d 100644 --- a/fairmq/shmem/FairMQTransportFactorySHM.h +++ b/fairmq/shmem/FairMQTransportFactorySHM.h @@ -55,13 +55,13 @@ class FairMQTransportFactorySHM : public FairMQTransportFactory void StartMonitor(); static FairMQ::Transport fTransportType; + std::string fSessionName; void* fContext; void* fHeartbeatSocket; std::thread fHeartbeatThread; std::atomic fSendHeartbeats; boost::interprocess::named_mutex fShMutex; fair::mq::shmem::DeviceCounter* fDeviceCounter; - std::string fSegmentName; std::unique_ptr fManager; }; diff --git a/fairmq/shmem/Manager.cxx b/fairmq/shmem/Manager.cxx index 6b6c6230..cfb49fe5 100644 --- a/fairmq/shmem/Manager.cxx +++ b/fairmq/shmem/Manager.cxx @@ -20,9 +20,11 @@ using namespace std; namespace bipc = boost::interprocess; Manager::Manager(const string& name, size_t size) - : fName(name) - , fSegment(bipc::open_or_create, fName.c_str(), size) - , fManagementSegment(bipc::open_or_create, "fmq_shm_management", 65536) + : fSessionName(name) + , fSegmentName("fmq_shm_" + fSessionName + "_main") + , fManagementSegmentName("fmq_shm_" + fSessionName + "_management") + , fSegment(bipc::open_or_create, fSegmentName.c_str(), size) + , fManagementSegment(bipc::open_or_create, fManagementSegmentName.c_str(), 65536) , fRegions() {} @@ -112,22 +114,22 @@ bipc::message_queue* Manager::GetRegionQueue(const uint64_t id) void Manager::RemoveSegment() { - if (bipc::shared_memory_object::remove(fName.c_str())) + if (bipc::shared_memory_object::remove(fSegmentName.c_str())) { - LOG(DEBUG) << "shmem: successfully removed " << fName << " segment after the device has stopped."; + LOG(DEBUG) << "shmem: successfully removed " << fSegmentName << " segment after the device has stopped."; } else { - LOG(DEBUG) << "shmem: did not remove " << fName << " segment after the device stopped. Already removed?"; + LOG(DEBUG) << "shmem: did not remove " << fSegmentName << " segment after the device stopped. Already removed?"; } - if (bipc::shared_memory_object::remove("fmq_shm_management")) + if (bipc::shared_memory_object::remove(fManagementSegmentName.c_str())) { - LOG(DEBUG) << "shmem: successfully removed \"fmq_shm_management\" segment after the device has stopped."; + LOG(DEBUG) << "shmem: successfully removed '" << fManagementSegmentName << "' segment after the device has stopped."; } else { - LOG(DEBUG) << "shmem: did not remove \"fmq_shm_management\" segment after the device stopped. Already removed?"; + LOG(DEBUG) << "shmem: did not remove '" << fManagementSegmentName << "' segment after the device stopped. Already removed?"; } } diff --git a/fairmq/shmem/Manager.h b/fairmq/shmem/Manager.h index 82eb5f84..919f5277 100644 --- a/fairmq/shmem/Manager.h +++ b/fairmq/shmem/Manager.h @@ -68,7 +68,9 @@ class Manager boost::interprocess::managed_shared_memory& ManagementSegment(); private: - std::string fName; + std::string fSessionName; + std::string fSegmentName; + std::string fManagementSegmentName; boost::interprocess::managed_shared_memory fSegment; boost::interprocess::managed_shared_memory fManagementSegment; std::unordered_map fRegions; diff --git a/fairmq/shmem/Monitor.cxx b/fairmq/shmem/Monitor.cxx index 72bb0b62..1ce072a0 100644 --- a/fairmq/shmem/Monitor.cxx +++ b/fairmq/shmem/Monitor.cxx @@ -51,17 +51,20 @@ void signalHandler(int signal) gSignalStatus = signal; } -Monitor::Monitor(const string& segmentName, bool selfDestruct, bool interactive, unsigned int timeoutInMS) +Monitor::Monitor(const string& sessionName, bool selfDestruct, bool interactive, unsigned int timeoutInMS) : fSelfDestruct(selfDestruct) , fInteractive(interactive) , fSeenOnce(false) , fTimeoutInMS(timeoutInMS) - , fSegmentName(segmentName) + , fSessionName(sessionName) + , fSegmentName("fmq_shm_" + fSessionName + "_main") + , fManagementSegmentName("fmq_shm_" + fSessionName + "_management") + , fControlQueueName("fmq_shm_" + fSessionName + "_control_queue") , fTerminating(false) , fHeartbeatTriggered(false) , fLastHeartbeat() , fSignalThread() - , fManagementSegment(bipc::open_or_create, "fmq_shm_management", 65536) + , fManagementSegment(bipc::open_or_create, fManagementSegmentName.c_str(), 65536) { MonitorStatus* monitorStatus = fManagementSegment.find(bipc::unique_instance).first; if (monitorStatus != nullptr) @@ -71,7 +74,7 @@ Monitor::Monitor(const string& segmentName, bool selfDestruct, bool interactive, } fManagementSegment.construct(bipc::unique_instance)(); - RemoveQueue("fmq_shm_control_queue"); + RemoveQueue(fControlQueueName); } void Monitor::CatchSignals() @@ -124,7 +127,7 @@ void Monitor::MonitorHeartbeats() { try { - bipc::message_queue mq(bipc::open_or_create, "fmq_shm_control_queue", 1000, sizeof(bool)); + bipc::message_queue mq(bipc::open_or_create, fControlQueueName.c_str(), 1000, sizeof(bool)); unsigned int priority; bipc::message_queue::size_type recvdSize; @@ -149,7 +152,7 @@ void Monitor::MonitorHeartbeats() cout << ie.what() << endl; } - RemoveQueue("fmq_shm_control_queue"); + RemoveQueue(fControlQueueName); } void Monitor::Interactive() @@ -192,7 +195,7 @@ void Monitor::Interactive() break; case 'x': cout << "[x] --> closing shared memory:" << endl; - Cleanup(fSegmentName); + Cleanup(fSessionName); break; case 'h': cout << "[h] --> help:" << endl << endl; @@ -283,7 +286,7 @@ void Monitor::CheckSegment() if (fHeartbeatTriggered && duration > fTimeoutInMS) { cout << "no heartbeats since over " << fTimeoutInMS << " milliseconds, cleaning..." << endl; - Cleanup(fSegmentName); + Cleanup(fSessionName); fHeartbeatTriggered = false; if (fSelfDestruct) { @@ -335,11 +338,12 @@ void Monitor::CheckSegment() } } -void Monitor::Cleanup(const string& segmentName) +void Monitor::Cleanup(const string& sessionName) { + string managementSegmentName("fmq_shm_" + sessionName + "_management"); try { - bipc::managed_shared_memory managementSegment(bipc::open_only, "fmq_shm_management"); + bipc::managed_shared_memory managementSegment(bipc::open_only, managementSegmentName.c_str()); RegionCounter* rc = managementSegment.find(bipc::unique_instance).first; if (rc) { @@ -347,8 +351,8 @@ void Monitor::Cleanup(const string& segmentName) unsigned int regionCount = rc->fCount; for (unsigned int i = 1; i <= regionCount; ++i) { - RemoveObject("fmq_shm_region_" + to_string(i)); - RemoveQueue(std::string("fmq_shm_region_queue_" + std::to_string(i))); + RemoveObject("fmq_shm_" + sessionName + "_region_" + to_string(i)); + RemoveQueue(std::string("fmq_shm_" + sessionName + "_region_queue_" + std::to_string(i))); } } else @@ -356,16 +360,16 @@ void Monitor::Cleanup(const string& segmentName) cout << "shmem: no region counter found. no regions to cleanup." << endl; } - RemoveObject("fmq_shm_management"); + RemoveObject(managementSegmentName.c_str()); } catch (bipc::interprocess_exception& ie) { - cout << "Did not find \"fmq_shm_management\" shared memory segment. No regions to cleanup." << endl; + cout << "Did not find '" << managementSegmentName << "' shared memory segment. No regions to cleanup." << endl; } - RemoveObject(segmentName); + RemoveObject("fmq_shm_" + sessionName + "_main"); - boost::interprocess::named_mutex::remove("fmq_shm_mutex"); + boost::interprocess::named_mutex::remove(std::string("fmq_shm_" + sessionName + "_mutex").c_str()); cout << endl; } @@ -401,7 +405,7 @@ void Monitor::PrintQueues() try { bipc::managed_shared_memory segment(bipc::open_only, fSegmentName.c_str()); - StringVector* queues = segment.find("fmq_shm_queues").first; + StringVector* queues = segment.find(std::string("fmq_shm_" + fSessionName + "_queues").c_str()).first; if (queues) { cout << "found " << queues->size() << " queue(s):" << endl; diff --git a/fairmq/shmem/Monitor.h b/fairmq/shmem/Monitor.h index 59746391..3301509c 100644 --- a/fairmq/shmem/Monitor.h +++ b/fairmq/shmem/Monitor.h @@ -25,7 +25,7 @@ namespace shmem class Monitor { public: - Monitor(const std::string& segmentName, bool selfDestruct, bool interactive, unsigned int timeoutInMS); + Monitor(const std::string& sessionName, bool selfDestruct, bool interactive, unsigned int timeoutInMS); Monitor(const Monitor&) = delete; Monitor operator=(const Monitor&) = delete; @@ -35,7 +35,7 @@ class Monitor virtual ~Monitor(); - static void Cleanup(const std::string& segmentName); + static void Cleanup(const std::string& sessionName); static void RemoveObject(const std::string&); static void RemoveQueue(const std::string&); @@ -52,7 +52,10 @@ class Monitor bool fInteractive; // running in interactive mode bool fSeenOnce; // true is segment has been opened successfully at least once unsigned int fTimeoutInMS; + std::string fSessionName; std::string fSegmentName; + std::string fManagementSegmentName; + std::string fControlQueueName; std::atomic fTerminating; std::atomic fHeartbeatTriggered; std::chrono::high_resolution_clock::time_point fLastHeartbeat; diff --git a/fairmq/shmem/README.md b/fairmq/shmem/README.md index 689082b3..10573a8a 100644 --- a/fairmq/shmem/README.md +++ b/fairmq/shmem/README.md @@ -12,7 +12,7 @@ The shared memory monitor tool, supplied with the shared memory transport can be With default arguments the monitor will run indefinitely with no output, and clean up shared memory segment if it is open and no heartbeats from devices arrive within a timeout period. It can be further customized with following parameters: - `--segment-name `: customize the name of the shared memory segment (default is "fmq_shm_main"). + `--session `: customize the name of the shared memory segment via the session name (default is "default"). `--cleanup`: start monitor, perform cleanup of the memory and quit. `--self-destruct`: run until the memory segment is closed (either naturally via cleanup performed by devices or in case of a crash (no heartbeats within timeout)). `--interactive`: run interactively, with detailed segment details and user input for various shmem operations. @@ -27,9 +27,9 @@ The FairMQShmMonitor class can also be used independently from the supplied exec FairMQ Shared Memory currently uses following names to register shared memory on the system: -`fmq_shm_main` - main segment name, used for user data (this name can be overridden via `--shm-segment-name`). -`fmq_shm_management` - management segment name, used for storing management data. -`fmq_shm_control_queue` - message queue for communicating between shm transport and shm monitor (exists independent of above segments). -`fmq_shm_mutex` - boost::interprocess::named_mutex for management purposes (exists independent of above segments). -`fmq_shm_region_` - names of unmanaged regions. -`fmq_shm_region_queue_` - names of queues for the unmanaged regions. +`fmq_shm__main` - main segment name, used for user data (session name can be overridden via `--session`). +`fmq_shm__management` - management segment name, used for storing management data. +`fmq_shm__control_queue` - message queue for communicating between shm transport and shm monitor (exists independent of above segments). +`fmq_shm__mutex` - boost::interprocess::named_mutex for management purposes (exists independent of above segments). +`fmq_shm__region_` - names of unmanaged regions. +`fmq_shm__region_queue_` - names of queues for the unmanaged regions. diff --git a/fairmq/shmem/Region.cxx b/fairmq/shmem/Region.cxx index 58350f04..39ea30b4 100644 --- a/fairmq/shmem/Region.cxx +++ b/fairmq/shmem/Region.cxx @@ -10,7 +10,7 @@ #include "Common.h" #include "Manager.h" -#include +#include #include @@ -30,8 +30,8 @@ Region::Region(Manager& manager, uint64_t id, uint64_t size, bool remote, FairMQ : fManager(manager) , fRemote(remote) , fStop(false) - , fName("fmq_shm_region_" + to_string(id)) - , fQueueName("fmq_shm_region_queue_" + to_string(id)) + , fName("fmq_shm_" + fManager.fSessionName +"_region_" + to_string(id)) + , fQueueName("fmq_shm_" + fManager.fSessionName +"_region_queue_" + to_string(id)) , fShmemObject() , fQueue(nullptr) , fWorker() diff --git a/fairmq/shmem/runMonitor.cxx b/fairmq/shmem/runMonitor.cxx index c619ab76..41f9b338 100644 --- a/fairmq/shmem/runMonitor.cxx +++ b/fairmq/shmem/runMonitor.cxx @@ -19,7 +19,7 @@ int main(int argc, char** argv) { try { - string segmentName; + string sessionName; bool cleanup = false; bool selfDestruct = false; bool interactive = false; @@ -27,7 +27,7 @@ int main(int argc, char** argv) options_description desc("Options"); desc.add_options() - ("segment-name", value(&segmentName)->default_value("fmq_shm_main"), "Name of the shared memory segment") + ("session", value(&sessionName)->default_value("default"), "Name of the session which to monitor") ("cleanup", value(&cleanup)->implicit_value(true), "Perform cleanup and quit") ("self-destruct", value(&selfDestruct)->implicit_value(true), "Quit after first closing of the memory") ("interactive", value(&interactive)->implicit_value(true), "Interactive run") @@ -47,15 +47,15 @@ int main(int argc, char** argv) if (cleanup) { - cout << "Cleaning up \"" << segmentName << "\"..." << endl; - fair::mq::shmem::Monitor::Cleanup(segmentName); - fair::mq::shmem::Monitor::RemoveQueue("fmq_shm_control_queue"); + cout << "Cleaning up \"" << sessionName << "\"..." << endl; + fair::mq::shmem::Monitor::Cleanup(sessionName); + fair::mq::shmem::Monitor::RemoveQueue("fmq_shm_" + sessionName + "_control_queue"); return 0; } - cout << "Starting monitor for shared memory segment: \"" << segmentName << "\"..." << endl; + cout << "Starting shared memory monitor for session: \"" << sessionName << "\"..." << endl; - fair::mq::shmem::Monitor monitor{segmentName, selfDestruct, interactive, timeoutInMS}; + fair::mq::shmem::Monitor monitor{sessionName, selfDestruct, interactive, timeoutInMS}; monitor.CatchSignals(); monitor.Run();