mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-16 01:51:45 +00:00
region example: add options for testing with externally-created regions
This commit is contained in:
@@ -19,6 +19,7 @@ struct Sampler : fair::mq::Device
|
||||
{
|
||||
void InitTask() override
|
||||
{
|
||||
fExternalRegion = fConfig->GetProperty<bool>("external-region");
|
||||
fMsgSize = fConfig->GetProperty<int>("msg-size");
|
||||
fLinger = fConfig->GetProperty<uint32_t>("region-linger");
|
||||
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
|
||||
@@ -34,18 +35,29 @@ struct Sampler : fair::mq::Device
|
||||
|
||||
fair::mq::RegionConfig regionCfg;
|
||||
regionCfg.linger = fLinger; // delay in ms before region destruction to collect outstanding events
|
||||
regionCfg.lock = true; // mlock region after creation
|
||||
regionCfg.zero = true; // zero region content after creation
|
||||
fRegion = fair::mq::UnmanagedRegionPtr(NewUnmanagedRegionFor("data", // region is created using the transport of this channel...
|
||||
0, // ... and this sub-channel
|
||||
10000000, // region size
|
||||
[this](const std::vector<fair::mq::RegionBlock>& blocks) { // callback to be called when message buffers no longer needed by transport
|
||||
std::lock_guard<std::mutex> lock(fMtx);
|
||||
fNumUnackedMsgs -= blocks.size();
|
||||
if (fMaxIterations > 0) {
|
||||
LOG(info) << "Received " << blocks.size() << " acks";
|
||||
}
|
||||
}, regionCfg));
|
||||
// options for testing with an externally-created -region
|
||||
if (fExternalRegion) {
|
||||
regionCfg.id = 1;
|
||||
regionCfg.removeOnDestruction = false;
|
||||
regionCfg.lock = false; // mlock region after creation
|
||||
regionCfg.lock = false; // mlock region after creation
|
||||
} else {
|
||||
regionCfg.lock = true; // mlock region after creation
|
||||
regionCfg.zero = true; // zero region content after creation
|
||||
}
|
||||
fRegion = fair::mq::UnmanagedRegionPtr(NewUnmanagedRegionFor(
|
||||
"data", // region is created using the transport of this channel...
|
||||
0, // ... and this sub-channel
|
||||
10000000, // region size
|
||||
[this](const std::vector<fair::mq::RegionBlock>& blocks) { // callback to be called when message buffers no longer needed by transport
|
||||
std::lock_guard<std::mutex> lock(fMtx);
|
||||
fNumUnackedMsgs -= blocks.size();
|
||||
if (fMaxIterations > 0) {
|
||||
LOG(info) << "Received " << blocks.size() << " acks";
|
||||
}
|
||||
},
|
||||
regionCfg
|
||||
));
|
||||
}
|
||||
|
||||
bool ConditionalRun() override
|
||||
@@ -91,6 +103,7 @@ struct Sampler : fair::mq::Device
|
||||
}
|
||||
|
||||
private:
|
||||
int fExternalRegion = false;
|
||||
int fMsgSize = 10000;
|
||||
uint32_t fLinger = 100;
|
||||
uint64_t fMaxIterations = 0;
|
||||
@@ -105,7 +118,8 @@ void addCustomOptions(bpo::options_description& options)
|
||||
options.add_options()
|
||||
("msg-size", bpo::value<int>()->default_value(1000), "Message size in bytes")
|
||||
("region-linger", bpo::value<uint32_t>()->default_value(100), "Linger period for regions")
|
||||
("max-iterations", bpo::value<uint64_t>()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
|
||||
("max-iterations", bpo::value<uint64_t>()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)")
|
||||
("external-region", bpo::value<bool>()->default_value(false), "Use region created by another process");
|
||||
}
|
||||
|
||||
std::unique_ptr<fair::mq::Device> getDevice(fair::mq::ProgOptions& /*config*/)
|
||||
|
Reference in New Issue
Block a user