mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-16 01:51:45 +00:00
shm: add APIs for implementing keep-alive process
This commit is contained in:
committed by
Dennis Klein
parent
eb4620b1ec
commit
692576a5b1
@@ -11,7 +11,7 @@
|
||||
|
||||
#include "Common.h"
|
||||
#include "Monitor.h"
|
||||
#include "Region.h"
|
||||
#include "UnmanagedRegion.h"
|
||||
#include <fairmq/Message.h>
|
||||
#include <fairmq/ProgOptions.h>
|
||||
#include <fairmq/tools/Strings.h>
|
||||
@@ -129,8 +129,8 @@ class Manager
|
||||
{
|
||||
public:
|
||||
Manager(const std::string& sessionName, size_t size, const ProgOptions* config)
|
||||
: fShmId64(makeShmIdUint64(sessionName))
|
||||
, fShmId(makeShmIdStr(sessionName))
|
||||
: fShmId64(config ? config->GetProperty<uint64_t>("shmid", makeShmIdUint64(sessionName)) : makeShmIdUint64(sessionName))
|
||||
, fShmId(makeShmIdStr(fShmId64))
|
||||
, fSegmentId(config ? config->GetProperty<uint16_t>("shm-segment-id", 0) : 0)
|
||||
, fManagementSegment(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_mng").c_str(), 6553600)
|
||||
, fShmVoidAlloc(fManagementSegment.get_segment_manager())
|
||||
@@ -360,14 +360,14 @@ class Manager
|
||||
}
|
||||
bool Interrupted() { return fInterrupted.load(); }
|
||||
|
||||
std::pair<boost::interprocess::mapped_region*, uint16_t> CreateRegion(const size_t size,
|
||||
RegionCallback callback,
|
||||
RegionBulkCallback bulkCallback,
|
||||
RegionConfig cfg)
|
||||
std::pair<UnmanagedRegion*, uint16_t> CreateRegion(const size_t size,
|
||||
RegionCallback callback,
|
||||
RegionBulkCallback bulkCallback,
|
||||
RegionConfig cfg)
|
||||
{
|
||||
using namespace boost::interprocess;
|
||||
try {
|
||||
std::pair<mapped_region*, uint16_t> result;
|
||||
std::pair<UnmanagedRegion*, uint16_t> result;
|
||||
|
||||
{
|
||||
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> lock(*fShmMtx);
|
||||
@@ -386,46 +386,39 @@ class Manager
|
||||
}
|
||||
|
||||
cfg.id = rc->fCount;
|
||||
} else if (cfg.id.value() == 0) {
|
||||
LOG(error) << "User-given UnmanagedRegion ID must not be 0.";
|
||||
throw TransportError("User-given UnmanagedRegion ID must not be 0.");
|
||||
}
|
||||
|
||||
auto it = fRegions.find(cfg.id.value());
|
||||
if (it != fRegions.end()) {
|
||||
LOG(error) << "Trying to open/create a UnmanagedRegion that already exists (id: " << cfg.id.value() << ")";
|
||||
throw TransportError(tools::ToString("Trying to open/create a UnmanagedRegion that already exists (id: ", cfg.id.value(), ")"));
|
||||
}
|
||||
const uint16_t id = cfg.id.value();
|
||||
|
||||
Region& region = *(fRegions.emplace(cfg.id.value(), std::make_unique<Region>(fShmId, size, false, callback, bulkCallback, cfg)).first->second);
|
||||
// LOG(debug) << "Created region with id '" << cfg.id.value() << "', path: '" << cfg.path << "', flags: '" << cfg.creationFlags << "'";
|
||||
auto res = fRegions.emplace(id, std::make_unique<UnmanagedRegion>(fShmId, size, false, cfg));
|
||||
bool newRegionCreated = res.second;
|
||||
UnmanagedRegion& region = *(res.first->second);
|
||||
// LOG(debug) << "Created region with id '" << id << "', path: '" << cfg.path << "', flags: '" << cfg.creationFlags << "'";
|
||||
|
||||
if (!newRegionCreated) {
|
||||
region.fRemote = false; // TODO: this should be more clear, refactor it.
|
||||
}
|
||||
|
||||
if (cfg.lock) {
|
||||
LOG(debug) << "Locking region " << cfg.id.value() << "...";
|
||||
if (mlock(region.fRegion.get_address(), region.fRegion.get_size()) == -1) {
|
||||
LOG(error) << "Could not lock region " << cfg.id.value() << ". Code: " << errno << ", reason: " << strerror(errno);
|
||||
throw TransportError(tools::ToString("Could not lock region ", cfg.id.value(), ": ", strerror(errno)));
|
||||
}
|
||||
LOG(debug) << "Successfully locked region " << cfg.id.value() << ".";
|
||||
LOG(debug) << "Locking region " << id << "...";
|
||||
region.Lock();
|
||||
LOG(debug) << "Successfully locked region " << id << ".";
|
||||
}
|
||||
if (cfg.zero) {
|
||||
LOG(debug) << "Zeroing free memory of region " << cfg.id.value() << "...";
|
||||
memset(region.fRegion.get_address(), 0x00, region.fRegion.get_size());
|
||||
LOG(debug) << "Successfully zeroed free memory of region " << cfg.id.value() << ".";
|
||||
LOG(debug) << "Zeroing free memory of region " << id << "...";
|
||||
region.Zero();
|
||||
LOG(debug) << "Successfully zeroed free memory of region " << id << ".";
|
||||
}
|
||||
|
||||
bool newRegionCreated = fShmRegions->emplace(cfg.id.value(), RegionInfo(cfg.path.c_str(), cfg.creationFlags, cfg.userFlags, fShmVoidAlloc)).second;
|
||||
|
||||
// start ack receiver only if a callback has been provided.
|
||||
if (callback || bulkCallback) {
|
||||
region.SetCallbacks(callback, bulkCallback);
|
||||
region.InitializeQueues();
|
||||
region.StartAckSender();
|
||||
region.StartAckReceiver();
|
||||
}
|
||||
result.first = &(region.fRegion);
|
||||
result.second = cfg.id.value();
|
||||
|
||||
if (newRegionCreated) {
|
||||
(fEventCounter->fCount)++;
|
||||
}
|
||||
result.first = &(region);
|
||||
result.second = id;
|
||||
}
|
||||
fRegionsGen += 1; // signal TL cache invalidation
|
||||
fRegionEventsShmCV->notify_all();
|
||||
@@ -438,7 +431,7 @@ class Manager
|
||||
}
|
||||
}
|
||||
|
||||
Region* GetRegion(const uint16_t id)
|
||||
UnmanagedRegion* GetRegion(const uint16_t id)
|
||||
{
|
||||
// NOTE: gcc optimizations. Prevent loading tls addresses many times in the fast path
|
||||
const auto &lTlCache = fTlRegionCache;
|
||||
@@ -464,7 +457,7 @@ class Manager
|
||||
return lRegion;
|
||||
}
|
||||
|
||||
Region* GetRegionUnsafe(const uint16_t id)
|
||||
UnmanagedRegion* GetRegionUnsafe(const uint16_t id)
|
||||
{
|
||||
// remote region could actually be a local one if a message originates from this device (has been sent out and returned)
|
||||
auto it = fRegions.find(id);
|
||||
@@ -480,7 +473,9 @@ class Manager
|
||||
cfg.path = regionInfo.fPath.c_str();
|
||||
// LOG(debug) << "Located remote region with id '" << id << "', path: '" << cfg.path << "', flags: '" << cfg.creationFlags << "'";
|
||||
|
||||
auto r = fRegions.emplace(id, std::make_unique<Region>(fShmId, 0, true, nullptr, nullptr, std::move(cfg)));
|
||||
auto r = fRegions.emplace(id, std::make_unique<UnmanagedRegion>(fShmId, 0, true, std::move(cfg)));
|
||||
r.first->second->InitializeQueues();
|
||||
r.first->second->StartAckSender();
|
||||
return r.first->second.get();
|
||||
} catch (std::out_of_range& oor) {
|
||||
LOG(error) << "Could not get remote region with id '" << id << "'. Does the region creator run with the same session id?";
|
||||
@@ -499,7 +494,7 @@ class Manager
|
||||
fRegions.at(id)->StopAcks();
|
||||
{
|
||||
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> lock(*fShmMtx);
|
||||
if (fRegions.at(id)->fRemoveOnDestruction) {
|
||||
if (fRegions.at(id)->RemoveOnDestruction()) {
|
||||
fShmRegions->at(id).fDestroyed = true;
|
||||
(fEventCounter->fCount)++;
|
||||
}
|
||||
@@ -531,8 +526,8 @@ class Manager
|
||||
if (!e.second.fDestroyed) {
|
||||
auto region = GetRegionUnsafe(info.id);
|
||||
if (region) {
|
||||
info.ptr = region->fRegion.get_address();
|
||||
info.size = region->fRegion.get_size();
|
||||
info.ptr = region->GetData();
|
||||
info.size = region->GetSize();
|
||||
} else {
|
||||
throw std::runtime_error(tools::ToString("GetRegionInfoUnsafe() could not get region with id '", info.id, "'"));
|
||||
}
|
||||
@@ -789,8 +784,12 @@ class Manager
|
||||
LOG(error) << "Manager could not acquire lock: " << e.what();
|
||||
}
|
||||
|
||||
if (lastRemoved && !fNoCleanup) {
|
||||
Monitor::Cleanup(ShmId{fShmId});
|
||||
if (lastRemoved) {
|
||||
if (!fNoCleanup) {
|
||||
Monitor::Cleanup(ShmId{fShmId});
|
||||
} else {
|
||||
Monitor::RemoveObject("fmq_" + fShmId + "_mng");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -808,8 +807,8 @@ class Manager
|
||||
uint64_t fShmId64;
|
||||
std::string fShmId;
|
||||
uint16_t fSegmentId;
|
||||
std::unordered_map<uint16_t, boost::variant<RBTreeBestFitSegment, SimpleSeqFitSegment>> fSegments; // TODO: can use Segment class directly here
|
||||
boost::interprocess::managed_shared_memory fManagementSegment;
|
||||
std::unordered_map<uint16_t, boost::variant<RBTreeBestFitSegment, SimpleSeqFitSegment>> fSegments; // TODO: refactor to use Segment class
|
||||
boost::interprocess::managed_shared_memory fManagementSegment; // TODO: refactor to use ManagementSegment class
|
||||
VoidAlloc fShmVoidAlloc;
|
||||
boost::interprocess::interprocess_mutex* fShmMtx;
|
||||
|
||||
@@ -823,11 +822,11 @@ class Manager
|
||||
EventCounter* fEventCounter;
|
||||
Uint16SegmentInfoHashMap* fShmSegments;
|
||||
Uint16RegionInfoHashMap* fShmRegions;
|
||||
std::unordered_map<uint16_t, std::unique_ptr<Region>> fRegions;
|
||||
std::unordered_map<uint16_t, std::unique_ptr<UnmanagedRegion>> fRegions;
|
||||
inline static std::atomic<unsigned long> fRegionsGen = 0ul;
|
||||
inline static thread_local struct ManagerTLCache {
|
||||
unsigned long fRegionsTLCacheGen;
|
||||
std::vector<std::tuple<Region*, uint16_t, uint64_t>> fRegionsTLCache;
|
||||
std::vector<std::tuple<UnmanagedRegion*, uint16_t, uint64_t>> fRegionsTLCache;
|
||||
} fTlRegionCache;
|
||||
|
||||
#ifdef FAIRMQ_DEBUG_MODE
|
||||
|
Reference in New Issue
Block a user