extend region config

This commit is contained in:
Alexey Rybalchenko
2021-12-06 23:44:38 +01:00
committed by Dennis Klein
parent eef42d2dea
commit 80ed45df63
7 changed files with 156 additions and 90 deletions

View File

@@ -38,12 +38,13 @@ namespace fair::mq::shmem
struct Region
{
Region(const std::string& shmId, uint16_t id, uint64_t size, bool remote, RegionCallback callback, RegionBulkCallback bulkCallback, RegionConfig cfg)
Region(const std::string& shmId, uint64_t size, bool remote, RegionCallback callback, RegionBulkCallback bulkCallback, RegionConfig cfg)
: fRemote(remote)
, fRemoveOnDestruction(cfg.removeOnDestruction)
, fLinger(cfg.linger)
, fStopAcks(false)
, fName("fmq_" + shmId + "_rg_" + std::to_string(id))
, fQueueName("fmq_" + shmId + "_rgq_" + std::to_string(id))
, fName("fmq_" + shmId + "_rg_" + std::to_string(cfg.id.value()))
, fQueueName("fmq_" + shmId + "_rgq_" + std::to_string(cfg.id.value()))
, fShmemObject()
, fFile(nullptr)
, fFileMapping()
@@ -81,23 +82,46 @@ struct Region
if (fRemote) {
fShmemObject = shared_memory_object(open_only, fName.c_str(), read_write);
} else {
fShmemObject = shared_memory_object(create_only, fName.c_str(), read_write);
fShmemObject.truncate(size);
switch (cfg.constructionMode) {
case RegionConstruction::create:
fShmemObject = shared_memory_object(create_only, fName.c_str(), read_write);
fShmemObject.truncate(size);
break;
case RegionConstruction::open:
fShmemObject = shared_memory_object(open_only, fName.c_str(), read_write);
break;
case RegionConstruction::open_or_create:
try {
fShmemObject = shared_memory_object(open_only, fName.c_str(), read_write);
} catch (interprocess_exception&) {
fShmemObject = shared_memory_object(create_only, fName.c_str(), read_write);
fShmemObject.truncate(size);
}
break;
default:
throw TransportError(tools::ToString("Unknown RegionConstruction mode provided "));
break;
}
}
} catch (interprocess_exception& e) {
LOG(error) << "Failed " << (fRemote ? "opening" : "creating") << " shared_memory_object for region id '" << id << "': " << e.what();
LOG(error) << "Failed " << (fRemote ? "opening" : "creating") << " shared_memory_object for region id '" << cfg.id.value() << "': " << e.what();
throw;
}
try {
fRegion = mapped_region(fShmemObject, read_write, 0, 0, 0, cfg.creationFlags);
} catch (interprocess_exception& e) {
LOG(error) << "Failed mapping shared_memory_object for region id '" << id << "': " << e.what();
LOG(error) << "Failed mapping shared_memory_object for region id '" << cfg.id.value() << "': " << e.what();
throw;
}
}
InitializeQueues();
StartSendingAcks();
// skip queues initialization if region is being created without callbacks passed
if (fRemote || (fCallback || fBulkCallback)) {
InitializeQueues();
StartAckSender();
} else {
LOG(trace) << "skipping queues creation & ack sender thread, because created region locally but no callback were provided";
}
LOG(trace) << "shmem: initialized region: " << fName << " (" << (fRemote ? "remote" : "local") << ")";
}
@@ -112,16 +136,11 @@ struct Region
void InitializeQueues()
{
using namespace boost::interprocess;
if (fRemote) {
fQueue = std::make_unique<message_queue>(open_only, fQueueName.c_str());
} else {
fQueue = std::make_unique<message_queue>(create_only, fQueueName.c_str(), 1024, fAckBunchSize * sizeof(RegionBlock));
}
fQueue = std::make_unique<message_queue>(open_or_create, fQueueName.c_str(), 1024, fAckBunchSize * sizeof(RegionBlock));
LOG(trace) << "shmem: initialized region queue: " << fQueueName << " (" << (fRemote ? "remote" : "local") << ")";
}
void StartSendingAcks()
void StartAckSender()
{
fAcksSender = std::thread(&Region::SendAcks, this);
}
@@ -163,7 +182,7 @@ struct Region
<< " blocks left to send: " << blocksToSend << ").";
}
void StartReceivingAcks()
void StartAckReceiver()
{
if (!fAcksReceiver.joinable()) {
fAcksReceiver = std::thread(&Region::ReceiveAcks, this);
@@ -255,20 +274,26 @@ struct Region
fAcksReceiver.join();
}
if (boost::interprocess::shared_memory_object::remove(fName.c_str())) {
LOG(trace) << "Region '" << fName << "' destroyed.";
}
if (fRemoveOnDestruction) {
if (boost::interprocess::shared_memory_object::remove(fName.c_str())) {
LOG(trace) << "Region '" << fName << "' destroyed.";
}
if (boost::interprocess::file_mapping::remove(fName.c_str())) {
LOG(trace) << "File mapping '" << fName << "' destroyed.";
}
if (fFile) {
fclose(fFile);
if (boost::interprocess::file_mapping::remove(fName.c_str())) {
LOG(trace) << "File mapping '" << fName << "' destroyed.";
}
} else {
LOG(debug) << "Skipping removal of " << fName << " unmanaged region, because RegionConfig::removeOnDestruction is false";
}
if (boost::interprocess::message_queue::remove(fQueueName.c_str())) {
LOG(trace) << "Region queue '" << fQueueName << "' destroyed.";
} else {
LOG(debug) << "Region queue '" << fQueueName << "' not destroyed.";
}
if (fFile) {
fclose(fFile);
}
} else {
// LOG(debug) << "Region queue '" << fQueueName << "' is remote, no cleanup necessary";
@@ -278,6 +303,7 @@ struct Region
}
bool fRemote;
bool fRemoveOnDestruction;
uint32_t fLinger;
std::atomic<bool> fStopAcks;
std::string fName;