Shm: initial multiple segments support

This commit is contained in:
Alexey Rybalchenko
2020-08-25 21:04:02 +02:00
parent b126ede45a
commit 266843cda5
11 changed files with 159 additions and 124 deletions

View File

@@ -60,6 +60,7 @@ class Manager
public:
Manager(std::string shmId, std::string deviceId, size_t size, const ProgOptions* config)
: fShmId(std::move(shmId))
, fSegmentId(0)
, fDeviceId(std::move(deviceId))
, fSegments()
, fManagementSegment(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_mng").c_str(), 6553600)
@@ -88,6 +89,7 @@ class Manager
std::string allocationAlgorithm("rbtree_best_fit");
if (config) {
mlockSegment = config->GetProperty<bool>("shm-mlock-segment", mlockSegment);
fSegmentId = config->GetProperty<uint16_t>("shm-segment-id", fSegmentId);
zeroSegment = config->GetProperty<bool>("shm-zero-segment", zeroSegment);
autolaunchMonitor = config->GetProperty<bool>("shm-monitor", autolaunchMonitor);
fThrowOnBadAlloc = config->GetProperty<bool>("shm-throw-bad-alloc", fThrowOnBadAlloc);
@@ -109,32 +111,30 @@ class Manager
std::stringstream ss;
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
fShmSegments = fManagementSegment.find_or_construct<Uint64SegmentInfoHashMap>(unique_instance)(fShmVoidAlloc);
const uint64_t id = 0;
fShmSegments = fManagementSegment.find_or_construct<Uint16SegmentInfoHashMap>(unique_instance)(fShmVoidAlloc);
try {
auto it = fShmSegments->find(id);
auto it = fShmSegments->find(fSegmentId);
if (it == fShmSegments->end()) {
// no segment with given id exists, creating
if (allocationAlgorithm == "rbtree_best_fit") {
fSegments.emplace(id, RBTreeBestFitSegment(create_only, std::string("fmq_" + fShmId + "_main").c_str(), size));
fShmSegments->emplace(id, AllocationAlgorithm::rbtree_best_fit);
fSegments.emplace(fSegmentId, RBTreeBestFitSegment(create_only, std::string("fmq_" + fShmId + "_m_" + std::to_string(fSegmentId)).c_str(), size));
fShmSegments->emplace(fSegmentId, AllocationAlgorithm::rbtree_best_fit);
} else if (allocationAlgorithm == "simple_seq_fit") {
fSegments.emplace(id, SimpleSeqFitSegment(create_only, std::string("fmq_" + fShmId + "_main").c_str(), size));
fShmSegments->emplace(id, AllocationAlgorithm::simple_seq_fit);
fSegments.emplace(fSegmentId, SimpleSeqFitSegment(create_only, std::string("fmq_" + fShmId + "_m_" + std::to_string(fSegmentId)).c_str(), size));
fShmSegments->emplace(fSegmentId, AllocationAlgorithm::simple_seq_fit);
}
ss << "Created ";
} else {
// found segment with the given id, opening
if (it->second.fAllocationAlgorithm == AllocationAlgorithm::rbtree_best_fit) {
fSegments.emplace(id, RBTreeBestFitSegment(open_only, std::string("fmq_" + fShmId + "_main").c_str()));
fSegments.emplace(fSegmentId, RBTreeBestFitSegment(open_only, std::string("fmq_" + fShmId + "_m_" + std::to_string(fSegmentId)).c_str()));
if (allocationAlgorithm != "rbtree_best_fit") {
LOG(warn) << "Allocation algorithm of the opened segment is rbtree_best_fit, but requested is " << allocationAlgorithm << ". Ignoring requested setting.";
allocationAlgorithm = "rbtree_best_fit";
}
} else {
fSegments.emplace(id, SimpleSeqFitSegment(open_only, std::string("fmq_" + fShmId + "_main").c_str()));
fSegments.emplace(fSegmentId, SimpleSeqFitSegment(open_only, std::string("fmq_" + fShmId + "_m_" + std::to_string(fSegmentId)).c_str()));
if (allocationAlgorithm != "simple_seq_fit") {
LOG(warn) << "Allocation algorithm of the opened segment is simple_seq_fit, but requested is " << allocationAlgorithm << ". Ignoring requested setting.";
allocationAlgorithm = "simple_seq_fit";
@@ -142,33 +142,33 @@ class Manager
}
ss << "Opened ";
}
ss << "shared memory segment '" << "fmq_" << fShmId << "_main_" << id << "'."
<< " Size: " << boost::apply_visitor(SegmentSize{}, fSegments.at(id)) << " bytes."
<< " Available: " << boost::apply_visitor(SegmentFreeMemory{}, fSegments.at(id)) << " bytes."
ss << "shared memory segment '" << "fmq_" << fShmId << "_m_" << fSegmentId << "'."
<< " Size: " << boost::apply_visitor(SegmentSize{}, fSegments.at(fSegmentId)) << " bytes."
<< " Available: " << boost::apply_visitor(SegmentFreeMemory{}, fSegments.at(fSegmentId)) << " bytes."
<< " Allocation algorithm: " << allocationAlgorithm;
LOG(debug) << ss.str();
} catch(interprocess_exception&) {
LOG(error) << "something went wrong";
} catch(interprocess_exception& bie) {
LOG(error) << "something went wrong: " << bie.what();
}
}
if (mlockSegment) {
LOG(debug) << "Locking the managed segment memory pages...";
if (mlock(boost::apply_visitor(SegmentAddress{}, fSegments.at(0)), boost::apply_visitor(SegmentSize{}, fSegments.at(0))) == -1) {
if (mlock(boost::apply_visitor(SegmentAddress{}, fSegments.at(fSegmentId)), boost::apply_visitor(SegmentSize{}, fSegments.at(fSegmentId))) == -1) {
LOG(error) << "Could not lock the managed segment memory. Code: " << errno << ", reason: " << strerror(errno);
}
LOG(debug) << "Successfully locked the managed segment memory pages.";
}
if (zeroSegment) {
LOG(debug) << "Zeroing the managed segment free memory...";
boost::apply_visitor(SegmentMemoryZeroer{}, fSegments.at(0));
boost::apply_visitor(SegmentMemoryZeroer{}, fSegments.at(fSegmentId));
LOG(debug) << "Successfully zeroed the managed segment free memory.";
}
{
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
fShmRegions = fManagementSegment.find_or_construct<Uint64RegionInfoHashMap>(unique_instance)(fShmVoidAlloc);
fShmRegions = fManagementSegment.find_or_construct<Uint16RegionInfoHashMap>(unique_instance)(fShmVoidAlloc);
#ifdef FAIRMQ_DEBUG_MODE
fMsgDebug = fManagementSegment.find_or_construct<SizetMsgDebugMap>(unique_instance)(fShmVoidAlloc);
@@ -258,7 +258,7 @@ class Manager
}
bool Interrupted() { return fInterrupted.load(); }
std::pair<boost::interprocess::mapped_region*, uint64_t> CreateRegion(const size_t size,
std::pair<boost::interprocess::mapped_region*, uint16_t> CreateRegion(const size_t size,
const int64_t userFlags,
RegionCallback callback,
RegionBulkCallback bulkCallback,
@@ -267,10 +267,10 @@ class Manager
{
using namespace boost::interprocess;
try {
std::pair<mapped_region*, uint64_t> result;
std::pair<mapped_region*, uint16_t> result;
{
uint64_t id = 0;
uint16_t id = 0;
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
RegionCounter* rc = fManagementSegment.find<RegionCounter>(unique_instance).first;
@@ -314,13 +314,13 @@ class Manager
}
}
Region* GetRegion(const uint64_t id)
Region* GetRegion(const uint16_t id)
{
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
return GetRegionUnsafe(id);
}
Region* GetRegionUnsafe(const uint64_t id)
Region* GetRegionUnsafe(const uint16_t id)
{
// remote region could actually be a local one if a message originates from this device (has been sent out and returned)
auto it = fRegions.find(id);
@@ -347,7 +347,7 @@ class Manager
}
}
void RemoveRegion(const uint64_t id)
void RemoveRegion(const uint16_t id)
{
fRegions.erase(id);
{
@@ -495,13 +495,37 @@ class Manager
bool ThrowingOnBadAlloc() const { return fThrowOnBadAlloc; }
boost::interprocess::managed_shared_memory::handle_t GetHandleFromAddress(const void* ptr) const
void GetSegment(uint16_t id)
{
return boost::apply_visitor(SegmentHandleFromAddress{ptr}, fSegments.at(0));
auto it = fSegments.find(id);
if (it == fSegments.end()) {
try {
// get region info
SegmentInfo segmentInfo = fShmSegments->at(id);
LOG(info) << "LOCATED SEGMENT WITH ID '" << id << "'";
using namespace boost::interprocess;
if (segmentInfo.fAllocationAlgorithm == AllocationAlgorithm::rbtree_best_fit) {
fSegments.emplace(id, RBTreeBestFitSegment(open_only, std::string("fmq_" + fShmId + "_m_" + std::to_string(id)).c_str()));
} else {
fSegments.emplace(id, SimpleSeqFitSegment(open_only, std::string("fmq_" + fShmId + "_m_" + std::to_string(id)).c_str()));
}
} catch (std::out_of_range& oor) {
LOG(error) << "Could not get segment with id '" << id << "': " << oor.what();
} catch (boost::interprocess::interprocess_exception& bie) {
LOG(error) << "Could not get segment with id '" << id << "': " << bie.what();
}
}
}
void* GetAddressFromHandle(const boost::interprocess::managed_shared_memory::handle_t handle) const
boost::interprocess::managed_shared_memory::handle_t GetHandleFromAddress(const void* ptr, uint16_t segmentId) const
{
return boost::apply_visitor(SegmentAddressFromHandle{handle}, fSegments.at(0));
return boost::apply_visitor(SegmentHandleFromAddress{ptr}, fSegments.at(segmentId));
}
void* GetAddressFromHandle(const boost::interprocess::managed_shared_memory::handle_t handle, uint16_t segmentId) const
{
return boost::apply_visitor(SegmentAddressFromHandle{handle}, fSegments.at(segmentId));
}
char* Allocate(const size_t size, size_t alignment = 0)
@@ -513,20 +537,20 @@ class Manager
try {
// boost::interprocess::managed_shared_memory::size_type actualSize = size;
// char* hint = 0; // unused for boost::interprocess::allocate_new
// ptr = fSegments.at(0).allocation_command<char>(boost::interprocess::allocate_new, size, actualSize, hint);
size_t segmentSize = boost::apply_visitor(SegmentSize{}, fSegments.at(0));
// ptr = fSegments.at(fSegmentId).allocation_command<char>(boost::interprocess::allocate_new, size, actualSize, hint);
size_t segmentSize = boost::apply_visitor(SegmentSize{}, fSegments.at(fSegmentId));
if (size > segmentSize) {
throw MessageBadAlloc(tools::ToString("Requested message size (", size, ") exceeds segment size (", segmentSize, ")"));
}
if (alignment == 0) {
ptr = reinterpret_cast<char*>(boost::apply_visitor(SegmentAllocate{size}, fSegments.at(0)));
ptr = reinterpret_cast<char*>(boost::apply_visitor(SegmentAllocate{size}, fSegments.at(fSegmentId)));
} else {
ptr = reinterpret_cast<char*>(boost::apply_visitor(SegmentAllocateAligned{size, alignment}, fSegments.at(0)));
ptr = reinterpret_cast<char*>(boost::apply_visitor(SegmentAllocateAligned{size, alignment}, fSegments.at(fSegmentId)));
}
} catch (boost::interprocess::bad_alloc& ba) {
// LOG(warn) << "Shared memory full...";
if (ThrowingOnBadAlloc()) {
throw MessageBadAlloc(tools::ToString("shmem: could not create a message of size ", size, ", alignment: ", (alignment != 0) ? std::to_string(alignment) : "default", ", free memory: ", boost::apply_visitor(SegmentFreeMemory{}, fSegments.at(0))));
throw MessageBadAlloc(tools::ToString("shmem: could not create a message of size ", size, ", alignment: ", (alignment != 0) ? std::to_string(alignment) : "default", ", free memory: ", boost::apply_visitor(SegmentFreeMemory{}, fSegments.at(fSegmentId))));
}
// rateLimiter.maybe_sleep();
std::this_thread::sleep_for(std::chrono::milliseconds(50));
@@ -546,9 +570,9 @@ class Manager
return ptr;
}
void Deallocate(boost::interprocess::managed_shared_memory::handle_t handle)
void Deallocate(boost::interprocess::managed_shared_memory::handle_t handle, uint16_t segmentId)
{
boost::apply_visitor(SegmentDeallocate{GetAddressFromHandle(handle)}, fSegments.at(0));
boost::apply_visitor(SegmentDeallocate{GetAddressFromHandle(handle, segmentId)}, fSegments.at(segmentId));
#ifdef FAIRMQ_DEBUG_MODE
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
DecrementShmMsgCounter();
@@ -556,11 +580,13 @@ class Manager
#endif
}
char* ShrinkInPlace(size_t oldSize, size_t newSize, char* localPtr)
char* ShrinkInPlace(size_t oldSize, size_t newSize, char* localPtr, uint16_t segmentId)
{
return boost::apply_visitor(SegmentBufferShrink{oldSize, newSize, localPtr}, fSegments.at(0));
return boost::apply_visitor(SegmentBufferShrink{oldSize, newSize, localPtr}, fSegments.at(segmentId));
}
uint16_t GetSegmentId() const { return fSegmentId; }
~Manager()
{
using namespace boost::interprocess;
@@ -599,8 +625,9 @@ class Manager
private:
std::string fShmId;
uint16_t fSegmentId;
std::string fDeviceId;
std::unordered_map<uint64_t, boost::variant<RBTreeBestFitSegment, SimpleSeqFitSegment>> fSegments;
std::unordered_map<uint16_t, boost::variant<RBTreeBestFitSegment, SimpleSeqFitSegment>> fSegments;
boost::interprocess::managed_shared_memory fManagementSegment;
VoidAlloc fShmVoidAlloc;
boost::interprocess::named_mutex fShmMtx;
@@ -609,12 +636,12 @@ class Manager
std::thread fRegionEventThread;
bool fRegionEventsSubscriptionActive;
std::function<void(fair::mq::RegionInfo)> fRegionEventCallback;
std::unordered_map<uint64_t, RegionEvent> fObservedRegionEvents;
std::unordered_map<uint16_t, RegionEvent> fObservedRegionEvents;
DeviceCounter* fDeviceCounter;
Uint64SegmentInfoHashMap* fShmSegments;
Uint64RegionInfoHashMap* fShmRegions;
std::unordered_map<uint64_t, std::unique_ptr<Region>> fRegions;
Uint16SegmentInfoHashMap* fShmSegments;
Uint16RegionInfoHashMap* fShmRegions;
std::unordered_map<uint16_t, std::unique_ptr<Region>> fRegions;
std::atomic<bool> fInterrupted;
std::atomic<int32_t> fMsgCounter; // TODO: find a better lifetime solution instead of the counter