mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-13 16:46:47 +00:00
shm: always open_or_create segment
This commit is contained in:
parent
9f9583eb55
commit
eb4620b1ec
|
@ -195,11 +195,8 @@ class Manager
|
|||
fHeartbeatThread = std::thread(&Manager::Heartbeats, this);
|
||||
|
||||
try {
|
||||
std::stringstream ss;
|
||||
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> lock(*fShmMtx);
|
||||
|
||||
fShmSegments = fManagementSegment.find_or_construct<Uint16SegmentInfoHashMap>(unique_instance)(fShmVoidAlloc);
|
||||
|
||||
SessionInfo* sessionInfo = fManagementSegment.find<SessionInfo>(unique_instance).first;
|
||||
if (sessionInfo) {
|
||||
LOG(debug) << "session info found, name: " << sessionInfo->fSessionName << ", creator id: " << sessionInfo->fCreatorId;
|
||||
|
@ -218,8 +215,6 @@ class Manager
|
|||
LOG(debug) << "initialized event counter with: " << fEventCounter->fCount;
|
||||
}
|
||||
|
||||
fShmRegions = fManagementSegment.find_or_construct<Uint16RegionInfoHashMap>(unique_instance)(fShmVoidAlloc);
|
||||
|
||||
fDeviceCounter = fManagementSegment.find<DeviceCounter>(unique_instance).first;
|
||||
if (fDeviceCounter) {
|
||||
LOG(debug) << "device counter found, with value of " << fDeviceCounter->fCount << ". incrementing.";
|
||||
|
@ -231,22 +226,21 @@ class Manager
|
|||
LOG(debug) << "initialized device counter with: " << fDeviceCounter->fCount;
|
||||
}
|
||||
|
||||
std::string op("create/open");
|
||||
fShmSegments = fManagementSegment.find_or_construct<Uint16SegmentInfoHashMap>(unique_instance)(fShmVoidAlloc);
|
||||
fShmRegions = fManagementSegment.find_or_construct<Uint16RegionInfoHashMap>(unique_instance)(fShmVoidAlloc);
|
||||
|
||||
try {
|
||||
std::string segmentName("fmq_" + fShmId + "_m_" + std::to_string(fSegmentId));
|
||||
auto it = fShmSegments->find(fSegmentId);
|
||||
if (it == fShmSegments->end()) {
|
||||
op = "create";
|
||||
// no segment with given id exists, creating
|
||||
if (allocationAlgorithm == "rbtree_best_fit") {
|
||||
fSegments.emplace(fSegmentId, RBTreeBestFitSegment(create_only, segmentName.c_str(), size));
|
||||
fSegments.emplace(fSegmentId, RBTreeBestFitSegment(open_or_create, segmentName.c_str(), size));
|
||||
fShmSegments->emplace(fSegmentId, AllocationAlgorithm::rbtree_best_fit);
|
||||
} else if (allocationAlgorithm == "simple_seq_fit") {
|
||||
fSegments.emplace(fSegmentId, SimpleSeqFitSegment(create_only, segmentName.c_str(), size));
|
||||
fSegments.emplace(fSegmentId, SimpleSeqFitSegment(open_or_create, segmentName.c_str(), size));
|
||||
fShmSegments->emplace(fSegmentId, AllocationAlgorithm::simple_seq_fit);
|
||||
}
|
||||
ss << "Created ";
|
||||
(fEventCounter->fCount)++;
|
||||
if (mlockSegmentOnCreation) {
|
||||
MlockSegment(fSegmentId);
|
||||
|
@ -255,31 +249,28 @@ class Manager
|
|||
ZeroSegment(fSegmentId);
|
||||
}
|
||||
} else {
|
||||
op = "open";
|
||||
// found segment with the given id, opening
|
||||
if (it->second.fAllocationAlgorithm == AllocationAlgorithm::rbtree_best_fit) {
|
||||
fSegments.emplace(fSegmentId, RBTreeBestFitSegment(open_only, segmentName.c_str()));
|
||||
fSegments.emplace(fSegmentId, RBTreeBestFitSegment(open_or_create, segmentName.c_str(), size));
|
||||
if (allocationAlgorithm != "rbtree_best_fit") {
|
||||
LOG(warn) << "Allocation algorithm of the opened segment is rbtree_best_fit, but requested is " << allocationAlgorithm << ". Ignoring requested setting.";
|
||||
allocationAlgorithm = "rbtree_best_fit";
|
||||
}
|
||||
} else {
|
||||
fSegments.emplace(fSegmentId, SimpleSeqFitSegment(open_only, segmentName.c_str()));
|
||||
fSegments.emplace(fSegmentId, SimpleSeqFitSegment(open_or_create, segmentName.c_str(), size));
|
||||
if (allocationAlgorithm != "simple_seq_fit") {
|
||||
LOG(warn) << "Allocation algorithm of the opened segment is simple_seq_fit, but requested is " << allocationAlgorithm << ". Ignoring requested setting.";
|
||||
allocationAlgorithm = "simple_seq_fit";
|
||||
}
|
||||
}
|
||||
ss << "Opened ";
|
||||
}
|
||||
ss << "shared memory segment '" << "fmq_" << fShmId << "_m_" << fSegmentId << "'."
|
||||
LOG(debug) << "Created/opened shared memory segment '" << "fmq_" << fShmId << "_m_" << fSegmentId << "'."
|
||||
<< " Size: " << boost::apply_visitor(SegmentSize(), fSegments.at(fSegmentId)) << " bytes."
|
||||
<< " Available: " << boost::apply_visitor(SegmentFreeMemory(), fSegments.at(fSegmentId)) << " bytes."
|
||||
<< " Allocation algorithm: " << allocationAlgorithm;
|
||||
LOG(debug) << ss.str();
|
||||
} catch (interprocess_exception& bie) {
|
||||
LOG(error) << "Failed to " << op << " shared memory segment (" << "fmq_" << fShmId << "_m_" << fSegmentId << "): " << bie.what();
|
||||
throw TransportError(tools::ToString("Failed to ", op, " shared memory segment (", "fmq_", fShmId, "_m_", fSegmentId, "): ", bie.what()));
|
||||
LOG(error) << "Failed to create/open shared memory segment '" << "fmq_" << fShmId << "_m_" << fSegmentId << "': " << bie.what();
|
||||
throw TransportError(tools::ToString("Failed to create/open shared memory segment '", "fmq_", fShmId, "_m_", fSegmentId, "': ", bie.what()));
|
||||
}
|
||||
|
||||
if (mlockSegment) {
|
||||
|
@ -817,7 +808,7 @@ class Manager
|
|||
uint64_t fShmId64;
|
||||
std::string fShmId;
|
||||
uint16_t fSegmentId;
|
||||
std::unordered_map<uint16_t, boost::variant<RBTreeBestFitSegment, SimpleSeqFitSegment>> fSegments;
|
||||
std::unordered_map<uint16_t, boost::variant<RBTreeBestFitSegment, SimpleSeqFitSegment>> fSegments; // TODO: can use Segment class directly here
|
||||
boost::interprocess::managed_shared_memory fManagementSegment;
|
||||
VoidAlloc fShmVoidAlloc;
|
||||
boost::interprocess::interprocess_mutex* fShmMtx;
|
||||
|
|
Loading…
Reference in New Issue
Block a user