Compare commits

...

4 Commits

Author SHA1 Message Date
Alexey Rybalchenko
f6bade32bb modify keep-alive example executable a bit, make it configurable 2022-01-12 19:54:49 +01:00
Alexey Rybalchenko
ddf9bc7272 shm: keep mng segment around when skipping cleanup 2022-01-12 19:54:49 +01:00
Alexey Rybalchenko
f79a0714b4 shm: fix double unlock() 2022-01-12 19:54:49 +01:00
Alexey Rybalchenko
c04958e2a4 shm: reduce contention on region events 2022-01-10 19:42:08 +01:00
3 changed files with 153 additions and 117 deletions

View File

@@ -14,13 +14,18 @@
#include <fairlogger/Logger.h> #include <fairlogger/Logger.h>
#include <boost/algorithm/string.hpp>
#include <boost/program_options.hpp>
#include <csignal> #include <csignal>
#include <chrono> #include <chrono>
#include <map>
#include <string> #include <string>
#include <thread> #include <thread>
using namespace std; using namespace std;
using namespace boost::program_options;
namespace namespace
{ {
@@ -32,19 +37,69 @@ void signalHandler(int /* signal */)
gStopping = 1; gStopping = 1;
} }
struct ShmRemover struct ShmManager
{ {
ShmRemover(std::string _shmId) : shmId(std::move(_shmId)) {} ShmManager(uint64_t _shmId, const vector<string>& _segments, const vector<string>& _regions)
~ShmRemover() : shmId(fair::mq::shmem::makeShmIdStr(_shmId))
{ {
// This will clean all segments, regions and any other shmem objects belonging to this shmId for (const auto& s : _segments) {
vector<string> segmentConf;
boost::algorithm::split(segmentConf, s, boost::algorithm::is_any_of(","));
if (segmentConf.size() != 2) {
LOG(error) << "incorrect format for --segments. Expecting pairs of <id>,<size>.";
fair::mq::shmem::Monitor::Cleanup(fair::mq::shmem::ShmId{shmId});
throw runtime_error("incorrect format for --segments. Expecting pairs of <id>,<size>.");
}
uint16_t id = stoi(segmentConf.at(0));
uint64_t size = stoull(segmentConf.at(1));
auto ret = segments.emplace(id, fair::mq::shmem::Segment(shmId, id, size, fair::mq::shmem::rbTreeBestFit));
fair::mq::shmem::Segment& segment = ret.first->second;
LOG(info) << "Created segment " << id << " of size " << segment.GetSize() << ", starting at " << segment.GetData() << ". Locking...";
segment.Lock();
LOG(info) << "Done.";
LOG(info) << "Zeroing...";
segment.Zero();
LOG(info) << "Done.";
}
for (const auto& r : _regions) {
vector<string> regionConf;
boost::algorithm::split(regionConf, r, boost::algorithm::is_any_of(","));
if (regionConf.size() != 2) {
LOG(error) << "incorrect format for --regions. Expecting pairs of <id>,<size>.";
fair::mq::shmem::Monitor::Cleanup(fair::mq::shmem::ShmId{shmId});
throw runtime_error("incorrect format for --regions. Expecting pairs of <id>,<size>.");
}
uint16_t id = stoi(regionConf.at(0));
uint64_t size = stoull(regionConf.at(1));
auto ret = regions.emplace(id, make_unique<fair::mq::shmem::UnmanagedRegion>(shmId, id, size));
fair::mq::shmem::UnmanagedRegion& region = *(ret.first->second);
LOG(info) << "Created unamanged region " << id << " of size " << region.GetSize() << ", starting at " << region.GetData() << ". Locking...";
region.Lock();
LOG(info) << "Done.";
LOG(info) << "Zeroing...";
region.Zero();
LOG(info) << "Done.";
}
}
void ResetContent()
{
fair::mq::shmem::Monitor::ResetContent(fair::mq::shmem::ShmId{shmId});
}
~ShmManager()
{
// clean all segments, regions and any other shmem objects belonging to this shmId
fair::mq::shmem::Monitor::Cleanup(fair::mq::shmem::ShmId{shmId}); fair::mq::shmem::Monitor::Cleanup(fair::mq::shmem::ShmId{shmId});
} }
std::string shmId; std::string shmId;
map<uint16_t, fair::mq::shmem::Segment> segments;
map<uint16_t, unique_ptr<fair::mq::shmem::UnmanagedRegion>> regions;
}; };
int main(int /* argc */, char** /* argv */) int main(int argc, char** argv)
{ {
fair::Logger::SetConsoleColor(true); fair::Logger::SetConsoleColor(true);
@@ -52,47 +107,28 @@ int main(int /* argc */, char** /* argv */)
signal(SIGTERM, signalHandler); signal(SIGTERM, signalHandler);
try { try {
const string session = "default"; // to_string(fair::mq::tools::UuidHash()); uint64_t shmId = 0;
// generate shmId out of session id + user id (geteuid). vector<string> segments;
const string shmId = fair::mq::shmem::makeShmIdStr(session); vector<string> regions;
const uint16_t s1id = 0; options_description desc("Options");
const uint64_t s1size = 100000000; desc.add_options()
const uint16_t s2id = 1; ("shmid", value<uint64_t>(&shmId)->required(), "Shm id")
const uint64_t s2size = 200000000; ("segments", value<vector<string>>(&segments)->multitoken()->composing(), "Segments, as <id>,<size> <id>,<size> <id>,<size> ...")
("regions", value<vector<string>>(&regions)->multitoken()->composing(), "Regions, as <id>,<size> <id>,<size> <id>,<size> ...")
("help,h", "Print help");
const uint16_t r1id = 0; variables_map vm;
const uint64_t r1size = 100000000; store(parse_command_line(argc, argv, desc), vm);
const uint16_t r2id = 1;
const uint64_t r2size = 200000000;
// cleanup when done if (vm.count("help")) {
ShmRemover shmRemover(shmId); LOG(info) << "ShmManager" << "\n" << desc;
return 0;
}
// managed segments notify(vm);
fair::mq::shmem::Segment segment1(shmId, s1id, s1size, fair::mq::shmem::rbTreeBestFit);
segment1.Lock();
segment1.Zero();
LOG(info) << "Created segment " << s1id << " of size " << segment1.GetSize() << " starting at " << segment1.GetData();
fair::mq::shmem::Segment segment2(shmId, s2id, s2size, fair::mq::shmem::rbTreeBestFit); ShmManager shmManager(shmId, segments, regions);
segment2.Lock();
segment2.Zero();
LOG(info) << "Created segment " << s2id << " of size " << segment2.GetSize() << " starting at " << segment2.GetData();
// unmanaged regions
fair::mq::shmem::UnmanagedRegion region1(shmId, r1id, r1size);
region1.Lock();
region1.Zero();
LOG(info) << "Created region " << r1id << " of size " << region1.GetSize() << " starting at " << region1.GetData();
fair::mq::shmem::UnmanagedRegion region2(shmId, r2id, r2size);
region2.Lock();
region2.Zero();
LOG(info) << "Created region " << r2id << " of size " << region2.GetSize() << " starting at " << region2.GetData();
// for a "soft reset" call (shmem should not be in active use by (no messages in flight) devices during this call):
// fair::mq::shmem::Monitor::ResetContent(fair::mq::shmem::ShmId{shmId});
while (!gStopping) { while (!gStopping) {
std::this_thread::sleep_for(std::chrono::milliseconds(50)); std::this_thread::sleep_for(std::chrono::milliseconds(50));

View File

@@ -135,7 +135,6 @@ class Manager
, fManagementSegment(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_mng").c_str(), 6553600) , fManagementSegment(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_mng").c_str(), 6553600)
, fShmVoidAlloc(fManagementSegment.get_segment_manager()) , fShmVoidAlloc(fManagementSegment.get_segment_manager())
, fShmMtx(fManagementSegment.find_or_construct<boost::interprocess::interprocess_mutex>(boost::interprocess::unique_instance)()) , fShmMtx(fManagementSegment.find_or_construct<boost::interprocess::interprocess_mutex>(boost::interprocess::unique_instance)())
, fRegionEventsShmCV(fManagementSegment.find_or_construct<boost::interprocess::interprocess_condition>(boost::interprocess::unique_instance)())
, fNumObservedEvents(0) , fNumObservedEvents(0)
, fDeviceCounter(nullptr) , fDeviceCounter(nullptr)
, fEventCounter(nullptr) , fEventCounter(nullptr)
@@ -241,7 +240,6 @@ class Manager
fSegments.emplace(fSegmentId, SimpleSeqFitSegment(open_or_create, segmentName.c_str(), size)); fSegments.emplace(fSegmentId, SimpleSeqFitSegment(open_or_create, segmentName.c_str(), size));
fShmSegments->emplace(fSegmentId, AllocationAlgorithm::simple_seq_fit); fShmSegments->emplace(fSegmentId, AllocationAlgorithm::simple_seq_fit);
} }
(fEventCounter->fCount)++;
if (mlockSegmentOnCreation) { if (mlockSegmentOnCreation) {
MlockSegment(fSegmentId); MlockSegment(fSegmentId);
} }
@@ -280,6 +278,8 @@ class Manager
ZeroSegment(fSegmentId); ZeroSegment(fSegmentId);
} }
(fEventCounter->fCount)++;
#ifdef FAIRMQ_DEBUG_MODE #ifdef FAIRMQ_DEBUG_MODE
fMsgDebug = fManagementSegment.find_or_construct<Uint16MsgDebugMapHashMap>(unique_instance)(fShmVoidAlloc); fMsgDebug = fManagementSegment.find_or_construct<Uint16MsgDebugMapHashMap>(unique_instance)(fShmVoidAlloc);
fShmMsgCounters = fManagementSegment.find_or_construct<Uint16MsgCounterHashMap>(unique_instance)(fShmVoidAlloc); fShmMsgCounters = fManagementSegment.find_or_construct<Uint16MsgCounterHashMap>(unique_instance)(fShmVoidAlloc);
@@ -399,17 +399,6 @@ class Manager
region.fRemote = false; // TODO: this should be more clear, refactor it. region.fRemote = false; // TODO: this should be more clear, refactor it.
} }
if (cfg.lock) {
LOG(debug) << "Locking region " << id << "...";
region.Lock();
LOG(debug) << "Successfully locked region " << id << ".";
}
if (cfg.zero) {
LOG(debug) << "Zeroing free memory of region " << id << "...";
region.Zero();
LOG(debug) << "Successfully zeroed free memory of region " << id << ".";
}
// start ack receiver only if a callback has been provided. // start ack receiver only if a callback has been provided.
if (callback || bulkCallback) { if (callback || bulkCallback) {
region.SetCallbacks(callback, bulkCallback); region.SetCallbacks(callback, bulkCallback);
@@ -421,7 +410,6 @@ class Manager
result.second = id; result.second = id;
} }
fRegionsGen += 1; // signal TL cache invalidation fRegionsGen += 1; // signal TL cache invalidation
fRegionEventsShmCV->notify_all();
return result; return result;
} catch (interprocess_exception& e) { } catch (interprocess_exception& e) {
@@ -445,19 +433,19 @@ class Manager
} }
} }
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> lock(*fShmMtx); boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> shmLock(*fShmMtx);
// slow path: check invalidation // slow path: check invalidation
if (lTlCacheGen != fRegionsGen) { if (lTlCacheGen != fRegionsGen) {
fTlRegionCache.fRegionsTLCache.clear(); fTlRegionCache.fRegionsTLCache.clear();
} }
auto *lRegion = GetRegionUnsafe(id); auto* lRegion = GetRegionUnsafe(id, shmLock);
fTlRegionCache.fRegionsTLCache.emplace_back(std::make_tuple(lRegion, id, fShmId64)); fTlRegionCache.fRegionsTLCache.emplace_back(std::make_tuple(lRegion, id, fShmId64));
fTlRegionCache.fRegionsTLCacheGen = fRegionsGen; fTlRegionCache.fRegionsTLCacheGen = fRegionsGen;
return lRegion; return lRegion;
} }
UnmanagedRegion* GetRegionUnsafe(const uint16_t id) UnmanagedRegion* GetRegionUnsafe(const uint16_t id, boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex>& lockedShmLock)
{ {
// remote region could actually be a local one if a message originates from this device (has been sent out and returned) // remote region could actually be a local one if a message originates from this device (has been sent out and returned)
auto it = fRegions.find(id); auto it = fRegions.find(id);
@@ -467,6 +455,8 @@ class Manager
try { try {
// get region info // get region info
RegionInfo regionInfo = fShmRegions->at(id); RegionInfo regionInfo = fShmRegions->at(id);
// safe to unlock now - no shm container accessed after this
lockedShmLock.unlock();
RegionConfig cfg; RegionConfig cfg;
cfg.id = id; cfg.id = id;
cfg.creationFlags = regionInfo.fCreationFlags; cfg.creationFlags = regionInfo.fCreationFlags;
@@ -476,6 +466,7 @@ class Manager
auto r = fRegions.emplace(id, std::make_unique<UnmanagedRegion>(fShmId, 0, true, std::move(cfg))); auto r = fRegions.emplace(id, std::make_unique<UnmanagedRegion>(fShmId, 0, true, std::move(cfg)));
r.first->second->InitializeQueues(); r.first->second->InitializeQueues();
r.first->second->StartAckSender(); r.first->second->StartAckSender();
lockedShmLock.lock();
return r.first->second.get(); return r.first->second.get();
} catch (std::out_of_range& oor) { } catch (std::out_of_range& oor) {
LOG(error) << "Could not get remote region with id '" << id << "'. Does the region creator run with the same session id?"; LOG(error) << "Could not get remote region with id '" << id << "'. Does the region creator run with the same session id?";
@@ -500,7 +491,6 @@ class Manager
} }
fRegions.erase(id); fRegions.erase(id);
} }
fRegionEventsShmCV->notify_all();
} catch (std::out_of_range& oor) { } catch (std::out_of_range& oor) {
LOG(debug) << "RemoveRegion() could not locate region with id '" << id << "'"; LOG(debug) << "RemoveRegion() could not locate region with id '" << id << "'";
} }
@@ -508,35 +498,9 @@ class Manager
} }
std::vector<fair::mq::RegionInfo> GetRegionInfo() std::vector<fair::mq::RegionInfo> GetRegionInfo()
{
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> lock(*fShmMtx);
return GetRegionInfoUnsafe();
}
std::vector<fair::mq::RegionInfo> GetRegionInfoUnsafe()
{ {
std::vector<fair::mq::RegionInfo> result; std::vector<fair::mq::RegionInfo> result;
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> shmLock(*fShmMtx);
for (const auto& e : *fShmRegions) {
fair::mq::RegionInfo info;
info.managed = false;
info.id = e.first;
info.flags = e.second.fUserFlags;
info.event = e.second.fDestroyed ? RegionEvent::destroyed : RegionEvent::created;
if (!e.second.fDestroyed) {
auto region = GetRegionUnsafe(info.id);
if (region) {
info.ptr = region->GetData();
info.size = region->GetSize();
} else {
throw std::runtime_error(tools::ToString("GetRegionInfoUnsafe() could not get region with id '", info.id, "'"));
}
} else {
info.ptr = nullptr;
info.size = 0;
}
result.push_back(info);
}
for (const auto& e : *fShmSegments) { for (const auto& e : *fShmSegments) {
// make sure any segments in the session are found // make sure any segments in the session are found
@@ -555,6 +519,27 @@ class Manager
} }
} }
for (const auto& e : *fShmRegions) {
fair::mq::RegionInfo info;
info.managed = false;
info.id = e.first;
info.flags = e.second.fUserFlags;
info.event = e.second.fDestroyed ? RegionEvent::destroyed : RegionEvent::created;
if (info.event == RegionEvent::created) {
auto region = GetRegionUnsafe(info.id, shmLock);
if (region) {
info.ptr = region->GetData();
info.size = region->GetSize();
} else {
throw std::runtime_error(tools::ToString("GetRegionInfo() could not get region with id '", info.id, "'"));
}
} else {
info.ptr = nullptr;
info.size = 0;
}
result.push_back(info);
}
return result; return result;
} }
@@ -562,13 +547,13 @@ class Manager
{ {
if (fRegionEventThread.joinable()) { if (fRegionEventThread.joinable()) {
LOG(debug) << "Already subscribed. Overwriting previous subscription."; LOG(debug) << "Already subscribed. Overwriting previous subscription.";
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> lock(*fShmMtx); std::unique_lock<std::mutex> lock(fRegionEventsMtx);
fRegionEventsSubscriptionActive = false; fRegionEventsSubscriptionActive = false;
lock.unlock(); lock.unlock();
fRegionEventsShmCV->notify_all(); fRegionEventsCV.notify_one();
fRegionEventThread.join(); fRegionEventThread.join();
} }
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> lock(*fShmMtx); std::lock_guard<std::mutex> lock(fRegionEventsMtx);
fRegionEventCallback = callback; fRegionEventCallback = callback;
fRegionEventsSubscriptionActive = true; fRegionEventsSubscriptionActive = true;
fRegionEventThread = std::thread(&Manager::RegionEventsSubscription, this); fRegionEventThread = std::thread(&Manager::RegionEventsSubscription, this);
@@ -579,10 +564,10 @@ class Manager
void UnsubscribeFromRegionEvents() void UnsubscribeFromRegionEvents()
{ {
if (fRegionEventThread.joinable()) { if (fRegionEventThread.joinable()) {
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> lock(*fShmMtx); std::unique_lock<std::mutex> lock(fRegionEventsMtx);
fRegionEventsSubscriptionActive = false; fRegionEventsSubscriptionActive = false;
lock.unlock(); lock.unlock();
fRegionEventsShmCV->notify_all(); fRegionEventsCV.notify_one();
fRegionEventThread.join(); fRegionEventThread.join();
lock.lock(); lock.lock();
fRegionEventCallback = nullptr; fRegionEventCallback = nullptr;
@@ -591,33 +576,38 @@ class Manager
void RegionEventsSubscription() void RegionEventsSubscription()
{ {
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> lock(*fShmMtx); std::unique_lock<std::mutex> lock(fRegionEventsMtx);
while (fRegionEventsSubscriptionActive) { while (fRegionEventsSubscriptionActive) {
auto infos = GetRegionInfoUnsafe(); if (fNumObservedEvents != fEventCounter->fCount) {
for (const auto& i : infos) { auto infos = GetRegionInfo();
auto el = fObservedRegionEvents.find({i.id, i.managed});
if (el == fObservedRegionEvents.end()) { // if event id has not been observed for (const auto& i : infos) {
fObservedRegionEvents.emplace(std::make_pair(i.id, i.managed), i.event); auto el = fObservedRegionEvents.find({i.id, i.managed});
// if a region has been created and destroyed rapidly, we could see 'destroyed' without ever seeing 'created' if (el == fObservedRegionEvents.end()) { // if event id has not been observed
// TODO: do we care to show 'created' events if we know region is already destroyed? fObservedRegionEvents.emplace(std::make_pair(i.id, i.managed), i.event);
if (i.event == RegionEvent::created) { // if a region has been created and destroyed rapidly, we could see 'destroyed' without ever seeing 'created'
fRegionEventCallback(i); // TODO: do we care to show 'created' events if we know region is already destroyed?
++fNumObservedEvents; if (i.event == RegionEvent::created) {
} else { fRegionEventCallback(i);
fNumObservedEvents += 2; ++fNumObservedEvents;
} } else {
} else { // if event id has been observed (expected - there are two events per id - created & destroyed) fNumObservedEvents += 2;
// fire a callback if we have observed 'created' event and incoming is 'destroyed' }
if (el->second == RegionEvent::created && i.event == RegionEvent::destroyed) { } else { // if event id has been observed (expected - there are two events per id - created & destroyed)
fRegionEventCallback(i); // fire a callback if we have observed 'created' event and incoming is 'destroyed'
el->second = i.event; if (el->second == RegionEvent::created && i.event == RegionEvent::destroyed) {
++fNumObservedEvents; fRegionEventCallback(i);
} else { el->second = i.event;
// LOG(debug) << "ignoring event " << i.id << ": incoming: " << i.event << ", stored: " << el->second; ++fNumObservedEvents;
} else {
// LOG(debug) << "ignoring event " << i.id << ": incoming: " << i.event << ", stored: " << el->second;
}
} }
} }
} }
fRegionEventsShmCV->wait(lock, [&] { return !fRegionEventsSubscriptionActive || fNumObservedEvents != fEventCounter->fCount; }); // TODO: do better than polling here, without adding too much shmem contention
fRegionEventsCV.wait_for(lock, std::chrono::milliseconds(50), [&] { return !fRegionEventsSubscriptionActive; });
} }
} }
@@ -787,8 +777,6 @@ class Manager
if (lastRemoved) { if (lastRemoved) {
if (!fNoCleanup) { if (!fNoCleanup) {
Monitor::Cleanup(ShmId{fShmId}); Monitor::Cleanup(ShmId{fShmId});
} else {
Monitor::RemoveObject("fmq_" + fShmId + "_mng");
} }
} }
} }
@@ -812,7 +800,8 @@ class Manager
VoidAlloc fShmVoidAlloc; VoidAlloc fShmVoidAlloc;
boost::interprocess::interprocess_mutex* fShmMtx; boost::interprocess::interprocess_mutex* fShmMtx;
boost::interprocess::interprocess_condition* fRegionEventsShmCV; std::mutex fRegionEventsMtx;
std::condition_variable fRegionEventsCV;
std::thread fRegionEventThread; std::thread fRegionEventThread;
std::function<void(fair::mq::RegionInfo)> fRegionEventCallback; std::function<void(fair::mq::RegionInfo)> fRegionEventCallback;
std::map<std::pair<uint16_t, bool>, RegionEvent> fObservedRegionEvents; // pair: <region id, managed> std::map<std::pair<uint16_t, bool>, RegionEvent> fObservedRegionEvents; // pair: <region id, managed>

View File

@@ -107,6 +107,17 @@ struct UnmanagedRegion
} }
} }
if (cfg.lock) {
LOG(debug) << "Locking region " << cfg.id.value() << "...";
Lock();
LOG(debug) << "Successfully locked region " << cfg.id.value() << ".";
}
if (cfg.zero) {
LOG(debug) << "Zeroing free memory of region " << cfg.id.value() << "...";
Zero();
LOG(debug) << "Successfully zeroed free memory of region " << cfg.id.value() << ".";
}
if (!remote) { if (!remote) {
Register(shmId, cfg); Register(shmId, cfg);
} }