From f5c46ce018486c5e9eb4ef0b0cef582a4ed9d709 Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Fri, 9 Sep 2022 13:49:26 +0200 Subject: [PATCH] region example: add options for testing with externally-created regions --- examples/region/fairmq-start-ex-region.sh.in | 7 ++++ examples/region/sampler.cxx | 40 +++++++++++++------- 2 files changed, 34 insertions(+), 13 deletions(-) diff --git a/examples/region/fairmq-start-ex-region.sh.in b/examples/region/fairmq-start-ex-region.sh.in index d743d837..6f38f653 100755 --- a/examples/region/fairmq-start-ex-region.sh.in +++ b/examples/region/fairmq-start-ex-region.sh.in @@ -19,6 +19,10 @@ SAMPLER+=" --severity debug" SAMPLER+=" --msg-size $msgSize" # SAMPLER+=" --rate 10" SAMPLER+=" --transport $transport" +# SAMPLER+=" --external-region true" +# SAMPLER+=" --shm-no-cleaup true" +# SAMPLER+=" --shm-monitor false" +# SAMPLER+=" --shmid 1" SAMPLER+=" --channel-config name=data,type=push,method=bind,address=tcp://127.0.0.1:7777,sndKernelSize=212992" xterm -geometry 120x60+0+0 -hold -e @EX_BIN_DIR@/$SAMPLER & @@ -26,5 +30,8 @@ SINK="fairmq-ex-region-sink" SINK+=" --id sink1" SINK+=" --severity debug" SINK+=" --transport $transport" +# SINK+=" --shm-no-cleaup true" +# SINK+=" --shm-monitor false" +# SINK+=" --shmid 1" SINK+=" --channel-config name=data,type=pull,method=connect,address=tcp://127.0.0.1:7777,rcvKernelSize=212992" xterm -geometry 120x60+750+0 -hold -e @EX_BIN_DIR@/$SINK & diff --git a/examples/region/sampler.cxx b/examples/region/sampler.cxx index c1711df3..12308d34 100644 --- a/examples/region/sampler.cxx +++ b/examples/region/sampler.cxx @@ -19,6 +19,7 @@ struct Sampler : fair::mq::Device { void InitTask() override { + fExternalRegion = fConfig->GetProperty("external-region"); fMsgSize = fConfig->GetProperty("msg-size"); fLinger = fConfig->GetProperty("region-linger"); fMaxIterations = fConfig->GetProperty("max-iterations"); @@ -34,18 +35,29 @@ struct Sampler : fair::mq::Device fair::mq::RegionConfig regionCfg; regionCfg.linger = fLinger; // delay in ms before region destruction to collect outstanding events - regionCfg.lock = true; // mlock region after creation - regionCfg.zero = true; // zero region content after creation - fRegion = fair::mq::UnmanagedRegionPtr(NewUnmanagedRegionFor("data", // region is created using the transport of this channel... - 0, // ... and this sub-channel - 10000000, // region size - [this](const std::vector& blocks) { // callback to be called when message buffers no longer needed by transport - std::lock_guard lock(fMtx); - fNumUnackedMsgs -= blocks.size(); - if (fMaxIterations > 0) { - LOG(info) << "Received " << blocks.size() << " acks"; - } - }, regionCfg)); + // options for testing with an externally-created -region + if (fExternalRegion) { + regionCfg.id = 1; + regionCfg.removeOnDestruction = false; + regionCfg.lock = false; // mlock region after creation + regionCfg.lock = false; // mlock region after creation + } else { + regionCfg.lock = true; // mlock region after creation + regionCfg.zero = true; // zero region content after creation + } + fRegion = fair::mq::UnmanagedRegionPtr(NewUnmanagedRegionFor( + "data", // region is created using the transport of this channel... + 0, // ... and this sub-channel + 10000000, // region size + [this](const std::vector& blocks) { // callback to be called when message buffers no longer needed by transport + std::lock_guard lock(fMtx); + fNumUnackedMsgs -= blocks.size(); + if (fMaxIterations > 0) { + LOG(info) << "Received " << blocks.size() << " acks"; + } + }, + regionCfg + )); } bool ConditionalRun() override @@ -91,6 +103,7 @@ struct Sampler : fair::mq::Device } private: + int fExternalRegion = false; int fMsgSize = 10000; uint32_t fLinger = 100; uint64_t fMaxIterations = 0; @@ -105,7 +118,8 @@ void addCustomOptions(bpo::options_description& options) options.add_options() ("msg-size", bpo::value()->default_value(1000), "Message size in bytes") ("region-linger", bpo::value()->default_value(100), "Linger period for regions") - ("max-iterations", bpo::value()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)"); + ("max-iterations", bpo::value()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)") + ("external-region", bpo::value()->default_value(false), "Use region created by another process"); } std::unique_ptr getDevice(fair::mq::ProgOptions& /*config*/)