shm: Ref counting for unmanaged regions in a dedicated segment

This commit is contained in:
Alexey Rybalchenko 2023-09-19 10:21:00 +02:00
parent cacf69d5f6
commit 1a0ab3a4e2
7 changed files with 159 additions and 85 deletions

View File

@ -134,6 +134,7 @@ struct RegionConfig
int creationFlags = 0; /// flags passed to the underlying transport on region creation
int64_t userFlags = 0; /// custom flags that have no effect on the transport, but can be retrieved from the region by the user
uint64_t size = 0; /// region size
uint64_t rcSegmentSize = 10000000; /// region size
std::string path = ""; /// file path, if the region is backed by a file
std::optional<uint16_t> id = std::nullopt; /// region id
uint32_t linger = 100; /// delay in ms before region destruction to collect outstanding events

View File

@ -13,6 +13,7 @@
#include <functional> // std::equal_to
#include <boost/functional/hash.hpp>
#include <boost/interprocess/allocators/adaptive_pool.hpp>
#include <boost/interprocess/allocators/allocator.hpp>
#include <boost/interprocess/containers/map.hpp>
#include <boost/interprocess/containers/string.hpp>
@ -25,6 +26,8 @@
#include <sys/types.h>
#include <fairmq/tools/Strings.h>
namespace fair::mq::shmem
{
@ -41,6 +44,31 @@ using RBTreeBestFitSegment = boost::interprocess::basic_managed_shared_memory<ch
boost::interprocess::null_index>;
// boost::interprocess::iset_index>;
inline std::string MakeShmName(const std::string& shmId, const std::string& type, int index) {
return std::string("fmq_" + shmId + "_" + type + "_" + std::to_string(index));
}
struct RefCount
{
explicit RefCount(uint16_t c)
: count(c)
{}
uint16_t Get() { return count.load(); }
uint16_t Increment() { return count.fetch_add(1); }
uint16_t Decrement() { return count.fetch_sub(1); }
std::atomic<uint16_t> count;
};
// Number of nodes allocated at once when the allocator runs out of nodes.
static constexpr size_t numNodesPerBlock = 4096;
// Maximum number of totally free blocks that the adaptive node pool will hold.
// The rest of the totally free blocks will be deallocated with the segment manager.
static constexpr size_t maxFreeBlocks = 2;
using RefCountPool = boost::interprocess::adaptive_pool<RefCount, boost::interprocess::managed_shared_memory::segment_manager, numNodesPerBlock, maxFreeBlocks>;
using SegmentManager = boost::interprocess::managed_shared_memory::segment_manager;
using VoidAlloc = boost::interprocess::allocator<void, SegmentManager>;
using CharAlloc = boost::interprocess::allocator<char, SegmentManager>;
@ -121,6 +149,17 @@ struct ShmHeader
static void Destruct(char* ptr) { RefCountPtr(ptr).~atomic(); }
};
struct MetaHeader
{
size_t fSize; // size of the shm buffer
size_t fHint; // user-defined value, given by the user on message creation and returned to the user on "buffer no longer needed"-callbacks
boost::interprocess::managed_shared_memory::handle_t fHandle; // handle to shm buffer, convertible to shm buffer ptr
mutable boost::interprocess::managed_shared_memory::handle_t fShared; // handle to the buffer storing the ref count for shared buffers
uint16_t fRegionId; // id of the unmanaged region
mutable uint16_t fSegmentId; // id of the managed segment
bool fManaged; // true = managed segment, false = unmanaged region
};
enum class AllocationAlgorithm : int
{
rbtree_best_fit,
@ -129,26 +168,30 @@ enum class AllocationAlgorithm : int
struct RegionInfo
{
RegionInfo(const VoidAlloc& alloc)
: fPath("", alloc)
, fCreationFlags(0)
, fUserFlags(0)
, fSize(0)
, fDestroyed(false)
{}
static constexpr uint64_t DefaultRcSegmentSize = 10000000;
RegionInfo(const char* path, int flags, uint64_t userFlags, uint64_t size, const VoidAlloc& alloc)
RegionInfo(const char* path, int flags, uint64_t userFlags, uint64_t size, uint64_t rcSegmentSize, const VoidAlloc& alloc)
: fPath(path, alloc)
, fCreationFlags(flags)
, fUserFlags(userFlags)
, fSize(size)
, fRCSegmentSize(rcSegmentSize)
, fDestroyed(false)
{}
RegionInfo(const VoidAlloc& alloc)
: RegionInfo("", 0, 0, 0, DefaultRcSegmentSize, alloc)
{}
RegionInfo(const char* path, int flags, uint64_t userFlags, uint64_t size, const VoidAlloc& alloc)
: RegionInfo(path, flags, userFlags, size, DefaultRcSegmentSize, alloc)
{}
Str fPath;
int fCreationFlags;
uint64_t fUserFlags;
uint64_t fSize;
uint64_t fRCSegmentSize;
bool fDestroyed;
};
@ -216,17 +259,6 @@ struct RegionCounter
std::atomic<uint16_t> fCount;
};
struct MetaHeader
{
size_t fSize;
size_t fHint;
boost::interprocess::managed_shared_memory::handle_t fHandle;
mutable boost::interprocess::managed_shared_memory::handle_t fShared;
uint16_t fRegionId;
mutable uint16_t fSegmentId;
bool fManaged;
};
#ifdef FAIRMQ_DEBUG_MODE
struct MsgCounter
{
@ -310,29 +342,6 @@ struct SegmentBufferShrink
mutable char* local_ptr;
};
// struct SegmentWrapper
// {
// SegmentWrapper(boost::variant<RBTreeBestFitSegment, SimpleSeqFitSegment>&& _segment)
// : segment(std::move(_segment))
// , refCountPool(nullptr)
// {}
// void InitRefCountPoolSSF()
// {
// refCountPool = std::make_unique<boost::variant<RefCountPoolRBT, RefCountPoolSSF>>(
// RefCountPoolSSF(boost::get<SimpleSeqFitSegment>(segment).get_segment_manager()));
// }
// void InitRefCountPoolRBT()
// {
// refCountPool = std::make_unique<boost::variant<RefCountPoolRBT, RefCountPoolSSF>>(
// RefCountPoolRBT(boost::get<RBTreeBestFitSegment>(segment).get_segment_manager()));
// }
// boost::variant<SimpleSeqFitSegment, RBTreeBestFitSegment> segment;
// std::unique_ptr<boost::variant<RefCountPoolRBT, RefCountPoolSSF>> refCountPool;
// };
} // namespace fair::mq::shmem
#endif /* FAIR_MQ_SHMEM_COMMON_H_ */

View File

@ -323,6 +323,7 @@ class Manager
}
const uint16_t id = cfg.id.value();
const uint64_t rcSegmentSize = cfg.rcSegmentSize;
std::lock_guard<std::mutex> lock(fLocalRegionsMtx);
@ -347,6 +348,7 @@ class Manager
// start ack receiver only if a callback has been provided.
if (callback || bulkCallback) {
region->SetCallbacks(callback, bulkCallback);
region->InitializeRefCountSegment(rcSegmentSize);
region->InitializeQueues();
region->StartAckSender();
region->StartAckReceiver();
@ -398,6 +400,7 @@ class Manager
} else {
try {
RegionConfig cfg;
const uint64_t rcSegmentSize = cfg.rcSegmentSize;
// get region info
{
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> shmLock(*fShmMtx);
@ -409,6 +412,7 @@ class Manager
// LOG(debug) << "Located remote region with id '" << id << "', path: '" << cfg.path << "', flags: '" << cfg.creationFlags << "'";
auto r = fRegions.emplace(id, std::make_unique<UnmanagedRegion>(fShmId, 0, false, std::move(cfg)));
r.first->second->InitializeRefCountSegment(rcSegmentSize);
r.first->second->InitializeQueues();
r.first->second->StartAckSender();
return r.first->second.get();
@ -499,8 +503,10 @@ class Manager
if (it != fRegions.end()) {
region = it->second.get();
} else {
const uint64_t rcSegmentSize = cfgIt->second.rcSegmentSize;
auto r = fRegions.emplace(cfgIt->first, std::make_unique<UnmanagedRegion>(fShmId, 0, false, cfgIt->second));
region = r.first->second.get();
region->InitializeRefCountSegment(rcSegmentSize);
region->InitializeQueues();
region->StartAckSender();
}

View File

@ -262,52 +262,50 @@ class Message final : public fair::mq::Message
if (fMeta.fManaged) { // managed segment
fManager.GetSegment(fMeta.fSegmentId);
return ShmHeader::RefCount(fManager.GetAddressFromHandle(fMeta.fHandle, fMeta.fSegmentId));
} else { // unmanaged region
if (fMeta.fShared < 0) { // UR msg is not yet shared
return 1;
} else {
fManager.GetSegment(fMeta.fSegmentId);
return ShmHeader::RefCount(fManager.GetAddressFromHandle(fMeta.fShared, fMeta.fSegmentId));
}
}
if (fMeta.fShared < 0) { // UR msg is not yet shared
return 1;
}
fRegionPtr = fManager.GetRegionFromCache(fMeta.fRegionId);
if (!fRegionPtr) {
throw TransportError(tools::ToString("Cannot get unmanaged region with id ", fMeta.fRegionId));
}
return fRegionPtr->GetRefCountAddressFromHandle(fMeta.fShared)->Get();
}
void Copy(const fair::mq::Message& other) override
{
const Message& otherMsg = static_cast<const Message&>(other);
// if the other message is not initialized, close this one too and return
if (otherMsg.fMeta.fHandle < 0) {
// if the other message is not initialized, close this one too and return
CloseMessage();
return;
}
// if this msg is already initialized, close it first
if (fMeta.fHandle >= 0) {
// if this msg is already initialized, close it first
CloseMessage();
}
if (otherMsg.fMeta.fManaged) { // managed segment
fMeta = otherMsg.fMeta;
fManager.GetSegment(fMeta.fSegmentId);
ShmHeader::IncrementRefCount(fManager.GetAddressFromHandle(fMeta.fHandle, fMeta.fSegmentId));
} else { // unmanaged region
if (otherMsg.fMeta.fShared < 0) { // if UR msg is not yet shared
// TODO: minimize the size to 0 and don't create extra space for user buffer alignment
char* ptr = fManager.Allocate(2, 0);
// point the fShared in the unmanaged region message to the refCount holder
otherMsg.fMeta.fShared = fManager.GetHandleFromAddress(ptr, fMeta.fSegmentId);
// the message needs to be able to locate in which segment the refCount is stored
otherMsg.fMeta.fSegmentId = fMeta.fSegmentId;
// point this message to the same content as the unmanaged region message
fMeta = otherMsg.fMeta;
// increment the refCount
ShmHeader::IncrementRefCount(ptr);
} else { // if the UR msg is already shared
fMeta = otherMsg.fMeta;
fManager.GetSegment(fMeta.fSegmentId);
ShmHeader::IncrementRefCount(fManager.GetAddressFromHandle(fMeta.fShared, fMeta.fSegmentId));
// increment ref count
if (otherMsg.fMeta.fManaged) { // msg in managed segment
fManager.GetSegment(otherMsg.fMeta.fSegmentId);
ShmHeader::IncrementRefCount(fManager.GetAddressFromHandle(otherMsg.fMeta.fHandle, otherMsg.fMeta.fSegmentId));
} else { // msg in unmanaged region
fRegionPtr = fManager.GetRegionFromCache(otherMsg.fMeta.fRegionId);
if (!fRegionPtr) {
throw TransportError(tools::ToString("Cannot get unmanaged region with id ", otherMsg.fMeta.fRegionId));
}
if (otherMsg.fMeta.fShared < 0) {
// UR msg not yet shared, create the reference counting object with count 2
otherMsg.fMeta.fShared = fRegionPtr->HandleFromAddress(&(fRegionPtr->MakeRefCount(2)));
} else {
fRegionPtr->GetRefCountAddressFromHandle(otherMsg.fMeta.fShared)->Increment();
}
}
// copy meta data
fMeta = otherMsg.fMeta;
}
~Message() override { CloseMessage(); }
@ -344,12 +342,13 @@ class Message final : public fair::mq::Message
}
} else { // unmanaged region
if (fMeta.fShared >= 0) {
// make sure segment is initialized in this transport
fManager.GetSegment(fMeta.fSegmentId);
// release unmanaged region block if ref count is one
uint16_t refCount = ShmHeader::DecrementRefCount(fManager.GetAddressFromHandle(fMeta.fShared, fMeta.fSegmentId));
fRegionPtr = fManager.GetRegionFromCache(fMeta.fRegionId);
if (!fRegionPtr) {
throw TransportError(tools::ToString("Cannot get unmanaged region with id ", fMeta.fRegionId));
}
uint16_t refCount = fRegionPtr->GetRefCountAddressFromHandle(fMeta.fShared)->Decrement();
if (refCount == 1) {
fManager.Deallocate(fMeta.fShared, fMeta.fSegmentId);
fRegionPtr->RemoveRefCount(*(fRegionPtr->GetRefCountAddressFromHandle(fMeta.fShared)));
ReleaseUnmanagedRegionBlock();
}
} else {

View File

@ -235,8 +235,8 @@ bool Monitor::PrintShm(const ShmId& shmId)
<< ", managed segments:\n";
for (const auto& s : segments) {
size_t free = std::visit([](auto& s){ return s.get_free_memory(); }, s.second);
size_t total = std::visit([](auto& s){ return s.get_size(); }, s.second);
size_t free = std::visit([](auto& seg){ return seg.get_free_memory(); }, s.second);
size_t total = std::visit([](auto& seg){ return seg.get_size(); }, s.second);
size_t used = total - free;
std::string msgCount;
@ -268,12 +268,19 @@ bool Monitor::PrintShm(const ShmId& shmId)
if (shmRegions && !shmRegions->empty()) {
ss << "\n unmanaged regions:";
for (const auto& r : *shmRegions) {
ss << "\n [" << r.first << "]: " << (r.second.fDestroyed ? "destroyed" : "alive");
ss << ", size: " << r.second.fSize;
for (const auto& [id, info] : *shmRegions) {
ss << "\n [" << id << "]: " << (info.fDestroyed ? "destroyed" : "alive");
ss << ", size: " << info.fSize;
try {
managed_shared_memory rcCountSegment(open_read_only, std::string("fmq_" + shmId.shmId + "_rrc_" + to_string(id)).c_str());
ss << ", rcCountSegment size: " << rcCountSegment.get_size();
} catch (bie&) {
ss << ", rcCountSegment: not found";
}
// try {
// boost::interprocess::message_queue q(open_only, std::string("fmq_" + std::string(shmId) + "_rgq_" + to_string(r.first)).c_str());
// boost::interprocess::message_queue q(open_only, std::string("fmq_" + std::string(shmId) + "_rgq_" + to_string(id)).c_str());
// ss << ", ack queue: " << q.get_num_msg() << " messages";
// } catch (bie&) {
// ss << ", ack queue: not found";
@ -679,6 +686,7 @@ std::vector<std::pair<std::string, bool>> Monitor::Cleanup(const ShmId& shmIdT,
result.emplace_back(Remove<bipc::shared_memory_object>("fmq_" + shmId + "_rg_" + to_string(id), verbose));
}
result.emplace_back(Remove<bipc::message_queue>("fmq_" + shmId + "_rgq_" + to_string(id), verbose));
result.emplace_back(Remove<bipc::shared_memory_object>("fmq_" + shmId + "_rrc_" + to_string(id), verbose));
}
}
@ -780,6 +788,7 @@ void Monitor::ResetContent(const ShmId& shmIdT, bool verbose /* = true */)
for (const auto& region : *shmRegions) {
uint16_t id = region.first;
Remove<bipc::message_queue>("fmq_" + shmId + "_rgq_" + to_string(id), verbose);
Remove<bipc::shared_memory_object>("fmq_" + shmId + "_rrc_" + to_string(id), verbose);
}
}
} catch (bie& e) {

View File

@ -16,6 +16,7 @@ FairMQ Shared Memory currently uses the following names to register shared memor
| `fmq_<shmId>_mng` | management segment (management data) | one of the devices | devices |
| `fmq_<shmId>_rg_<index>` | unmanaged region(s) | one of the devices | devices with unmanaged regions |
| `fmq_<shmId>_rgq_<index>` | unmanaged region queue(s) | one of the devices | devices with unmanaged regions |
| `fmq_<shmId>_rrc_<index>` | unmanaged region reference count pool(s) | one of the devices | devices with unmanaged regions |
| `fmq_<shmId>_ms` | shmmonitor status | shmmonitor | devices, shmmonitor |
The shmId is generated out of session id and user id.

View File

@ -59,8 +59,9 @@ struct UnmanagedRegion
, fRemoveOnDestruction(cfg.removeOnDestruction)
, fLinger(cfg.linger)
, fStopAcks(false)
, fName("fmq_" + shmId + "_rg_" + std::to_string(cfg.id.value()))
, fQueueName("fmq_" + shmId + "_rgq_" + std::to_string(cfg.id.value()))
, fName(MakeShmName(shmId, "rg", cfg.id.value()))
, fQueueName(MakeShmName(shmId, "rgq", cfg.id.value()))
, fRefCountSegmentName(MakeShmName(shmId, "rrc", cfg.id.value()))
, fShmemObject()
, fFile(nullptr)
, fFileMapping()
@ -186,6 +187,19 @@ struct UnmanagedRegion
bool RemoveOnDestruction() { return fRemoveOnDestruction; }
RefCount& MakeRefCount(uint16_t initialCount = 1)
{
RefCount* refCount = fRefCountPool->allocate(1).get();
new (refCount) RefCount(initialCount);
return *refCount;
}
void RemoveRefCount(RefCount& refCount)
{
refCount.~RefCount();
fRefCountPool->deallocate(&refCount, 1);
}
~UnmanagedRegion()
{
LOG(debug) << "~UnmanagedRegion(): " << fName << " (" << (fControlling ? "controller" : "viewer") << ")";
@ -208,6 +222,11 @@ struct UnmanagedRegion
if (Monitor::RemoveFileMapping(fName.c_str())) {
LOG(trace) << "File mapping '" << fName << "' destroyed.";
}
if (fRefCountSegment) {
if (Monitor::RemoveObject(fRefCountSegmentName)) {
LOG(trace) << "Ref Count Segment '" << fRefCountSegmentName << "' destroyed.";
}
}
} else {
LOG(debug) << "Skipping removal of " << fName << " unmanaged region, because RegionConfig::removeOnDestruction is false";
}
@ -235,6 +254,7 @@ struct UnmanagedRegion
std::atomic<bool> fStopAcks;
std::string fName;
std::string fQueueName;
std::string fRefCountSegmentName;
boost::interprocess::shared_memory_object fShmemObject;
FILE* fFile;
boost::interprocess::file_mapping fFileMapping;
@ -245,12 +265,18 @@ struct UnmanagedRegion
std::vector<RegionBlock> fBlocksToFree;
const std::size_t fAckBunchSize = 256;
std::unique_ptr<boost::interprocess::message_queue> fQueue;
std::unique_ptr<boost::interprocess::managed_shared_memory> fRefCountSegment;
std::unique_ptr<RefCountPool> fRefCountPool;
std::thread fAcksReceiver;
std::thread fAcksSender;
RegionCallback fCallback;
RegionBulkCallback fBulkCallback;
static std::string MakeSegmentName(const std::string& shmId, std::string_view segment, int regionIndex) {
return tools::ToString("fmq_", shmId, "_", segment, "_", regionIndex);
}
static RegionConfig makeRegionConfig(uint16_t id)
{
RegionConfig regionCfg;
@ -275,7 +301,7 @@ struct UnmanagedRegion
throw TransportError(tools::ToString("Unmanaged Region with id ", cfg.id.value(), " has already been registered. Only unique IDs per session are allowed."));
}
shmRegions->emplace(cfg.id.value(), RegionInfo(cfg.path.c_str(), cfg.creationFlags, cfg.userFlags, cfg.size, alloc));
shmRegions->emplace(cfg.id.value(), RegionInfo(cfg.path.c_str(), cfg.creationFlags, cfg.userFlags, cfg.size, cfg.rcSegmentSize, alloc));
(eventCounter->fCount)++;
}
@ -294,6 +320,29 @@ struct UnmanagedRegion
}
}
void InitializeRefCountSegment(uint64_t size)
{
using namespace boost::interprocess;
if (!fRefCountSegment) {
fRefCountSegment = std::make_unique<managed_shared_memory>(open_or_create, fRefCountSegmentName.c_str(), size);
LOG(trace) << "shmem: initialized ref count segment: " << fRefCountSegmentName;
fRefCountPool = std::make_unique<RefCountPool>(fRefCountSegment->get_segment_manager());
}
}
RefCount* GetRefCountAddressFromHandle(const boost::interprocess::managed_shared_memory::handle_t handle)
{
if (fRefCountPool) {
return reinterpret_cast<RefCount*>(fRefCountSegment->get_address_from_handle(handle));
}
return nullptr;
};
boost::interprocess::managed_shared_memory::handle_t HandleFromAddress(const void* ptr)
{
return fRefCountSegment->get_handle_from_address(ptr);
}
void StartAckSender()
{
if (!fAcksSender.joinable()) {