From f5c46ce018486c5e9eb4ef0b0cef582a4ed9d709 Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Fri, 9 Sep 2022 13:49:26 +0200 Subject: [PATCH 1/3] region example: add options for testing with externally-created regions --- examples/region/fairmq-start-ex-region.sh.in | 7 ++++ examples/region/sampler.cxx | 40 +++++++++++++------- 2 files changed, 34 insertions(+), 13 deletions(-) diff --git a/examples/region/fairmq-start-ex-region.sh.in b/examples/region/fairmq-start-ex-region.sh.in index d743d837..6f38f653 100755 --- a/examples/region/fairmq-start-ex-region.sh.in +++ b/examples/region/fairmq-start-ex-region.sh.in @@ -19,6 +19,10 @@ SAMPLER+=" --severity debug" SAMPLER+=" --msg-size $msgSize" # SAMPLER+=" --rate 10" SAMPLER+=" --transport $transport" +# SAMPLER+=" --external-region true" +# SAMPLER+=" --shm-no-cleaup true" +# SAMPLER+=" --shm-monitor false" +# SAMPLER+=" --shmid 1" SAMPLER+=" --channel-config name=data,type=push,method=bind,address=tcp://127.0.0.1:7777,sndKernelSize=212992" xterm -geometry 120x60+0+0 -hold -e @EX_BIN_DIR@/$SAMPLER & @@ -26,5 +30,8 @@ SINK="fairmq-ex-region-sink" SINK+=" --id sink1" SINK+=" --severity debug" SINK+=" --transport $transport" +# SINK+=" --shm-no-cleaup true" +# SINK+=" --shm-monitor false" +# SINK+=" --shmid 1" SINK+=" --channel-config name=data,type=pull,method=connect,address=tcp://127.0.0.1:7777,rcvKernelSize=212992" xterm -geometry 120x60+750+0 -hold -e @EX_BIN_DIR@/$SINK & diff --git a/examples/region/sampler.cxx b/examples/region/sampler.cxx index c1711df3..12308d34 100644 --- a/examples/region/sampler.cxx +++ b/examples/region/sampler.cxx @@ -19,6 +19,7 @@ struct Sampler : fair::mq::Device { void InitTask() override { + fExternalRegion = fConfig->GetProperty("external-region"); fMsgSize = fConfig->GetProperty("msg-size"); fLinger = fConfig->GetProperty("region-linger"); fMaxIterations = fConfig->GetProperty("max-iterations"); @@ -34,18 +35,29 @@ struct Sampler : fair::mq::Device fair::mq::RegionConfig regionCfg; regionCfg.linger = fLinger; // delay in ms before region destruction to collect outstanding events - regionCfg.lock = true; // mlock region after creation - regionCfg.zero = true; // zero region content after creation - fRegion = fair::mq::UnmanagedRegionPtr(NewUnmanagedRegionFor("data", // region is created using the transport of this channel... - 0, // ... and this sub-channel - 10000000, // region size - [this](const std::vector& blocks) { // callback to be called when message buffers no longer needed by transport - std::lock_guard lock(fMtx); - fNumUnackedMsgs -= blocks.size(); - if (fMaxIterations > 0) { - LOG(info) << "Received " << blocks.size() << " acks"; - } - }, regionCfg)); + // options for testing with an externally-created -region + if (fExternalRegion) { + regionCfg.id = 1; + regionCfg.removeOnDestruction = false; + regionCfg.lock = false; // mlock region after creation + regionCfg.lock = false; // mlock region after creation + } else { + regionCfg.lock = true; // mlock region after creation + regionCfg.zero = true; // zero region content after creation + } + fRegion = fair::mq::UnmanagedRegionPtr(NewUnmanagedRegionFor( + "data", // region is created using the transport of this channel... + 0, // ... and this sub-channel + 10000000, // region size + [this](const std::vector& blocks) { // callback to be called when message buffers no longer needed by transport + std::lock_guard lock(fMtx); + fNumUnackedMsgs -= blocks.size(); + if (fMaxIterations > 0) { + LOG(info) << "Received " << blocks.size() << " acks"; + } + }, + regionCfg + )); } bool ConditionalRun() override @@ -91,6 +103,7 @@ struct Sampler : fair::mq::Device } private: + int fExternalRegion = false; int fMsgSize = 10000; uint32_t fLinger = 100; uint64_t fMaxIterations = 0; @@ -105,7 +118,8 @@ void addCustomOptions(bpo::options_description& options) options.add_options() ("msg-size", bpo::value()->default_value(1000), "Message size in bytes") ("region-linger", bpo::value()->default_value(100), "Linger period for regions") - ("max-iterations", bpo::value()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)"); + ("max-iterations", bpo::value()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)") + ("external-region", bpo::value()->default_value(false), "Use region created by another process"); } std::unique_ptr getDevice(fair::mq::ProgOptions& /*config*/) From 072d7cb7441d783497eded39ba7b09093e6f6a41 Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Fri, 9 Sep 2022 13:50:56 +0200 Subject: [PATCH 2/3] shm: add some debug output --- fairmq/shmem/Monitor.cxx | 2 +- fairmq/shmem/UnmanagedRegion.h | 3 +++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/fairmq/shmem/Monitor.cxx b/fairmq/shmem/Monitor.cxx index 1dffd977..35b23c59 100644 --- a/fairmq/shmem/Monitor.cxx +++ b/fairmq/shmem/Monitor.cxx @@ -670,7 +670,7 @@ std::vector> Monitor::Cleanup(const ShmId& shmIdT, string path = info.fPath.c_str(); int flags = info.fCreationFlags; if (verbose) { - LOG(info) << "Found RegionInfo with path: '" << path << "', flags: " << flags << ", fDestroyed: " << info.fDestroyed << "."; + LOG(info) << "Found UnmanagedRegion with id: " << id << ", path: '" << path << "', flags: " << flags << ", fDestroyed: " << info.fDestroyed << "."; } if (!path.empty()) { result.emplace_back(Remove(path + "fmq_" + shmId + "_rg_" + to_string(id), verbose)); diff --git a/fairmq/shmem/UnmanagedRegion.h b/fairmq/shmem/UnmanagedRegion.h index 9b81dba5..21ac1225 100644 --- a/fairmq/shmem/UnmanagedRegion.h +++ b/fairmq/shmem/UnmanagedRegion.h @@ -74,6 +74,8 @@ struct UnmanagedRegion // TODO: refactor this cfg.size = size; + LOG(debug) << "UnmanagedRegion(): " << fName << " | remote: " << remote << "."; + if (!cfg.path.empty()) { fName = std::string(cfg.path + fName); @@ -171,6 +173,7 @@ struct UnmanagedRegion ~UnmanagedRegion() { + LOG(debug) << "~UnmanagedRegion(): " << fName << " | remote: " << fRemote << "."; fStopAcks = true; if (fAcksSender.joinable()) { From af0d668951f6f9dfbcc164e3a09ace1fe1a9f84e Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Fri, 9 Sep 2022 13:58:25 +0200 Subject: [PATCH 3/3] Shm: fix region init with external regions --- fairmq/shmem/Manager.h | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/fairmq/shmem/Manager.h b/fairmq/shmem/Manager.h index 279188de..aab01943 100644 --- a/fairmq/shmem/Manager.h +++ b/fairmq/shmem/Manager.h @@ -395,18 +395,10 @@ class Manager const uint16_t id = cfg.id.value(); - UnmanagedRegion* region = nullptr; - bool newRegionCreated = false; std::lock_guard lock(fLocalRegionsMtx); - auto res = fRegions.emplace(id, std::make_unique(fShmId, size, false, cfg)); - newRegionCreated = res.second; - region = res.first->second.get(); + auto& region = fRegions[id] = std::make_unique(fShmId, size, false, cfg); // LOG(debug) << "Created region with id '" << id << "', path: '" << cfg.path << "', flags: '" << cfg.creationFlags << "'"; - if (!newRegionCreated) { - region->fRemote = false; // TODO: this should be more clear, refactor it. - } - // start ack receiver only if a callback has been provided. if (callback || bulkCallback) { region->SetCallbacks(callback, bulkCallback); @@ -414,7 +406,7 @@ class Manager region->StartAckSender(); region->StartAckReceiver(); } - result.first = region; + result.first = region.get(); result.second = id; } fRegionsGen += 1; // signal TL cache invalidation