mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-15 17:41:45 +00:00
Compare commits
9 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
d105960444 | ||
|
3aae5bae58 | ||
|
9031029d2c | ||
|
d478e050ba | ||
|
06b2b9b01f | ||
|
b3fa4f6e7e | ||
|
da5cb34416 | ||
|
226733c653 | ||
|
b06efc401e |
86
.zenodo.json
Normal file
86
.zenodo.json
Normal file
@@ -0,0 +1,86 @@
|
||||
{
|
||||
"creators": [
|
||||
{
|
||||
"name": "Al-Turany, Mohammad"
|
||||
},
|
||||
{
|
||||
"orcid": "0000-0003-3787-1910",
|
||||
"name": "Klein, Dennis"
|
||||
},
|
||||
{
|
||||
"name": "Kollegger, Thorsten"
|
||||
},
|
||||
{
|
||||
"name": "Rybalchenko, Alexey"
|
||||
},
|
||||
{
|
||||
"name": "Winckler, Nicolas"
|
||||
}
|
||||
],
|
||||
"contributors": [
|
||||
{
|
||||
"type": "Other",
|
||||
"name": "Aphecetche, Laurent"
|
||||
},
|
||||
{
|
||||
"type": "Other",
|
||||
"name": "Binet, Sebastien"
|
||||
},
|
||||
{
|
||||
"type": "Other",
|
||||
"name": "Eulisse, Giulio"
|
||||
},
|
||||
{
|
||||
"type": "Other",
|
||||
"name": "Karabowicz, Radoslaw"
|
||||
},
|
||||
{
|
||||
"type": "Other",
|
||||
"name": "Kretz, Matthias"
|
||||
},
|
||||
{
|
||||
"type": "Other",
|
||||
"name": "Krzewicki, Mikolaj"
|
||||
},
|
||||
{
|
||||
"type": "Other",
|
||||
"name": "Lebedev, Andrey"
|
||||
},
|
||||
{
|
||||
"type": "Other",
|
||||
"name": "Mrnjavac, Teo"
|
||||
},
|
||||
{
|
||||
"type": "Other",
|
||||
"name": "Neskovic, Gvozden"
|
||||
},
|
||||
{
|
||||
"type": "Other",
|
||||
"name": "Richter, Matthias"
|
||||
},
|
||||
{
|
||||
"type": "Other",
|
||||
"orcid": "0000-0002-5321-8404",
|
||||
"name": "Tacke, Christian"
|
||||
},
|
||||
{
|
||||
"type": "Other",
|
||||
"name": "Uhlig, Florian"
|
||||
},
|
||||
{
|
||||
"type": "Other",
|
||||
"name": "Wenzel, Sandro"
|
||||
}
|
||||
],
|
||||
"description": "<p>C++ Message Queuing Library and Framework</p>",
|
||||
"related_identifiers": [
|
||||
{
|
||||
"identifier": "https://github.com/FairRootGroup/FairMQ/",
|
||||
"relation": "isSupplementTo",
|
||||
"resource_type": "software",
|
||||
"scheme": "url"
|
||||
}
|
||||
],
|
||||
"title": "FairMQ",
|
||||
"license": "LGPL-3.0-only"
|
||||
}
|
2
AUTHORS
2
AUTHORS
@@ -1,5 +1,5 @@
|
||||
Al-Turany, Mohammad
|
||||
Klein, Dennis
|
||||
Klein, Dennis [https://orcid.org/0000-0003-3787-1910]
|
||||
Kollegger, Thorsten
|
||||
Rybalchenko, Alexey
|
||||
Winckler, Nicolas
|
||||
|
@@ -3,11 +3,11 @@ Binet, Sebastien
|
||||
Eulisse, Giulio
|
||||
Karabowicz, Radoslaw
|
||||
Kretz, Matthias <kretz@kde.org>
|
||||
Krzewicki, Mikolaj
|
||||
Krzewicki, Mikolaj
|
||||
Lebedev, Andrey
|
||||
Mrnjavac, Teo <teo.m@cern.ch>
|
||||
Neskovic, Gvozden
|
||||
Richter, Matthias
|
||||
Tacke, Christian
|
||||
Tacke, Christian [https://orcid.org/0000-0002-5321-8404]
|
||||
Uhlig, Florian
|
||||
Wenzel, Sandro
|
||||
|
@@ -18,7 +18,8 @@
|
||||
{
|
||||
"@type": "Person",
|
||||
"givenName": "Dennis",
|
||||
"familyName": "Klein"
|
||||
"familyName": "Klein",
|
||||
"@id": "https://orcid.org/0000-0003-3787-1910"
|
||||
},
|
||||
{
|
||||
"@type": "Person",
|
||||
@@ -92,7 +93,8 @@
|
||||
{
|
||||
"@type": "Person",
|
||||
"givenName": "Christian",
|
||||
"familyName": "Tacke"
|
||||
"familyName": "Tacke",
|
||||
"@id": "https://orcid.org/0000-0002-5321-8404"
|
||||
},
|
||||
{
|
||||
"@type": "Person",
|
||||
|
@@ -6,10 +6,9 @@
|
||||
* copied verbatim in the file "LICENSE" *
|
||||
********************************************************************************/
|
||||
#include <fairmq/shmem/Common.h>
|
||||
#include <fairmq/shmem/UnmanagedRegion.h>
|
||||
#include <fairmq/shmem/Segment.h>
|
||||
#include <fairmq/shmem/Monitor.h>
|
||||
|
||||
#include <fairmq/shmem/Segment.h>
|
||||
#include <fairmq/shmem/UnmanagedRegion.h>
|
||||
#include <fairmq/tools/Unique.h>
|
||||
|
||||
#include <fairlogger/Logger.h>
|
||||
@@ -17,9 +16,8 @@
|
||||
#include <boost/algorithm/string.hpp>
|
||||
#include <boost/program_options.hpp>
|
||||
|
||||
#include <csignal>
|
||||
|
||||
#include <chrono>
|
||||
#include <csignal>
|
||||
#include <map>
|
||||
#include <string>
|
||||
#include <thread>
|
||||
@@ -27,65 +25,117 @@
|
||||
using namespace std;
|
||||
using namespace boost::program_options;
|
||||
|
||||
namespace
|
||||
{
|
||||
volatile sig_atomic_t gStopping = 0;
|
||||
}
|
||||
namespace {
|
||||
volatile sig_atomic_t gStopping = 0;
|
||||
volatile sig_atomic_t gResetContent = 0;
|
||||
} // namespace
|
||||
|
||||
void signalHandler(int /* signal */)
|
||||
{
|
||||
gStopping = 1;
|
||||
}
|
||||
void signalHandler(int /* signal */) { gStopping = 1; }
|
||||
|
||||
void resetContentHandler(int /* signal */) { gResetContent = 1; }
|
||||
|
||||
struct ShmManager
|
||||
{
|
||||
ShmManager(uint64_t _shmId, const vector<string>& _segments, const vector<string>& _regions)
|
||||
ShmManager(uint64_t _shmId, const vector<string>& _segments, const vector<string>& _regions, bool zero = true)
|
||||
: shmId(fair::mq::shmem::makeShmIdStr(_shmId))
|
||||
{
|
||||
LOG(info) << "Starting ShmManager for shmId: " << shmId;
|
||||
LOG(info) << "Performing full reset...";
|
||||
FullReset();
|
||||
LOG(info) << "Done.";
|
||||
LOG(info) << "Adding managed segments...";
|
||||
AddSegments(_segments, zero);
|
||||
LOG(info) << "Done.";
|
||||
LOG(info) << "Adding unmanaged regions...";
|
||||
AddRegions(_regions, zero);
|
||||
LOG(info) << "Done.";
|
||||
LOG(info) << "Shared memory is ready for use.";
|
||||
}
|
||||
|
||||
void AddSegments(const vector<string>& _segments, bool zero)
|
||||
{
|
||||
for (const auto& s : _segments) {
|
||||
vector<string> segmentConf;
|
||||
boost::algorithm::split(segmentConf, s, boost::algorithm::is_any_of(","));
|
||||
if (segmentConf.size() != 2) {
|
||||
LOG(error) << "incorrect format for --segments. Expecting pairs of <id>,<size>.";
|
||||
vector<string> conf;
|
||||
boost::algorithm::split(conf, s, boost::algorithm::is_any_of(","));
|
||||
if (conf.size() != 3) {
|
||||
LOG(error) << "incorrect format for --segments. Expecting pairs of <id>,<size><numaid>.";
|
||||
fair::mq::shmem::Monitor::Cleanup(fair::mq::shmem::ShmId{shmId});
|
||||
throw runtime_error("incorrect format for --segments. Expecting pairs of <id>,<size>.");
|
||||
throw runtime_error("incorrect format for --segments. Expecting pairs of <id>,<size>,<numaid>.");
|
||||
}
|
||||
uint16_t id = stoi(segmentConf.at(0));
|
||||
uint64_t size = stoull(segmentConf.at(1));
|
||||
uint16_t id = stoi(conf.at(0));
|
||||
uint64_t size = stoull(conf.at(1));
|
||||
segmentCfgs.emplace_back(fair::mq::shmem::SegmentConfig{id, size, "rbtree_best_fit"});
|
||||
|
||||
auto ret = segments.emplace(id, fair::mq::shmem::Segment(shmId, id, size, fair::mq::shmem::rbTreeBestFit));
|
||||
fair::mq::shmem::Segment& segment = ret.first->second;
|
||||
LOG(info) << "Created segment " << id << " of size " << segment.GetSize() << ", starting at " << segment.GetData() << ". Locking...";
|
||||
LOG(info) << "Created segment " << id << " of size " << segment.GetSize()
|
||||
<< ", starting at " << segment.GetData() << ". Locking...";
|
||||
segment.Lock();
|
||||
LOG(info) << "Done.";
|
||||
LOG(info) << "Zeroing...";
|
||||
segment.Zero();
|
||||
LOG(info) << "Done.";
|
||||
}
|
||||
|
||||
for (const auto& r : _regions) {
|
||||
vector<string> regionConf;
|
||||
boost::algorithm::split(regionConf, r, boost::algorithm::is_any_of(","));
|
||||
if (regionConf.size() != 2) {
|
||||
LOG(error) << "incorrect format for --regions. Expecting pairs of <id>,<size>.";
|
||||
fair::mq::shmem::Monitor::Cleanup(fair::mq::shmem::ShmId{shmId});
|
||||
throw runtime_error("incorrect format for --regions. Expecting pairs of <id>,<size>.");
|
||||
if (zero) {
|
||||
LOG(info) << "Zeroing...";
|
||||
segment.Zero();
|
||||
LOG(info) << "Done.";
|
||||
}
|
||||
uint16_t id = stoi(regionConf.at(0));
|
||||
uint64_t size = stoull(regionConf.at(1));
|
||||
}
|
||||
}
|
||||
|
||||
void AddRegions(const vector<string>& _regions, bool zero)
|
||||
{
|
||||
for (const auto& r : _regions) {
|
||||
vector<string> conf;
|
||||
boost::algorithm::split(conf, r, boost::algorithm::is_any_of(","));
|
||||
if (conf.size() != 3) {
|
||||
LOG(error) << "incorrect format for --regions. Expecting pairs of <id>,<size>,<numaid>.";
|
||||
fair::mq::shmem::Monitor::Cleanup(fair::mq::shmem::ShmId{shmId});
|
||||
throw runtime_error("incorrect format for --regions. Expecting pairs of <id>,<size>,<numaid>.");
|
||||
}
|
||||
uint16_t id = stoi(conf.at(0));
|
||||
uint64_t size = stoull(conf.at(1));
|
||||
fair::mq::RegionConfig cfg;
|
||||
cfg.id = id;
|
||||
cfg.size = size;
|
||||
regionCfgs.push_back(cfg);
|
||||
|
||||
auto ret = regions.emplace(id, make_unique<fair::mq::shmem::UnmanagedRegion>(shmId, id, size));
|
||||
fair::mq::shmem::UnmanagedRegion& region = *(ret.first->second);
|
||||
LOG(info) << "Created unamanged region " << id << " of size " << region.GetSize() << ", starting at " << region.GetData() << ". Locking...";
|
||||
LOG(info) << "Created unamanged region " << id << " of size " << region.GetSize()
|
||||
<< ", starting at " << region.GetData() << ". Locking...";
|
||||
region.Lock();
|
||||
LOG(info) << "Done.";
|
||||
LOG(info) << "Zeroing...";
|
||||
region.Zero();
|
||||
LOG(info) << "Done.";
|
||||
if (zero) {
|
||||
LOG(info) << "Zeroing...";
|
||||
region.Zero();
|
||||
LOG(info) << "Done.";
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bool CheckPresence()
|
||||
{
|
||||
for (const auto& sc : segmentCfgs) {
|
||||
if (!(fair::mq::shmem::Monitor::SegmentIsPresent(fair::mq::shmem::ShmId{shmId}, sc.id))) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
for (const auto& rc : regionCfgs) {
|
||||
if (!(fair::mq::shmem::Monitor::RegionIsPresent(fair::mq::shmem::ShmId{shmId}, rc.id.value()))) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
void ResetContent()
|
||||
{
|
||||
fair::mq::shmem::Monitor::ResetContent(fair::mq::shmem::ShmId{shmId});
|
||||
fair::mq::shmem::Monitor::ResetContent(fair::mq::shmem::ShmId{shmId}, segmentCfgs, regionCfgs);
|
||||
}
|
||||
|
||||
void FullReset()
|
||||
{
|
||||
segments.clear();
|
||||
regions.clear();
|
||||
fair::mq::shmem::Monitor::Cleanup(fair::mq::shmem::ShmId{shmId});
|
||||
}
|
||||
|
||||
~ShmManager()
|
||||
@@ -97,6 +147,8 @@ struct ShmManager
|
||||
std::string shmId;
|
||||
map<uint16_t, fair::mq::shmem::Segment> segments;
|
||||
map<uint16_t, unique_ptr<fair::mq::shmem::UnmanagedRegion>> regions;
|
||||
std::vector<fair::mq::shmem::SegmentConfig> segmentCfgs;
|
||||
std::vector<fair::mq::RegionConfig> regionCfgs;
|
||||
};
|
||||
|
||||
int main(int argc, char** argv)
|
||||
@@ -105,8 +157,11 @@ int main(int argc, char** argv)
|
||||
|
||||
signal(SIGINT, signalHandler);
|
||||
signal(SIGTERM, signalHandler);
|
||||
signal(SIGUSR1, resetContentHandler);
|
||||
|
||||
try {
|
||||
bool nozero = false;
|
||||
bool checkPresence = true;
|
||||
uint64_t shmId = 0;
|
||||
vector<string> segments;
|
||||
vector<string> regions;
|
||||
@@ -114,8 +169,10 @@ int main(int argc, char** argv)
|
||||
options_description desc("Options");
|
||||
desc.add_options()
|
||||
("shmid", value<uint64_t>(&shmId)->required(), "Shm id")
|
||||
("segments", value<vector<string>>(&segments)->multitoken()->composing(), "Segments, as <id>,<size> <id>,<size> <id>,<size> ...")
|
||||
("regions", value<vector<string>>(®ions)->multitoken()->composing(), "Regions, as <id>,<size> <id>,<size> <id>,<size> ...")
|
||||
("segments", value<vector<string>>(&segments)->multitoken()->composing(), "Segments, as <id>,<size>,<numaid> <id>,<size>,<numaid> <id>,<size>,<numaid> ... (numaid: -2 disabled, -1 interleave, >=0 node)")
|
||||
("regions", value<vector<string>>(®ions)->multitoken()->composing(), "Regions, as <id>,<size> <id>,<size>,<numaid> <id>,<size>,<numaid> ...")
|
||||
("nozero", value<bool>(&nozero)->default_value(false)->implicit_value(true), "Do not zero segments after initialization")
|
||||
("check-presence", value<bool>(&checkPresence)->default_value(true)->implicit_value(true), "Check periodically if configured segments/regions are still present, and cleanup and leave if they are not")
|
||||
("help,h", "Print help");
|
||||
|
||||
variables_map vm;
|
||||
@@ -128,15 +185,35 @@ int main(int argc, char** argv)
|
||||
|
||||
notify(vm);
|
||||
|
||||
ShmManager shmManager(shmId, segments, regions);
|
||||
ShmManager shmManager(shmId, segments, regions, !nozero);
|
||||
|
||||
while (!gStopping) {
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(50));
|
||||
std::thread resetContentThread([&shmManager]() {
|
||||
while (!gStopping) {
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(50));
|
||||
if (gResetContent == 1) {
|
||||
LOG(info) << "Resetting content for shmId " << shmManager.shmId;
|
||||
shmManager.ResetContent();
|
||||
gResetContent = 0;
|
||||
LOG(info) << "Done resetting content for shmId " << shmManager.shmId;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
if (checkPresence) {
|
||||
while (!gStopping) {
|
||||
if (shmManager.CheckPresence() == false) {
|
||||
LOG(error) << "Failed to find segments, exiting.";
|
||||
gStopping = true;
|
||||
}
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(500));
|
||||
}
|
||||
}
|
||||
|
||||
resetContentThread.join();
|
||||
|
||||
LOG(info) << "stopping.";
|
||||
} catch (exception& e) {
|
||||
LOG(error) << "Unhandled Exception reached the top of main: " << e.what() << ", application will now exit";
|
||||
LOG(error) << "Exception reached the top of main: " << e.what() << ", exiting";
|
||||
return 2;
|
||||
}
|
||||
|
||||
|
@@ -76,6 +76,8 @@ struct Sampler : fair::mq::Device
|
||||
|
||||
void ResetTask() override
|
||||
{
|
||||
// give some time for acks to be received
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(250));
|
||||
fRegion.reset();
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(fMtx);
|
||||
|
@@ -40,7 +40,7 @@ Config::Config(const string& name, Plugin::Version version, const string& mainta
|
||||
LOG(debug) << "channel-config: Parsing channel configuration";
|
||||
SetProperties(SuboptParser(GetProperty<vector<string>>("channel-config"), idForParser));
|
||||
} else {
|
||||
LOG(warn) << "fair::mq::plugins::Config: no channels configuration provided via --mq-config or --channel-config";
|
||||
LOG(info) << "fair::mq::plugins::Config: no channels configuration provided via --mq-config or --channel-config";
|
||||
}
|
||||
} catch (exception& e) {
|
||||
LOG(error) << e.what();
|
||||
|
@@ -397,12 +397,10 @@ class Manager
|
||||
|
||||
UnmanagedRegion* region = nullptr;
|
||||
bool newRegionCreated = false;
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(fLocalRegionsMtx);
|
||||
auto res = fRegions.emplace(id, std::make_unique<UnmanagedRegion>(fShmId, size, false, cfg));
|
||||
newRegionCreated = res.second;
|
||||
region = res.first->second.get();
|
||||
}
|
||||
std::lock_guard<std::mutex> lock(fLocalRegionsMtx);
|
||||
auto res = fRegions.emplace(id, std::make_unique<UnmanagedRegion>(fShmId, size, false, cfg));
|
||||
newRegionCreated = res.second;
|
||||
region = res.first->second.get();
|
||||
// LOG(debug) << "Created region with id '" << id << "', path: '" << cfg.path << "', flags: '" << cfg.creationFlags << "'";
|
||||
|
||||
if (!newRegionCreated) {
|
||||
@@ -429,7 +427,7 @@ class Manager
|
||||
}
|
||||
}
|
||||
|
||||
UnmanagedRegion* GetRegion(uint16_t id)
|
||||
UnmanagedRegion* GetRegionFromCache(uint16_t id)
|
||||
{
|
||||
// NOTE: gcc optimizations. Prevent loading tls addresses many times in the fast path
|
||||
const auto &lTlCache = fTlRegionCache;
|
||||
@@ -443,41 +441,40 @@ class Manager
|
||||
}
|
||||
}
|
||||
|
||||
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> shmLock(*fShmMtx);
|
||||
// slow path: check invalidation
|
||||
if (lTlCacheGen != fRegionsGen) {
|
||||
fTlRegionCache.fRegionsTLCache.clear();
|
||||
}
|
||||
|
||||
std::lock_guard<std::mutex> lock(fLocalRegionsMtx);
|
||||
auto* lRegion = GetRegionUnsafe(id, shmLock);
|
||||
auto* lRegion = GetRegion(id);
|
||||
fTlRegionCache.fRegionsTLCache.emplace_back(std::make_tuple(lRegion, id, fShmId64));
|
||||
fTlRegionCache.fRegionsTLCacheGen = fRegionsGen;
|
||||
return lRegion;
|
||||
}
|
||||
|
||||
UnmanagedRegion* GetRegionUnsafe(uint16_t id, boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex>& lockedShmLock)
|
||||
UnmanagedRegion* GetRegion(uint16_t id)
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(fLocalRegionsMtx);
|
||||
// remote region could actually be a local one if a message originates from this device (has been sent out and returned)
|
||||
auto it = fRegions.find(id);
|
||||
if (it != fRegions.end()) {
|
||||
return it->second.get();
|
||||
} else {
|
||||
try {
|
||||
// get region info
|
||||
RegionInfo regionInfo = fShmRegions->at(id);
|
||||
// safe to unlock now - no shm container accessed after this
|
||||
lockedShmLock.unlock();
|
||||
RegionConfig cfg;
|
||||
cfg.id = id;
|
||||
cfg.creationFlags = regionInfo.fCreationFlags;
|
||||
cfg.path = regionInfo.fPath.c_str();
|
||||
// get region info
|
||||
{
|
||||
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> shmLock(*fShmMtx);
|
||||
RegionInfo regionInfo = fShmRegions->at(id);
|
||||
cfg.id = id;
|
||||
cfg.creationFlags = regionInfo.fCreationFlags;
|
||||
cfg.path = regionInfo.fPath.c_str();
|
||||
}
|
||||
// LOG(debug) << "Located remote region with id '" << id << "', path: '" << cfg.path << "', flags: '" << cfg.creationFlags << "'";
|
||||
|
||||
auto r = fRegions.emplace(id, std::make_unique<UnmanagedRegion>(fShmId, 0, true, std::move(cfg)));
|
||||
r.first->second->InitializeQueues();
|
||||
r.first->second->StartAckSender();
|
||||
lockedShmLock.lock();
|
||||
return r.first->second.get();
|
||||
} catch (std::out_of_range& oor) {
|
||||
LOG(error) << "Could not get remote region with id '" << id << "'. Does the region creator run with the same session id?";
|
||||
@@ -493,10 +490,10 @@ class Manager
|
||||
void RemoveRegion(uint16_t id)
|
||||
{
|
||||
try {
|
||||
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> shmLock(*fShmMtx);
|
||||
std::lock_guard<std::mutex> lock(fLocalRegionsMtx);
|
||||
fRegions.at(id)->StopAcks();
|
||||
{
|
||||
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> shmLock(*fShmMtx);
|
||||
if (fRegions.at(id)->RemoveOnDestruction()) {
|
||||
fShmRegions->at(id).fDestroyed = true;
|
||||
(fEventCounter->fCount)++;
|
||||
@@ -512,44 +509,73 @@ class Manager
|
||||
std::vector<fair::mq::RegionInfo> GetRegionInfo()
|
||||
{
|
||||
std::vector<fair::mq::RegionInfo> result;
|
||||
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> shmLock(*fShmMtx);
|
||||
std::map<uint64_t, RegionConfig> regionCfgs;
|
||||
|
||||
for (const auto& e : *fShmSegments) {
|
||||
// make sure any segments in the session are found
|
||||
GetSegment(e.first);
|
||||
try {
|
||||
{
|
||||
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> shmLock(*fShmMtx);
|
||||
|
||||
for (const auto& [segmentId, segmentInfo] : *fShmSegments) {
|
||||
// make sure any segments in the session are found
|
||||
GetSegment(segmentId);
|
||||
try {
|
||||
fair::mq::RegionInfo info;
|
||||
info.managed = true;
|
||||
info.id = segmentId;
|
||||
info.event = RegionEvent::created;
|
||||
info.ptr = boost::apply_visitor(SegmentAddress(), fSegments.at(segmentId));
|
||||
info.size = boost::apply_visitor(SegmentSize(), fSegments.at(segmentId));
|
||||
result.push_back(info);
|
||||
} catch (const std::out_of_range& oor) {
|
||||
LOG(error) << "could not find segment with id " << segmentId;
|
||||
LOG(error) << oor.what();
|
||||
}
|
||||
}
|
||||
|
||||
for (const auto& [regionId, regionInfo] : *fShmRegions) {
|
||||
fair::mq::RegionInfo info;
|
||||
info.managed = true;
|
||||
info.id = e.first;
|
||||
info.event = RegionEvent::created;
|
||||
info.ptr = boost::apply_visitor(SegmentAddress(), fSegments.at(e.first));
|
||||
info.size = boost::apply_visitor(SegmentSize(), fSegments.at(e.first));
|
||||
info.managed = false;
|
||||
info.id = regionId;
|
||||
info.flags = regionInfo.fUserFlags;
|
||||
info.event = regionInfo.fDestroyed ? RegionEvent::destroyed : RegionEvent::created;
|
||||
if (info.event == RegionEvent::created) {
|
||||
RegionConfig cfg;
|
||||
cfg.id = info.id;
|
||||
cfg.creationFlags = regionInfo.fCreationFlags;
|
||||
cfg.path = regionInfo.fPath.c_str();
|
||||
regionCfgs.emplace(info.id, cfg);
|
||||
// fill the ptr+size info after shmLock is released, to avoid constructing local region under it
|
||||
} else {
|
||||
info.ptr = nullptr;
|
||||
info.size = 0;
|
||||
}
|
||||
result.push_back(info);
|
||||
} catch (const std::out_of_range& oor) {
|
||||
LOG(error) << "could not find segment with id " << e.first;
|
||||
LOG(error) << oor.what();
|
||||
}
|
||||
}
|
||||
|
||||
for (const auto& e : *fShmRegions) {
|
||||
fair::mq::RegionInfo info;
|
||||
info.managed = false;
|
||||
info.id = e.first;
|
||||
info.flags = e.second.fUserFlags;
|
||||
info.event = e.second.fDestroyed ? RegionEvent::destroyed : RegionEvent::created;
|
||||
if (info.event == RegionEvent::created) {
|
||||
auto region = GetRegionUnsafe(info.id, shmLock);
|
||||
if (region) {
|
||||
// do another iteration outside of shm lock, to fill ptr+size of unmanaged regions
|
||||
for (auto& info : result) {
|
||||
if (!info.managed && info.event == RegionEvent::created) {
|
||||
auto cfgIt = regionCfgs.find(info.id);
|
||||
if (cfgIt != regionCfgs.end()) {
|
||||
UnmanagedRegion* region = nullptr;
|
||||
std::lock_guard<std::mutex> lock(fLocalRegionsMtx);
|
||||
auto it = fRegions.find(info.id);
|
||||
if (it != fRegions.end()) {
|
||||
region = it->second.get();
|
||||
} else {
|
||||
auto r = fRegions.emplace(cfgIt->first, std::make_unique<UnmanagedRegion>(fShmId, 0, true, cfgIt->second));
|
||||
region = r.first->second.get();
|
||||
region->InitializeQueues();
|
||||
region->StartAckSender();
|
||||
}
|
||||
|
||||
info.ptr = region->GetData();
|
||||
info.size = region->GetSize();
|
||||
} else {
|
||||
throw std::runtime_error(tools::ToString("GetRegionInfo() could not get region with id '", info.id, "'"));
|
||||
info.ptr = nullptr;
|
||||
info.size = 0;
|
||||
}
|
||||
} else {
|
||||
info.ptr = nullptr;
|
||||
info.size = 0;
|
||||
}
|
||||
result.push_back(info);
|
||||
}
|
||||
|
||||
return result;
|
||||
|
@@ -195,7 +195,7 @@ class Message final : public fair::mq::Message
|
||||
fLocalPtr = nullptr;
|
||||
}
|
||||
} else {
|
||||
fRegionPtr = fManager.GetRegion(fMeta.fRegionId);
|
||||
fRegionPtr = fManager.GetRegionFromCache(fMeta.fRegionId);
|
||||
if (fRegionPtr) {
|
||||
fLocalPtr = reinterpret_cast<char*>(fRegionPtr->GetData()) + fMeta.fHandle;
|
||||
} else {
|
||||
@@ -365,7 +365,7 @@ class Message final : public fair::mq::Message
|
||||
void ReleaseUnmanagedRegionBlock()
|
||||
{
|
||||
if (!fRegionPtr) {
|
||||
fRegionPtr = fManager.GetRegion(fMeta.fRegionId);
|
||||
fRegionPtr = fManager.GetRegionFromCache(fMeta.fRegionId);
|
||||
}
|
||||
|
||||
if (fRegionPtr) {
|
||||
|
@@ -23,6 +23,7 @@
|
||||
#include <boost/interprocess/ipc/message_queue.hpp>
|
||||
|
||||
#include <csignal>
|
||||
#include <cstdio>
|
||||
#include <iostream>
|
||||
#include <iomanip>
|
||||
#include <chrono>
|
||||
@@ -533,6 +534,88 @@ unsigned long Monitor::GetFreeMemory(const SessionId& sessionId, uint16_t segmen
|
||||
return GetFreeMemory(shmId, segmentId);
|
||||
}
|
||||
|
||||
bool Monitor::SegmentIsPresent(const ShmId& shmId, uint16_t segmentId)
|
||||
{
|
||||
using namespace boost::interprocess;
|
||||
try {
|
||||
bipc::managed_shared_memory managementSegment(bipc::open_read_only, std::string("fmq_" + shmId.shmId + "_mng").c_str());
|
||||
Uint16SegmentInfoHashMap* shmSegments = managementSegment.find<Uint16SegmentInfoHashMap>(unique_instance).first;
|
||||
|
||||
if (!shmSegments) {
|
||||
LOG(error) << "Found management segment, but could not locate segment info";
|
||||
return false;
|
||||
}
|
||||
|
||||
auto it = shmSegments->find(segmentId);
|
||||
if (it != shmSegments->end()) {
|
||||
try {
|
||||
if (it->second.fAllocationAlgorithm == AllocationAlgorithm::rbtree_best_fit) {
|
||||
RBTreeBestFitSegment segment(open_read_only, std::string("fmq_" + shmId.shmId + "_m_" + std::to_string(segmentId)).c_str());
|
||||
} else {
|
||||
SimpleSeqFitSegment segment(open_read_only, std::string("fmq_" + shmId.shmId + "_m_" + std::to_string(segmentId)).c_str());
|
||||
}
|
||||
} catch (bie&) {
|
||||
LOG(error) << "Could not find segment with id '" << segmentId << "' for shmId '" << shmId.shmId << "'";
|
||||
return false;
|
||||
}
|
||||
} else {
|
||||
LOG(error) << "Could not find segment info for segment id '" << segmentId << "' for shmId '" << shmId.shmId << "'";
|
||||
return false;
|
||||
}
|
||||
} catch (bie&) {
|
||||
LOG(error) << "Could not find management segment for shmid '" << shmId.shmId << "'";
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
bool Monitor::SegmentIsPresent(const SessionId& sessionId, uint16_t segmentId)
|
||||
{
|
||||
ShmId shmId{makeShmIdStr(sessionId.sessionId)};
|
||||
return SegmentIsPresent(shmId, segmentId);
|
||||
}
|
||||
|
||||
bool Monitor::RegionIsPresent(const ShmId& shmId, uint16_t regionId)
|
||||
{
|
||||
using namespace boost::interprocess;
|
||||
try {
|
||||
bipc::managed_shared_memory managementSegment(bipc::open_read_only, std::string("fmq_" + shmId.shmId + "_mng").c_str());
|
||||
Uint16RegionInfoHashMap* shmRegions = managementSegment.find<Uint16RegionInfoHashMap>(bipc::unique_instance).first;
|
||||
|
||||
if (!shmRegions) {
|
||||
LOG(error) << "Found management segment, but could not locate region info";
|
||||
return false;
|
||||
}
|
||||
|
||||
std::string regionFileName("fmq_" + shmId.shmId + "_rg_" + to_string(regionId));
|
||||
|
||||
auto it = shmRegions->find(regionId);
|
||||
if (it != shmRegions->end()) {
|
||||
try {
|
||||
if (it->second.fPath.empty()) {
|
||||
shared_memory_object object(open_only, regionFileName.c_str(), read_only);
|
||||
}
|
||||
} catch (bie&) {
|
||||
LOG(error) << "Could not find region with id '" << regionId << "' for shmId '" << shmId.shmId << "'";
|
||||
return false;
|
||||
}
|
||||
} else {
|
||||
LOG(error) << "Could not find region info for region id '" << regionId << "' for shmId '" << shmId.shmId << "'";
|
||||
return false;
|
||||
}
|
||||
} catch (bie&) {
|
||||
LOG(error) << "Could not find management segment for shmid '" << shmId.shmId << "'";
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
bool Monitor::RegionIsPresent(const SessionId& sessionId, uint16_t regionId)
|
||||
{
|
||||
ShmId shmId{makeShmIdStr(sessionId.sessionId)};
|
||||
return RegionIsPresent(shmId, regionId);
|
||||
}
|
||||
|
||||
void Monitor::PrintHelp()
|
||||
{
|
||||
LOG(info) << "controls: [x] close memory, "
|
||||
|
@@ -119,7 +119,7 @@ class Monitor
|
||||
/// @param sessionId session id
|
||||
static std::unordered_map<uint16_t, std::vector<BufferDebugInfo>> GetDebugInfo(const SessionId& sessionId);
|
||||
/// @brief Returns the amount of free memory in the specified segment
|
||||
/// @param sessionId shmem id
|
||||
/// @param shmId shmem id
|
||||
/// @param segmentId segment id
|
||||
/// @throws MonitorError
|
||||
static unsigned long GetFreeMemory(const ShmId& shmId, uint16_t segmentId);
|
||||
@@ -128,6 +128,23 @@ class Monitor
|
||||
/// @param segmentId segment id
|
||||
/// @throws MonitorError
|
||||
static unsigned long GetFreeMemory(const SessionId& sessionId, uint16_t segmentId);
|
||||
/// @brief Checks if a given segment can be opened
|
||||
/// @param shmId shmem id
|
||||
/// @param segmentId segment id
|
||||
static bool SegmentIsPresent(const ShmId& shmId, uint16_t segmentId);
|
||||
/// @brief Checks if a given segment can be opened
|
||||
/// @param sessionId session id
|
||||
/// @param segmentId segment id
|
||||
static bool SegmentIsPresent(const SessionId& sessionId, uint16_t segmentId);
|
||||
/// @brief Checks if a given region can be opened
|
||||
/// @param shmId shmem id
|
||||
/// @param regionId region id
|
||||
static bool RegionIsPresent(const ShmId& shmId, uint16_t regionId);
|
||||
/// @brief Checks if a given region can be opened
|
||||
/// @param sessionId session id
|
||||
/// @param regionId region id
|
||||
static bool RegionIsPresent(const SessionId& sessionId, uint16_t regionId);
|
||||
|
||||
|
||||
static bool PrintShm(const ShmId& shmId);
|
||||
static void ListAll(const std::string& path);
|
||||
|
Reference in New Issue
Block a user