mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-15 09:31:45 +00:00
Shm: configurable allocation strategy
This commit is contained in:
@@ -27,12 +27,10 @@
|
||||
|
||||
#include <boost/date_time/posix_time/posix_time.hpp>
|
||||
#include <boost/filesystem.hpp>
|
||||
#include <boost/interprocess/indexes/null_index.hpp>
|
||||
#include <boost/interprocess/ipc/message_queue.hpp>
|
||||
#include <boost/interprocess/managed_shared_memory.hpp>
|
||||
#include <boost/interprocess/sync/named_condition.hpp>
|
||||
#include <boost/interprocess/sync/named_mutex.hpp>
|
||||
#include <boost/interprocess/mem_algo/simple_seq_fit.hpp>
|
||||
#include <boost/process.hpp>
|
||||
#include <boost/variant.hpp>
|
||||
|
||||
@@ -40,6 +38,7 @@
|
||||
#include <condition_variable>
|
||||
#include <mutex>
|
||||
#include <set>
|
||||
#include <sstream>
|
||||
#include <stdexcept>
|
||||
#include <string>
|
||||
#include <thread>
|
||||
@@ -56,22 +55,12 @@ namespace mq
|
||||
namespace shmem
|
||||
{
|
||||
|
||||
struct SharedMemoryError : std::runtime_error { using std::runtime_error::runtime_error; };
|
||||
|
||||
using SimpleSeqFitSegment = boost::interprocess::basic_managed_shared_memory<char,
|
||||
boost::interprocess::simple_seq_fit<boost::interprocess::mutex_family>,
|
||||
boost::interprocess::iset_index>;
|
||||
using RBTreeBestFitSegment = boost::interprocess::basic_managed_shared_memory<char,
|
||||
boost::interprocess::rbtree_best_fit<boost::interprocess::mutex_family>,
|
||||
boost::interprocess::iset_index>;
|
||||
|
||||
class Manager
|
||||
{
|
||||
public:
|
||||
Manager(std::string shmId, std::string deviceId, size_t size, const ProgOptions* config)
|
||||
: fShmId(std::move(shmId))
|
||||
, fDeviceId(std::move(deviceId))
|
||||
// , fSegment(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_main").c_str(), size)
|
||||
, fSegments()
|
||||
, fManagementSegment(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_mng").c_str(), 6553600)
|
||||
, fShmVoidAlloc(fManagementSegment.get_segment_manager())
|
||||
@@ -79,6 +68,7 @@ class Manager
|
||||
, fRegionEventsCV(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_cv").c_str())
|
||||
, fRegionEventsSubscriptionActive(false)
|
||||
, fDeviceCounter(nullptr)
|
||||
, fShmSegments(nullptr)
|
||||
, fShmRegions(nullptr)
|
||||
, fInterrupted(false)
|
||||
, fMsgCounter(0)
|
||||
@@ -95,77 +85,120 @@ class Manager
|
||||
bool mlockSegment = false;
|
||||
bool zeroSegment = false;
|
||||
bool autolaunchMonitor = false;
|
||||
std::string allocationAlgorithm("rbtree_best_fit");
|
||||
if (config) {
|
||||
mlockSegment = config->GetProperty<bool>("shm-mlock-segment", mlockSegment);
|
||||
zeroSegment = config->GetProperty<bool>("shm-zero-segment", zeroSegment);
|
||||
autolaunchMonitor = config->GetProperty<bool>("shm-monitor", autolaunchMonitor);
|
||||
fThrowOnBadAlloc = config->GetProperty<bool>("shm-throw-bad-alloc", fThrowOnBadAlloc);
|
||||
allocationAlgorithm = config->GetProperty<std::string>("shm-allocation", allocationAlgorithm);
|
||||
} else {
|
||||
LOG(debug) << "ProgOptions not available! Using defaults.";
|
||||
}
|
||||
|
||||
if (allocationAlgorithm != "rbtree_best_fit" && allocationAlgorithm != "simple_seq_fit") {
|
||||
LOG(error) << "Provided shared memory allocation algorithm '" << allocationAlgorithm << "' is not supported. Supported are 'rbtree_best_fit'/'simple_seq_fit'";
|
||||
throw SharedMemoryError(tools::ToString("Provided shared memory allocation algorithm '", allocationAlgorithm, "' is not supported. Supported are 'rbtree_best_fit'/'simple_seq_fit'"));
|
||||
}
|
||||
|
||||
if (autolaunchMonitor) {
|
||||
StartMonitor(fShmId);
|
||||
}
|
||||
|
||||
{
|
||||
std::stringstream ss;
|
||||
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
|
||||
|
||||
fShmSegments = fManagementSegment.find_or_construct<Uint64SegmentInfoHashMap>(unique_instance)(fShmVoidAlloc);
|
||||
|
||||
const uint64_t id = 0;
|
||||
|
||||
try {
|
||||
fSegments.emplace(0, RBTreeBestFitSegment(open_only, std::string("fmq_" + fShmId + "_main").c_str()));
|
||||
LOG(debug) << "opened shared memory segment '" << "fmq_" << fShmId << "_main" << "' of " << fSegments.at(0).get_size() << " bytes. Available are " << fSegments.at(0).get_free_memory() << " bytes.";
|
||||
auto it = fShmSegments->find(id);
|
||||
if (it == fShmSegments->end()) {
|
||||
// no segment with given id exists, creating
|
||||
if (allocationAlgorithm == "rbtree_best_fit") {
|
||||
fSegments.emplace(id, RBTreeBestFitSegment(create_only, std::string("fmq_" + fShmId + "_main").c_str(), size));
|
||||
fShmSegments->emplace(id, AllocationAlgorithm::rbtree_best_fit);
|
||||
} else if (allocationAlgorithm == "simple_seq_fit") {
|
||||
fSegments.emplace(id, SimpleSeqFitSegment(create_only, std::string("fmq_" + fShmId + "_main").c_str(), size));
|
||||
fShmSegments->emplace(id, AllocationAlgorithm::simple_seq_fit);
|
||||
}
|
||||
ss << "Created ";
|
||||
} else {
|
||||
// found segment with the given id, opening
|
||||
if (it->second.fAllocationAlgorithm == AllocationAlgorithm::rbtree_best_fit) {
|
||||
fSegments.emplace(id, RBTreeBestFitSegment(open_only, std::string("fmq_" + fShmId + "_main").c_str()));
|
||||
if (allocationAlgorithm != "rbtree_best_fit") {
|
||||
LOG(warn) << "Allocation algorithm of the opened segment is rbtree_best_fit, but requested is " << allocationAlgorithm << ". Ignoring requested setting.";
|
||||
allocationAlgorithm = "rbtree_best_fit";
|
||||
}
|
||||
} else {
|
||||
fSegments.emplace(id, SimpleSeqFitSegment(open_only, std::string("fmq_" + fShmId + "_main").c_str()));
|
||||
if (allocationAlgorithm != "simple_seq_fit") {
|
||||
LOG(warn) << "Allocation algorithm of the opened segment is simple_seq_fit, but requested is " << allocationAlgorithm << ". Ignoring requested setting.";
|
||||
allocationAlgorithm = "simple_seq_fit";
|
||||
}
|
||||
}
|
||||
ss << "Opened ";
|
||||
}
|
||||
ss << "shared memory segment '" << "fmq_" << fShmId << "_main_" << id << "'."
|
||||
<< " Size: " << boost::apply_visitor(SegmentSize{}, fSegments.at(id)) << " bytes."
|
||||
<< " Available: " << boost::apply_visitor(SegmentFreeMemory{}, fSegments.at(id)) << " bytes."
|
||||
<< " Allocation algorithm: " << allocationAlgorithm;
|
||||
LOG(debug) << ss.str();
|
||||
} catch(interprocess_exception&) {
|
||||
fSegments.emplace(0, RBTreeBestFitSegment(create_only, std::string("fmq_" + fShmId + "_main").c_str(), size));
|
||||
LOG(debug) << "created shared memory segment '" << "fmq_" << fShmId << "_main" << "' of " << fSegments.at(0).get_size() << " bytes. Available are " << fSegments.at(0).get_free_memory() << " bytes.";
|
||||
LOG(error) << "something went wrong";
|
||||
}
|
||||
}
|
||||
|
||||
if (mlockSegment) {
|
||||
LOG(debug) << "Locking the managed segment memory pages...";
|
||||
if (mlock(fSegments.at(0).get_address(), fSegments.at(0).get_size()) == -1) {
|
||||
if (mlock(boost::apply_visitor(SegmentAddress{}, fSegments.at(0)), boost::apply_visitor(SegmentSize{}, fSegments.at(0))) == -1) {
|
||||
LOG(error) << "Could not lock the managed segment memory. Code: " << errno << ", reason: " << strerror(errno);
|
||||
}
|
||||
LOG(debug) << "Successfully locked the managed segment memory pages.";
|
||||
}
|
||||
if (zeroSegment) {
|
||||
LOG(debug) << "Zeroing the managed segment free memory...";
|
||||
fSegments.at(0).zero_free_memory();
|
||||
boost::apply_visitor(SegmentMemoryZeroer{}, fSegments.at(0));
|
||||
LOG(debug) << "Successfully zeroed the managed segment free memory.";
|
||||
}
|
||||
|
||||
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
|
||||
{
|
||||
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
|
||||
|
||||
fShmRegions = fManagementSegment.find_or_construct<Uint64RegionInfoHashMap>(unique_instance)(fShmVoidAlloc);
|
||||
#ifdef FAIRMQ_DEBUG_MODE
|
||||
fMsgDebug = fManagementSegment.find_or_construct<SizetMsgDebugMap>(unique_instance)(fShmVoidAlloc);
|
||||
#endif
|
||||
// store info about the managed segment as region with id 0
|
||||
fShmRegions->emplace(0, RegionInfo("", 0, 0, fShmVoidAlloc));
|
||||
|
||||
fDeviceCounter = fManagementSegment.find<DeviceCounter>(unique_instance).first;
|
||||
|
||||
if (fDeviceCounter) {
|
||||
LOG(debug) << "device counter found, with value of " << fDeviceCounter->fCount << ". incrementing.";
|
||||
(fDeviceCounter->fCount)++;
|
||||
LOG(debug) << "incremented device counter, now: " << fDeviceCounter->fCount;
|
||||
} else {
|
||||
LOG(debug) << "no device counter found, creating one and initializing with 1";
|
||||
fDeviceCounter = fManagementSegment.construct<DeviceCounter>(unique_instance)(1);
|
||||
LOG(debug) << "initialized device counter with: " << fDeviceCounter->fCount;
|
||||
}
|
||||
fShmRegions = fManagementSegment.find_or_construct<Uint64RegionInfoHashMap>(unique_instance)(fShmVoidAlloc);
|
||||
|
||||
#ifdef FAIRMQ_DEBUG_MODE
|
||||
fShmMsgCounter = fManagementSegment.find<MsgCounter>(unique_instance).first;
|
||||
|
||||
if (fShmMsgCounter) {
|
||||
LOG(debug) << "message counter found, with value of " << fShmMsgCounter->fCount << ".";
|
||||
} else {
|
||||
LOG(debug) << "no message counter found, creating one and initializing with 0";
|
||||
fShmMsgCounter = fManagementSegment.construct<MsgCounter>(unique_instance)(0);
|
||||
LOG(debug) << "initialized message counter with: " << fShmMsgCounter->fCount;
|
||||
}
|
||||
fMsgDebug = fManagementSegment.find_or_construct<SizetMsgDebugMap>(unique_instance)(fShmVoidAlloc);
|
||||
#endif
|
||||
|
||||
fDeviceCounter = fManagementSegment.find<DeviceCounter>(unique_instance).first;
|
||||
|
||||
if (fDeviceCounter) {
|
||||
LOG(debug) << "device counter found, with value of " << fDeviceCounter->fCount << ". incrementing.";
|
||||
(fDeviceCounter->fCount)++;
|
||||
LOG(debug) << "incremented device counter, now: " << fDeviceCounter->fCount;
|
||||
} else {
|
||||
LOG(debug) << "no device counter found, creating one and initializing with 1";
|
||||
fDeviceCounter = fManagementSegment.construct<DeviceCounter>(unique_instance)(1);
|
||||
LOG(debug) << "initialized device counter with: " << fDeviceCounter->fCount;
|
||||
}
|
||||
|
||||
#ifdef FAIRMQ_DEBUG_MODE
|
||||
fShmMsgCounter = fManagementSegment.find<MsgCounter>(unique_instance).first;
|
||||
|
||||
if (fShmMsgCounter) {
|
||||
LOG(debug) << "message counter found, with value of " << fShmMsgCounter->fCount << ".";
|
||||
} else {
|
||||
LOG(debug) << "no message counter found, creating one and initializing with 0";
|
||||
fShmMsgCounter = fManagementSegment.construct<MsgCounter>(unique_instance)(0);
|
||||
LOG(debug) << "initialized message counter with: " << fShmMsgCounter->fCount;
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
fHeartbeatThread = std::thread(&Manager::SendHeartbeats, this);
|
||||
}
|
||||
|
||||
@@ -336,29 +369,29 @@ class Manager
|
||||
|
||||
for (const auto& e : *fShmRegions) {
|
||||
fair::mq::RegionInfo info;
|
||||
info.managed = false;
|
||||
info.id = e.first;
|
||||
info.flags = e.second.fUserFlags;
|
||||
info.event = e.second.fDestroyed ? RegionEvent::destroyed : RegionEvent::created;
|
||||
if (info.id != 0) {
|
||||
if (!e.second.fDestroyed) {
|
||||
auto region = GetRegionUnsafe(info.id);
|
||||
info.ptr = region->fRegion.get_address();
|
||||
info.size = region->fRegion.get_size();
|
||||
} else {
|
||||
info.ptr = nullptr;
|
||||
info.size = 0;
|
||||
}
|
||||
result.push_back(info);
|
||||
if (!e.second.fDestroyed) {
|
||||
auto region = GetRegionUnsafe(info.id);
|
||||
info.ptr = region->fRegion.get_address();
|
||||
info.size = region->fRegion.get_size();
|
||||
} else {
|
||||
if (!e.second.fDestroyed) {
|
||||
info.ptr = fSegments.at(0).get_address();
|
||||
info.size = fSegments.at(0).get_size();
|
||||
} else {
|
||||
info.ptr = nullptr;
|
||||
info.size = 0;
|
||||
}
|
||||
result.push_back(info);
|
||||
info.ptr = nullptr;
|
||||
info.size = 0;
|
||||
}
|
||||
result.push_back(info);
|
||||
}
|
||||
|
||||
for (const auto& e : *fShmSegments) {
|
||||
fair::mq::RegionInfo info;
|
||||
info.managed = true;
|
||||
info.id = e.first;
|
||||
info.event = RegionEvent::created;
|
||||
info.ptr = boost::apply_visitor(SegmentAddress{}, fSegments.at(e.first));
|
||||
info.size = boost::apply_visitor(SegmentSize{}, fSegments.at(e.first));
|
||||
result.push_back(info);
|
||||
}
|
||||
|
||||
return result;
|
||||
@@ -462,8 +495,14 @@ class Manager
|
||||
|
||||
bool ThrowingOnBadAlloc() const { return fThrowOnBadAlloc; }
|
||||
|
||||
boost::interprocess::managed_shared_memory::handle_t GetHandleFromAddress(const void* ptr) const { return fSegments.at(0).get_handle_from_address(ptr); }
|
||||
void* GetAddressFromHandle(const boost::interprocess::managed_shared_memory::handle_t handle) const { return fSegments.at(0).get_address_from_handle(handle); }
|
||||
boost::interprocess::managed_shared_memory::handle_t GetHandleFromAddress(const void* ptr) const
|
||||
{
|
||||
return boost::apply_visitor(SegmentHandleFromAddress{ptr}, fSegments.at(0));
|
||||
}
|
||||
void* GetAddressFromHandle(const boost::interprocess::managed_shared_memory::handle_t handle) const
|
||||
{
|
||||
return boost::apply_visitor(SegmentAddressFromHandle{handle}, fSegments.at(0));
|
||||
}
|
||||
|
||||
char* Allocate(const size_t size, size_t alignment = 0)
|
||||
{
|
||||
@@ -475,19 +514,19 @@ class Manager
|
||||
// boost::interprocess::managed_shared_memory::size_type actualSize = size;
|
||||
// char* hint = 0; // unused for boost::interprocess::allocate_new
|
||||
// ptr = fSegments.at(0).allocation_command<char>(boost::interprocess::allocate_new, size, actualSize, hint);
|
||||
size_t segmentSize = fSegments.at(0).get_size();
|
||||
size_t segmentSize = boost::apply_visitor(SegmentSize{}, fSegments.at(0));
|
||||
if (size > segmentSize) {
|
||||
throw MessageBadAlloc(tools::ToString("Requested message size (", size, ") exceeds segment size (", segmentSize, ")"));
|
||||
}
|
||||
if (alignment == 0) {
|
||||
ptr = reinterpret_cast<char*>(fSegments.at(0).allocate(size));
|
||||
ptr = reinterpret_cast<char*>(boost::apply_visitor(SegmentAllocate{size}, fSegments.at(0)));
|
||||
} else {
|
||||
ptr = reinterpret_cast<char*>(fSegments.at(0).allocate_aligned(size, alignment));
|
||||
ptr = reinterpret_cast<char*>(boost::apply_visitor(SegmentAllocateAligned{size, alignment}, fSegments.at(0)));
|
||||
}
|
||||
} catch (boost::interprocess::bad_alloc& ba) {
|
||||
// LOG(warn) << "Shared memory full...";
|
||||
if (ThrowingOnBadAlloc()) {
|
||||
throw MessageBadAlloc(tools::ToString("shmem: could not create a message of size ", size, ", alignment: ", (alignment != 0) ? std::to_string(alignment) : "default", ", free memory: ", fSegments.at(0).get_free_memory()));
|
||||
throw MessageBadAlloc(tools::ToString("shmem: could not create a message of size ", size, ", alignment: ", (alignment != 0) ? std::to_string(alignment) : "default", ", free memory: ", boost::apply_visitor(SegmentFreeMemory{}, fSegments.at(0))));
|
||||
}
|
||||
// rateLimiter.maybe_sleep();
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(50));
|
||||
@@ -509,7 +548,7 @@ class Manager
|
||||
|
||||
void Deallocate(boost::interprocess::managed_shared_memory::handle_t handle)
|
||||
{
|
||||
fSegments.at(0).deallocate(GetAddressFromHandle(handle));
|
||||
boost::apply_visitor(SegmentDeallocate{GetAddressFromHandle(handle)}, fSegments.at(0));
|
||||
#ifdef FAIRMQ_DEBUG_MODE
|
||||
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
|
||||
DecrementShmMsgCounter();
|
||||
@@ -519,9 +558,7 @@ class Manager
|
||||
|
||||
char* ShrinkInPlace(size_t oldSize, size_t newSize, char* localPtr)
|
||||
{
|
||||
using namespace boost::interprocess;
|
||||
managed_shared_memory::size_type shrunkSize = newSize;
|
||||
return fSegments.at(0).allocation_command<char>(shrink_in_place, oldSize + 128, shrunkSize, localPtr);
|
||||
return boost::apply_visitor(SegmentBufferShrink{oldSize, newSize, localPtr}, fSegments.at(0));
|
||||
}
|
||||
|
||||
~Manager()
|
||||
@@ -563,8 +600,7 @@ class Manager
|
||||
private:
|
||||
std::string fShmId;
|
||||
std::string fDeviceId;
|
||||
// boost::interprocess::managed_shared_memory fSegment;
|
||||
std::unordered_map<uint64_t, RBTreeBestFitSegment> fSegments;
|
||||
std::unordered_map<uint64_t, boost::variant<RBTreeBestFitSegment, SimpleSeqFitSegment>> fSegments;
|
||||
boost::interprocess::managed_shared_memory fManagementSegment;
|
||||
VoidAlloc fShmVoidAlloc;
|
||||
boost::interprocess::named_mutex fShmMtx;
|
||||
@@ -576,6 +612,7 @@ class Manager
|
||||
std::unordered_map<uint64_t, RegionEvent> fObservedRegionEvents;
|
||||
|
||||
DeviceCounter* fDeviceCounter;
|
||||
Uint64SegmentInfoHashMap* fShmSegments;
|
||||
Uint64RegionInfoHashMap* fShmRegions;
|
||||
std::unordered_map<uint64_t, std::unique_ptr<Region>> fRegions;
|
||||
|
||||
|
Reference in New Issue
Block a user