FairMQ: Introduce configurable session name per device.

Session name is given to each device via `--session`,
which must be synchronized from a higher level,
e.g. from start script or command & control entity.
This commit is contained in:
Alexey Rybalchenko 2017-11-27 13:59:15 +01:00 committed by Mohammad Al-Turany
parent 58a312b730
commit eddfd0d1bd
10 changed files with 73 additions and 61 deletions

View File

@ -334,9 +334,9 @@ void FairMQProgOptions::FillOptionDescription(boost::program_options::options_de
("port-range-max", po::value<int >()->default_value(32000), "End of the port range for dynamic initialization.")
("log-to-file", po::value<string>()->default_value(""), "Log output to a file.")
("print-channels", po::value<bool >()->implicit_value(true), "Print registered channel endpoints in a machine-readable format (<channel name>:<min num subchannels>:<max num subchannels>)")
("shm-segment-size", po::value<size_t>()->default_value(2000000000), "shmem transport: size of the shared memory segment (in bytes).")
("shm-segment-name", po::value<string>()->default_value("fmq_shm_main"), "shmem transport: name of the shared memory segment.")
("rate", po::value<float >()->default_value(0.), "rate for conditional run loop (Hz)")
("shm-segment-size", po::value<size_t>()->default_value(2000000000), "Shared memory: size of the shared memory segment (in bytes).")
("rate", po::value<float >()->default_value(0.), "Rate for conditional run loop (Hz).")
("session", po::value<string>()->default_value("default"), "Session name.")
;
}

View File

@ -35,13 +35,13 @@ FairMQ::Transport FairMQTransportFactorySHM::fTransportType = FairMQ::Transport:
FairMQTransportFactorySHM::FairMQTransportFactorySHM(const string& id, const FairMQProgOptions* config)
: FairMQTransportFactory(id)
, fSessionName("default")
, fContext(nullptr)
, fHeartbeatSocket(nullptr)
, fHeartbeatThread()
, fSendHeartbeats(true)
, fShMutex(bipc::open_or_create, "fmq_shm_mutex")
, fShMutex(bipc::open_or_create, std::string("fmq_shm_" + fSessionName + "_mutex").c_str())
, fDeviceCounter(nullptr)
, fSegmentName()
, fManager(nullptr)
{
int major, minor, patch;
@ -57,12 +57,12 @@ FairMQTransportFactorySHM::FairMQTransportFactorySHM(const string& id, const Fai
}
int numIoThreads = 1;
fSegmentName = "fmq_shm_main";
size_t segmentSize = 2000000000;
if (config)
{
numIoThreads = config->GetValue<int>("io-threads");
fSegmentName = config->GetValue<string>("shm-segment-name");
fSessionName = config->GetValue<string>("session");
// fSegmentName = "fmq_shm_" + fSessionName + "_main";
segmentSize = config->GetValue<size_t>("shm-segment-size");
}
else
@ -81,7 +81,7 @@ FairMQTransportFactorySHM::FairMQTransportFactorySHM(const string& id, const Fai
LOG(ERROR) << "shmem: failed configuring context, reason: " << zmq_strerror(errno);
}
fManager = fair::mq::tools::make_unique<Manager>(fSegmentName, segmentSize);
fManager = fair::mq::tools::make_unique<Manager>(fSessionName, segmentSize);
LOG(DEBUG) << "shmem: created/opened shared memory segment of " << segmentSize << " bytes. Available are " << fManager->Segment().get_free_memory() << " bytes.";
{
@ -112,7 +112,7 @@ FairMQTransportFactorySHM::FairMQTransportFactorySHM(const string& id, const Fai
// }
// else
// {
// LOG(DEBUG) << "shmem: found shmmonitor in fmq_shm_management.";
// LOG(DEBUG) << "shmem: found shmmonitor.";
// }
// }
// catch (std::exception& e)
@ -163,11 +163,12 @@ void FairMQTransportFactorySHM::StartMonitor()
void FairMQTransportFactorySHM::SendHeartbeats()
{
string controlQueueName("fmq_shm_" + fSessionName + "_control_queue");
while (fSendHeartbeats)
{
try
{
bipc::message_queue mq(bipc::open_only, "fmq_shm_control_queue");
bipc::message_queue mq(bipc::open_only, controlQueueName.c_str());
bool heartbeat = true;
bpt::ptime sndTill = bpt::microsec_clock::universal_time() + bpt::milliseconds(100);
if (mq.timed_send(&heartbeat, sizeof(heartbeat), 0, sndTill))
@ -182,7 +183,7 @@ void FairMQTransportFactorySHM::SendHeartbeats()
catch (bipc::interprocess_exception& ie)
{
this_thread::sleep_for(chrono::milliseconds(500));
// LOG(WARN) << "no fmq_shm_control_queue found";
// LOG(WARN) << "no " << controlQueueName << " found";
}
}
}
@ -272,20 +273,20 @@ FairMQTransportFactorySHM::~FairMQTransportFactorySHM()
if (fDeviceCounter->fCount == 0)
{
LOG(DEBUG) << "shmem: last " << fSegmentName << " user, removing segment.";
LOG(DEBUG) << "shmem: last segment user, removing segment.";
fManager->RemoveSegment();
lastRemoved = true;
}
else
{
LOG(DEBUG) << "shmem: other " << fSegmentName << " users present (" << fDeviceCounter->fCount << "), not removing it.";
LOG(DEBUG) << "shmem: other segment users present (" << fDeviceCounter->fCount << "), not removing it.";
}
}
if (lastRemoved)
{
boost::interprocess::named_mutex::remove("fmq_shm_mutex");
boost::interprocess::named_mutex::remove(std::string("fmq_shm_" + fSessionName + "_mutex").c_str());
}
}

View File

@ -55,13 +55,13 @@ class FairMQTransportFactorySHM : public FairMQTransportFactory
void StartMonitor();
static FairMQ::Transport fTransportType;
std::string fSessionName;
void* fContext;
void* fHeartbeatSocket;
std::thread fHeartbeatThread;
std::atomic<bool> fSendHeartbeats;
boost::interprocess::named_mutex fShMutex;
fair::mq::shmem::DeviceCounter* fDeviceCounter;
std::string fSegmentName;
std::unique_ptr<fair::mq::shmem::Manager> fManager;
};

View File

@ -20,9 +20,11 @@ using namespace std;
namespace bipc = boost::interprocess;
Manager::Manager(const string& name, size_t size)
: fName(name)
, fSegment(bipc::open_or_create, fName.c_str(), size)
, fManagementSegment(bipc::open_or_create, "fmq_shm_management", 65536)
: fSessionName(name)
, fSegmentName("fmq_shm_" + fSessionName + "_main")
, fManagementSegmentName("fmq_shm_" + fSessionName + "_management")
, fSegment(bipc::open_or_create, fSegmentName.c_str(), size)
, fManagementSegment(bipc::open_or_create, fManagementSegmentName.c_str(), 65536)
, fRegions()
{}
@ -112,22 +114,22 @@ bipc::message_queue* Manager::GetRegionQueue(const uint64_t id)
void Manager::RemoveSegment()
{
if (bipc::shared_memory_object::remove(fName.c_str()))
if (bipc::shared_memory_object::remove(fSegmentName.c_str()))
{
LOG(DEBUG) << "shmem: successfully removed " << fName << " segment after the device has stopped.";
LOG(DEBUG) << "shmem: successfully removed " << fSegmentName << " segment after the device has stopped.";
}
else
{
LOG(DEBUG) << "shmem: did not remove " << fName << " segment after the device stopped. Already removed?";
LOG(DEBUG) << "shmem: did not remove " << fSegmentName << " segment after the device stopped. Already removed?";
}
if (bipc::shared_memory_object::remove("fmq_shm_management"))
if (bipc::shared_memory_object::remove(fManagementSegmentName.c_str()))
{
LOG(DEBUG) << "shmem: successfully removed \"fmq_shm_management\" segment after the device has stopped.";
LOG(DEBUG) << "shmem: successfully removed '" << fManagementSegmentName << "' segment after the device has stopped.";
}
else
{
LOG(DEBUG) << "shmem: did not remove \"fmq_shm_management\" segment after the device stopped. Already removed?";
LOG(DEBUG) << "shmem: did not remove '" << fManagementSegmentName << "' segment after the device stopped. Already removed?";
}
}

View File

@ -68,7 +68,9 @@ class Manager
boost::interprocess::managed_shared_memory& ManagementSegment();
private:
std::string fName;
std::string fSessionName;
std::string fSegmentName;
std::string fManagementSegmentName;
boost::interprocess::managed_shared_memory fSegment;
boost::interprocess::managed_shared_memory fManagementSegment;
std::unordered_map<uint64_t, Region> fRegions;

View File

@ -51,17 +51,20 @@ void signalHandler(int signal)
gSignalStatus = signal;
}
Monitor::Monitor(const string& segmentName, bool selfDestruct, bool interactive, unsigned int timeoutInMS)
Monitor::Monitor(const string& sessionName, bool selfDestruct, bool interactive, unsigned int timeoutInMS)
: fSelfDestruct(selfDestruct)
, fInteractive(interactive)
, fSeenOnce(false)
, fTimeoutInMS(timeoutInMS)
, fSegmentName(segmentName)
, fSessionName(sessionName)
, fSegmentName("fmq_shm_" + fSessionName + "_main")
, fManagementSegmentName("fmq_shm_" + fSessionName + "_management")
, fControlQueueName("fmq_shm_" + fSessionName + "_control_queue")
, fTerminating(false)
, fHeartbeatTriggered(false)
, fLastHeartbeat()
, fSignalThread()
, fManagementSegment(bipc::open_or_create, "fmq_shm_management", 65536)
, fManagementSegment(bipc::open_or_create, fManagementSegmentName.c_str(), 65536)
{
MonitorStatus* monitorStatus = fManagementSegment.find<MonitorStatus>(bipc::unique_instance).first;
if (monitorStatus != nullptr)
@ -71,7 +74,7 @@ Monitor::Monitor(const string& segmentName, bool selfDestruct, bool interactive,
}
fManagementSegment.construct<MonitorStatus>(bipc::unique_instance)();
RemoveQueue("fmq_shm_control_queue");
RemoveQueue(fControlQueueName);
}
void Monitor::CatchSignals()
@ -124,7 +127,7 @@ void Monitor::MonitorHeartbeats()
{
try
{
bipc::message_queue mq(bipc::open_or_create, "fmq_shm_control_queue", 1000, sizeof(bool));
bipc::message_queue mq(bipc::open_or_create, fControlQueueName.c_str(), 1000, sizeof(bool));
unsigned int priority;
bipc::message_queue::size_type recvdSize;
@ -149,7 +152,7 @@ void Monitor::MonitorHeartbeats()
cout << ie.what() << endl;
}
RemoveQueue("fmq_shm_control_queue");
RemoveQueue(fControlQueueName);
}
void Monitor::Interactive()
@ -192,7 +195,7 @@ void Monitor::Interactive()
break;
case 'x':
cout << "[x] --> closing shared memory:" << endl;
Cleanup(fSegmentName);
Cleanup(fSessionName);
break;
case 'h':
cout << "[h] --> help:" << endl << endl;
@ -283,7 +286,7 @@ void Monitor::CheckSegment()
if (fHeartbeatTriggered && duration > fTimeoutInMS)
{
cout << "no heartbeats since over " << fTimeoutInMS << " milliseconds, cleaning..." << endl;
Cleanup(fSegmentName);
Cleanup(fSessionName);
fHeartbeatTriggered = false;
if (fSelfDestruct)
{
@ -335,11 +338,12 @@ void Monitor::CheckSegment()
}
}
void Monitor::Cleanup(const string& segmentName)
void Monitor::Cleanup(const string& sessionName)
{
string managementSegmentName("fmq_shm_" + sessionName + "_management");
try
{
bipc::managed_shared_memory managementSegment(bipc::open_only, "fmq_shm_management");
bipc::managed_shared_memory managementSegment(bipc::open_only, managementSegmentName.c_str());
RegionCounter* rc = managementSegment.find<RegionCounter>(bipc::unique_instance).first;
if (rc)
{
@ -347,8 +351,8 @@ void Monitor::Cleanup(const string& segmentName)
unsigned int regionCount = rc->fCount;
for (unsigned int i = 1; i <= regionCount; ++i)
{
RemoveObject("fmq_shm_region_" + to_string(i));
RemoveQueue(std::string("fmq_shm_region_queue_" + std::to_string(i)));
RemoveObject("fmq_shm_" + sessionName + "_region_" + to_string(i));
RemoveQueue(std::string("fmq_shm_" + sessionName + "_region_queue_" + std::to_string(i)));
}
}
else
@ -356,16 +360,16 @@ void Monitor::Cleanup(const string& segmentName)
cout << "shmem: no region counter found. no regions to cleanup." << endl;
}
RemoveObject("fmq_shm_management");
RemoveObject(managementSegmentName.c_str());
}
catch (bipc::interprocess_exception& ie)
{
cout << "Did not find \"fmq_shm_management\" shared memory segment. No regions to cleanup." << endl;
cout << "Did not find '" << managementSegmentName << "' shared memory segment. No regions to cleanup." << endl;
}
RemoveObject(segmentName);
RemoveObject("fmq_shm_" + sessionName + "_main");
boost::interprocess::named_mutex::remove("fmq_shm_mutex");
boost::interprocess::named_mutex::remove(std::string("fmq_shm_" + sessionName + "_mutex").c_str());
cout << endl;
}
@ -401,7 +405,7 @@ void Monitor::PrintQueues()
try
{
bipc::managed_shared_memory segment(bipc::open_only, fSegmentName.c_str());
StringVector* queues = segment.find<StringVector>("fmq_shm_queues").first;
StringVector* queues = segment.find<StringVector>(std::string("fmq_shm_" + fSessionName + "_queues").c_str()).first;
if (queues)
{
cout << "found " << queues->size() << " queue(s):" << endl;

View File

@ -25,7 +25,7 @@ namespace shmem
class Monitor
{
public:
Monitor(const std::string& segmentName, bool selfDestruct, bool interactive, unsigned int timeoutInMS);
Monitor(const std::string& sessionName, bool selfDestruct, bool interactive, unsigned int timeoutInMS);
Monitor(const Monitor&) = delete;
Monitor operator=(const Monitor&) = delete;
@ -35,7 +35,7 @@ class Monitor
virtual ~Monitor();
static void Cleanup(const std::string& segmentName);
static void Cleanup(const std::string& sessionName);
static void RemoveObject(const std::string&);
static void RemoveQueue(const std::string&);
@ -52,7 +52,10 @@ class Monitor
bool fInteractive; // running in interactive mode
bool fSeenOnce; // true is segment has been opened successfully at least once
unsigned int fTimeoutInMS;
std::string fSessionName;
std::string fSegmentName;
std::string fManagementSegmentName;
std::string fControlQueueName;
std::atomic<bool> fTerminating;
std::atomic<bool> fHeartbeatTriggered;
std::chrono::high_resolution_clock::time_point fLastHeartbeat;

View File

@ -12,7 +12,7 @@ The shared memory monitor tool, supplied with the shared memory transport can be
With default arguments the monitor will run indefinitely with no output, and clean up shared memory segment if it is open and no heartbeats from devices arrive within a timeout period. It can be further customized with following parameters:
`--segment-name <arg>`: customize the name of the shared memory segment (default is "fmq_shm_main").
`--session <arg>`: customize the name of the shared memory segment via the session name (default is "default").
`--cleanup`: start monitor, perform cleanup of the memory and quit.
`--self-destruct`: run until the memory segment is closed (either naturally via cleanup performed by devices or in case of a crash (no heartbeats within timeout)).
`--interactive`: run interactively, with detailed segment details and user input for various shmem operations.
@ -27,9 +27,9 @@ The FairMQShmMonitor class can also be used independently from the supplied exec
FairMQ Shared Memory currently uses following names to register shared memory on the system:
`fmq_shm_main` - main segment name, used for user data (this name can be overridden via `--shm-segment-name`).
`fmq_shm_management` - management segment name, used for storing management data.
`fmq_shm_control_queue` - message queue for communicating between shm transport and shm monitor (exists independent of above segments).
`fmq_shm_mutex` - boost::interprocess::named_mutex for management purposes (exists independent of above segments).
`fmq_shm_region_<index>` - names of unmanaged regions.
`fmq_shm_region_queue_<index>` - names of queues for the unmanaged regions.
`fmq_shm_<sessionName>_main` - main segment name, used for user data (session name can be overridden via `--session`).
`fmq_shm_<sessionName>_management` - management segment name, used for storing management data.
`fmq_shm_<sessionName>_control_queue` - message queue for communicating between shm transport and shm monitor (exists independent of above segments).
`fmq_shm_<sessionName>_mutex` - boost::interprocess::named_mutex for management purposes (exists independent of above segments).
`fmq_shm_<sessionName>_region_<index>` - names of unmanaged regions.
`fmq_shm_<sessionName>_region_queue_<index>` - names of queues for the unmanaged regions.

View File

@ -10,7 +10,7 @@
#include "Common.h"
#include "Manager.h"
#include <boost/thread/thread_time.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
#include <chrono>
@ -30,8 +30,8 @@ Region::Region(Manager& manager, uint64_t id, uint64_t size, bool remote, FairMQ
: fManager(manager)
, fRemote(remote)
, fStop(false)
, fName("fmq_shm_region_" + to_string(id))
, fQueueName("fmq_shm_region_queue_" + to_string(id))
, fName("fmq_shm_" + fManager.fSessionName +"_region_" + to_string(id))
, fQueueName("fmq_shm_" + fManager.fSessionName +"_region_queue_" + to_string(id))
, fShmemObject()
, fQueue(nullptr)
, fWorker()

View File

@ -19,7 +19,7 @@ int main(int argc, char** argv)
{
try
{
string segmentName;
string sessionName;
bool cleanup = false;
bool selfDestruct = false;
bool interactive = false;
@ -27,7 +27,7 @@ int main(int argc, char** argv)
options_description desc("Options");
desc.add_options()
("segment-name", value<string>(&segmentName)->default_value("fmq_shm_main"), "Name of the shared memory segment")
("session", value<string>(&sessionName)->default_value("default"), "Name of the session which to monitor")
("cleanup", value<bool>(&cleanup)->implicit_value(true), "Perform cleanup and quit")
("self-destruct", value<bool>(&selfDestruct)->implicit_value(true), "Quit after first closing of the memory")
("interactive", value<bool>(&interactive)->implicit_value(true), "Interactive run")
@ -47,15 +47,15 @@ int main(int argc, char** argv)
if (cleanup)
{
cout << "Cleaning up \"" << segmentName << "\"..." << endl;
fair::mq::shmem::Monitor::Cleanup(segmentName);
fair::mq::shmem::Monitor::RemoveQueue("fmq_shm_control_queue");
cout << "Cleaning up \"" << sessionName << "\"..." << endl;
fair::mq::shmem::Monitor::Cleanup(sessionName);
fair::mq::shmem::Monitor::RemoveQueue("fmq_shm_" + sessionName + "_control_queue");
return 0;
}
cout << "Starting monitor for shared memory segment: \"" << segmentName << "\"..." << endl;
cout << "Starting shared memory monitor for session: \"" << sessionName << "\"..." << endl;
fair::mq::shmem::Monitor monitor{segmentName, selfDestruct, interactive, timeoutInMS};
fair::mq::shmem::Monitor monitor{sessionName, selfDestruct, interactive, timeoutInMS};
monitor.CatchSignals();
monitor.Run();