mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-15 17:41:45 +00:00
Compare commits
10 Commits
v1.4.55
...
v1.4-patch
Author | SHA1 | Date | |
---|---|---|---|
|
16275db125 | ||
|
42ce691f57 | ||
|
58aa2b4f88 | ||
|
c3b273cec0 | ||
|
a982d60ed7 | ||
|
d16e473b91 | ||
|
1881986cca | ||
|
adf91d053d | ||
|
d3be9af9b6 | ||
|
4104636456 |
15
.github/workflows/fair-software.yml
vendored
Normal file
15
.github/workflows/fair-software.yml
vendored
Normal file
@@ -0,0 +1,15 @@
|
||||
name: fair-software
|
||||
|
||||
on: push
|
||||
|
||||
jobs:
|
||||
verify:
|
||||
name: "fair-software"
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: fair-software/howfairis-github-action@0.2.1
|
||||
name: Measure compliance with fair-software.eu recommendations
|
||||
env:
|
||||
PYCHARM_HOSTED: "Trick colorama into displaying colored output"
|
||||
with:
|
||||
MY_REPO_URL: "https://github.com/${{ github.repository }}"
|
@@ -1,5 +1,10 @@
|
||||
<!-- {#mainpage} -->
|
||||
# FairMQ [](COPYRIGHT)
|
||||
# FairMQ
|
||||
|
||||
[](COPYRIGHT)
|
||||
[](https://doi.org/10.5281/zenodo.1689985)
|
||||
[](https://bestpractices.coreinfrastructure.org/projects/6915)
|
||||
[](https://github.com/FairRootGroup/FairMQ/actions/workflows/fair-software.yml)
|
||||
|
||||
C++ Message Queuing Library and Framework
|
||||
|
||||
|
@@ -39,12 +39,9 @@ struct Sampler : fair::mq::Device
|
||||
if (fExternalRegion) {
|
||||
regionCfg.id = 1;
|
||||
regionCfg.removeOnDestruction = false;
|
||||
regionCfg.lock = false; // mlock region after creation
|
||||
regionCfg.lock = false; // mlock region after creation
|
||||
} else {
|
||||
regionCfg.lock = true; // mlock region after creation
|
||||
regionCfg.zero = true; // zero region content after creation
|
||||
}
|
||||
regionCfg.lock = !fExternalRegion; // mlock region after creation
|
||||
regionCfg.zero = !fExternalRegion; // zero region content after creation
|
||||
fRegion = fair::mq::UnmanagedRegionPtr(NewUnmanagedRegionFor(
|
||||
"data", // region is created using the transport of this channel...
|
||||
0, // ... and this sub-channel
|
||||
|
@@ -207,22 +207,22 @@ class Manager
|
||||
|
||||
fEventCounter = fManagementSegment.find<EventCounter>(unique_instance).first;
|
||||
if (fEventCounter) {
|
||||
LOG(debug) << "event counter found: " << fEventCounter->fCount;
|
||||
LOG(trace) << "event counter found: " << fEventCounter->fCount;
|
||||
} else {
|
||||
LOG(debug) << "no event counter found, creating one and initializing with 0";
|
||||
LOG(trace) << "no event counter found, creating one and initializing with 0";
|
||||
fEventCounter = fManagementSegment.construct<EventCounter>(unique_instance)(0);
|
||||
LOG(debug) << "initialized event counter with: " << fEventCounter->fCount;
|
||||
LOG(trace) << "initialized event counter with: " << fEventCounter->fCount;
|
||||
}
|
||||
|
||||
fDeviceCounter = fManagementSegment.find<DeviceCounter>(unique_instance).first;
|
||||
if (fDeviceCounter) {
|
||||
LOG(debug) << "device counter found, with value of " << fDeviceCounter->fCount << ". incrementing.";
|
||||
LOG(trace) << "device counter found, with value of " << fDeviceCounter->fCount << ". incrementing.";
|
||||
(fDeviceCounter->fCount)++;
|
||||
LOG(debug) << "incremented device counter, now: " << fDeviceCounter->fCount;
|
||||
LOG(trace) << "incremented device counter, now: " << fDeviceCounter->fCount;
|
||||
} else {
|
||||
LOG(debug) << "no device counter found, creating one and initializing with 1";
|
||||
LOG(trace) << "no device counter found, creating one and initializing with 1";
|
||||
fDeviceCounter = fManagementSegment.construct<DeviceCounter>(unique_instance)(1);
|
||||
LOG(debug) << "initialized device counter with: " << fDeviceCounter->fCount;
|
||||
LOG(trace) << "initialized device counter with: " << fDeviceCounter->fCount;
|
||||
}
|
||||
|
||||
fShmSegments = fManagementSegment.find_or_construct<Uint16SegmentInfoHashMap>(unique_instance)(fShmVoidAlloc);
|
||||
@@ -265,10 +265,10 @@ class Manager
|
||||
}
|
||||
}
|
||||
}
|
||||
LOG(debug) << "Created/opened shared memory segment '" << "fmq_" << fShmId << "_m_" << fSegmentId << "'."
|
||||
<< " Size: " << boost::apply_visitor(SegmentSize(), fSegments.at(fSegmentId)) << " bytes."
|
||||
<< " Available: " << boost::apply_visitor(SegmentFreeMemory(), fSegments.at(fSegmentId)) << " bytes."
|
||||
<< " Allocation algorithm: " << allocationAlgorithm;
|
||||
LOG(debug) << (createdSegment ? "Created" : "Opened") << " managed shared memory segment " << "fmq_" << fShmId << "_m_" << fSegmentId
|
||||
<< ". Size: " << boost::apply_visitor(SegmentSize(), fSegments.at(fSegmentId)) << " bytes."
|
||||
<< " Available: " << boost::apply_visitor(SegmentFreeMemory(), fSegments.at(fSegmentId)) << " bytes."
|
||||
<< " Allocation algorithm: " << allocationAlgorithm;
|
||||
} catch (interprocess_exception& bie) {
|
||||
LOG(error) << "Failed to create/open shared memory segment '" << "fmq_" << fShmId << "_m_" << fSegmentId << "': " << bie.what();
|
||||
throw TransportError(tools::ToString("Failed to create/open shared memory segment '", "fmq_", fShmId, "_m_", fSegmentId, "': ", bie.what()));
|
||||
@@ -396,7 +396,23 @@ class Manager
|
||||
const uint16_t id = cfg.id.value();
|
||||
|
||||
std::lock_guard<std::mutex> lock(fLocalRegionsMtx);
|
||||
auto& region = fRegions[id] = std::make_unique<UnmanagedRegion>(fShmId, size, false, cfg);
|
||||
|
||||
UnmanagedRegion* region = nullptr;
|
||||
|
||||
auto it = fRegions.find(id);
|
||||
if (it != fRegions.end()) {
|
||||
region = it->second.get();
|
||||
if (region->fControlling) {
|
||||
LOG(error) << "Unmanaged Region with id " << id << " already exists. Only unique IDs per session are allowed.";
|
||||
throw TransportError(tools::ToString("Unmanaged Region with id ", id, " already exists. Only unique IDs per session are allowed."));
|
||||
}
|
||||
|
||||
LOG(debug) << "Unmanaged region (view) already present, promoting to controller";
|
||||
region->BecomeController(cfg);
|
||||
} else {
|
||||
auto res = fRegions.emplace(id, std::make_unique<UnmanagedRegion>(fShmId, size, true, cfg));
|
||||
region = res.first->second.get();
|
||||
}
|
||||
// LOG(debug) << "Created region with id '" << id << "', path: '" << cfg.path << "', flags: '" << cfg.creationFlags << "'";
|
||||
|
||||
// start ack receiver only if a callback has been provided.
|
||||
@@ -406,7 +422,7 @@ class Manager
|
||||
region->StartAckSender();
|
||||
region->StartAckReceiver();
|
||||
}
|
||||
result.first = region.get();
|
||||
result.first = region;
|
||||
result.second = id;
|
||||
}
|
||||
fRegionsGen += 1; // signal TL cache invalidation
|
||||
@@ -447,7 +463,6 @@ class Manager
|
||||
UnmanagedRegion* GetRegion(uint16_t id)
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(fLocalRegionsMtx);
|
||||
// remote region could actually be a local one if a message originates from this device (has been sent out and returned)
|
||||
auto it = fRegions.find(id);
|
||||
if (it != fRegions.end()) {
|
||||
return it->second.get();
|
||||
@@ -464,7 +479,7 @@ class Manager
|
||||
}
|
||||
// LOG(debug) << "Located remote region with id '" << id << "', path: '" << cfg.path << "', flags: '" << cfg.creationFlags << "'";
|
||||
|
||||
auto r = fRegions.emplace(id, std::make_unique<UnmanagedRegion>(fShmId, 0, true, std::move(cfg)));
|
||||
auto r = fRegions.emplace(id, std::make_unique<UnmanagedRegion>(fShmId, 0, false, std::move(cfg)));
|
||||
r.first->second->InitializeQueues();
|
||||
r.first->second->StartAckSender();
|
||||
return r.first->second.get();
|
||||
@@ -555,7 +570,7 @@ class Manager
|
||||
if (it != fRegions.end()) {
|
||||
region = it->second.get();
|
||||
} else {
|
||||
auto r = fRegions.emplace(cfgIt->first, std::make_unique<UnmanagedRegion>(fShmId, 0, true, cfgIt->second));
|
||||
auto r = fRegions.emplace(cfgIt->first, std::make_unique<UnmanagedRegion>(fShmId, 0, false, cfgIt->second));
|
||||
region = r.first->second.get();
|
||||
region->InitializeQueues();
|
||||
region->StartAckSender();
|
||||
|
@@ -44,19 +44,19 @@ struct UnmanagedRegion
|
||||
friend class Monitor;
|
||||
|
||||
UnmanagedRegion(const std::string& shmId, uint16_t id, uint64_t size)
|
||||
: UnmanagedRegion(shmId, size, false, makeRegionConfig(id))
|
||||
: UnmanagedRegion(shmId, size, true, makeRegionConfig(id))
|
||||
{}
|
||||
|
||||
UnmanagedRegion(const std::string& shmId, uint64_t size, RegionConfig cfg)
|
||||
: UnmanagedRegion(shmId, size, false, std::move(cfg))
|
||||
: UnmanagedRegion(shmId, size, true, std::move(cfg))
|
||||
{}
|
||||
|
||||
UnmanagedRegion(const std::string& shmId, RegionConfig cfg)
|
||||
: UnmanagedRegion(shmId, cfg.size, false, std::move(cfg))
|
||||
: UnmanagedRegion(shmId, cfg.size, true, std::move(cfg))
|
||||
{}
|
||||
|
||||
UnmanagedRegion(const std::string& shmId, uint64_t size, bool remote, RegionConfig cfg)
|
||||
: fRemote(remote)
|
||||
UnmanagedRegion(const std::string& shmId, uint64_t size, bool controlling, RegionConfig cfg)
|
||||
: fControlling(controlling)
|
||||
, fRemoveOnDestruction(cfg.removeOnDestruction)
|
||||
, fLinger(cfg.linger)
|
||||
, fStopAcks(false)
|
||||
@@ -73,13 +73,15 @@ struct UnmanagedRegion
|
||||
|
||||
// TODO: refactor this
|
||||
cfg.size = size;
|
||||
const uint16_t id = cfg.id.value();
|
||||
bool created = false;
|
||||
|
||||
LOG(debug) << "UnmanagedRegion(): " << fName << " | remote: " << remote << ".";
|
||||
LOG(debug) << "UnmanagedRegion(): " << fName << " (" << (fControlling ? "controller" : "viewer") << ")";
|
||||
|
||||
if (!cfg.path.empty()) {
|
||||
fName = std::string(cfg.path + fName);
|
||||
|
||||
if (!fRemote) {
|
||||
if (fControlling) {
|
||||
// create a file
|
||||
std::filebuf fbuf;
|
||||
if (fbuf.open(fName, std::ios_base::in | std::ios_base::out | std::ios_base::trunc | std::ios_base::binary)) {
|
||||
@@ -94,23 +96,30 @@ struct UnmanagedRegion
|
||||
if (!fFile) {
|
||||
LOG(error) << "Failed to initialize file: " << fName;
|
||||
LOG(error) << "errno: " << errno << ": " << strerror(errno);
|
||||
throw std::runtime_error(tools::ToString("Failed to initialize file for shared memory region: ", strerror(errno)));
|
||||
throw TransportError(tools::ToString("Failed to initialize file for shared memory region: ", strerror(errno)));
|
||||
}
|
||||
fFileMapping = file_mapping(fName.c_str(), read_write);
|
||||
LOG(debug) << "shmem: initialized file: " << fName;
|
||||
LOG(debug) << "UnmanagedRegion(): initialized file: " << fName;
|
||||
fRegion = mapped_region(fFileMapping, read_write, 0, size, 0, cfg.creationFlags);
|
||||
} else {
|
||||
try {
|
||||
// if opening fails, create
|
||||
try {
|
||||
fShmemObject = shared_memory_object(open_only, fName.c_str(), read_write);
|
||||
created = false;
|
||||
} catch (interprocess_exception& e) {
|
||||
LOG(debug) << "Could not open " << (remote ? "remote" : "local") << " shared_memory_object for region id '" << cfg.id.value() << "': " << e.what() << ", creating...";
|
||||
fShmemObject = shared_memory_object(create_only, fName.c_str(), read_write);
|
||||
fShmemObject.truncate(size);
|
||||
if (fControlling) {
|
||||
LOG(debug) << "Could not open controlling shared_memory_object for region " << id << ": " << e.what() << ", creating...";
|
||||
fShmemObject = shared_memory_object(create_only, fName.c_str(), read_write);
|
||||
fShmemObject.truncate(size);
|
||||
created = true;
|
||||
} else {
|
||||
LOG(error) << "Could not open view for shared_memory_object for region " << id << ": " << e.what();
|
||||
throw TransportError(tools::ToString("Could not open view for shared_memory_object for region ", id, ": ", e.what()));
|
||||
}
|
||||
}
|
||||
} catch (interprocess_exception& e) {
|
||||
LOG(error) << "Failed " << (remote ? "opening" : "creating") << " shared_memory_object for region id '" << cfg.id.value() << "': " << e.what();
|
||||
LOG(error) << "Failed initializing shared_memory_object for region id " << id << ": " << e.what();
|
||||
throw;
|
||||
}
|
||||
|
||||
@@ -121,27 +130,27 @@ struct UnmanagedRegion
|
||||
throw TransportError(tools::ToString("Created/opened region size (", fRegion.get_size(), ") does not match configured size (", size, ")"));
|
||||
}
|
||||
} catch (interprocess_exception& e) {
|
||||
LOG(error) << "Failed mapping shared_memory_object for region id '" << cfg.id.value() << "': " << e.what();
|
||||
LOG(error) << "Failed mapping shared_memory_object for region id " << id << ": " << e.what();
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
if (cfg.lock) {
|
||||
LOG(debug) << "Locking region " << cfg.id.value() << "...";
|
||||
LOG(debug) << "Locking region " << id << "...";
|
||||
Lock();
|
||||
LOG(debug) << "Successfully locked region " << cfg.id.value() << ".";
|
||||
LOG(debug) << "Successfully locked region " << id << ".";
|
||||
}
|
||||
if (cfg.zero) {
|
||||
LOG(debug) << "Zeroing free memory of region " << cfg.id.value() << "...";
|
||||
LOG(debug) << "Zeroing free memory of region " << id << "...";
|
||||
Zero();
|
||||
LOG(debug) << "Successfully zeroed free memory of region " << cfg.id.value() << ".";
|
||||
LOG(debug) << "Successfully zeroed free memory of region " << id << ".";
|
||||
}
|
||||
|
||||
if (!remote) {
|
||||
if (fControlling && created) {
|
||||
Register(shmId, cfg);
|
||||
}
|
||||
|
||||
LOG(trace) << "shmem: initialized region: " << fName << " (" << (remote ? "remote" : "local") << ")";
|
||||
LOG(debug) << (created ? "Created" : "Opened") << " unmanaged shared memory region: " << fName << " (" << (fControlling ? "controller" : "viewer") << ")";
|
||||
}
|
||||
|
||||
UnmanagedRegion() = delete;
|
||||
@@ -151,6 +160,13 @@ struct UnmanagedRegion
|
||||
UnmanagedRegion& operator=(const UnmanagedRegion&) = delete;
|
||||
UnmanagedRegion& operator=(UnmanagedRegion&&) = delete;
|
||||
|
||||
void BecomeController(RegionConfig& cfg)
|
||||
{
|
||||
fControlling = true;
|
||||
fLinger = cfg.linger;
|
||||
fRemoveOnDestruction = cfg.removeOnDestruction;
|
||||
}
|
||||
|
||||
void Zero()
|
||||
{
|
||||
memset(fRegion.get_address(), 0x00, fRegion.get_size());
|
||||
@@ -173,7 +189,7 @@ struct UnmanagedRegion
|
||||
|
||||
~UnmanagedRegion()
|
||||
{
|
||||
LOG(debug) << "~UnmanagedRegion(): " << fName << " | remote: " << fRemote << ".";
|
||||
LOG(debug) << "~UnmanagedRegion(): " << fName << " (" << (fControlling ? "controller" : "viewer") << ")";
|
||||
fStopAcks = true;
|
||||
|
||||
if (fAcksSender.joinable()) {
|
||||
@@ -181,7 +197,7 @@ struct UnmanagedRegion
|
||||
fAcksSender.join();
|
||||
}
|
||||
|
||||
if (!fRemote) {
|
||||
if (fControlling) {
|
||||
if (fAcksReceiver.joinable()) {
|
||||
fAcksReceiver.join();
|
||||
}
|
||||
@@ -207,14 +223,14 @@ struct UnmanagedRegion
|
||||
fclose(fFile);
|
||||
}
|
||||
} else {
|
||||
// LOG(debug) << "Region queue '" << fQueueName << "' is remote, no cleanup necessary";
|
||||
// LOG(debug) << "Region queue '" << fQueueName << "' is viewer, no cleanup necessary";
|
||||
}
|
||||
|
||||
// LOG(debug) << "Region '" << fName << "' (" << (fRemote ? "remote" : "local") << ") destructed.";
|
||||
// LOG(debug) << "Region '" << fName << "' (" << (fControlling ? "controller" : "viewer") << ") destructed.";
|
||||
}
|
||||
|
||||
private:
|
||||
bool fRemote;
|
||||
bool fControlling;
|
||||
bool fRemoveOnDestruction;
|
||||
uint32_t fLinger;
|
||||
std::atomic<bool> fStopAcks;
|
||||
@@ -246,6 +262,7 @@ struct UnmanagedRegion
|
||||
static void Register(const std::string& shmId, const RegionConfig& cfg)
|
||||
{
|
||||
using namespace boost::interprocess;
|
||||
LOG(debug) << "Registering unmanaged shared memory region with id " << cfg.id.value();
|
||||
managed_shared_memory mngSegment(open_or_create, std::string("fmq_" + shmId + "_mng").c_str(), kManagementSegmentSize);
|
||||
VoidAlloc alloc(mngSegment.get_segment_manager());
|
||||
|
||||
@@ -253,10 +270,14 @@ struct UnmanagedRegion
|
||||
|
||||
EventCounter* eventCounter = mngSegment.find_or_construct<EventCounter>(unique_instance)(0);
|
||||
|
||||
bool newShmRegionCreated = shmRegions->emplace(cfg.id.value(), RegionInfo(cfg.path.c_str(), cfg.creationFlags, cfg.userFlags, cfg.size, alloc)).second;
|
||||
if (newShmRegionCreated) {
|
||||
(eventCounter->fCount)++;
|
||||
auto it = shmRegions->find(cfg.id.value());
|
||||
if (it != shmRegions->end()) {
|
||||
LOG(error) << "Unmanaged Region with id " << cfg.id.value() << " has already been registered. Only unique IDs per session are allowed.";
|
||||
throw TransportError(tools::ToString("Unmanaged Region with id ", cfg.id.value(), " has already been registered. Only unique IDs per session are allowed."));
|
||||
}
|
||||
|
||||
shmRegions->emplace(cfg.id.value(), RegionInfo(cfg.path.c_str(), cfg.creationFlags, cfg.userFlags, cfg.size, alloc)).second;
|
||||
(eventCounter->fCount)++;
|
||||
}
|
||||
|
||||
void SetCallbacks(RegionCallback callback, RegionBulkCallback bulkCallback)
|
||||
|
@@ -40,9 +40,9 @@ class UnmanagedRegionImpl final : public fair::mq::UnmanagedRegion
|
||||
, fRegion(nullptr)
|
||||
, fRegionId(0)
|
||||
{
|
||||
auto result = fManager.CreateRegion(size, callback, bulkCallback, std::move(cfg));
|
||||
fRegion = result.first;
|
||||
fRegionId = result.second;
|
||||
auto [regionPtr, regionId] = fManager.CreateRegion(size, callback, bulkCallback, std::move(cfg));
|
||||
fRegion = regionPtr;
|
||||
fRegionId = regionId;
|
||||
}
|
||||
|
||||
UnmanagedRegionImpl(const UnmanagedRegionImpl&) = delete;
|
||||
|
@@ -6,6 +6,11 @@
|
||||
* copied verbatim in the file "LICENSE" *
|
||||
********************************************************************************/
|
||||
|
||||
#include <fairmq/shmem/Common.h>
|
||||
#include <fairmq/shmem/Monitor.h>
|
||||
#include <fairmq/shmem/Segment.h>
|
||||
#include <fairmq/shmem/UnmanagedRegion.h>
|
||||
|
||||
#include <fairmq/TransportFactory.h>
|
||||
#include <fairmq/ProgOptions.h>
|
||||
#include <fairmq/tools/Unique.h>
|
||||
@@ -16,8 +21,12 @@
|
||||
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
#include <cstdint>
|
||||
#include <map>
|
||||
#include <memory> // make_unique
|
||||
#include <string>
|
||||
#include <utility> // pair
|
||||
#include <vector> // pair
|
||||
|
||||
namespace
|
||||
{
|
||||
@@ -25,6 +34,34 @@ namespace
|
||||
using namespace std;
|
||||
using namespace fair::mq;
|
||||
|
||||
struct ShmOwner
|
||||
{
|
||||
ShmOwner(const string& sessionId,
|
||||
const vector<pair<uint16_t, size_t>>& segments,
|
||||
const vector<pair<uint16_t, size_t>>& regions)
|
||||
: fShmId(fair::mq::shmem::makeShmIdStr(sessionId))
|
||||
{
|
||||
LOG(info) << "ShmOwner: creating segments";
|
||||
for (auto [id, size] : segments) {
|
||||
fSegments.emplace(id, fair::mq::shmem::Segment(fShmId, id, size, fair::mq::shmem::rbTreeBestFit));
|
||||
}
|
||||
LOG(info) << "ShmOwner: creating regions";
|
||||
for (auto [id, size] : regions) {
|
||||
fRegions.emplace(id, make_unique<fair::mq::shmem::UnmanagedRegion>(fShmId, id, size));
|
||||
}
|
||||
}
|
||||
|
||||
~ShmOwner()
|
||||
{
|
||||
LOG(info) << "ShmOwner: cleaning up";
|
||||
fair::mq::shmem::Monitor::Cleanup(fair::mq::shmem::ShmId{fShmId});
|
||||
}
|
||||
|
||||
string fShmId;
|
||||
map<uint16_t, fair::mq::shmem::Segment> fSegments;
|
||||
map<uint16_t, unique_ptr<fair::mq::shmem::UnmanagedRegion>> fRegions;
|
||||
};
|
||||
|
||||
void RegionsSizeMismatch()
|
||||
{
|
||||
size_t session = tools::UuidHash();
|
||||
@@ -108,31 +145,69 @@ void RegionsCache(const string& transport, const string& address)
|
||||
}
|
||||
}
|
||||
|
||||
void RegionEventSubscriptions(const string& transport)
|
||||
void RegionEventSubscriptions(const string& transport, bool external)
|
||||
{
|
||||
fair::Logger::SetConsoleSeverity(fair::Severity::debug);
|
||||
|
||||
unique_ptr<ShmOwner> shmOwner = nullptr;
|
||||
|
||||
size_t session{tools::UuidHash()};
|
||||
|
||||
constexpr int sSize = 100000000;
|
||||
constexpr int r1Size = 1000000;
|
||||
constexpr int r2Size = 5000000;
|
||||
constexpr uint16_t sId = 0;
|
||||
constexpr uint16_t r1id = 100;
|
||||
constexpr uint16_t r2id = 101;
|
||||
|
||||
if (external) {
|
||||
shmOwner = make_unique<ShmOwner>(
|
||||
to_string(session),
|
||||
vector<pair<uint16_t, size_t>>{ { sId, sSize } },
|
||||
vector<pair<uint16_t, size_t>>{ { r1id, r1Size }, { r2id, r2Size } }
|
||||
);
|
||||
}
|
||||
|
||||
ProgOptions config;
|
||||
config.SetProperty<string>("session", to_string(session));
|
||||
config.SetProperty<size_t>("shm-segment-size", 100000000);
|
||||
config.SetProperty<size_t>("shm-segment-size", sSize);
|
||||
if (external) {
|
||||
config.SetProperty<bool>("shm-no-cleanup", true);
|
||||
config.SetProperty<bool>("shm-monitor", false);
|
||||
}
|
||||
|
||||
auto factory = TransportFactory::CreateTransportFactory(transport, tools::Uuid(), &config);
|
||||
|
||||
constexpr int size1 = 1000000;
|
||||
constexpr int size2 = 5000000;
|
||||
constexpr int64_t userFlags = 12345;
|
||||
tools::Semaphore blocker;
|
||||
|
||||
{
|
||||
auto region1 = factory->CreateUnmanagedRegion(size1, [](void*, size_t, void*) {});
|
||||
fair::mq::RegionConfig r1Cfg;
|
||||
if (external) {
|
||||
r1Cfg.id = r1id;
|
||||
r1Cfg.removeOnDestruction = false;
|
||||
}
|
||||
auto region1 = factory->CreateUnmanagedRegion(r1Size, [](void*, size_t, void*) {}, r1Cfg);
|
||||
void* ptr1 = region1->GetData();
|
||||
uint64_t id1 = region1->GetId();
|
||||
ASSERT_EQ(region1->GetSize(), size1);
|
||||
if (external) {
|
||||
ASSERT_EQ(id1, r1id);
|
||||
}
|
||||
ASSERT_EQ(region1->GetSize(), r1Size);
|
||||
|
||||
auto region2 = factory->CreateUnmanagedRegion(size2, userFlags, [](void*, size_t, void*) {});
|
||||
fair::mq::RegionConfig r2Cfg;
|
||||
r2Cfg.userFlags = userFlags;
|
||||
if (external) {
|
||||
r2Cfg.id = r2id;
|
||||
r2Cfg.removeOnDestruction = false;
|
||||
}
|
||||
auto region2 = factory->CreateUnmanagedRegion(r2Size, [](void*, size_t, void*) {}, r2Cfg);
|
||||
void* ptr2 = region2->GetData();
|
||||
uint64_t id2 = region2->GetId();
|
||||
ASSERT_EQ(region2->GetSize(), size2);
|
||||
if (external) {
|
||||
ASSERT_EQ(id2, r2id);
|
||||
}
|
||||
ASSERT_EQ(region2->GetSize(), r2Size);
|
||||
|
||||
ASSERT_EQ(factory->SubscribedToRegionEvents(), false);
|
||||
factory->SubscribeToRegionEvents([&, id1, id2, ptr1, ptr2](RegionInfo info) {
|
||||
@@ -144,13 +219,15 @@ void RegionEventSubscriptions(const string& transport)
|
||||
<< ", flags: " << info.flags;
|
||||
if (info.event == RegionEvent::created) {
|
||||
if (info.id == id1) {
|
||||
ASSERT_EQ(info.size, size1);
|
||||
ASSERT_EQ(info.size, r1Size);
|
||||
ASSERT_EQ(info.ptr, ptr1);
|
||||
blocker.Signal();
|
||||
} else if (info.id == id2) {
|
||||
ASSERT_EQ(info.size, size2);
|
||||
ASSERT_EQ(info.size, r2Size);
|
||||
ASSERT_EQ(info.ptr, ptr2);
|
||||
ASSERT_EQ(info.flags, userFlags);
|
||||
if (!external) {
|
||||
ASSERT_EQ(info.flags, userFlags);
|
||||
}
|
||||
blocker.Signal();
|
||||
}
|
||||
} else if (info.event == RegionEvent::destroyed) {
|
||||
@@ -170,10 +247,12 @@ void RegionEventSubscriptions(const string& transport)
|
||||
LOG(info) << "2 done.";
|
||||
}
|
||||
|
||||
blocker.Wait();
|
||||
LOG(info) << "3 done.";
|
||||
blocker.Wait();
|
||||
LOG(info) << "4 done.";
|
||||
if (!external) {
|
||||
blocker.Wait();
|
||||
LOG(info) << "3 done.";
|
||||
blocker.Wait();
|
||||
LOG(info) << "4 done.";
|
||||
}
|
||||
LOG(info) << "All done.";
|
||||
|
||||
factory->UnsubscribeFromRegionEvents();
|
||||
@@ -185,9 +264,13 @@ void RegionCallbacks(const string& transport, const string& _address)
|
||||
size_t session(tools::UuidHash());
|
||||
std::string address(tools::ToString(_address, "_", transport));
|
||||
|
||||
constexpr size_t sSize = 100000000;
|
||||
constexpr size_t r1Size = 2000000;
|
||||
constexpr size_t r2Size = 3000000;
|
||||
|
||||
ProgOptions config;
|
||||
config.SetProperty<string>("session", to_string(session));
|
||||
config.SetProperty<size_t>("shm-segment-size", 100000000);
|
||||
config.SetProperty<size_t>("shm-segment-size", sSize);
|
||||
|
||||
auto factory = TransportFactory::CreateTransportFactory(transport, tools::Uuid(), &config);
|
||||
|
||||
@@ -206,7 +289,7 @@ void RegionCallbacks(const string& transport, const string& _address)
|
||||
void* ptr2 = nullptr;
|
||||
size_t size2 = 200;
|
||||
|
||||
auto region1 = factory->CreateUnmanagedRegion(2000000, [&](void* ptr, size_t size, void* hint) {
|
||||
auto region1 = factory->CreateUnmanagedRegion(r1Size, [&](void* ptr, size_t size, void* hint) {
|
||||
ASSERT_EQ(ptr, ptr1);
|
||||
ASSERT_EQ(size, size1);
|
||||
ASSERT_EQ(hint, intPtr1.get());
|
||||
@@ -215,7 +298,7 @@ void RegionCallbacks(const string& transport, const string& _address)
|
||||
});
|
||||
ptr1 = region1->GetData();
|
||||
|
||||
auto region2 = factory->CreateUnmanagedRegion(3000000, [&](const std::vector<RegionBlock>& blocks) {
|
||||
auto region2 = factory->CreateUnmanagedRegion(r2Size, [&](const std::vector<RegionBlock>& blocks) {
|
||||
ASSERT_EQ(blocks.size(), 1);
|
||||
ASSERT_EQ(blocks.at(0).ptr, ptr2);
|
||||
ASSERT_EQ(blocks.at(0).size, size2);
|
||||
@@ -263,12 +346,12 @@ TEST(Cache, shmem)
|
||||
|
||||
TEST(EventSubscriptions, zeromq)
|
||||
{
|
||||
RegionEventSubscriptions("zeromq");
|
||||
RegionEventSubscriptions("zeromq", false);
|
||||
}
|
||||
|
||||
TEST(EventSubscriptions, shmem)
|
||||
{
|
||||
RegionEventSubscriptions("shmem");
|
||||
RegionEventSubscriptions("shmem", false);
|
||||
}
|
||||
|
||||
TEST(Callbacks, zeromq)
|
||||
@@ -281,4 +364,9 @@ TEST(Callbacks, shmem)
|
||||
RegionCallbacks("shmem", "ipc://test_region_callbacks");
|
||||
}
|
||||
|
||||
TEST(EventSubscriptionsExternalRegion, shmem)
|
||||
{
|
||||
RegionEventSubscriptions("shmem", true);
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
Reference in New Issue
Block a user