Compare commits

..

25 Commits

Author SHA1 Message Date
Alexey Rybalchenko
16275db125 Add test for externally (outside the session) created shmem region 2023-01-19 16:12:58 +01:00
Alexey Rybalchenko
42ce691f57 shm: error on duplicate region IDs 2023-01-19 16:12:31 +01:00
Alexey Rybalchenko
58aa2b4f88 shm: refactor UnamangedRegion: rename fRemote to fController 2023-01-19 16:12:22 +01:00
Alexey Rybalchenko
c3b273cec0 shm: Improve debug output a bit 2023-01-19 16:10:59 +01:00
Alexey Rybalchenko
a982d60ed7 example: fix incorrect config 2023-01-19 16:10:44 +01:00
Dennis Klein
d16e473b91 docs: Update fair-software.eu compliance badge
And link to the GH workflow page instead of fair-software.eu
2023-01-16 13:27:13 +01:00
Dennis Klein
1881986cca docs: Add fair-software.eu compliance badge 2023-01-16 13:17:09 +01:00
Dennis Klein
adf91d053d docs: Add OpenSSF Best Practices Badge 2023-01-16 13:16:56 +01:00
Dennis Klein
d3be9af9b6 docs: Add our DOI badge 2023-01-16 13:16:42 +01:00
Dennis Klein
4104636456 build: Add fair-software.eu compliance checker 2023-01-16 13:15:33 +01:00
Alexey Rybalchenko
af0d668951 Shm: fix region init with external regions 2022-09-09 15:40:33 +02:00
Alexey Rybalchenko
072d7cb744 shm: add some debug output 2022-09-09 15:40:33 +02:00
Alexey Rybalchenko
f5c46ce018 region example: add options for testing with externally-created regions 2022-09-09 15:40:33 +02:00
Alexey Rybalchenko
d105960444 fix(shm): Fix incorrect parameters when mapping regions 2022-09-06 08:09:47 +02:00
Dennis Klein
3aae5bae58 build: Add ORCID for Christian Tacke 2022-09-06 08:08:42 +02:00
Dennis Klein
9031029d2c build: Add ORCID for Dennis Klein 2022-09-06 08:08:34 +02:00
Dennis Klein
d478e050ba build: HTML-Format desc field in zenodo.org config 2022-09-06 08:08:14 +02:00
Dennis Klein
06b2b9b01f build: Add license hint to zenodo.org config 2022-09-06 08:08:05 +02:00
Dennis Klein
b3fa4f6e7e build: Add config for zenodo.org import 2022-09-06 08:07:46 +02:00
Alexey Rybalchenko
da5cb34416 fix(shm): race/deadlock in region locks 2022-08-21 18:32:24 +02:00
Alexey Rybalchenko
226733c653 Reduce severity of the missing channel cfg on command line
It is a valid use case to create the config programmatically at a later stage.
2022-06-22 14:04:43 +02:00
Alexey Rybalchenko
b06efc401e shm: Monitor: Add region/segment presence check function 2022-06-22 13:31:51 +02:00
Alexey Rybalchenko
2500771689 shm: ResetContent(): reset data after recreating the metadata 2022-05-28 14:46:21 +02:00
Alexey Rybalchenko
d2aa3b6bb0 shm: open managament data as read only during cleanup 2022-05-28 14:46:21 +02:00
Alexey Rybalchenko
00df117c7c Shm::Monitor: add nullptr check for segment info 2022-05-28 14:46:21 +02:00
17 changed files with 669 additions and 204 deletions

15
.github/workflows/fair-software.yml vendored Normal file
View File

@@ -0,0 +1,15 @@
name: fair-software
on: push
jobs:
verify:
name: "fair-software"
runs-on: ubuntu-latest
steps:
- uses: fair-software/howfairis-github-action@0.2.1
name: Measure compliance with fair-software.eu recommendations
env:
PYCHARM_HOSTED: "Trick colorama into displaying colored output"
with:
MY_REPO_URL: "https://github.com/${{ github.repository }}"

86
.zenodo.json Normal file
View File

@@ -0,0 +1,86 @@
{
"creators": [
{
"name": "Al-Turany, Mohammad"
},
{
"orcid": "0000-0003-3787-1910",
"name": "Klein, Dennis"
},
{
"name": "Kollegger, Thorsten"
},
{
"name": "Rybalchenko, Alexey"
},
{
"name": "Winckler, Nicolas"
}
],
"contributors": [
{
"type": "Other",
"name": "Aphecetche, Laurent"
},
{
"type": "Other",
"name": "Binet, Sebastien"
},
{
"type": "Other",
"name": "Eulisse, Giulio"
},
{
"type": "Other",
"name": "Karabowicz, Radoslaw"
},
{
"type": "Other",
"name": "Kretz, Matthias"
},
{
"type": "Other",
"name": "Krzewicki, Mikolaj"
},
{
"type": "Other",
"name": "Lebedev, Andrey"
},
{
"type": "Other",
"name": "Mrnjavac, Teo"
},
{
"type": "Other",
"name": "Neskovic, Gvozden"
},
{
"type": "Other",
"name": "Richter, Matthias"
},
{
"type": "Other",
"orcid": "0000-0002-5321-8404",
"name": "Tacke, Christian"
},
{
"type": "Other",
"name": "Uhlig, Florian"
},
{
"type": "Other",
"name": "Wenzel, Sandro"
}
],
"description": "<p>C++ Message Queuing Library and Framework</p>",
"related_identifiers": [
{
"identifier": "https://github.com/FairRootGroup/FairMQ/",
"relation": "isSupplementTo",
"resource_type": "software",
"scheme": "url"
}
],
"title": "FairMQ",
"license": "LGPL-3.0-only"
}

View File

@@ -1,5 +1,5 @@
Al-Turany, Mohammad Al-Turany, Mohammad
Klein, Dennis Klein, Dennis [https://orcid.org/0000-0003-3787-1910]
Kollegger, Thorsten Kollegger, Thorsten
Rybalchenko, Alexey Rybalchenko, Alexey
Winckler, Nicolas Winckler, Nicolas

View File

@@ -3,11 +3,11 @@ Binet, Sebastien
Eulisse, Giulio Eulisse, Giulio
Karabowicz, Radoslaw Karabowicz, Radoslaw
Kretz, Matthias <kretz@kde.org> Kretz, Matthias <kretz@kde.org>
Krzewicki, Mikolaj Krzewicki, Mikolaj
Lebedev, Andrey Lebedev, Andrey
Mrnjavac, Teo <teo.m@cern.ch> Mrnjavac, Teo <teo.m@cern.ch>
Neskovic, Gvozden Neskovic, Gvozden
Richter, Matthias Richter, Matthias
Tacke, Christian Tacke, Christian [https://orcid.org/0000-0002-5321-8404]
Uhlig, Florian Uhlig, Florian
Wenzel, Sandro Wenzel, Sandro

View File

@@ -1,5 +1,10 @@
<!-- {#mainpage} --> <!-- {#mainpage} -->
# FairMQ [![license](https://alfa-ci.gsi.de/shields/badge/license-LGPL--3.0-orange.svg)](COPYRIGHT) # FairMQ
[![license](https://alfa-ci.gsi.de/shields/badge/license-LGPL--3.0-orange.svg)](COPYRIGHT)
[![DOI](https://zenodo.org/badge/DOI/10.5281/zenodo.1689985.svg)](https://doi.org/10.5281/zenodo.1689985)
[![OpenSSF Best Practices](https://bestpractices.coreinfrastructure.org/projects/6915/badge)](https://bestpractices.coreinfrastructure.org/projects/6915)
[![fair-software.eu](https://img.shields.io/badge/fair--software.eu-%E2%97%8F%20%20%E2%97%8F%20%20%E2%97%8B%20%20%E2%97%8F%20%20%E2%97%8F-yellow)](https://github.com/FairRootGroup/FairMQ/actions/workflows/fair-software.yml)
C++ Message Queuing Library and Framework C++ Message Queuing Library and Framework

View File

@@ -18,7 +18,8 @@
{ {
"@type": "Person", "@type": "Person",
"givenName": "Dennis", "givenName": "Dennis",
"familyName": "Klein" "familyName": "Klein",
"@id": "https://orcid.org/0000-0003-3787-1910"
}, },
{ {
"@type": "Person", "@type": "Person",
@@ -92,7 +93,8 @@
{ {
"@type": "Person", "@type": "Person",
"givenName": "Christian", "givenName": "Christian",
"familyName": "Tacke" "familyName": "Tacke",
"@id": "https://orcid.org/0000-0002-5321-8404"
}, },
{ {
"@type": "Person", "@type": "Person",

View File

@@ -19,6 +19,10 @@ SAMPLER+=" --severity debug"
SAMPLER+=" --msg-size $msgSize" SAMPLER+=" --msg-size $msgSize"
# SAMPLER+=" --rate 10" # SAMPLER+=" --rate 10"
SAMPLER+=" --transport $transport" SAMPLER+=" --transport $transport"
# SAMPLER+=" --external-region true"
# SAMPLER+=" --shm-no-cleaup true"
# SAMPLER+=" --shm-monitor false"
# SAMPLER+=" --shmid 1"
SAMPLER+=" --channel-config name=data,type=push,method=bind,address=tcp://127.0.0.1:7777,sndKernelSize=212992" SAMPLER+=" --channel-config name=data,type=push,method=bind,address=tcp://127.0.0.1:7777,sndKernelSize=212992"
xterm -geometry 120x60+0+0 -hold -e @EX_BIN_DIR@/$SAMPLER & xterm -geometry 120x60+0+0 -hold -e @EX_BIN_DIR@/$SAMPLER &
@@ -26,5 +30,8 @@ SINK="fairmq-ex-region-sink"
SINK+=" --id sink1" SINK+=" --id sink1"
SINK+=" --severity debug" SINK+=" --severity debug"
SINK+=" --transport $transport" SINK+=" --transport $transport"
# SINK+=" --shm-no-cleaup true"
# SINK+=" --shm-monitor false"
# SINK+=" --shmid 1"
SINK+=" --channel-config name=data,type=pull,method=connect,address=tcp://127.0.0.1:7777,rcvKernelSize=212992" SINK+=" --channel-config name=data,type=pull,method=connect,address=tcp://127.0.0.1:7777,rcvKernelSize=212992"
xterm -geometry 120x60+750+0 -hold -e @EX_BIN_DIR@/$SINK & xterm -geometry 120x60+750+0 -hold -e @EX_BIN_DIR@/$SINK &

View File

@@ -6,10 +6,9 @@
* copied verbatim in the file "LICENSE" * * copied verbatim in the file "LICENSE" *
********************************************************************************/ ********************************************************************************/
#include <fairmq/shmem/Common.h> #include <fairmq/shmem/Common.h>
#include <fairmq/shmem/UnmanagedRegion.h>
#include <fairmq/shmem/Segment.h>
#include <fairmq/shmem/Monitor.h> #include <fairmq/shmem/Monitor.h>
#include <fairmq/shmem/Segment.h>
#include <fairmq/shmem/UnmanagedRegion.h>
#include <fairmq/tools/Unique.h> #include <fairmq/tools/Unique.h>
#include <fairlogger/Logger.h> #include <fairlogger/Logger.h>
@@ -17,9 +16,8 @@
#include <boost/algorithm/string.hpp> #include <boost/algorithm/string.hpp>
#include <boost/program_options.hpp> #include <boost/program_options.hpp>
#include <csignal>
#include <chrono> #include <chrono>
#include <csignal>
#include <map> #include <map>
#include <string> #include <string>
#include <thread> #include <thread>
@@ -27,65 +25,117 @@
using namespace std; using namespace std;
using namespace boost::program_options; using namespace boost::program_options;
namespace namespace {
{ volatile sig_atomic_t gStopping = 0;
volatile sig_atomic_t gStopping = 0; volatile sig_atomic_t gResetContent = 0;
} } // namespace
void signalHandler(int /* signal */) void signalHandler(int /* signal */) { gStopping = 1; }
{
gStopping = 1; void resetContentHandler(int /* signal */) { gResetContent = 1; }
}
struct ShmManager struct ShmManager
{ {
ShmManager(uint64_t _shmId, const vector<string>& _segments, const vector<string>& _regions) ShmManager(uint64_t _shmId, const vector<string>& _segments, const vector<string>& _regions, bool zero = true)
: shmId(fair::mq::shmem::makeShmIdStr(_shmId)) : shmId(fair::mq::shmem::makeShmIdStr(_shmId))
{
LOG(info) << "Starting ShmManager for shmId: " << shmId;
LOG(info) << "Performing full reset...";
FullReset();
LOG(info) << "Done.";
LOG(info) << "Adding managed segments...";
AddSegments(_segments, zero);
LOG(info) << "Done.";
LOG(info) << "Adding unmanaged regions...";
AddRegions(_regions, zero);
LOG(info) << "Done.";
LOG(info) << "Shared memory is ready for use.";
}
void AddSegments(const vector<string>& _segments, bool zero)
{ {
for (const auto& s : _segments) { for (const auto& s : _segments) {
vector<string> segmentConf; vector<string> conf;
boost::algorithm::split(segmentConf, s, boost::algorithm::is_any_of(",")); boost::algorithm::split(conf, s, boost::algorithm::is_any_of(","));
if (segmentConf.size() != 2) { if (conf.size() != 3) {
LOG(error) << "incorrect format for --segments. Expecting pairs of <id>,<size>."; LOG(error) << "incorrect format for --segments. Expecting pairs of <id>,<size><numaid>.";
fair::mq::shmem::Monitor::Cleanup(fair::mq::shmem::ShmId{shmId}); fair::mq::shmem::Monitor::Cleanup(fair::mq::shmem::ShmId{shmId});
throw runtime_error("incorrect format for --segments. Expecting pairs of <id>,<size>."); throw runtime_error("incorrect format for --segments. Expecting pairs of <id>,<size>,<numaid>.");
} }
uint16_t id = stoi(segmentConf.at(0)); uint16_t id = stoi(conf.at(0));
uint64_t size = stoull(segmentConf.at(1)); uint64_t size = stoull(conf.at(1));
segmentCfgs.emplace_back(fair::mq::shmem::SegmentConfig{id, size, "rbtree_best_fit"});
auto ret = segments.emplace(id, fair::mq::shmem::Segment(shmId, id, size, fair::mq::shmem::rbTreeBestFit)); auto ret = segments.emplace(id, fair::mq::shmem::Segment(shmId, id, size, fair::mq::shmem::rbTreeBestFit));
fair::mq::shmem::Segment& segment = ret.first->second; fair::mq::shmem::Segment& segment = ret.first->second;
LOG(info) << "Created segment " << id << " of size " << segment.GetSize() << ", starting at " << segment.GetData() << ". Locking..."; LOG(info) << "Created segment " << id << " of size " << segment.GetSize()
<< ", starting at " << segment.GetData() << ". Locking...";
segment.Lock(); segment.Lock();
LOG(info) << "Done."; LOG(info) << "Done.";
LOG(info) << "Zeroing..."; if (zero) {
segment.Zero(); LOG(info) << "Zeroing...";
LOG(info) << "Done."; segment.Zero();
} LOG(info) << "Done.";
for (const auto& r : _regions) {
vector<string> regionConf;
boost::algorithm::split(regionConf, r, boost::algorithm::is_any_of(","));
if (regionConf.size() != 2) {
LOG(error) << "incorrect format for --regions. Expecting pairs of <id>,<size>.";
fair::mq::shmem::Monitor::Cleanup(fair::mq::shmem::ShmId{shmId});
throw runtime_error("incorrect format for --regions. Expecting pairs of <id>,<size>.");
} }
uint16_t id = stoi(regionConf.at(0)); }
uint64_t size = stoull(regionConf.at(1)); }
void AddRegions(const vector<string>& _regions, bool zero)
{
for (const auto& r : _regions) {
vector<string> conf;
boost::algorithm::split(conf, r, boost::algorithm::is_any_of(","));
if (conf.size() != 3) {
LOG(error) << "incorrect format for --regions. Expecting pairs of <id>,<size>,<numaid>.";
fair::mq::shmem::Monitor::Cleanup(fair::mq::shmem::ShmId{shmId});
throw runtime_error("incorrect format for --regions. Expecting pairs of <id>,<size>,<numaid>.");
}
uint16_t id = stoi(conf.at(0));
uint64_t size = stoull(conf.at(1));
fair::mq::RegionConfig cfg;
cfg.id = id;
cfg.size = size;
regionCfgs.push_back(cfg);
auto ret = regions.emplace(id, make_unique<fair::mq::shmem::UnmanagedRegion>(shmId, id, size)); auto ret = regions.emplace(id, make_unique<fair::mq::shmem::UnmanagedRegion>(shmId, id, size));
fair::mq::shmem::UnmanagedRegion& region = *(ret.first->second); fair::mq::shmem::UnmanagedRegion& region = *(ret.first->second);
LOG(info) << "Created unamanged region " << id << " of size " << region.GetSize() << ", starting at " << region.GetData() << ". Locking..."; LOG(info) << "Created unamanged region " << id << " of size " << region.GetSize()
<< ", starting at " << region.GetData() << ". Locking...";
region.Lock(); region.Lock();
LOG(info) << "Done."; LOG(info) << "Done.";
LOG(info) << "Zeroing..."; if (zero) {
region.Zero(); LOG(info) << "Zeroing...";
LOG(info) << "Done."; region.Zero();
LOG(info) << "Done.";
}
} }
} }
bool CheckPresence()
{
for (const auto& sc : segmentCfgs) {
if (!(fair::mq::shmem::Monitor::SegmentIsPresent(fair::mq::shmem::ShmId{shmId}, sc.id))) {
return false;
}
}
for (const auto& rc : regionCfgs) {
if (!(fair::mq::shmem::Monitor::RegionIsPresent(fair::mq::shmem::ShmId{shmId}, rc.id.value()))) {
return false;
}
}
return true;
}
void ResetContent() void ResetContent()
{ {
fair::mq::shmem::Monitor::ResetContent(fair::mq::shmem::ShmId{shmId}); fair::mq::shmem::Monitor::ResetContent(fair::mq::shmem::ShmId{shmId}, segmentCfgs, regionCfgs);
}
void FullReset()
{
segments.clear();
regions.clear();
fair::mq::shmem::Monitor::Cleanup(fair::mq::shmem::ShmId{shmId});
} }
~ShmManager() ~ShmManager()
@@ -97,6 +147,8 @@ struct ShmManager
std::string shmId; std::string shmId;
map<uint16_t, fair::mq::shmem::Segment> segments; map<uint16_t, fair::mq::shmem::Segment> segments;
map<uint16_t, unique_ptr<fair::mq::shmem::UnmanagedRegion>> regions; map<uint16_t, unique_ptr<fair::mq::shmem::UnmanagedRegion>> regions;
std::vector<fair::mq::shmem::SegmentConfig> segmentCfgs;
std::vector<fair::mq::RegionConfig> regionCfgs;
}; };
int main(int argc, char** argv) int main(int argc, char** argv)
@@ -105,8 +157,11 @@ int main(int argc, char** argv)
signal(SIGINT, signalHandler); signal(SIGINT, signalHandler);
signal(SIGTERM, signalHandler); signal(SIGTERM, signalHandler);
signal(SIGUSR1, resetContentHandler);
try { try {
bool nozero = false;
bool checkPresence = true;
uint64_t shmId = 0; uint64_t shmId = 0;
vector<string> segments; vector<string> segments;
vector<string> regions; vector<string> regions;
@@ -114,8 +169,10 @@ int main(int argc, char** argv)
options_description desc("Options"); options_description desc("Options");
desc.add_options() desc.add_options()
("shmid", value<uint64_t>(&shmId)->required(), "Shm id") ("shmid", value<uint64_t>(&shmId)->required(), "Shm id")
("segments", value<vector<string>>(&segments)->multitoken()->composing(), "Segments, as <id>,<size> <id>,<size> <id>,<size> ...") ("segments", value<vector<string>>(&segments)->multitoken()->composing(), "Segments, as <id>,<size>,<numaid> <id>,<size>,<numaid> <id>,<size>,<numaid> ... (numaid: -2 disabled, -1 interleave, >=0 node)")
("regions", value<vector<string>>(&regions)->multitoken()->composing(), "Regions, as <id>,<size> <id>,<size> <id>,<size> ...") ("regions", value<vector<string>>(&regions)->multitoken()->composing(), "Regions, as <id>,<size> <id>,<size>,<numaid> <id>,<size>,<numaid> ...")
("nozero", value<bool>(&nozero)->default_value(false)->implicit_value(true), "Do not zero segments after initialization")
("check-presence", value<bool>(&checkPresence)->default_value(true)->implicit_value(true), "Check periodically if configured segments/regions are still present, and cleanup and leave if they are not")
("help,h", "Print help"); ("help,h", "Print help");
variables_map vm; variables_map vm;
@@ -128,15 +185,35 @@ int main(int argc, char** argv)
notify(vm); notify(vm);
ShmManager shmManager(shmId, segments, regions); ShmManager shmManager(shmId, segments, regions, !nozero);
while (!gStopping) { std::thread resetContentThread([&shmManager]() {
std::this_thread::sleep_for(std::chrono::milliseconds(50)); while (!gStopping) {
std::this_thread::sleep_for(std::chrono::milliseconds(50));
if (gResetContent == 1) {
LOG(info) << "Resetting content for shmId " << shmManager.shmId;
shmManager.ResetContent();
gResetContent = 0;
LOG(info) << "Done resetting content for shmId " << shmManager.shmId;
}
}
});
if (checkPresence) {
while (!gStopping) {
if (shmManager.CheckPresence() == false) {
LOG(error) << "Failed to find segments, exiting.";
gStopping = true;
}
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
} }
resetContentThread.join();
LOG(info) << "stopping."; LOG(info) << "stopping.";
} catch (exception& e) { } catch (exception& e) {
LOG(error) << "Unhandled Exception reached the top of main: " << e.what() << ", application will now exit"; LOG(error) << "Exception reached the top of main: " << e.what() << ", exiting";
return 2; return 2;
} }

View File

@@ -19,6 +19,7 @@ struct Sampler : fair::mq::Device
{ {
void InitTask() override void InitTask() override
{ {
fExternalRegion = fConfig->GetProperty<bool>("external-region");
fMsgSize = fConfig->GetProperty<int>("msg-size"); fMsgSize = fConfig->GetProperty<int>("msg-size");
fLinger = fConfig->GetProperty<uint32_t>("region-linger"); fLinger = fConfig->GetProperty<uint32_t>("region-linger");
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations"); fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
@@ -34,18 +35,26 @@ struct Sampler : fair::mq::Device
fair::mq::RegionConfig regionCfg; fair::mq::RegionConfig regionCfg;
regionCfg.linger = fLinger; // delay in ms before region destruction to collect outstanding events regionCfg.linger = fLinger; // delay in ms before region destruction to collect outstanding events
regionCfg.lock = true; // mlock region after creation // options for testing with an externally-created -region
regionCfg.zero = true; // zero region content after creation if (fExternalRegion) {
fRegion = fair::mq::UnmanagedRegionPtr(NewUnmanagedRegionFor("data", // region is created using the transport of this channel... regionCfg.id = 1;
0, // ... and this sub-channel regionCfg.removeOnDestruction = false;
10000000, // region size }
[this](const std::vector<fair::mq::RegionBlock>& blocks) { // callback to be called when message buffers no longer needed by transport regionCfg.lock = !fExternalRegion; // mlock region after creation
std::lock_guard<std::mutex> lock(fMtx); regionCfg.zero = !fExternalRegion; // zero region content after creation
fNumUnackedMsgs -= blocks.size(); fRegion = fair::mq::UnmanagedRegionPtr(NewUnmanagedRegionFor(
if (fMaxIterations > 0) { "data", // region is created using the transport of this channel...
LOG(info) << "Received " << blocks.size() << " acks"; 0, // ... and this sub-channel
} 10000000, // region size
}, regionCfg)); [this](const std::vector<fair::mq::RegionBlock>& blocks) { // callback to be called when message buffers no longer needed by transport
std::lock_guard<std::mutex> lock(fMtx);
fNumUnackedMsgs -= blocks.size();
if (fMaxIterations > 0) {
LOG(info) << "Received " << blocks.size() << " acks";
}
},
regionCfg
));
} }
bool ConditionalRun() override bool ConditionalRun() override
@@ -76,6 +85,8 @@ struct Sampler : fair::mq::Device
void ResetTask() override void ResetTask() override
{ {
// give some time for acks to be received
std::this_thread::sleep_for(std::chrono::milliseconds(250));
fRegion.reset(); fRegion.reset();
{ {
std::lock_guard<std::mutex> lock(fMtx); std::lock_guard<std::mutex> lock(fMtx);
@@ -89,6 +100,7 @@ struct Sampler : fair::mq::Device
} }
private: private:
int fExternalRegion = false;
int fMsgSize = 10000; int fMsgSize = 10000;
uint32_t fLinger = 100; uint32_t fLinger = 100;
uint64_t fMaxIterations = 0; uint64_t fMaxIterations = 0;
@@ -103,7 +115,8 @@ void addCustomOptions(bpo::options_description& options)
options.add_options() options.add_options()
("msg-size", bpo::value<int>()->default_value(1000), "Message size in bytes") ("msg-size", bpo::value<int>()->default_value(1000), "Message size in bytes")
("region-linger", bpo::value<uint32_t>()->default_value(100), "Linger period for regions") ("region-linger", bpo::value<uint32_t>()->default_value(100), "Linger period for regions")
("max-iterations", bpo::value<uint64_t>()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)"); ("max-iterations", bpo::value<uint64_t>()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)")
("external-region", bpo::value<bool>()->default_value(false), "Use region created by another process");
} }
std::unique_ptr<fair::mq::Device> getDevice(fair::mq::ProgOptions& /*config*/) std::unique_ptr<fair::mq::Device> getDevice(fair::mq::ProgOptions& /*config*/)

View File

@@ -40,7 +40,7 @@ Config::Config(const string& name, Plugin::Version version, const string& mainta
LOG(debug) << "channel-config: Parsing channel configuration"; LOG(debug) << "channel-config: Parsing channel configuration";
SetProperties(SuboptParser(GetProperty<vector<string>>("channel-config"), idForParser)); SetProperties(SuboptParser(GetProperty<vector<string>>("channel-config"), idForParser));
} else { } else {
LOG(warn) << "fair::mq::plugins::Config: no channels configuration provided via --mq-config or --channel-config"; LOG(info) << "fair::mq::plugins::Config: no channels configuration provided via --mq-config or --channel-config";
} }
} catch (exception& e) { } catch (exception& e) {
LOG(error) << e.what(); LOG(error) << e.what();

View File

@@ -207,22 +207,22 @@ class Manager
fEventCounter = fManagementSegment.find<EventCounter>(unique_instance).first; fEventCounter = fManagementSegment.find<EventCounter>(unique_instance).first;
if (fEventCounter) { if (fEventCounter) {
LOG(debug) << "event counter found: " << fEventCounter->fCount; LOG(trace) << "event counter found: " << fEventCounter->fCount;
} else { } else {
LOG(debug) << "no event counter found, creating one and initializing with 0"; LOG(trace) << "no event counter found, creating one and initializing with 0";
fEventCounter = fManagementSegment.construct<EventCounter>(unique_instance)(0); fEventCounter = fManagementSegment.construct<EventCounter>(unique_instance)(0);
LOG(debug) << "initialized event counter with: " << fEventCounter->fCount; LOG(trace) << "initialized event counter with: " << fEventCounter->fCount;
} }
fDeviceCounter = fManagementSegment.find<DeviceCounter>(unique_instance).first; fDeviceCounter = fManagementSegment.find<DeviceCounter>(unique_instance).first;
if (fDeviceCounter) { if (fDeviceCounter) {
LOG(debug) << "device counter found, with value of " << fDeviceCounter->fCount << ". incrementing."; LOG(trace) << "device counter found, with value of " << fDeviceCounter->fCount << ". incrementing.";
(fDeviceCounter->fCount)++; (fDeviceCounter->fCount)++;
LOG(debug) << "incremented device counter, now: " << fDeviceCounter->fCount; LOG(trace) << "incremented device counter, now: " << fDeviceCounter->fCount;
} else { } else {
LOG(debug) << "no device counter found, creating one and initializing with 1"; LOG(trace) << "no device counter found, creating one and initializing with 1";
fDeviceCounter = fManagementSegment.construct<DeviceCounter>(unique_instance)(1); fDeviceCounter = fManagementSegment.construct<DeviceCounter>(unique_instance)(1);
LOG(debug) << "initialized device counter with: " << fDeviceCounter->fCount; LOG(trace) << "initialized device counter with: " << fDeviceCounter->fCount;
} }
fShmSegments = fManagementSegment.find_or_construct<Uint16SegmentInfoHashMap>(unique_instance)(fShmVoidAlloc); fShmSegments = fManagementSegment.find_or_construct<Uint16SegmentInfoHashMap>(unique_instance)(fShmVoidAlloc);
@@ -265,10 +265,10 @@ class Manager
} }
} }
} }
LOG(debug) << "Created/opened shared memory segment '" << "fmq_" << fShmId << "_m_" << fSegmentId << "'." LOG(debug) << (createdSegment ? "Created" : "Opened") << " managed shared memory segment " << "fmq_" << fShmId << "_m_" << fSegmentId
<< " Size: " << boost::apply_visitor(SegmentSize(), fSegments.at(fSegmentId)) << " bytes." << ". Size: " << boost::apply_visitor(SegmentSize(), fSegments.at(fSegmentId)) << " bytes."
<< " Available: " << boost::apply_visitor(SegmentFreeMemory(), fSegments.at(fSegmentId)) << " bytes." << " Available: " << boost::apply_visitor(SegmentFreeMemory(), fSegments.at(fSegmentId)) << " bytes."
<< " Allocation algorithm: " << allocationAlgorithm; << " Allocation algorithm: " << allocationAlgorithm;
} catch (interprocess_exception& bie) { } catch (interprocess_exception& bie) {
LOG(error) << "Failed to create/open shared memory segment '" << "fmq_" << fShmId << "_m_" << fSegmentId << "': " << bie.what(); LOG(error) << "Failed to create/open shared memory segment '" << "fmq_" << fShmId << "_m_" << fSegmentId << "': " << bie.what();
throw TransportError(tools::ToString("Failed to create/open shared memory segment '", "fmq_", fShmId, "_m_", fSegmentId, "': ", bie.what())); throw TransportError(tools::ToString("Failed to create/open shared memory segment '", "fmq_", fShmId, "_m_", fSegmentId, "': ", bie.what()));
@@ -395,20 +395,26 @@ class Manager
const uint16_t id = cfg.id.value(); const uint16_t id = cfg.id.value();
std::lock_guard<std::mutex> lock(fLocalRegionsMtx);
UnmanagedRegion* region = nullptr; UnmanagedRegion* region = nullptr;
bool newRegionCreated = false;
{ auto it = fRegions.find(id);
std::lock_guard<std::mutex> lock(fLocalRegionsMtx); if (it != fRegions.end()) {
auto res = fRegions.emplace(id, std::make_unique<UnmanagedRegion>(fShmId, size, false, cfg)); region = it->second.get();
newRegionCreated = res.second; if (region->fControlling) {
LOG(error) << "Unmanaged Region with id " << id << " already exists. Only unique IDs per session are allowed.";
throw TransportError(tools::ToString("Unmanaged Region with id ", id, " already exists. Only unique IDs per session are allowed."));
}
LOG(debug) << "Unmanaged region (view) already present, promoting to controller";
region->BecomeController(cfg);
} else {
auto res = fRegions.emplace(id, std::make_unique<UnmanagedRegion>(fShmId, size, true, cfg));
region = res.first->second.get(); region = res.first->second.get();
} }
// LOG(debug) << "Created region with id '" << id << "', path: '" << cfg.path << "', flags: '" << cfg.creationFlags << "'"; // LOG(debug) << "Created region with id '" << id << "', path: '" << cfg.path << "', flags: '" << cfg.creationFlags << "'";
if (!newRegionCreated) {
region->fRemote = false; // TODO: this should be more clear, refactor it.
}
// start ack receiver only if a callback has been provided. // start ack receiver only if a callback has been provided.
if (callback || bulkCallback) { if (callback || bulkCallback) {
region->SetCallbacks(callback, bulkCallback); region->SetCallbacks(callback, bulkCallback);
@@ -429,7 +435,7 @@ class Manager
} }
} }
UnmanagedRegion* GetRegion(uint16_t id) UnmanagedRegion* GetRegionFromCache(uint16_t id)
{ {
// NOTE: gcc optimizations. Prevent loading tls addresses many times in the fast path // NOTE: gcc optimizations. Prevent loading tls addresses many times in the fast path
const auto &lTlCache = fTlRegionCache; const auto &lTlCache = fTlRegionCache;
@@ -443,41 +449,39 @@ class Manager
} }
} }
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> shmLock(*fShmMtx);
// slow path: check invalidation // slow path: check invalidation
if (lTlCacheGen != fRegionsGen) { if (lTlCacheGen != fRegionsGen) {
fTlRegionCache.fRegionsTLCache.clear(); fTlRegionCache.fRegionsTLCache.clear();
} }
std::lock_guard<std::mutex> lock(fLocalRegionsMtx); auto* lRegion = GetRegion(id);
auto* lRegion = GetRegionUnsafe(id, shmLock);
fTlRegionCache.fRegionsTLCache.emplace_back(std::make_tuple(lRegion, id, fShmId64)); fTlRegionCache.fRegionsTLCache.emplace_back(std::make_tuple(lRegion, id, fShmId64));
fTlRegionCache.fRegionsTLCacheGen = fRegionsGen; fTlRegionCache.fRegionsTLCacheGen = fRegionsGen;
return lRegion; return lRegion;
} }
UnmanagedRegion* GetRegionUnsafe(uint16_t id, boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex>& lockedShmLock) UnmanagedRegion* GetRegion(uint16_t id)
{ {
// remote region could actually be a local one if a message originates from this device (has been sent out and returned) std::lock_guard<std::mutex> lock(fLocalRegionsMtx);
auto it = fRegions.find(id); auto it = fRegions.find(id);
if (it != fRegions.end()) { if (it != fRegions.end()) {
return it->second.get(); return it->second.get();
} else { } else {
try { try {
// get region info
RegionInfo regionInfo = fShmRegions->at(id);
// safe to unlock now - no shm container accessed after this
lockedShmLock.unlock();
RegionConfig cfg; RegionConfig cfg;
cfg.id = id; // get region info
cfg.creationFlags = regionInfo.fCreationFlags; {
cfg.path = regionInfo.fPath.c_str(); boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> shmLock(*fShmMtx);
RegionInfo regionInfo = fShmRegions->at(id);
cfg.id = id;
cfg.creationFlags = regionInfo.fCreationFlags;
cfg.path = regionInfo.fPath.c_str();
}
// LOG(debug) << "Located remote region with id '" << id << "', path: '" << cfg.path << "', flags: '" << cfg.creationFlags << "'"; // LOG(debug) << "Located remote region with id '" << id << "', path: '" << cfg.path << "', flags: '" << cfg.creationFlags << "'";
auto r = fRegions.emplace(id, std::make_unique<UnmanagedRegion>(fShmId, 0, true, std::move(cfg))); auto r = fRegions.emplace(id, std::make_unique<UnmanagedRegion>(fShmId, 0, false, std::move(cfg)));
r.first->second->InitializeQueues(); r.first->second->InitializeQueues();
r.first->second->StartAckSender(); r.first->second->StartAckSender();
lockedShmLock.lock();
return r.first->second.get(); return r.first->second.get();
} catch (std::out_of_range& oor) { } catch (std::out_of_range& oor) {
LOG(error) << "Could not get remote region with id '" << id << "'. Does the region creator run with the same session id?"; LOG(error) << "Could not get remote region with id '" << id << "'. Does the region creator run with the same session id?";
@@ -493,10 +497,10 @@ class Manager
void RemoveRegion(uint16_t id) void RemoveRegion(uint16_t id)
{ {
try { try {
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> shmLock(*fShmMtx);
std::lock_guard<std::mutex> lock(fLocalRegionsMtx); std::lock_guard<std::mutex> lock(fLocalRegionsMtx);
fRegions.at(id)->StopAcks(); fRegions.at(id)->StopAcks();
{ {
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> shmLock(*fShmMtx);
if (fRegions.at(id)->RemoveOnDestruction()) { if (fRegions.at(id)->RemoveOnDestruction()) {
fShmRegions->at(id).fDestroyed = true; fShmRegions->at(id).fDestroyed = true;
(fEventCounter->fCount)++; (fEventCounter->fCount)++;
@@ -512,44 +516,73 @@ class Manager
std::vector<fair::mq::RegionInfo> GetRegionInfo() std::vector<fair::mq::RegionInfo> GetRegionInfo()
{ {
std::vector<fair::mq::RegionInfo> result; std::vector<fair::mq::RegionInfo> result;
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> shmLock(*fShmMtx); std::map<uint64_t, RegionConfig> regionCfgs;
for (const auto& e : *fShmSegments) { {
// make sure any segments in the session are found boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> shmLock(*fShmMtx);
GetSegment(e.first);
try { for (const auto& [segmentId, segmentInfo] : *fShmSegments) {
// make sure any segments in the session are found
GetSegment(segmentId);
try {
fair::mq::RegionInfo info;
info.managed = true;
info.id = segmentId;
info.event = RegionEvent::created;
info.ptr = boost::apply_visitor(SegmentAddress(), fSegments.at(segmentId));
info.size = boost::apply_visitor(SegmentSize(), fSegments.at(segmentId));
result.push_back(info);
} catch (const std::out_of_range& oor) {
LOG(error) << "could not find segment with id " << segmentId;
LOG(error) << oor.what();
}
}
for (const auto& [regionId, regionInfo] : *fShmRegions) {
fair::mq::RegionInfo info; fair::mq::RegionInfo info;
info.managed = true; info.managed = false;
info.id = e.first; info.id = regionId;
info.event = RegionEvent::created; info.flags = regionInfo.fUserFlags;
info.ptr = boost::apply_visitor(SegmentAddress(), fSegments.at(e.first)); info.event = regionInfo.fDestroyed ? RegionEvent::destroyed : RegionEvent::created;
info.size = boost::apply_visitor(SegmentSize(), fSegments.at(e.first)); if (info.event == RegionEvent::created) {
RegionConfig cfg;
cfg.id = info.id;
cfg.creationFlags = regionInfo.fCreationFlags;
cfg.path = regionInfo.fPath.c_str();
regionCfgs.emplace(info.id, cfg);
// fill the ptr+size info after shmLock is released, to avoid constructing local region under it
} else {
info.ptr = nullptr;
info.size = 0;
}
result.push_back(info); result.push_back(info);
} catch (const std::out_of_range& oor) {
LOG(error) << "could not find segment with id " << e.first;
LOG(error) << oor.what();
} }
} }
for (const auto& e : *fShmRegions) { // do another iteration outside of shm lock, to fill ptr+size of unmanaged regions
fair::mq::RegionInfo info; for (auto& info : result) {
info.managed = false; if (!info.managed && info.event == RegionEvent::created) {
info.id = e.first; auto cfgIt = regionCfgs.find(info.id);
info.flags = e.second.fUserFlags; if (cfgIt != regionCfgs.end()) {
info.event = e.second.fDestroyed ? RegionEvent::destroyed : RegionEvent::created; UnmanagedRegion* region = nullptr;
if (info.event == RegionEvent::created) { std::lock_guard<std::mutex> lock(fLocalRegionsMtx);
auto region = GetRegionUnsafe(info.id, shmLock); auto it = fRegions.find(info.id);
if (region) { if (it != fRegions.end()) {
region = it->second.get();
} else {
auto r = fRegions.emplace(cfgIt->first, std::make_unique<UnmanagedRegion>(fShmId, 0, false, cfgIt->second));
region = r.first->second.get();
region->InitializeQueues();
region->StartAckSender();
}
info.ptr = region->GetData(); info.ptr = region->GetData();
info.size = region->GetSize(); info.size = region->GetSize();
} else { } else {
throw std::runtime_error(tools::ToString("GetRegionInfo() could not get region with id '", info.id, "'")); info.ptr = nullptr;
info.size = 0;
} }
} else {
info.ptr = nullptr;
info.size = 0;
} }
result.push_back(info);
} }
return result; return result;

View File

@@ -195,7 +195,7 @@ class Message final : public fair::mq::Message
fLocalPtr = nullptr; fLocalPtr = nullptr;
} }
} else { } else {
fRegionPtr = fManager.GetRegion(fMeta.fRegionId); fRegionPtr = fManager.GetRegionFromCache(fMeta.fRegionId);
if (fRegionPtr) { if (fRegionPtr) {
fLocalPtr = reinterpret_cast<char*>(fRegionPtr->GetData()) + fMeta.fHandle; fLocalPtr = reinterpret_cast<char*>(fRegionPtr->GetData()) + fMeta.fHandle;
} else { } else {
@@ -365,7 +365,7 @@ class Message final : public fair::mq::Message
void ReleaseUnmanagedRegionBlock() void ReleaseUnmanagedRegionBlock()
{ {
if (!fRegionPtr) { if (!fRegionPtr) {
fRegionPtr = fManager.GetRegion(fMeta.fRegionId); fRegionPtr = fManager.GetRegionFromCache(fMeta.fRegionId);
} }
if (fRegionPtr) { if (fRegionPtr) {

View File

@@ -23,6 +23,7 @@
#include <boost/interprocess/ipc/message_queue.hpp> #include <boost/interprocess/ipc/message_queue.hpp>
#include <csignal> #include <csignal>
#include <cstdio>
#include <iostream> #include <iostream>
#include <iomanip> #include <iomanip>
#include <chrono> #include <chrono>
@@ -533,6 +534,88 @@ unsigned long Monitor::GetFreeMemory(const SessionId& sessionId, uint16_t segmen
return GetFreeMemory(shmId, segmentId); return GetFreeMemory(shmId, segmentId);
} }
bool Monitor::SegmentIsPresent(const ShmId& shmId, uint16_t segmentId)
{
using namespace boost::interprocess;
try {
bipc::managed_shared_memory managementSegment(bipc::open_read_only, std::string("fmq_" + shmId.shmId + "_mng").c_str());
Uint16SegmentInfoHashMap* shmSegments = managementSegment.find<Uint16SegmentInfoHashMap>(unique_instance).first;
if (!shmSegments) {
LOG(error) << "Found management segment, but could not locate segment info";
return false;
}
auto it = shmSegments->find(segmentId);
if (it != shmSegments->end()) {
try {
if (it->second.fAllocationAlgorithm == AllocationAlgorithm::rbtree_best_fit) {
RBTreeBestFitSegment segment(open_read_only, std::string("fmq_" + shmId.shmId + "_m_" + std::to_string(segmentId)).c_str());
} else {
SimpleSeqFitSegment segment(open_read_only, std::string("fmq_" + shmId.shmId + "_m_" + std::to_string(segmentId)).c_str());
}
} catch (bie&) {
LOG(error) << "Could not find segment with id '" << segmentId << "' for shmId '" << shmId.shmId << "'";
return false;
}
} else {
LOG(error) << "Could not find segment info for segment id '" << segmentId << "' for shmId '" << shmId.shmId << "'";
return false;
}
} catch (bie&) {
LOG(error) << "Could not find management segment for shmid '" << shmId.shmId << "'";
return false;
}
return true;
}
bool Monitor::SegmentIsPresent(const SessionId& sessionId, uint16_t segmentId)
{
ShmId shmId{makeShmIdStr(sessionId.sessionId)};
return SegmentIsPresent(shmId, segmentId);
}
bool Monitor::RegionIsPresent(const ShmId& shmId, uint16_t regionId)
{
using namespace boost::interprocess;
try {
bipc::managed_shared_memory managementSegment(bipc::open_read_only, std::string("fmq_" + shmId.shmId + "_mng").c_str());
Uint16RegionInfoHashMap* shmRegions = managementSegment.find<Uint16RegionInfoHashMap>(bipc::unique_instance).first;
if (!shmRegions) {
LOG(error) << "Found management segment, but could not locate region info";
return false;
}
std::string regionFileName("fmq_" + shmId.shmId + "_rg_" + to_string(regionId));
auto it = shmRegions->find(regionId);
if (it != shmRegions->end()) {
try {
if (it->second.fPath.empty()) {
shared_memory_object object(open_only, regionFileName.c_str(), read_only);
}
} catch (bie&) {
LOG(error) << "Could not find region with id '" << regionId << "' for shmId '" << shmId.shmId << "'";
return false;
}
} else {
LOG(error) << "Could not find region info for region id '" << regionId << "' for shmId '" << shmId.shmId << "'";
return false;
}
} catch (bie&) {
LOG(error) << "Could not find management segment for shmid '" << shmId.shmId << "'";
return false;
}
return true;
}
bool Monitor::RegionIsPresent(const SessionId& sessionId, uint16_t regionId)
{
ShmId shmId{makeShmIdStr(sessionId.sessionId)};
return RegionIsPresent(shmId, regionId);
}
void Monitor::PrintHelp() void Monitor::PrintHelp()
{ {
LOG(info) << "controls: [x] close memory, " LOG(info) << "controls: [x] close memory, "
@@ -574,7 +657,7 @@ std::vector<std::pair<std::string, bool>> Monitor::Cleanup(const ShmId& shmIdT,
string managementSegmentName("fmq_" + shmId + "_mng"); string managementSegmentName("fmq_" + shmId + "_mng");
try { try {
bipc::managed_shared_memory managementSegment(bipc::open_only, managementSegmentName.c_str()); bipc::managed_shared_memory managementSegment(bipc::open_read_only, managementSegmentName.c_str());
Uint16RegionInfoHashMap* shmRegions = managementSegment.find<Uint16RegionInfoHashMap>(bipc::unique_instance).first; Uint16RegionInfoHashMap* shmRegions = managementSegment.find<Uint16RegionInfoHashMap>(bipc::unique_instance).first;
if (shmRegions) { if (shmRegions) {
@@ -587,7 +670,7 @@ std::vector<std::pair<std::string, bool>> Monitor::Cleanup(const ShmId& shmIdT,
string path = info.fPath.c_str(); string path = info.fPath.c_str();
int flags = info.fCreationFlags; int flags = info.fCreationFlags;
if (verbose) { if (verbose) {
LOG(info) << "Found RegionInfo with path: '" << path << "', flags: " << flags << ", fDestroyed: " << info.fDestroyed << "."; LOG(info) << "Found UnmanagedRegion with id: " << id << ", path: '" << path << "', flags: " << flags << ", fDestroyed: " << info.fDestroyed << ".";
} }
if (!path.empty()) { if (!path.empty()) {
result.emplace_back(Remove<bipc::file_mapping>(path + "fmq_" + shmId + "_rg_" + to_string(id), verbose)); result.emplace_back(Remove<bipc::file_mapping>(path + "fmq_" + shmId + "_rg_" + to_string(id), verbose));
@@ -660,27 +743,35 @@ void Monitor::ResetContent(const ShmId& shmIdT, bool verbose /* = true */)
managed_shared_memory managementSegment(open_only, managementSegmentName.c_str()); managed_shared_memory managementSegment(open_only, managementSegmentName.c_str());
Uint16SegmentInfoHashMap* segmentInfos = managementSegment.find<Uint16SegmentInfoHashMap>(unique_instance).first; Uint16SegmentInfoHashMap* segmentInfos = managementSegment.find<Uint16SegmentInfoHashMap>(unique_instance).first;
for (const auto& s : *segmentInfos) { if (segmentInfos) {
if (verbose) { cout << "Found info for " << segmentInfos->size() << " managed segments" << endl;
cout << "Resetting content of segment '" << "fmq_" << shmId << "_m_" << s.first << "'..." << endl; for (const auto& s : *segmentInfos) {
}
try {
if (s.second.fAllocationAlgorithm == AllocationAlgorithm::rbtree_best_fit) {
RBTreeBestFitSegment segment(open_only, std::string("fmq_" + shmId + "_m_" + to_string(s.first)).c_str());
void* ptr = segment.get_segment_manager();
size_t size = segment.get_segment_manager()->get_size();
new(ptr) segment_manager<char, rbtree_best_fit<mutex_family, offset_ptr<void>>, null_index>(size);
} else {
SimpleSeqFitSegment segment(open_only, std::string("fmq_" + shmId + "_m_" + to_string(s.first)).c_str());
void* ptr = segment.get_segment_manager();
size_t size = segment.get_segment_manager()->get_size();
new(ptr) segment_manager<char, simple_seq_fit<mutex_family, offset_ptr<void>>, null_index>(size);
}
} catch (bie& e) {
if (verbose) { if (verbose) {
cout << "Error resetting content of segment '" << std::string("fmq_" + shmId + "_m_" + to_string(s.first)) << "': " << e.what() << endl; cout << "Resetting content of segment '" << "fmq_" << shmId << "_m_" << s.first << "'..." << endl;
}
try {
if (s.second.fAllocationAlgorithm == AllocationAlgorithm::rbtree_best_fit) {
RBTreeBestFitSegment segment(open_only, std::string("fmq_" + shmId + "_m_" + to_string(s.first)).c_str());
void* ptr = segment.get_segment_manager();
size_t size = segment.get_segment_manager()->get_size();
new(ptr) segment_manager<char, rbtree_best_fit<mutex_family, offset_ptr<void>>, null_index>(size);
} else {
SimpleSeqFitSegment segment(open_only, std::string("fmq_" + shmId + "_m_" + to_string(s.first)).c_str());
void* ptr = segment.get_segment_manager();
size_t size = segment.get_segment_manager()->get_size();
new(ptr) segment_manager<char, simple_seq_fit<mutex_family, offset_ptr<void>>, null_index>(size);
}
if (verbose) {
cout << "Done." << endl;
}
} catch (bie& e) {
if (verbose) {
cout << "Error resetting content of segment '" << std::string("fmq_" + shmId + "_m_" + to_string(s.first)) << "': " << e.what() << endl;
}
} }
} }
} else {
cout << "Found management segment, but cannot locate segment info, something went wrong..." << endl;
} }
Uint16RegionInfoHashMap* shmRegions = managementSegment.find<Uint16RegionInfoHashMap>(bipc::unique_instance).first; Uint16RegionInfoHashMap* shmRegions = managementSegment.find<Uint16RegionInfoHashMap>(bipc::unique_instance).first;
@@ -717,13 +808,15 @@ void Monitor::ResetContent(const ShmId& shmIdT, const std::vector<SegmentConfig>
std::string shmId = shmIdT.shmId; std::string shmId = shmIdT.shmId;
std::string managementSegmentName("fmq_" + shmId + "_mng"); std::string managementSegmentName("fmq_" + shmId + "_mng");
// reset managed segments
ResetContent(shmIdT, verbose);
// delete management segment // delete management segment
cout << "deleting management segment" << endl;
Remove<bipc::shared_memory_object>(managementSegmentName, verbose); Remove<bipc::shared_memory_object>(managementSegmentName, verbose);
// recreate management segment // recreate management segment
cout << "recreating management segment..." << endl;
managed_shared_memory mngSegment(create_only, managementSegmentName.c_str(), kManagementSegmentSize); managed_shared_memory mngSegment(create_only, managementSegmentName.c_str(), kManagementSegmentSize);
cout << "done." << endl;
// fill management segment with segment & region infos // fill management segment with segment & region infos
cout << "filling management segment with managed segment configs..." << endl;
for (const auto& s : segmentCfgs) { for (const auto& s : segmentCfgs) {
if (s.allocationAlgorithm == "rbtree_best_fit") { if (s.allocationAlgorithm == "rbtree_best_fit") {
Segment::Register(shmId, s.id, AllocationAlgorithm::rbtree_best_fit); Segment::Register(shmId, s.id, AllocationAlgorithm::rbtree_best_fit);
@@ -734,9 +827,14 @@ void Monitor::ResetContent(const ShmId& shmIdT, const std::vector<SegmentConfig>
throw MonitorError("Unknown allocation algorithm provided: " + s.allocationAlgorithm); throw MonitorError("Unknown allocation algorithm provided: " + s.allocationAlgorithm);
} }
} }
cout << "done." << endl;
cout << "filling management segment with unmanaged region configs..." << endl;
for (const auto& r : regionCfgs) { for (const auto& r : regionCfgs) {
fair::mq::shmem::UnmanagedRegion::Register(shmId, r); fair::mq::shmem::UnmanagedRegion::Register(shmId, r);
} }
cout << "done." << endl;
// reset managed segments
ResetContent(shmIdT, verbose);
} }
void Monitor::ResetContent(const SessionId& sessionId, const std::vector<SegmentConfig>& segmentCfgs, const std::vector<RegionConfig>& regionCfgs, bool verbose /* = true */) void Monitor::ResetContent(const SessionId& sessionId, const std::vector<SegmentConfig>& segmentCfgs, const std::vector<RegionConfig>& regionCfgs, bool verbose /* = true */)

View File

@@ -119,7 +119,7 @@ class Monitor
/// @param sessionId session id /// @param sessionId session id
static std::unordered_map<uint16_t, std::vector<BufferDebugInfo>> GetDebugInfo(const SessionId& sessionId); static std::unordered_map<uint16_t, std::vector<BufferDebugInfo>> GetDebugInfo(const SessionId& sessionId);
/// @brief Returns the amount of free memory in the specified segment /// @brief Returns the amount of free memory in the specified segment
/// @param sessionId shmem id /// @param shmId shmem id
/// @param segmentId segment id /// @param segmentId segment id
/// @throws MonitorError /// @throws MonitorError
static unsigned long GetFreeMemory(const ShmId& shmId, uint16_t segmentId); static unsigned long GetFreeMemory(const ShmId& shmId, uint16_t segmentId);
@@ -128,6 +128,23 @@ class Monitor
/// @param segmentId segment id /// @param segmentId segment id
/// @throws MonitorError /// @throws MonitorError
static unsigned long GetFreeMemory(const SessionId& sessionId, uint16_t segmentId); static unsigned long GetFreeMemory(const SessionId& sessionId, uint16_t segmentId);
/// @brief Checks if a given segment can be opened
/// @param shmId shmem id
/// @param segmentId segment id
static bool SegmentIsPresent(const ShmId& shmId, uint16_t segmentId);
/// @brief Checks if a given segment can be opened
/// @param sessionId session id
/// @param segmentId segment id
static bool SegmentIsPresent(const SessionId& sessionId, uint16_t segmentId);
/// @brief Checks if a given region can be opened
/// @param shmId shmem id
/// @param regionId region id
static bool RegionIsPresent(const ShmId& shmId, uint16_t regionId);
/// @brief Checks if a given region can be opened
/// @param sessionId session id
/// @param regionId region id
static bool RegionIsPresent(const SessionId& sessionId, uint16_t regionId);
static bool PrintShm(const ShmId& shmId); static bool PrintShm(const ShmId& shmId);
static void ListAll(const std::string& path); static void ListAll(const std::string& path);

View File

@@ -44,19 +44,19 @@ struct UnmanagedRegion
friend class Monitor; friend class Monitor;
UnmanagedRegion(const std::string& shmId, uint16_t id, uint64_t size) UnmanagedRegion(const std::string& shmId, uint16_t id, uint64_t size)
: UnmanagedRegion(shmId, size, false, makeRegionConfig(id)) : UnmanagedRegion(shmId, size, true, makeRegionConfig(id))
{} {}
UnmanagedRegion(const std::string& shmId, uint64_t size, RegionConfig cfg) UnmanagedRegion(const std::string& shmId, uint64_t size, RegionConfig cfg)
: UnmanagedRegion(shmId, size, false, std::move(cfg)) : UnmanagedRegion(shmId, size, true, std::move(cfg))
{} {}
UnmanagedRegion(const std::string& shmId, RegionConfig cfg) UnmanagedRegion(const std::string& shmId, RegionConfig cfg)
: UnmanagedRegion(shmId, cfg.size, false, std::move(cfg)) : UnmanagedRegion(shmId, cfg.size, true, std::move(cfg))
{} {}
UnmanagedRegion(const std::string& shmId, uint64_t size, bool remote, RegionConfig cfg) UnmanagedRegion(const std::string& shmId, uint64_t size, bool controlling, RegionConfig cfg)
: fRemote(remote) : fControlling(controlling)
, fRemoveOnDestruction(cfg.removeOnDestruction) , fRemoveOnDestruction(cfg.removeOnDestruction)
, fLinger(cfg.linger) , fLinger(cfg.linger)
, fStopAcks(false) , fStopAcks(false)
@@ -73,11 +73,15 @@ struct UnmanagedRegion
// TODO: refactor this // TODO: refactor this
cfg.size = size; cfg.size = size;
const uint16_t id = cfg.id.value();
bool created = false;
LOG(debug) << "UnmanagedRegion(): " << fName << " (" << (fControlling ? "controller" : "viewer") << ")";
if (!cfg.path.empty()) { if (!cfg.path.empty()) {
fName = std::string(cfg.path + fName); fName = std::string(cfg.path + fName);
if (!fRemote) { if (fControlling) {
// create a file // create a file
std::filebuf fbuf; std::filebuf fbuf;
if (fbuf.open(fName, std::ios_base::in | std::ios_base::out | std::ios_base::trunc | std::ios_base::binary)) { if (fbuf.open(fName, std::ios_base::in | std::ios_base::out | std::ios_base::trunc | std::ios_base::binary)) {
@@ -92,23 +96,30 @@ struct UnmanagedRegion
if (!fFile) { if (!fFile) {
LOG(error) << "Failed to initialize file: " << fName; LOG(error) << "Failed to initialize file: " << fName;
LOG(error) << "errno: " << errno << ": " << strerror(errno); LOG(error) << "errno: " << errno << ": " << strerror(errno);
throw std::runtime_error(tools::ToString("Failed to initialize file for shared memory region: ", strerror(errno))); throw TransportError(tools::ToString("Failed to initialize file for shared memory region: ", strerror(errno)));
} }
fFileMapping = file_mapping(fName.c_str(), read_write); fFileMapping = file_mapping(fName.c_str(), read_write);
LOG(debug) << "shmem: initialized file: " << fName; LOG(debug) << "UnmanagedRegion(): initialized file: " << fName;
fRegion = mapped_region(fFileMapping, read_write, 0, size, 0, cfg.creationFlags); fRegion = mapped_region(fFileMapping, read_write, 0, size, 0, cfg.creationFlags);
} else { } else {
try { try {
// if opening fails, create // if opening fails, create
try { try {
fShmemObject = shared_memory_object(open_only, fName.c_str(), read_write); fShmemObject = shared_memory_object(open_only, fName.c_str(), read_write);
created = false;
} catch (interprocess_exception& e) { } catch (interprocess_exception& e) {
LOG(debug) << "Could not open " << (remote ? "remote" : "local") << " shared_memory_object for region id '" << cfg.id.value() << "': " << e.what() << ", creating..."; if (fControlling) {
fShmemObject = shared_memory_object(create_only, fName.c_str(), read_write); LOG(debug) << "Could not open controlling shared_memory_object for region " << id << ": " << e.what() << ", creating...";
fShmemObject.truncate(size); fShmemObject = shared_memory_object(create_only, fName.c_str(), read_write);
fShmemObject.truncate(size);
created = true;
} else {
LOG(error) << "Could not open view for shared_memory_object for region " << id << ": " << e.what();
throw TransportError(tools::ToString("Could not open view for shared_memory_object for region ", id, ": ", e.what()));
}
} }
} catch (interprocess_exception& e) { } catch (interprocess_exception& e) {
LOG(error) << "Failed " << (remote ? "opening" : "creating") << " shared_memory_object for region id '" << cfg.id.value() << "': " << e.what(); LOG(error) << "Failed initializing shared_memory_object for region id " << id << ": " << e.what();
throw; throw;
} }
@@ -119,27 +130,27 @@ struct UnmanagedRegion
throw TransportError(tools::ToString("Created/opened region size (", fRegion.get_size(), ") does not match configured size (", size, ")")); throw TransportError(tools::ToString("Created/opened region size (", fRegion.get_size(), ") does not match configured size (", size, ")"));
} }
} catch (interprocess_exception& e) { } catch (interprocess_exception& e) {
LOG(error) << "Failed mapping shared_memory_object for region id '" << cfg.id.value() << "': " << e.what(); LOG(error) << "Failed mapping shared_memory_object for region id " << id << ": " << e.what();
throw; throw;
} }
} }
if (cfg.lock) { if (cfg.lock) {
LOG(debug) << "Locking region " << cfg.id.value() << "..."; LOG(debug) << "Locking region " << id << "...";
Lock(); Lock();
LOG(debug) << "Successfully locked region " << cfg.id.value() << "."; LOG(debug) << "Successfully locked region " << id << ".";
} }
if (cfg.zero) { if (cfg.zero) {
LOG(debug) << "Zeroing free memory of region " << cfg.id.value() << "..."; LOG(debug) << "Zeroing free memory of region " << id << "...";
Zero(); Zero();
LOG(debug) << "Successfully zeroed free memory of region " << cfg.id.value() << "."; LOG(debug) << "Successfully zeroed free memory of region " << id << ".";
} }
if (!remote) { if (fControlling && created) {
Register(shmId, cfg); Register(shmId, cfg);
} }
LOG(trace) << "shmem: initialized region: " << fName << " (" << (remote ? "remote" : "local") << ")"; LOG(debug) << (created ? "Created" : "Opened") << " unmanaged shared memory region: " << fName << " (" << (fControlling ? "controller" : "viewer") << ")";
} }
UnmanagedRegion() = delete; UnmanagedRegion() = delete;
@@ -149,6 +160,13 @@ struct UnmanagedRegion
UnmanagedRegion& operator=(const UnmanagedRegion&) = delete; UnmanagedRegion& operator=(const UnmanagedRegion&) = delete;
UnmanagedRegion& operator=(UnmanagedRegion&&) = delete; UnmanagedRegion& operator=(UnmanagedRegion&&) = delete;
void BecomeController(RegionConfig& cfg)
{
fControlling = true;
fLinger = cfg.linger;
fRemoveOnDestruction = cfg.removeOnDestruction;
}
void Zero() void Zero()
{ {
memset(fRegion.get_address(), 0x00, fRegion.get_size()); memset(fRegion.get_address(), 0x00, fRegion.get_size());
@@ -171,6 +189,7 @@ struct UnmanagedRegion
~UnmanagedRegion() ~UnmanagedRegion()
{ {
LOG(debug) << "~UnmanagedRegion(): " << fName << " (" << (fControlling ? "controller" : "viewer") << ")";
fStopAcks = true; fStopAcks = true;
if (fAcksSender.joinable()) { if (fAcksSender.joinable()) {
@@ -178,7 +197,7 @@ struct UnmanagedRegion
fAcksSender.join(); fAcksSender.join();
} }
if (!fRemote) { if (fControlling) {
if (fAcksReceiver.joinable()) { if (fAcksReceiver.joinable()) {
fAcksReceiver.join(); fAcksReceiver.join();
} }
@@ -204,14 +223,14 @@ struct UnmanagedRegion
fclose(fFile); fclose(fFile);
} }
} else { } else {
// LOG(debug) << "Region queue '" << fQueueName << "' is remote, no cleanup necessary"; // LOG(debug) << "Region queue '" << fQueueName << "' is viewer, no cleanup necessary";
} }
// LOG(debug) << "Region '" << fName << "' (" << (fRemote ? "remote" : "local") << ") destructed."; // LOG(debug) << "Region '" << fName << "' (" << (fControlling ? "controller" : "viewer") << ") destructed.";
} }
private: private:
bool fRemote; bool fControlling;
bool fRemoveOnDestruction; bool fRemoveOnDestruction;
uint32_t fLinger; uint32_t fLinger;
std::atomic<bool> fStopAcks; std::atomic<bool> fStopAcks;
@@ -243,6 +262,7 @@ struct UnmanagedRegion
static void Register(const std::string& shmId, const RegionConfig& cfg) static void Register(const std::string& shmId, const RegionConfig& cfg)
{ {
using namespace boost::interprocess; using namespace boost::interprocess;
LOG(debug) << "Registering unmanaged shared memory region with id " << cfg.id.value();
managed_shared_memory mngSegment(open_or_create, std::string("fmq_" + shmId + "_mng").c_str(), kManagementSegmentSize); managed_shared_memory mngSegment(open_or_create, std::string("fmq_" + shmId + "_mng").c_str(), kManagementSegmentSize);
VoidAlloc alloc(mngSegment.get_segment_manager()); VoidAlloc alloc(mngSegment.get_segment_manager());
@@ -250,10 +270,14 @@ struct UnmanagedRegion
EventCounter* eventCounter = mngSegment.find_or_construct<EventCounter>(unique_instance)(0); EventCounter* eventCounter = mngSegment.find_or_construct<EventCounter>(unique_instance)(0);
bool newShmRegionCreated = shmRegions->emplace(cfg.id.value(), RegionInfo(cfg.path.c_str(), cfg.creationFlags, cfg.userFlags, cfg.size, alloc)).second; auto it = shmRegions->find(cfg.id.value());
if (newShmRegionCreated) { if (it != shmRegions->end()) {
(eventCounter->fCount)++; LOG(error) << "Unmanaged Region with id " << cfg.id.value() << " has already been registered. Only unique IDs per session are allowed.";
throw TransportError(tools::ToString("Unmanaged Region with id ", cfg.id.value(), " has already been registered. Only unique IDs per session are allowed."));
} }
shmRegions->emplace(cfg.id.value(), RegionInfo(cfg.path.c_str(), cfg.creationFlags, cfg.userFlags, cfg.size, alloc)).second;
(eventCounter->fCount)++;
} }
void SetCallbacks(RegionCallback callback, RegionBulkCallback bulkCallback) void SetCallbacks(RegionCallback callback, RegionBulkCallback bulkCallback)

View File

@@ -40,9 +40,9 @@ class UnmanagedRegionImpl final : public fair::mq::UnmanagedRegion
, fRegion(nullptr) , fRegion(nullptr)
, fRegionId(0) , fRegionId(0)
{ {
auto result = fManager.CreateRegion(size, callback, bulkCallback, std::move(cfg)); auto [regionPtr, regionId] = fManager.CreateRegion(size, callback, bulkCallback, std::move(cfg));
fRegion = result.first; fRegion = regionPtr;
fRegionId = result.second; fRegionId = regionId;
} }
UnmanagedRegionImpl(const UnmanagedRegionImpl&) = delete; UnmanagedRegionImpl(const UnmanagedRegionImpl&) = delete;

View File

@@ -6,6 +6,11 @@
* copied verbatim in the file "LICENSE" * * copied verbatim in the file "LICENSE" *
********************************************************************************/ ********************************************************************************/
#include <fairmq/shmem/Common.h>
#include <fairmq/shmem/Monitor.h>
#include <fairmq/shmem/Segment.h>
#include <fairmq/shmem/UnmanagedRegion.h>
#include <fairmq/TransportFactory.h> #include <fairmq/TransportFactory.h>
#include <fairmq/ProgOptions.h> #include <fairmq/ProgOptions.h>
#include <fairmq/tools/Unique.h> #include <fairmq/tools/Unique.h>
@@ -16,8 +21,12 @@
#include <gtest/gtest.h> #include <gtest/gtest.h>
#include <cstdint>
#include <map>
#include <memory> // make_unique #include <memory> // make_unique
#include <string> #include <string>
#include <utility> // pair
#include <vector> // pair
namespace namespace
{ {
@@ -25,6 +34,34 @@ namespace
using namespace std; using namespace std;
using namespace fair::mq; using namespace fair::mq;
struct ShmOwner
{
ShmOwner(const string& sessionId,
const vector<pair<uint16_t, size_t>>& segments,
const vector<pair<uint16_t, size_t>>& regions)
: fShmId(fair::mq::shmem::makeShmIdStr(sessionId))
{
LOG(info) << "ShmOwner: creating segments";
for (auto [id, size] : segments) {
fSegments.emplace(id, fair::mq::shmem::Segment(fShmId, id, size, fair::mq::shmem::rbTreeBestFit));
}
LOG(info) << "ShmOwner: creating regions";
for (auto [id, size] : regions) {
fRegions.emplace(id, make_unique<fair::mq::shmem::UnmanagedRegion>(fShmId, id, size));
}
}
~ShmOwner()
{
LOG(info) << "ShmOwner: cleaning up";
fair::mq::shmem::Monitor::Cleanup(fair::mq::shmem::ShmId{fShmId});
}
string fShmId;
map<uint16_t, fair::mq::shmem::Segment> fSegments;
map<uint16_t, unique_ptr<fair::mq::shmem::UnmanagedRegion>> fRegions;
};
void RegionsSizeMismatch() void RegionsSizeMismatch()
{ {
size_t session = tools::UuidHash(); size_t session = tools::UuidHash();
@@ -108,31 +145,69 @@ void RegionsCache(const string& transport, const string& address)
} }
} }
void RegionEventSubscriptions(const string& transport) void RegionEventSubscriptions(const string& transport, bool external)
{ {
fair::Logger::SetConsoleSeverity(fair::Severity::debug);
unique_ptr<ShmOwner> shmOwner = nullptr;
size_t session{tools::UuidHash()}; size_t session{tools::UuidHash()};
constexpr int sSize = 100000000;
constexpr int r1Size = 1000000;
constexpr int r2Size = 5000000;
constexpr uint16_t sId = 0;
constexpr uint16_t r1id = 100;
constexpr uint16_t r2id = 101;
if (external) {
shmOwner = make_unique<ShmOwner>(
to_string(session),
vector<pair<uint16_t, size_t>>{ { sId, sSize } },
vector<pair<uint16_t, size_t>>{ { r1id, r1Size }, { r2id, r2Size } }
);
}
ProgOptions config; ProgOptions config;
config.SetProperty<string>("session", to_string(session)); config.SetProperty<string>("session", to_string(session));
config.SetProperty<size_t>("shm-segment-size", 100000000); config.SetProperty<size_t>("shm-segment-size", sSize);
if (external) {
config.SetProperty<bool>("shm-no-cleanup", true);
config.SetProperty<bool>("shm-monitor", false);
}
auto factory = TransportFactory::CreateTransportFactory(transport, tools::Uuid(), &config); auto factory = TransportFactory::CreateTransportFactory(transport, tools::Uuid(), &config);
constexpr int size1 = 1000000;
constexpr int size2 = 5000000;
constexpr int64_t userFlags = 12345; constexpr int64_t userFlags = 12345;
tools::Semaphore blocker; tools::Semaphore blocker;
{ {
auto region1 = factory->CreateUnmanagedRegion(size1, [](void*, size_t, void*) {}); fair::mq::RegionConfig r1Cfg;
if (external) {
r1Cfg.id = r1id;
r1Cfg.removeOnDestruction = false;
}
auto region1 = factory->CreateUnmanagedRegion(r1Size, [](void*, size_t, void*) {}, r1Cfg);
void* ptr1 = region1->GetData(); void* ptr1 = region1->GetData();
uint64_t id1 = region1->GetId(); uint64_t id1 = region1->GetId();
ASSERT_EQ(region1->GetSize(), size1); if (external) {
ASSERT_EQ(id1, r1id);
}
ASSERT_EQ(region1->GetSize(), r1Size);
auto region2 = factory->CreateUnmanagedRegion(size2, userFlags, [](void*, size_t, void*) {}); fair::mq::RegionConfig r2Cfg;
r2Cfg.userFlags = userFlags;
if (external) {
r2Cfg.id = r2id;
r2Cfg.removeOnDestruction = false;
}
auto region2 = factory->CreateUnmanagedRegion(r2Size, [](void*, size_t, void*) {}, r2Cfg);
void* ptr2 = region2->GetData(); void* ptr2 = region2->GetData();
uint64_t id2 = region2->GetId(); uint64_t id2 = region2->GetId();
ASSERT_EQ(region2->GetSize(), size2); if (external) {
ASSERT_EQ(id2, r2id);
}
ASSERT_EQ(region2->GetSize(), r2Size);
ASSERT_EQ(factory->SubscribedToRegionEvents(), false); ASSERT_EQ(factory->SubscribedToRegionEvents(), false);
factory->SubscribeToRegionEvents([&, id1, id2, ptr1, ptr2](RegionInfo info) { factory->SubscribeToRegionEvents([&, id1, id2, ptr1, ptr2](RegionInfo info) {
@@ -144,13 +219,15 @@ void RegionEventSubscriptions(const string& transport)
<< ", flags: " << info.flags; << ", flags: " << info.flags;
if (info.event == RegionEvent::created) { if (info.event == RegionEvent::created) {
if (info.id == id1) { if (info.id == id1) {
ASSERT_EQ(info.size, size1); ASSERT_EQ(info.size, r1Size);
ASSERT_EQ(info.ptr, ptr1); ASSERT_EQ(info.ptr, ptr1);
blocker.Signal(); blocker.Signal();
} else if (info.id == id2) { } else if (info.id == id2) {
ASSERT_EQ(info.size, size2); ASSERT_EQ(info.size, r2Size);
ASSERT_EQ(info.ptr, ptr2); ASSERT_EQ(info.ptr, ptr2);
ASSERT_EQ(info.flags, userFlags); if (!external) {
ASSERT_EQ(info.flags, userFlags);
}
blocker.Signal(); blocker.Signal();
} }
} else if (info.event == RegionEvent::destroyed) { } else if (info.event == RegionEvent::destroyed) {
@@ -170,10 +247,12 @@ void RegionEventSubscriptions(const string& transport)
LOG(info) << "2 done."; LOG(info) << "2 done.";
} }
blocker.Wait(); if (!external) {
LOG(info) << "3 done."; blocker.Wait();
blocker.Wait(); LOG(info) << "3 done.";
LOG(info) << "4 done."; blocker.Wait();
LOG(info) << "4 done.";
}
LOG(info) << "All done."; LOG(info) << "All done.";
factory->UnsubscribeFromRegionEvents(); factory->UnsubscribeFromRegionEvents();
@@ -185,9 +264,13 @@ void RegionCallbacks(const string& transport, const string& _address)
size_t session(tools::UuidHash()); size_t session(tools::UuidHash());
std::string address(tools::ToString(_address, "_", transport)); std::string address(tools::ToString(_address, "_", transport));
constexpr size_t sSize = 100000000;
constexpr size_t r1Size = 2000000;
constexpr size_t r2Size = 3000000;
ProgOptions config; ProgOptions config;
config.SetProperty<string>("session", to_string(session)); config.SetProperty<string>("session", to_string(session));
config.SetProperty<size_t>("shm-segment-size", 100000000); config.SetProperty<size_t>("shm-segment-size", sSize);
auto factory = TransportFactory::CreateTransportFactory(transport, tools::Uuid(), &config); auto factory = TransportFactory::CreateTransportFactory(transport, tools::Uuid(), &config);
@@ -206,7 +289,7 @@ void RegionCallbacks(const string& transport, const string& _address)
void* ptr2 = nullptr; void* ptr2 = nullptr;
size_t size2 = 200; size_t size2 = 200;
auto region1 = factory->CreateUnmanagedRegion(2000000, [&](void* ptr, size_t size, void* hint) { auto region1 = factory->CreateUnmanagedRegion(r1Size, [&](void* ptr, size_t size, void* hint) {
ASSERT_EQ(ptr, ptr1); ASSERT_EQ(ptr, ptr1);
ASSERT_EQ(size, size1); ASSERT_EQ(size, size1);
ASSERT_EQ(hint, intPtr1.get()); ASSERT_EQ(hint, intPtr1.get());
@@ -215,7 +298,7 @@ void RegionCallbacks(const string& transport, const string& _address)
}); });
ptr1 = region1->GetData(); ptr1 = region1->GetData();
auto region2 = factory->CreateUnmanagedRegion(3000000, [&](const std::vector<RegionBlock>& blocks) { auto region2 = factory->CreateUnmanagedRegion(r2Size, [&](const std::vector<RegionBlock>& blocks) {
ASSERT_EQ(blocks.size(), 1); ASSERT_EQ(blocks.size(), 1);
ASSERT_EQ(blocks.at(0).ptr, ptr2); ASSERT_EQ(blocks.at(0).ptr, ptr2);
ASSERT_EQ(blocks.at(0).size, size2); ASSERT_EQ(blocks.at(0).size, size2);
@@ -263,12 +346,12 @@ TEST(Cache, shmem)
TEST(EventSubscriptions, zeromq) TEST(EventSubscriptions, zeromq)
{ {
RegionEventSubscriptions("zeromq"); RegionEventSubscriptions("zeromq", false);
} }
TEST(EventSubscriptions, shmem) TEST(EventSubscriptions, shmem)
{ {
RegionEventSubscriptions("shmem"); RegionEventSubscriptions("shmem", false);
} }
TEST(Callbacks, zeromq) TEST(Callbacks, zeromq)
@@ -281,4 +364,9 @@ TEST(Callbacks, shmem)
RegionCallbacks("shmem", "ipc://test_region_callbacks"); RegionCallbacks("shmem", "ipc://test_region_callbacks");
} }
TEST(EventSubscriptionsExternalRegion, shmem)
{
RegionEventSubscriptions("shmem", true);
}
} // namespace } // namespace