Examples: use multipart in the region example

This commit is contained in:
Alexey Rybalchenko 2023-11-15 10:11:55 +01:00
parent 2a91b38ef7
commit 40d2523a2d
4 changed files with 95 additions and 50 deletions

View File

@ -22,32 +22,66 @@ SAMPLER+=" --transport $transport"
SAMPLER+=" --shm-monitor true" SAMPLER+=" --shm-monitor true"
SAMPLER+=" --chan-name data1" SAMPLER+=" --chan-name data1"
SAMPLER+=" --channel-config name=data1,type=push,method=bind,address=tcp://127.0.0.1:7777" SAMPLER+=" --channel-config name=data1,type=push,method=bind,address=tcp://127.0.0.1:7777"
xterm -geometry 120x60+0+0 -hold -e @EX_BIN_DIR@/$SAMPLER & xterm -geometry 90x60+0+0 -hold -e @EX_BIN_DIR@/$SAMPLER &
PROCESSOR="fairmq-ex-region-processor" PROCESSOR1="fairmq-ex-region-processor"
PROCESSOR+=" --id processor1" PROCESSOR1+=" --id processor1"
PROCESSOR+=" --severity debug" PROCESSOR1+=" --severity debug"
PROCESSOR+=" --transport $transport" PROCESSOR1+=" --transport $transport"
PROCESSOR+=" --shm-monitor true" PROCESSOR1+=" --shm-segment-id 1"
PROCESSOR+=" --channel-config name=data1,type=pull,method=connect,address=tcp://127.0.0.1:7777" PROCESSOR1+=" --shm-monitor true"
PROCESSOR+=" name=data2,type=push,method=bind,address=tcp://127.0.0.1:7778" PROCESSOR1+=" --channel-config name=data1,type=pull,method=connect,address=tcp://127.0.0.1:7777"
PROCESSOR+=" name=data3,type=push,method=bind,address=tcp://127.0.0.1:7779" PROCESSOR1+=" name=data2,type=push,method=bind,address=tcp://127.0.0.1:7778"
xterm -geometry 120x60+750+0 -hold -e @EX_BIN_DIR@/$PROCESSOR & PROCESSOR1+=" name=data3,type=push,method=bind,address=tcp://127.0.0.1:7779"
xterm -geometry 90x40+550+40 -hold -e @EX_BIN_DIR@/$PROCESSOR1 &
SINK1="fairmq-ex-region-sink" PROCESSOR2="fairmq-ex-region-processor"
SINK1+=" --id sink1" PROCESSOR2+=" --id processor2"
SINK1+=" --severity debug" PROCESSOR2+=" --severity debug"
SINK1+=" --chan-name data2" PROCESSOR2+=" --transport $transport"
SINK1+=" --transport $transport" PROCESSOR2+=" --shm-segment-id 2"
SINK1+=" --shm-monitor true" PROCESSOR2+=" --shm-monitor true"
SINK1+=" --channel-config name=data2,type=pull,method=connect,address=tcp://127.0.0.1:7778" PROCESSOR2+=" --channel-config name=data1,type=pull,method=connect,address=tcp://127.0.0.1:7777"
xterm -geometry 120x32+1500+0 -hold -e @EX_BIN_DIR@/$SINK1 & PROCESSOR2+=" name=data2,type=push,method=bind,address=tcp://127.0.0.1:7788"
PROCESSOR2+=" name=data3,type=push,method=bind,address=tcp://127.0.0.1:7789"
xterm -geometry 90x40+550+600 -hold -e @EX_BIN_DIR@/$PROCESSOR2 &
SINK2="fairmq-ex-region-sink" SINK1_1="fairmq-ex-region-sink"
SINK2+=" --id sink2" SINK1_1+=" --id sink1_1"
SINK2+=" --severity debug" SINK1_1+=" --severity debug"
SINK2+=" --chan-name data3" SINK1_1+=" --chan-name data2"
SINK2+=" --transport $transport" SINK1_1+=" --transport $transport"
SINK2+=" --shm-monitor true" SINK1_1+=" --shm-segment-id 1"
SINK2+=" --channel-config name=data3,type=pull,method=connect,address=tcp://127.0.0.1:7779" SINK1_1+=" --shm-monitor true"
xterm -geometry 120x32+1500+500 -hold -e @EX_BIN_DIR@/$SINK2 & SINK1_1+=" --channel-config name=data2,type=pull,method=connect,address=tcp://127.0.0.1:7778"
xterm -geometry 90x20+1100+0 -hold -e @EX_BIN_DIR@/$SINK1_1 &
SINK1_2="fairmq-ex-region-sink"
SINK1_2+=" --id sink1_2"
SINK1_2+=" --severity debug"
SINK1_2+=" --chan-name data3"
SINK1_2+=" --transport $transport"
SINK1_2+=" --shm-segment-id 1"
SINK1_2+=" --shm-monitor true"
SINK1_2+=" --channel-config name=data3,type=pull,method=connect,address=tcp://127.0.0.1:7779"
xterm -geometry 90x20+1100+300 -hold -e @EX_BIN_DIR@/$SINK1_2 &
SINK2_1="fairmq-ex-region-sink"
SINK2_1+=" --id sink2_1"
SINK2_1+=" --severity debug"
SINK2_1+=" --chan-name data2"
SINK2_1+=" --transport $transport"
SINK2_1+=" --shm-segment-id 2"
SINK2_1+=" --shm-monitor true"
SINK2_1+=" --channel-config name=data2,type=pull,method=connect,address=tcp://127.0.0.1:7788"
xterm -geometry 90x20+1100+600 -hold -e @EX_BIN_DIR@/$SINK2_1 &
SINK2_2="fairmq-ex-region-sink"
SINK2_2+=" --id sink2_2"
SINK2_2+=" --severity debug"
SINK2_2+=" --chan-name data3"
SINK2_2+=" --transport $transport"
SINK2_2+=" --shm-segment-id 2"
SINK2_2+=" --shm-monitor true"
SINK2_2+=" --channel-config name=data3,type=pull,method=connect,address=tcp://127.0.0.1:7789"
xterm -geometry 90x20+1100+900 -hold -e @EX_BIN_DIR@/$SINK2_2 &

View File

@ -36,16 +36,22 @@ struct Processor : Device
Channel& dataOut2 = GetChannel("data3", 0); Channel& dataOut2 = GetChannel("data3", 0);
while (!NewStatePending()) { while (!NewStatePending()) {
auto msg(dataIn.Transport()->CreateMessage()); fair::mq::Parts inParts;
dataIn.Receive(msg); dataIn.Receive(inParts);
fair::mq::MessagePtr msgCopy1(NewMessage()); fair::mq::Parts outParts1;
msgCopy1->Copy(*msg); fair::mq::Parts outParts2;
fair::mq::MessagePtr msgCopy2(NewMessage());
msgCopy2->Copy(*msg);
dataOut1.Send(msgCopy1); for (const auto& inPart : inParts) {
dataOut2.Send(msgCopy2); outParts1.AddPart(NewMessage());
outParts1.fParts.back()->Copy(*inPart);
outParts2.AddPart(NewMessage());
outParts2.fParts.back()->Copy(*inPart);
}
dataOut1.Send(outParts1);
dataOut2.Send(outParts2);
if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) { if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) {
LOG(info) << "Configured max number of iterations reached. Leaving RUNNING state."; LOG(info) << "Configured max number of iterations reached. Leaving RUNNING state.";

View File

@ -26,6 +26,7 @@ struct Sampler : fair::mq::Device
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations"); fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
fChanName = fConfig->GetProperty<std::string>("chan-name"); fChanName = fConfig->GetProperty<std::string>("chan-name");
fSamplingRate = fConfig->GetProperty<float>("sampling-rate"); fSamplingRate = fConfig->GetProperty<float>("sampling-rate");
fRCSegmentSize = fConfig->GetProperty<uint64_t>("rc-segment-size");
GetChannel(fChanName, 0).Transport()->SubscribeToRegionEvents([](fair::mq::RegionInfo info) { GetChannel(fChanName, 0).Transport()->SubscribeToRegionEvents([](fair::mq::RegionInfo info) {
LOG(info) << "Region event: " << info.event << ": " LOG(info) << "Region event: " << info.event << ": "
@ -45,6 +46,7 @@ struct Sampler : fair::mq::Device
} }
regionCfg.lock = !fExternalRegion; // mlock region after creation regionCfg.lock = !fExternalRegion; // mlock region after creation
regionCfg.zero = !fExternalRegion; // zero region content after creation regionCfg.zero = !fExternalRegion; // zero region content after creation
regionCfg.rcSegmentSize = fRCSegmentSize; // size of the corresponding reference count segment
fRegion = fair::mq::UnmanagedRegionPtr(NewUnmanagedRegionFor( fRegion = fair::mq::UnmanagedRegionPtr(NewUnmanagedRegionFor(
fChanName, // region is created using the transport of this channel... fChanName, // region is created using the transport of this channel...
0, // ... and this sub-channel 0, // ... and this sub-channel
@ -66,17 +68,22 @@ struct Sampler : fair::mq::Device
fair::mq::tools::RateLimiter rateLimiter(fSamplingRate); fair::mq::tools::RateLimiter rateLimiter(fSamplingRate);
while (!NewStatePending()) { while (!NewStatePending()) {
fair::mq::MessagePtr msg(NewMessageFor(fChanName, // channel fair::mq::Parts parts;
0, // sub-channel // make 64 parts
fRegion, // region for (int i = 0; i < 64; ++i) {
fRegion->GetData(), // ptr within region parts.AddPart(NewMessageFor(
fMsgSize, // offset from ptr fChanName, // channel
nullptr // hint 0, // sub-channel
)); fRegion, // region
fRegion->GetData(), // ptr within region
fMsgSize, // offset from ptr
nullptr // hint
));
}
std::lock_guard<std::mutex> lock(fMtx); std::lock_guard<std::mutex> lock(fMtx);
++fNumUnackedMsgs; fNumUnackedMsgs += parts.Size();
if (Send(msg, fChanName, 0) > 0) { if (Send(parts, fChanName, 0) > 0) {
if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) { if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) {
LOG(info) << "Configured maximum number of iterations reached. Stopping sending."; LOG(info) << "Configured maximum number of iterations reached. Stopping sending.";
break; break;
@ -117,6 +124,7 @@ struct Sampler : fair::mq::Device
uint32_t fLinger = 100; uint32_t fLinger = 100;
uint64_t fMaxIterations = 0; uint64_t fMaxIterations = 0;
uint64_t fNumIterations = 0; uint64_t fNumIterations = 0;
uint64_t fRCSegmentSize = 10000000;
fair::mq::UnmanagedRegionPtr fRegion = nullptr; fair::mq::UnmanagedRegionPtr fRegion = nullptr;
std::mutex fMtx; std::mutex fMtx;
uint64_t fNumUnackedMsgs = 0; uint64_t fNumUnackedMsgs = 0;
@ -132,7 +140,8 @@ void addCustomOptions(bpo::options_description& options)
("sampling-rate", bpo::value<float>()->default_value(0.), "Sampling rate (Hz).") ("sampling-rate", bpo::value<float>()->default_value(0.), "Sampling rate (Hz).")
("region-linger", bpo::value<uint32_t>()->default_value(100), "Linger period for regions") ("region-linger", bpo::value<uint32_t>()->default_value(100), "Linger period for regions")
("max-iterations", bpo::value<uint64_t>()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)") ("max-iterations", bpo::value<uint64_t>()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)")
("external-region", bpo::value<bool>()->default_value(false), "Use region created by another process"); ("external-region", bpo::value<bool>()->default_value(false), "Use region created by another process")
("rc-segment-size", bpo::value<uint64_t>()->default_value(10000000), "Size of the reference count segment for Unamanged Region");
} }
std::unique_ptr<fair::mq::Device> getDevice(fair::mq::ProgOptions& /*config*/) std::unique_ptr<fair::mq::Device> getDevice(fair::mq::ProgOptions& /*config*/)

View File

@ -36,12 +36,8 @@ struct Sink : Device
Channel& dataIn = GetChannel(fChanName, 0); Channel& dataIn = GetChannel(fChanName, 0);
while (!NewStatePending()) { while (!NewStatePending()) {
auto msg(dataIn.Transport()->CreateMessage()); fair::mq::Parts parts;
dataIn.Receive(msg); dataIn.Receive(parts);
// void* ptr = msg->GetData();
// char* cptr = static_cast<char*>(ptr);
// LOG(info) << "check: " << cptr[3];
if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) { if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) {
LOG(info) << "Configured max number of iterations reached. Leaving RUNNING state."; LOG(info) << "Configured max number of iterations reached. Leaving RUNNING state.";