region example: add options for testing with externally-created regions

This commit is contained in:
Alexey Rybalchenko 2022-09-09 13:49:26 +02:00
parent d105960444
commit f5c46ce018
2 changed files with 34 additions and 13 deletions

View File

@ -19,6 +19,10 @@ SAMPLER+=" --severity debug"
SAMPLER+=" --msg-size $msgSize" SAMPLER+=" --msg-size $msgSize"
# SAMPLER+=" --rate 10" # SAMPLER+=" --rate 10"
SAMPLER+=" --transport $transport" SAMPLER+=" --transport $transport"
# SAMPLER+=" --external-region true"
# SAMPLER+=" --shm-no-cleaup true"
# SAMPLER+=" --shm-monitor false"
# SAMPLER+=" --shmid 1"
SAMPLER+=" --channel-config name=data,type=push,method=bind,address=tcp://127.0.0.1:7777,sndKernelSize=212992" SAMPLER+=" --channel-config name=data,type=push,method=bind,address=tcp://127.0.0.1:7777,sndKernelSize=212992"
xterm -geometry 120x60+0+0 -hold -e @EX_BIN_DIR@/$SAMPLER & xterm -geometry 120x60+0+0 -hold -e @EX_BIN_DIR@/$SAMPLER &
@ -26,5 +30,8 @@ SINK="fairmq-ex-region-sink"
SINK+=" --id sink1" SINK+=" --id sink1"
SINK+=" --severity debug" SINK+=" --severity debug"
SINK+=" --transport $transport" SINK+=" --transport $transport"
# SINK+=" --shm-no-cleaup true"
# SINK+=" --shm-monitor false"
# SINK+=" --shmid 1"
SINK+=" --channel-config name=data,type=pull,method=connect,address=tcp://127.0.0.1:7777,rcvKernelSize=212992" SINK+=" --channel-config name=data,type=pull,method=connect,address=tcp://127.0.0.1:7777,rcvKernelSize=212992"
xterm -geometry 120x60+750+0 -hold -e @EX_BIN_DIR@/$SINK & xterm -geometry 120x60+750+0 -hold -e @EX_BIN_DIR@/$SINK &

View File

@ -19,6 +19,7 @@ struct Sampler : fair::mq::Device
{ {
void InitTask() override void InitTask() override
{ {
fExternalRegion = fConfig->GetProperty<bool>("external-region");
fMsgSize = fConfig->GetProperty<int>("msg-size"); fMsgSize = fConfig->GetProperty<int>("msg-size");
fLinger = fConfig->GetProperty<uint32_t>("region-linger"); fLinger = fConfig->GetProperty<uint32_t>("region-linger");
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations"); fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
@ -34,9 +35,18 @@ struct Sampler : fair::mq::Device
fair::mq::RegionConfig regionCfg; fair::mq::RegionConfig regionCfg;
regionCfg.linger = fLinger; // delay in ms before region destruction to collect outstanding events regionCfg.linger = fLinger; // delay in ms before region destruction to collect outstanding events
// options for testing with an externally-created -region
if (fExternalRegion) {
regionCfg.id = 1;
regionCfg.removeOnDestruction = false;
regionCfg.lock = false; // mlock region after creation
regionCfg.lock = false; // mlock region after creation
} else {
regionCfg.lock = true; // mlock region after creation regionCfg.lock = true; // mlock region after creation
regionCfg.zero = true; // zero region content after creation regionCfg.zero = true; // zero region content after creation
fRegion = fair::mq::UnmanagedRegionPtr(NewUnmanagedRegionFor("data", // region is created using the transport of this channel... }
fRegion = fair::mq::UnmanagedRegionPtr(NewUnmanagedRegionFor(
"data", // region is created using the transport of this channel...
0, // ... and this sub-channel 0, // ... and this sub-channel
10000000, // region size 10000000, // region size
[this](const std::vector<fair::mq::RegionBlock>& blocks) { // callback to be called when message buffers no longer needed by transport [this](const std::vector<fair::mq::RegionBlock>& blocks) { // callback to be called when message buffers no longer needed by transport
@ -45,7 +55,9 @@ struct Sampler : fair::mq::Device
if (fMaxIterations > 0) { if (fMaxIterations > 0) {
LOG(info) << "Received " << blocks.size() << " acks"; LOG(info) << "Received " << blocks.size() << " acks";
} }
}, regionCfg)); },
regionCfg
));
} }
bool ConditionalRun() override bool ConditionalRun() override
@ -91,6 +103,7 @@ struct Sampler : fair::mq::Device
} }
private: private:
int fExternalRegion = false;
int fMsgSize = 10000; int fMsgSize = 10000;
uint32_t fLinger = 100; uint32_t fLinger = 100;
uint64_t fMaxIterations = 0; uint64_t fMaxIterations = 0;
@ -105,7 +118,8 @@ void addCustomOptions(bpo::options_description& options)
options.add_options() options.add_options()
("msg-size", bpo::value<int>()->default_value(1000), "Message size in bytes") ("msg-size", bpo::value<int>()->default_value(1000), "Message size in bytes")
("region-linger", bpo::value<uint32_t>()->default_value(100), "Linger period for regions") ("region-linger", bpo::value<uint32_t>()->default_value(100), "Linger period for regions")
("max-iterations", bpo::value<uint64_t>()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)"); ("max-iterations", bpo::value<uint64_t>()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)")
("external-region", bpo::value<bool>()->default_value(false), "Use region created by another process");
} }
std::unique_ptr<fair::mq::Device> getDevice(fair::mq::ProgOptions& /*config*/) std::unique_ptr<fair::mq::Device> getDevice(fair::mq::ProgOptions& /*config*/)