Compare commits

...

4 Commits

Author SHA1 Message Date
Alexey Rybalchenko
f6bade32bb modify keep-alive example executable a bit, make it configurable 2022-01-12 19:54:49 +01:00
Alexey Rybalchenko
ddf9bc7272 shm: keep mng segment around when skipping cleanup 2022-01-12 19:54:49 +01:00
Alexey Rybalchenko
f79a0714b4 shm: fix double unlock() 2022-01-12 19:54:49 +01:00
Alexey Rybalchenko
c04958e2a4 shm: reduce contention on region events 2022-01-10 19:42:08 +01:00
3 changed files with 153 additions and 117 deletions

View File

@@ -14,13 +14,18 @@
#include <fairlogger/Logger.h>
#include <boost/algorithm/string.hpp>
#include <boost/program_options.hpp>
#include <csignal>
#include <chrono>
#include <map>
#include <string>
#include <thread>
using namespace std;
using namespace boost::program_options;
namespace
{
@@ -32,19 +37,69 @@ void signalHandler(int /* signal */)
gStopping = 1;
}
struct ShmRemover
struct ShmManager
{
ShmRemover(std::string _shmId) : shmId(std::move(_shmId)) {}
~ShmRemover()
ShmManager(uint64_t _shmId, const vector<string>& _segments, const vector<string>& _regions)
: shmId(fair::mq::shmem::makeShmIdStr(_shmId))
{
// This will clean all segments, regions and any other shmem objects belonging to this shmId
for (const auto& s : _segments) {
vector<string> segmentConf;
boost::algorithm::split(segmentConf, s, boost::algorithm::is_any_of(","));
if (segmentConf.size() != 2) {
LOG(error) << "incorrect format for --segments. Expecting pairs of <id>,<size>.";
fair::mq::shmem::Monitor::Cleanup(fair::mq::shmem::ShmId{shmId});
throw runtime_error("incorrect format for --segments. Expecting pairs of <id>,<size>.");
}
uint16_t id = stoi(segmentConf.at(0));
uint64_t size = stoull(segmentConf.at(1));
auto ret = segments.emplace(id, fair::mq::shmem::Segment(shmId, id, size, fair::mq::shmem::rbTreeBestFit));
fair::mq::shmem::Segment& segment = ret.first->second;
LOG(info) << "Created segment " << id << " of size " << segment.GetSize() << ", starting at " << segment.GetData() << ". Locking...";
segment.Lock();
LOG(info) << "Done.";
LOG(info) << "Zeroing...";
segment.Zero();
LOG(info) << "Done.";
}
for (const auto& r : _regions) {
vector<string> regionConf;
boost::algorithm::split(regionConf, r, boost::algorithm::is_any_of(","));
if (regionConf.size() != 2) {
LOG(error) << "incorrect format for --regions. Expecting pairs of <id>,<size>.";
fair::mq::shmem::Monitor::Cleanup(fair::mq::shmem::ShmId{shmId});
throw runtime_error("incorrect format for --regions. Expecting pairs of <id>,<size>.");
}
uint16_t id = stoi(regionConf.at(0));
uint64_t size = stoull(regionConf.at(1));
auto ret = regions.emplace(id, make_unique<fair::mq::shmem::UnmanagedRegion>(shmId, id, size));
fair::mq::shmem::UnmanagedRegion& region = *(ret.first->second);
LOG(info) << "Created unamanged region " << id << " of size " << region.GetSize() << ", starting at " << region.GetData() << ". Locking...";
region.Lock();
LOG(info) << "Done.";
LOG(info) << "Zeroing...";
region.Zero();
LOG(info) << "Done.";
}
}
void ResetContent()
{
fair::mq::shmem::Monitor::ResetContent(fair::mq::shmem::ShmId{shmId});
}
~ShmManager()
{
// clean all segments, regions and any other shmem objects belonging to this shmId
fair::mq::shmem::Monitor::Cleanup(fair::mq::shmem::ShmId{shmId});
}
std::string shmId;
map<uint16_t, fair::mq::shmem::Segment> segments;
map<uint16_t, unique_ptr<fair::mq::shmem::UnmanagedRegion>> regions;
};
int main(int /* argc */, char** /* argv */)
int main(int argc, char** argv)
{
fair::Logger::SetConsoleColor(true);
@@ -52,47 +107,28 @@ int main(int /* argc */, char** /* argv */)
signal(SIGTERM, signalHandler);
try {
const string session = "default"; // to_string(fair::mq::tools::UuidHash());
// generate shmId out of session id + user id (geteuid).
const string shmId = fair::mq::shmem::makeShmIdStr(session);
uint64_t shmId = 0;
vector<string> segments;
vector<string> regions;
const uint16_t s1id = 0;
const uint64_t s1size = 100000000;
const uint16_t s2id = 1;
const uint64_t s2size = 200000000;
options_description desc("Options");
desc.add_options()
("shmid", value<uint64_t>(&shmId)->required(), "Shm id")
("segments", value<vector<string>>(&segments)->multitoken()->composing(), "Segments, as <id>,<size> <id>,<size> <id>,<size> ...")
("regions", value<vector<string>>(&regions)->multitoken()->composing(), "Regions, as <id>,<size> <id>,<size> <id>,<size> ...")
("help,h", "Print help");
const uint16_t r1id = 0;
const uint64_t r1size = 100000000;
const uint16_t r2id = 1;
const uint64_t r2size = 200000000;
variables_map vm;
store(parse_command_line(argc, argv, desc), vm);
// cleanup when done
ShmRemover shmRemover(shmId);
if (vm.count("help")) {
LOG(info) << "ShmManager" << "\n" << desc;
return 0;
}
// managed segments
fair::mq::shmem::Segment segment1(shmId, s1id, s1size, fair::mq::shmem::rbTreeBestFit);
segment1.Lock();
segment1.Zero();
LOG(info) << "Created segment " << s1id << " of size " << segment1.GetSize() << " starting at " << segment1.GetData();
notify(vm);
fair::mq::shmem::Segment segment2(shmId, s2id, s2size, fair::mq::shmem::rbTreeBestFit);
segment2.Lock();
segment2.Zero();
LOG(info) << "Created segment " << s2id << " of size " << segment2.GetSize() << " starting at " << segment2.GetData();
// unmanaged regions
fair::mq::shmem::UnmanagedRegion region1(shmId, r1id, r1size);
region1.Lock();
region1.Zero();
LOG(info) << "Created region " << r1id << " of size " << region1.GetSize() << " starting at " << region1.GetData();
fair::mq::shmem::UnmanagedRegion region2(shmId, r2id, r2size);
region2.Lock();
region2.Zero();
LOG(info) << "Created region " << r2id << " of size " << region2.GetSize() << " starting at " << region2.GetData();
// for a "soft reset" call (shmem should not be in active use by (no messages in flight) devices during this call):
// fair::mq::shmem::Monitor::ResetContent(fair::mq::shmem::ShmId{shmId});
ShmManager shmManager(shmId, segments, regions);
while (!gStopping) {
std::this_thread::sleep_for(std::chrono::milliseconds(50));

View File

@@ -135,7 +135,6 @@ class Manager
, fManagementSegment(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_mng").c_str(), 6553600)
, fShmVoidAlloc(fManagementSegment.get_segment_manager())
, fShmMtx(fManagementSegment.find_or_construct<boost::interprocess::interprocess_mutex>(boost::interprocess::unique_instance)())
, fRegionEventsShmCV(fManagementSegment.find_or_construct<boost::interprocess::interprocess_condition>(boost::interprocess::unique_instance)())
, fNumObservedEvents(0)
, fDeviceCounter(nullptr)
, fEventCounter(nullptr)
@@ -241,7 +240,6 @@ class Manager
fSegments.emplace(fSegmentId, SimpleSeqFitSegment(open_or_create, segmentName.c_str(), size));
fShmSegments->emplace(fSegmentId, AllocationAlgorithm::simple_seq_fit);
}
(fEventCounter->fCount)++;
if (mlockSegmentOnCreation) {
MlockSegment(fSegmentId);
}
@@ -280,6 +278,8 @@ class Manager
ZeroSegment(fSegmentId);
}
(fEventCounter->fCount)++;
#ifdef FAIRMQ_DEBUG_MODE
fMsgDebug = fManagementSegment.find_or_construct<Uint16MsgDebugMapHashMap>(unique_instance)(fShmVoidAlloc);
fShmMsgCounters = fManagementSegment.find_or_construct<Uint16MsgCounterHashMap>(unique_instance)(fShmVoidAlloc);
@@ -399,17 +399,6 @@ class Manager
region.fRemote = false; // TODO: this should be more clear, refactor it.
}
if (cfg.lock) {
LOG(debug) << "Locking region " << id << "...";
region.Lock();
LOG(debug) << "Successfully locked region " << id << ".";
}
if (cfg.zero) {
LOG(debug) << "Zeroing free memory of region " << id << "...";
region.Zero();
LOG(debug) << "Successfully zeroed free memory of region " << id << ".";
}
// start ack receiver only if a callback has been provided.
if (callback || bulkCallback) {
region.SetCallbacks(callback, bulkCallback);
@@ -421,7 +410,6 @@ class Manager
result.second = id;
}
fRegionsGen += 1; // signal TL cache invalidation
fRegionEventsShmCV->notify_all();
return result;
} catch (interprocess_exception& e) {
@@ -445,19 +433,19 @@ class Manager
}
}
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> lock(*fShmMtx);
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> shmLock(*fShmMtx);
// slow path: check invalidation
if (lTlCacheGen != fRegionsGen) {
fTlRegionCache.fRegionsTLCache.clear();
}
auto *lRegion = GetRegionUnsafe(id);
auto* lRegion = GetRegionUnsafe(id, shmLock);
fTlRegionCache.fRegionsTLCache.emplace_back(std::make_tuple(lRegion, id, fShmId64));
fTlRegionCache.fRegionsTLCacheGen = fRegionsGen;
return lRegion;
}
UnmanagedRegion* GetRegionUnsafe(const uint16_t id)
UnmanagedRegion* GetRegionUnsafe(const uint16_t id, boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex>& lockedShmLock)
{
// remote region could actually be a local one if a message originates from this device (has been sent out and returned)
auto it = fRegions.find(id);
@@ -467,6 +455,8 @@ class Manager
try {
// get region info
RegionInfo regionInfo = fShmRegions->at(id);
// safe to unlock now - no shm container accessed after this
lockedShmLock.unlock();
RegionConfig cfg;
cfg.id = id;
cfg.creationFlags = regionInfo.fCreationFlags;
@@ -476,6 +466,7 @@ class Manager
auto r = fRegions.emplace(id, std::make_unique<UnmanagedRegion>(fShmId, 0, true, std::move(cfg)));
r.first->second->InitializeQueues();
r.first->second->StartAckSender();
lockedShmLock.lock();
return r.first->second.get();
} catch (std::out_of_range& oor) {
LOG(error) << "Could not get remote region with id '" << id << "'. Does the region creator run with the same session id?";
@@ -500,7 +491,6 @@ class Manager
}
fRegions.erase(id);
}
fRegionEventsShmCV->notify_all();
} catch (std::out_of_range& oor) {
LOG(debug) << "RemoveRegion() could not locate region with id '" << id << "'";
}
@@ -508,35 +498,9 @@ class Manager
}
std::vector<fair::mq::RegionInfo> GetRegionInfo()
{
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> lock(*fShmMtx);
return GetRegionInfoUnsafe();
}
std::vector<fair::mq::RegionInfo> GetRegionInfoUnsafe()
{
std::vector<fair::mq::RegionInfo> result;
for (const auto& e : *fShmRegions) {
fair::mq::RegionInfo info;
info.managed = false;
info.id = e.first;
info.flags = e.second.fUserFlags;
info.event = e.second.fDestroyed ? RegionEvent::destroyed : RegionEvent::created;
if (!e.second.fDestroyed) {
auto region = GetRegionUnsafe(info.id);
if (region) {
info.ptr = region->GetData();
info.size = region->GetSize();
} else {
throw std::runtime_error(tools::ToString("GetRegionInfoUnsafe() could not get region with id '", info.id, "'"));
}
} else {
info.ptr = nullptr;
info.size = 0;
}
result.push_back(info);
}
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> shmLock(*fShmMtx);
for (const auto& e : *fShmSegments) {
// make sure any segments in the session are found
@@ -555,6 +519,27 @@ class Manager
}
}
for (const auto& e : *fShmRegions) {
fair::mq::RegionInfo info;
info.managed = false;
info.id = e.first;
info.flags = e.second.fUserFlags;
info.event = e.second.fDestroyed ? RegionEvent::destroyed : RegionEvent::created;
if (info.event == RegionEvent::created) {
auto region = GetRegionUnsafe(info.id, shmLock);
if (region) {
info.ptr = region->GetData();
info.size = region->GetSize();
} else {
throw std::runtime_error(tools::ToString("GetRegionInfo() could not get region with id '", info.id, "'"));
}
} else {
info.ptr = nullptr;
info.size = 0;
}
result.push_back(info);
}
return result;
}
@@ -562,13 +547,13 @@ class Manager
{
if (fRegionEventThread.joinable()) {
LOG(debug) << "Already subscribed. Overwriting previous subscription.";
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> lock(*fShmMtx);
std::unique_lock<std::mutex> lock(fRegionEventsMtx);
fRegionEventsSubscriptionActive = false;
lock.unlock();
fRegionEventsShmCV->notify_all();
fRegionEventsCV.notify_one();
fRegionEventThread.join();
}
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> lock(*fShmMtx);
std::lock_guard<std::mutex> lock(fRegionEventsMtx);
fRegionEventCallback = callback;
fRegionEventsSubscriptionActive = true;
fRegionEventThread = std::thread(&Manager::RegionEventsSubscription, this);
@@ -579,10 +564,10 @@ class Manager
void UnsubscribeFromRegionEvents()
{
if (fRegionEventThread.joinable()) {
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> lock(*fShmMtx);
std::unique_lock<std::mutex> lock(fRegionEventsMtx);
fRegionEventsSubscriptionActive = false;
lock.unlock();
fRegionEventsShmCV->notify_all();
fRegionEventsCV.notify_one();
fRegionEventThread.join();
lock.lock();
fRegionEventCallback = nullptr;
@@ -591,33 +576,38 @@ class Manager
void RegionEventsSubscription()
{
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> lock(*fShmMtx);
std::unique_lock<std::mutex> lock(fRegionEventsMtx);
while (fRegionEventsSubscriptionActive) {
auto infos = GetRegionInfoUnsafe();
for (const auto& i : infos) {
auto el = fObservedRegionEvents.find({i.id, i.managed});
if (el == fObservedRegionEvents.end()) { // if event id has not been observed
fObservedRegionEvents.emplace(std::make_pair(i.id, i.managed), i.event);
// if a region has been created and destroyed rapidly, we could see 'destroyed' without ever seeing 'created'
// TODO: do we care to show 'created' events if we know region is already destroyed?
if (i.event == RegionEvent::created) {
fRegionEventCallback(i);
++fNumObservedEvents;
} else {
fNumObservedEvents += 2;
}
} else { // if event id has been observed (expected - there are two events per id - created & destroyed)
// fire a callback if we have observed 'created' event and incoming is 'destroyed'
if (el->second == RegionEvent::created && i.event == RegionEvent::destroyed) {
fRegionEventCallback(i);
el->second = i.event;
++fNumObservedEvents;
} else {
// LOG(debug) << "ignoring event " << i.id << ": incoming: " << i.event << ", stored: " << el->second;
if (fNumObservedEvents != fEventCounter->fCount) {
auto infos = GetRegionInfo();
for (const auto& i : infos) {
auto el = fObservedRegionEvents.find({i.id, i.managed});
if (el == fObservedRegionEvents.end()) { // if event id has not been observed
fObservedRegionEvents.emplace(std::make_pair(i.id, i.managed), i.event);
// if a region has been created and destroyed rapidly, we could see 'destroyed' without ever seeing 'created'
// TODO: do we care to show 'created' events if we know region is already destroyed?
if (i.event == RegionEvent::created) {
fRegionEventCallback(i);
++fNumObservedEvents;
} else {
fNumObservedEvents += 2;
}
} else { // if event id has been observed (expected - there are two events per id - created & destroyed)
// fire a callback if we have observed 'created' event and incoming is 'destroyed'
if (el->second == RegionEvent::created && i.event == RegionEvent::destroyed) {
fRegionEventCallback(i);
el->second = i.event;
++fNumObservedEvents;
} else {
// LOG(debug) << "ignoring event " << i.id << ": incoming: " << i.event << ", stored: " << el->second;
}
}
}
}
fRegionEventsShmCV->wait(lock, [&] { return !fRegionEventsSubscriptionActive || fNumObservedEvents != fEventCounter->fCount; });
// TODO: do better than polling here, without adding too much shmem contention
fRegionEventsCV.wait_for(lock, std::chrono::milliseconds(50), [&] { return !fRegionEventsSubscriptionActive; });
}
}
@@ -787,8 +777,6 @@ class Manager
if (lastRemoved) {
if (!fNoCleanup) {
Monitor::Cleanup(ShmId{fShmId});
} else {
Monitor::RemoveObject("fmq_" + fShmId + "_mng");
}
}
}
@@ -812,7 +800,8 @@ class Manager
VoidAlloc fShmVoidAlloc;
boost::interprocess::interprocess_mutex* fShmMtx;
boost::interprocess::interprocess_condition* fRegionEventsShmCV;
std::mutex fRegionEventsMtx;
std::condition_variable fRegionEventsCV;
std::thread fRegionEventThread;
std::function<void(fair::mq::RegionInfo)> fRegionEventCallback;
std::map<std::pair<uint16_t, bool>, RegionEvent> fObservedRegionEvents; // pair: <region id, managed>

View File

@@ -107,6 +107,17 @@ struct UnmanagedRegion
}
}
if (cfg.lock) {
LOG(debug) << "Locking region " << cfg.id.value() << "...";
Lock();
LOG(debug) << "Successfully locked region " << cfg.id.value() << ".";
}
if (cfg.zero) {
LOG(debug) << "Zeroing free memory of region " << cfg.id.value() << "...";
Zero();
LOG(debug) << "Successfully zeroed free memory of region " << cfg.id.value() << ".";
}
if (!remote) {
Register(shmId, cfg);
}