mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-16 10:01:47 +00:00
Compare commits
4 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
f6bade32bb | ||
|
ddf9bc7272 | ||
|
f79a0714b4 | ||
|
c04958e2a4 |
@@ -14,13 +14,18 @@
|
||||
|
||||
#include <fairlogger/Logger.h>
|
||||
|
||||
#include <boost/algorithm/string.hpp>
|
||||
#include <boost/program_options.hpp>
|
||||
|
||||
#include <csignal>
|
||||
|
||||
#include <chrono>
|
||||
#include <map>
|
||||
#include <string>
|
||||
#include <thread>
|
||||
|
||||
using namespace std;
|
||||
using namespace boost::program_options;
|
||||
|
||||
namespace
|
||||
{
|
||||
@@ -32,19 +37,69 @@ void signalHandler(int /* signal */)
|
||||
gStopping = 1;
|
||||
}
|
||||
|
||||
struct ShmRemover
|
||||
struct ShmManager
|
||||
{
|
||||
ShmRemover(std::string _shmId) : shmId(std::move(_shmId)) {}
|
||||
~ShmRemover()
|
||||
ShmManager(uint64_t _shmId, const vector<string>& _segments, const vector<string>& _regions)
|
||||
: shmId(fair::mq::shmem::makeShmIdStr(_shmId))
|
||||
{
|
||||
// This will clean all segments, regions and any other shmem objects belonging to this shmId
|
||||
for (const auto& s : _segments) {
|
||||
vector<string> segmentConf;
|
||||
boost::algorithm::split(segmentConf, s, boost::algorithm::is_any_of(","));
|
||||
if (segmentConf.size() != 2) {
|
||||
LOG(error) << "incorrect format for --segments. Expecting pairs of <id>,<size>.";
|
||||
fair::mq::shmem::Monitor::Cleanup(fair::mq::shmem::ShmId{shmId});
|
||||
throw runtime_error("incorrect format for --segments. Expecting pairs of <id>,<size>.");
|
||||
}
|
||||
uint16_t id = stoi(segmentConf.at(0));
|
||||
uint64_t size = stoull(segmentConf.at(1));
|
||||
auto ret = segments.emplace(id, fair::mq::shmem::Segment(shmId, id, size, fair::mq::shmem::rbTreeBestFit));
|
||||
fair::mq::shmem::Segment& segment = ret.first->second;
|
||||
LOG(info) << "Created segment " << id << " of size " << segment.GetSize() << ", starting at " << segment.GetData() << ". Locking...";
|
||||
segment.Lock();
|
||||
LOG(info) << "Done.";
|
||||
LOG(info) << "Zeroing...";
|
||||
segment.Zero();
|
||||
LOG(info) << "Done.";
|
||||
}
|
||||
|
||||
for (const auto& r : _regions) {
|
||||
vector<string> regionConf;
|
||||
boost::algorithm::split(regionConf, r, boost::algorithm::is_any_of(","));
|
||||
if (regionConf.size() != 2) {
|
||||
LOG(error) << "incorrect format for --regions. Expecting pairs of <id>,<size>.";
|
||||
fair::mq::shmem::Monitor::Cleanup(fair::mq::shmem::ShmId{shmId});
|
||||
throw runtime_error("incorrect format for --regions. Expecting pairs of <id>,<size>.");
|
||||
}
|
||||
uint16_t id = stoi(regionConf.at(0));
|
||||
uint64_t size = stoull(regionConf.at(1));
|
||||
auto ret = regions.emplace(id, make_unique<fair::mq::shmem::UnmanagedRegion>(shmId, id, size));
|
||||
fair::mq::shmem::UnmanagedRegion& region = *(ret.first->second);
|
||||
LOG(info) << "Created unamanged region " << id << " of size " << region.GetSize() << ", starting at " << region.GetData() << ". Locking...";
|
||||
region.Lock();
|
||||
LOG(info) << "Done.";
|
||||
LOG(info) << "Zeroing...";
|
||||
region.Zero();
|
||||
LOG(info) << "Done.";
|
||||
}
|
||||
}
|
||||
|
||||
void ResetContent()
|
||||
{
|
||||
fair::mq::shmem::Monitor::ResetContent(fair::mq::shmem::ShmId{shmId});
|
||||
}
|
||||
|
||||
~ShmManager()
|
||||
{
|
||||
// clean all segments, regions and any other shmem objects belonging to this shmId
|
||||
fair::mq::shmem::Monitor::Cleanup(fair::mq::shmem::ShmId{shmId});
|
||||
}
|
||||
|
||||
std::string shmId;
|
||||
map<uint16_t, fair::mq::shmem::Segment> segments;
|
||||
map<uint16_t, unique_ptr<fair::mq::shmem::UnmanagedRegion>> regions;
|
||||
};
|
||||
|
||||
int main(int /* argc */, char** /* argv */)
|
||||
int main(int argc, char** argv)
|
||||
{
|
||||
fair::Logger::SetConsoleColor(true);
|
||||
|
||||
@@ -52,47 +107,28 @@ int main(int /* argc */, char** /* argv */)
|
||||
signal(SIGTERM, signalHandler);
|
||||
|
||||
try {
|
||||
const string session = "default"; // to_string(fair::mq::tools::UuidHash());
|
||||
// generate shmId out of session id + user id (geteuid).
|
||||
const string shmId = fair::mq::shmem::makeShmIdStr(session);
|
||||
uint64_t shmId = 0;
|
||||
vector<string> segments;
|
||||
vector<string> regions;
|
||||
|
||||
const uint16_t s1id = 0;
|
||||
const uint64_t s1size = 100000000;
|
||||
const uint16_t s2id = 1;
|
||||
const uint64_t s2size = 200000000;
|
||||
options_description desc("Options");
|
||||
desc.add_options()
|
||||
("shmid", value<uint64_t>(&shmId)->required(), "Shm id")
|
||||
("segments", value<vector<string>>(&segments)->multitoken()->composing(), "Segments, as <id>,<size> <id>,<size> <id>,<size> ...")
|
||||
("regions", value<vector<string>>(®ions)->multitoken()->composing(), "Regions, as <id>,<size> <id>,<size> <id>,<size> ...")
|
||||
("help,h", "Print help");
|
||||
|
||||
const uint16_t r1id = 0;
|
||||
const uint64_t r1size = 100000000;
|
||||
const uint16_t r2id = 1;
|
||||
const uint64_t r2size = 200000000;
|
||||
variables_map vm;
|
||||
store(parse_command_line(argc, argv, desc), vm);
|
||||
|
||||
// cleanup when done
|
||||
ShmRemover shmRemover(shmId);
|
||||
if (vm.count("help")) {
|
||||
LOG(info) << "ShmManager" << "\n" << desc;
|
||||
return 0;
|
||||
}
|
||||
|
||||
// managed segments
|
||||
fair::mq::shmem::Segment segment1(shmId, s1id, s1size, fair::mq::shmem::rbTreeBestFit);
|
||||
segment1.Lock();
|
||||
segment1.Zero();
|
||||
LOG(info) << "Created segment " << s1id << " of size " << segment1.GetSize() << " starting at " << segment1.GetData();
|
||||
notify(vm);
|
||||
|
||||
fair::mq::shmem::Segment segment2(shmId, s2id, s2size, fair::mq::shmem::rbTreeBestFit);
|
||||
segment2.Lock();
|
||||
segment2.Zero();
|
||||
LOG(info) << "Created segment " << s2id << " of size " << segment2.GetSize() << " starting at " << segment2.GetData();
|
||||
|
||||
// unmanaged regions
|
||||
fair::mq::shmem::UnmanagedRegion region1(shmId, r1id, r1size);
|
||||
region1.Lock();
|
||||
region1.Zero();
|
||||
LOG(info) << "Created region " << r1id << " of size " << region1.GetSize() << " starting at " << region1.GetData();
|
||||
|
||||
fair::mq::shmem::UnmanagedRegion region2(shmId, r2id, r2size);
|
||||
region2.Lock();
|
||||
region2.Zero();
|
||||
LOG(info) << "Created region " << r2id << " of size " << region2.GetSize() << " starting at " << region2.GetData();
|
||||
|
||||
// for a "soft reset" call (shmem should not be in active use by (no messages in flight) devices during this call):
|
||||
// fair::mq::shmem::Monitor::ResetContent(fair::mq::shmem::ShmId{shmId});
|
||||
ShmManager shmManager(shmId, segments, regions);
|
||||
|
||||
while (!gStopping) {
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(50));
|
||||
|
@@ -135,7 +135,6 @@ class Manager
|
||||
, fManagementSegment(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_mng").c_str(), 6553600)
|
||||
, fShmVoidAlloc(fManagementSegment.get_segment_manager())
|
||||
, fShmMtx(fManagementSegment.find_or_construct<boost::interprocess::interprocess_mutex>(boost::interprocess::unique_instance)())
|
||||
, fRegionEventsShmCV(fManagementSegment.find_or_construct<boost::interprocess::interprocess_condition>(boost::interprocess::unique_instance)())
|
||||
, fNumObservedEvents(0)
|
||||
, fDeviceCounter(nullptr)
|
||||
, fEventCounter(nullptr)
|
||||
@@ -241,7 +240,6 @@ class Manager
|
||||
fSegments.emplace(fSegmentId, SimpleSeqFitSegment(open_or_create, segmentName.c_str(), size));
|
||||
fShmSegments->emplace(fSegmentId, AllocationAlgorithm::simple_seq_fit);
|
||||
}
|
||||
(fEventCounter->fCount)++;
|
||||
if (mlockSegmentOnCreation) {
|
||||
MlockSegment(fSegmentId);
|
||||
}
|
||||
@@ -280,6 +278,8 @@ class Manager
|
||||
ZeroSegment(fSegmentId);
|
||||
}
|
||||
|
||||
(fEventCounter->fCount)++;
|
||||
|
||||
#ifdef FAIRMQ_DEBUG_MODE
|
||||
fMsgDebug = fManagementSegment.find_or_construct<Uint16MsgDebugMapHashMap>(unique_instance)(fShmVoidAlloc);
|
||||
fShmMsgCounters = fManagementSegment.find_or_construct<Uint16MsgCounterHashMap>(unique_instance)(fShmVoidAlloc);
|
||||
@@ -399,17 +399,6 @@ class Manager
|
||||
region.fRemote = false; // TODO: this should be more clear, refactor it.
|
||||
}
|
||||
|
||||
if (cfg.lock) {
|
||||
LOG(debug) << "Locking region " << id << "...";
|
||||
region.Lock();
|
||||
LOG(debug) << "Successfully locked region " << id << ".";
|
||||
}
|
||||
if (cfg.zero) {
|
||||
LOG(debug) << "Zeroing free memory of region " << id << "...";
|
||||
region.Zero();
|
||||
LOG(debug) << "Successfully zeroed free memory of region " << id << ".";
|
||||
}
|
||||
|
||||
// start ack receiver only if a callback has been provided.
|
||||
if (callback || bulkCallback) {
|
||||
region.SetCallbacks(callback, bulkCallback);
|
||||
@@ -421,7 +410,6 @@ class Manager
|
||||
result.second = id;
|
||||
}
|
||||
fRegionsGen += 1; // signal TL cache invalidation
|
||||
fRegionEventsShmCV->notify_all();
|
||||
|
||||
return result;
|
||||
} catch (interprocess_exception& e) {
|
||||
@@ -445,19 +433,19 @@ class Manager
|
||||
}
|
||||
}
|
||||
|
||||
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> lock(*fShmMtx);
|
||||
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> shmLock(*fShmMtx);
|
||||
// slow path: check invalidation
|
||||
if (lTlCacheGen != fRegionsGen) {
|
||||
fTlRegionCache.fRegionsTLCache.clear();
|
||||
}
|
||||
|
||||
auto *lRegion = GetRegionUnsafe(id);
|
||||
auto* lRegion = GetRegionUnsafe(id, shmLock);
|
||||
fTlRegionCache.fRegionsTLCache.emplace_back(std::make_tuple(lRegion, id, fShmId64));
|
||||
fTlRegionCache.fRegionsTLCacheGen = fRegionsGen;
|
||||
return lRegion;
|
||||
}
|
||||
|
||||
UnmanagedRegion* GetRegionUnsafe(const uint16_t id)
|
||||
UnmanagedRegion* GetRegionUnsafe(const uint16_t id, boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex>& lockedShmLock)
|
||||
{
|
||||
// remote region could actually be a local one if a message originates from this device (has been sent out and returned)
|
||||
auto it = fRegions.find(id);
|
||||
@@ -467,6 +455,8 @@ class Manager
|
||||
try {
|
||||
// get region info
|
||||
RegionInfo regionInfo = fShmRegions->at(id);
|
||||
// safe to unlock now - no shm container accessed after this
|
||||
lockedShmLock.unlock();
|
||||
RegionConfig cfg;
|
||||
cfg.id = id;
|
||||
cfg.creationFlags = regionInfo.fCreationFlags;
|
||||
@@ -476,6 +466,7 @@ class Manager
|
||||
auto r = fRegions.emplace(id, std::make_unique<UnmanagedRegion>(fShmId, 0, true, std::move(cfg)));
|
||||
r.first->second->InitializeQueues();
|
||||
r.first->second->StartAckSender();
|
||||
lockedShmLock.lock();
|
||||
return r.first->second.get();
|
||||
} catch (std::out_of_range& oor) {
|
||||
LOG(error) << "Could not get remote region with id '" << id << "'. Does the region creator run with the same session id?";
|
||||
@@ -500,7 +491,6 @@ class Manager
|
||||
}
|
||||
fRegions.erase(id);
|
||||
}
|
||||
fRegionEventsShmCV->notify_all();
|
||||
} catch (std::out_of_range& oor) {
|
||||
LOG(debug) << "RemoveRegion() could not locate region with id '" << id << "'";
|
||||
}
|
||||
@@ -508,35 +498,9 @@ class Manager
|
||||
}
|
||||
|
||||
std::vector<fair::mq::RegionInfo> GetRegionInfo()
|
||||
{
|
||||
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> lock(*fShmMtx);
|
||||
return GetRegionInfoUnsafe();
|
||||
}
|
||||
|
||||
std::vector<fair::mq::RegionInfo> GetRegionInfoUnsafe()
|
||||
{
|
||||
std::vector<fair::mq::RegionInfo> result;
|
||||
|
||||
for (const auto& e : *fShmRegions) {
|
||||
fair::mq::RegionInfo info;
|
||||
info.managed = false;
|
||||
info.id = e.first;
|
||||
info.flags = e.second.fUserFlags;
|
||||
info.event = e.second.fDestroyed ? RegionEvent::destroyed : RegionEvent::created;
|
||||
if (!e.second.fDestroyed) {
|
||||
auto region = GetRegionUnsafe(info.id);
|
||||
if (region) {
|
||||
info.ptr = region->GetData();
|
||||
info.size = region->GetSize();
|
||||
} else {
|
||||
throw std::runtime_error(tools::ToString("GetRegionInfoUnsafe() could not get region with id '", info.id, "'"));
|
||||
}
|
||||
} else {
|
||||
info.ptr = nullptr;
|
||||
info.size = 0;
|
||||
}
|
||||
result.push_back(info);
|
||||
}
|
||||
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> shmLock(*fShmMtx);
|
||||
|
||||
for (const auto& e : *fShmSegments) {
|
||||
// make sure any segments in the session are found
|
||||
@@ -555,6 +519,27 @@ class Manager
|
||||
}
|
||||
}
|
||||
|
||||
for (const auto& e : *fShmRegions) {
|
||||
fair::mq::RegionInfo info;
|
||||
info.managed = false;
|
||||
info.id = e.first;
|
||||
info.flags = e.second.fUserFlags;
|
||||
info.event = e.second.fDestroyed ? RegionEvent::destroyed : RegionEvent::created;
|
||||
if (info.event == RegionEvent::created) {
|
||||
auto region = GetRegionUnsafe(info.id, shmLock);
|
||||
if (region) {
|
||||
info.ptr = region->GetData();
|
||||
info.size = region->GetSize();
|
||||
} else {
|
||||
throw std::runtime_error(tools::ToString("GetRegionInfo() could not get region with id '", info.id, "'"));
|
||||
}
|
||||
} else {
|
||||
info.ptr = nullptr;
|
||||
info.size = 0;
|
||||
}
|
||||
result.push_back(info);
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
@@ -562,13 +547,13 @@ class Manager
|
||||
{
|
||||
if (fRegionEventThread.joinable()) {
|
||||
LOG(debug) << "Already subscribed. Overwriting previous subscription.";
|
||||
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> lock(*fShmMtx);
|
||||
std::unique_lock<std::mutex> lock(fRegionEventsMtx);
|
||||
fRegionEventsSubscriptionActive = false;
|
||||
lock.unlock();
|
||||
fRegionEventsShmCV->notify_all();
|
||||
fRegionEventsCV.notify_one();
|
||||
fRegionEventThread.join();
|
||||
}
|
||||
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> lock(*fShmMtx);
|
||||
std::lock_guard<std::mutex> lock(fRegionEventsMtx);
|
||||
fRegionEventCallback = callback;
|
||||
fRegionEventsSubscriptionActive = true;
|
||||
fRegionEventThread = std::thread(&Manager::RegionEventsSubscription, this);
|
||||
@@ -579,10 +564,10 @@ class Manager
|
||||
void UnsubscribeFromRegionEvents()
|
||||
{
|
||||
if (fRegionEventThread.joinable()) {
|
||||
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> lock(*fShmMtx);
|
||||
std::unique_lock<std::mutex> lock(fRegionEventsMtx);
|
||||
fRegionEventsSubscriptionActive = false;
|
||||
lock.unlock();
|
||||
fRegionEventsShmCV->notify_all();
|
||||
fRegionEventsCV.notify_one();
|
||||
fRegionEventThread.join();
|
||||
lock.lock();
|
||||
fRegionEventCallback = nullptr;
|
||||
@@ -591,33 +576,38 @@ class Manager
|
||||
|
||||
void RegionEventsSubscription()
|
||||
{
|
||||
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> lock(*fShmMtx);
|
||||
std::unique_lock<std::mutex> lock(fRegionEventsMtx);
|
||||
|
||||
while (fRegionEventsSubscriptionActive) {
|
||||
auto infos = GetRegionInfoUnsafe();
|
||||
for (const auto& i : infos) {
|
||||
auto el = fObservedRegionEvents.find({i.id, i.managed});
|
||||
if (el == fObservedRegionEvents.end()) { // if event id has not been observed
|
||||
fObservedRegionEvents.emplace(std::make_pair(i.id, i.managed), i.event);
|
||||
// if a region has been created and destroyed rapidly, we could see 'destroyed' without ever seeing 'created'
|
||||
// TODO: do we care to show 'created' events if we know region is already destroyed?
|
||||
if (i.event == RegionEvent::created) {
|
||||
fRegionEventCallback(i);
|
||||
++fNumObservedEvents;
|
||||
} else {
|
||||
fNumObservedEvents += 2;
|
||||
}
|
||||
} else { // if event id has been observed (expected - there are two events per id - created & destroyed)
|
||||
// fire a callback if we have observed 'created' event and incoming is 'destroyed'
|
||||
if (el->second == RegionEvent::created && i.event == RegionEvent::destroyed) {
|
||||
fRegionEventCallback(i);
|
||||
el->second = i.event;
|
||||
++fNumObservedEvents;
|
||||
} else {
|
||||
// LOG(debug) << "ignoring event " << i.id << ": incoming: " << i.event << ", stored: " << el->second;
|
||||
if (fNumObservedEvents != fEventCounter->fCount) {
|
||||
auto infos = GetRegionInfo();
|
||||
|
||||
for (const auto& i : infos) {
|
||||
auto el = fObservedRegionEvents.find({i.id, i.managed});
|
||||
if (el == fObservedRegionEvents.end()) { // if event id has not been observed
|
||||
fObservedRegionEvents.emplace(std::make_pair(i.id, i.managed), i.event);
|
||||
// if a region has been created and destroyed rapidly, we could see 'destroyed' without ever seeing 'created'
|
||||
// TODO: do we care to show 'created' events if we know region is already destroyed?
|
||||
if (i.event == RegionEvent::created) {
|
||||
fRegionEventCallback(i);
|
||||
++fNumObservedEvents;
|
||||
} else {
|
||||
fNumObservedEvents += 2;
|
||||
}
|
||||
} else { // if event id has been observed (expected - there are two events per id - created & destroyed)
|
||||
// fire a callback if we have observed 'created' event and incoming is 'destroyed'
|
||||
if (el->second == RegionEvent::created && i.event == RegionEvent::destroyed) {
|
||||
fRegionEventCallback(i);
|
||||
el->second = i.event;
|
||||
++fNumObservedEvents;
|
||||
} else {
|
||||
// LOG(debug) << "ignoring event " << i.id << ": incoming: " << i.event << ", stored: " << el->second;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
fRegionEventsShmCV->wait(lock, [&] { return !fRegionEventsSubscriptionActive || fNumObservedEvents != fEventCounter->fCount; });
|
||||
// TODO: do better than polling here, without adding too much shmem contention
|
||||
fRegionEventsCV.wait_for(lock, std::chrono::milliseconds(50), [&] { return !fRegionEventsSubscriptionActive; });
|
||||
}
|
||||
}
|
||||
|
||||
@@ -787,8 +777,6 @@ class Manager
|
||||
if (lastRemoved) {
|
||||
if (!fNoCleanup) {
|
||||
Monitor::Cleanup(ShmId{fShmId});
|
||||
} else {
|
||||
Monitor::RemoveObject("fmq_" + fShmId + "_mng");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -812,7 +800,8 @@ class Manager
|
||||
VoidAlloc fShmVoidAlloc;
|
||||
boost::interprocess::interprocess_mutex* fShmMtx;
|
||||
|
||||
boost::interprocess::interprocess_condition* fRegionEventsShmCV;
|
||||
std::mutex fRegionEventsMtx;
|
||||
std::condition_variable fRegionEventsCV;
|
||||
std::thread fRegionEventThread;
|
||||
std::function<void(fair::mq::RegionInfo)> fRegionEventCallback;
|
||||
std::map<std::pair<uint16_t, bool>, RegionEvent> fObservedRegionEvents; // pair: <region id, managed>
|
||||
|
@@ -107,6 +107,17 @@ struct UnmanagedRegion
|
||||
}
|
||||
}
|
||||
|
||||
if (cfg.lock) {
|
||||
LOG(debug) << "Locking region " << cfg.id.value() << "...";
|
||||
Lock();
|
||||
LOG(debug) << "Successfully locked region " << cfg.id.value() << ".";
|
||||
}
|
||||
if (cfg.zero) {
|
||||
LOG(debug) << "Zeroing free memory of region " << cfg.id.value() << "...";
|
||||
Zero();
|
||||
LOG(debug) << "Successfully zeroed free memory of region " << cfg.id.value() << ".";
|
||||
}
|
||||
|
||||
if (!remote) {
|
||||
Register(shmId, cfg);
|
||||
}
|
||||
|
Reference in New Issue
Block a user