FairMQ: Add uuid generator to tools & let tests use it for session names.

This commit is contained in:
Alexey Rybalchenko
2017-11-28 17:25:39 +01:00
committed by Mohammad Al-Turany
parent 85aab51bd4
commit 0dc4000187
14 changed files with 218 additions and 67 deletions

View File

@@ -35,12 +35,12 @@ FairMQ::Transport FairMQTransportFactorySHM::fTransportType = FairMQ::Transport:
FairMQTransportFactorySHM::FairMQTransportFactorySHM(const string& id, const FairMQProgOptions* config)
: FairMQTransportFactory(id)
, fSessionName("default")
, fSessionName()
, fContext(nullptr)
, fHeartbeatSocket(nullptr)
, fHeartbeatThread()
, fSendHeartbeats(true)
, fShMutex(bipc::open_or_create, std::string("fmq_shm_" + fSessionName + "_mutex").c_str())
, fShMutex(nullptr)
, fDeviceCounter(nullptr)
, fManager(nullptr)
{
@@ -62,6 +62,7 @@ FairMQTransportFactorySHM::FairMQTransportFactorySHM(const string& id, const Fai
{
numIoThreads = config->GetValue<int>("io-threads");
fSessionName = config->GetValue<string>("session");
fSessionName.resize(8); // shorten the session name, to acomodate for name size limit on some systems (MacOS)
// fSegmentName = "fmq_shm_" + fSessionName + "_main";
segmentSize = config->GetValue<size_t>("shm-segment-size");
}
@@ -70,56 +71,66 @@ FairMQTransportFactorySHM::FairMQTransportFactorySHM(const string& id, const Fai
LOG(WARN) << "shmem: FairMQProgOptions not available! Using defaults.";
}
if (zmq_ctx_set(fContext, ZMQ_IO_THREADS, numIoThreads) != 0)
try
{
LOG(ERROR) << "shmem: failed configuring context, reason: " << zmq_strerror(errno);
}
fShMutex = fair::mq::tools::make_unique<bipc::named_mutex>(bipc::open_or_create, std::string("fmq_shm_" + fSessionName + "_mutex").c_str());
// Set the maximum number of allowed sockets on the context.
if (zmq_ctx_set(fContext, ZMQ_MAX_SOCKETS, 10000) != 0)
{
LOG(ERROR) << "shmem: failed configuring context, reason: " << zmq_strerror(errno);
}
fManager = fair::mq::tools::make_unique<Manager>(fSessionName, segmentSize);
LOG(DEBUG) << "shmem: created/opened shared memory segment of " << segmentSize << " bytes. Available are " << fManager->Segment().get_free_memory() << " bytes.";
{
bipc::scoped_lock<bipc::named_mutex> lock(fShMutex);
fDeviceCounter = fManager->Segment().find<DeviceCounter>(bipc::unique_instance).first;
if (fDeviceCounter)
if (zmq_ctx_set(fContext, ZMQ_IO_THREADS, numIoThreads) != 0)
{
LOG(DEBUG) << "shmem: device counter found, with value of " << fDeviceCounter->fCount << ". incrementing.";
(fDeviceCounter->fCount)++;
LOG(DEBUG) << "shmem: incremented device counter, now: " << fDeviceCounter->fCount;
}
else
{
LOG(DEBUG) << "shmem: no device counter found, creating one and initializing with 1";
fDeviceCounter = fManager->Segment().construct<DeviceCounter>(bipc::unique_instance)(1);
LOG(DEBUG) << "shmem: initialized device counter with: " << fDeviceCounter->fCount;
LOG(ERROR) << "shmem: failed configuring context, reason: " << zmq_strerror(errno);
}
// start shm monitor
// try
// {
// MonitorStatus* monitorStatus = fManagementSegment.find<MonitorStatus>(bipc::unique_instance).first;
// if (monitorStatus == nullptr)
// {
// LOG(DEBUG) << "shmem: no shmmonitor found, starting...";
// StartMonitor();
// }
// else
// {
// LOG(DEBUG) << "shmem: found shmmonitor.";
// }
// }
// catch (std::exception& e)
// {
// LOG(ERROR) << "shmem: Exception during shmmonitor initialization: " << e.what() << ", application will now exit";
// exit(EXIT_FAILURE);
// }
// Set the maximum number of allowed sockets on the context.
if (zmq_ctx_set(fContext, ZMQ_MAX_SOCKETS, 10000) != 0)
{
LOG(ERROR) << "shmem: failed configuring context, reason: " << zmq_strerror(errno);
}
fManager = fair::mq::tools::make_unique<Manager>(fSessionName, segmentSize);
LOG(DEBUG) << "shmem: created/opened shared memory segment of " << segmentSize << " bytes. Available are " << fManager->Segment().get_free_memory() << " bytes.";
{
bipc::scoped_lock<bipc::named_mutex> lock(*fShMutex);
fDeviceCounter = fManager->Segment().find<DeviceCounter>(bipc::unique_instance).first;
if (fDeviceCounter)
{
LOG(DEBUG) << "shmem: device counter found, with value of " << fDeviceCounter->fCount << ". incrementing.";
(fDeviceCounter->fCount)++;
LOG(DEBUG) << "shmem: incremented device counter, now: " << fDeviceCounter->fCount;
}
else
{
LOG(DEBUG) << "shmem: no device counter found, creating one and initializing with 1";
fDeviceCounter = fManager->Segment().construct<DeviceCounter>(bipc::unique_instance)(1);
LOG(DEBUG) << "shmem: initialized device counter with: " << fDeviceCounter->fCount;
}
// start shm monitor
// try
// {
// MonitorStatus* monitorStatus = fManagementSegment.find<MonitorStatus>(bipc::unique_instance).first;
// if (monitorStatus == nullptr)
// {
// LOG(DEBUG) << "shmem: no shmmonitor found, starting...";
// StartMonitor();
// }
// else
// {
// LOG(DEBUG) << "shmem: found shmmonitor.";
// }
// }
// catch (std::exception& e)
// {
// LOG(ERROR) << "shmem: Exception during shmmonitor initialization: " << e.what() << ", application will now exit";
// exit(EXIT_FAILURE);
// }
}
}
catch(bipc::interprocess_exception& e)
{
LOG(ERROR) << "Could not initialize shared memory transport: " << e.what();
throw runtime_error("Cannot update configuration. Socket method (bind/connect) not specified.");
}
fSendHeartbeats = true;
@@ -267,7 +278,7 @@ FairMQTransportFactorySHM::~FairMQTransportFactorySHM()
bool lastRemoved = false;
{ // mutex scope
bipc::scoped_lock<bipc::named_mutex> lock(fShMutex);
bipc::scoped_lock<bipc::named_mutex> lock(*fShMutex);
(fDeviceCounter->fCount)--;

View File

@@ -60,7 +60,7 @@ class FairMQTransportFactorySHM : public FairMQTransportFactory
void* fHeartbeatSocket;
std::thread fHeartbeatThread;
std::atomic<bool> fSendHeartbeats;
boost::interprocess::named_mutex fShMutex;
std::unique_ptr<boost::interprocess::named_mutex> fShMutex;
fair::mq::shmem::DeviceCounter* fDeviceCounter;
std::unique_ptr<fair::mq::shmem::Manager> fManager;
};