Fix message counter in region example

This commit is contained in:
Alexey Rybalchenko
2020-06-15 14:04:42 +02:00
parent 79489bb501
commit 52c6264faf
3 changed files with 31 additions and 21 deletions

View File

@@ -27,8 +27,7 @@ Sampler::Sampler()
, fNumIterations(0) , fNumIterations(0)
, fRegion(nullptr) , fRegion(nullptr)
, fNumUnackedMsgs(0) , fNumUnackedMsgs(0)
{ {}
}
void Sampler::InitTask() void Sampler::InitTask()
{ {
@@ -36,20 +35,22 @@ void Sampler::InitTask()
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations"); fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
fChannels.at("data").at(0).Transport()->SubscribeToRegionEvents([](FairMQRegionInfo info) { fChannels.at("data").at(0).Transport()->SubscribeToRegionEvents([](FairMQRegionInfo info) {
LOG(warn) << ">>>" << info.event; LOG(info) << "Region event: " << info.event
LOG(warn) << "id: " << info.id; << ", id: " << info.id
LOG(warn) << "ptr: " << info.ptr; << ", ptr: " << info.ptr
LOG(warn) << "size: " << info.size; << ", size: " << info.size
LOG(warn) << "flags: " << info.flags; << ", flags: " << info.flags;
}); });
fRegion = FairMQUnmanagedRegionPtr(NewUnmanagedRegionFor("data", fRegion = FairMQUnmanagedRegionPtr(NewUnmanagedRegionFor("data",
0, 0,
10000000, 10000000,
[this](const std::vector<fair::mq::RegionBlock>& blocks) { // callback to be called when message buffers no longer needed by transport [this](const std::vector<fair::mq::RegionBlock>& blocks) { // callback to be called when message buffers no longer needed by transport
lock_guard<mutex> lock(fMtx);
fNumUnackedMsgs -= blocks.size(); fNumUnackedMsgs -= blocks.size();
if (fMaxIterations > 0) { if (fMaxIterations > 0) {
LOG(debug) << "Received " << blocks.size() << " acks"; LOG(info) << "Received " << blocks.size() << " acks";
} }
} }
)); ));
@@ -69,14 +70,14 @@ bool Sampler::ConditionalRun()
// LOG(info) << "check: " << static_cast<char*>(fRegion->GetData())[3]; // LOG(info) << "check: " << static_cast<char*>(fRegion->GetData())[3];
// std::this_thread::sleep_for(std::chrono::seconds(1)); // std::this_thread::sleep_for(std::chrono::seconds(1));
lock_guard<mutex> lock(fMtx);
if (Send(msg, "data", 0) > 0) { if (Send(msg, "data", 0) > 0) {
++fNumUnackedMsgs;
if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) { if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) {
LOG(info) << "Configured maximum number of iterations reached. Leaving RUNNING state."; LOG(info) << "Configured maximum number of iterations reached. Leaving RUNNING state.";
return false; return false;
} }
} }
++fNumUnackedMsgs;
return true; return true;
} }
@@ -84,10 +85,17 @@ bool Sampler::ConditionalRun()
void Sampler::ResetTask() void Sampler::ResetTask()
{ {
// if not all messages acknowledged, wait for a bit. But only once, since receiver could be already dead. // if not all messages acknowledged, wait for a bit. But only once, since receiver could be already dead.
if (fNumUnackedMsgs != 0) { {
LOG(debug) << "waiting for all acknowledgements... (" << fNumUnackedMsgs << ")"; unique_lock<mutex> lock(fMtx);
this_thread::sleep_for(chrono::milliseconds(500)); if (fNumUnackedMsgs != 0) {
LOG(debug) << "done, still unacked: " << fNumUnackedMsgs; LOG(info) << "Waiting for all acknowledgements... (" << fNumUnackedMsgs << ")";
lock.unlock();
this_thread::sleep_for(chrono::milliseconds(500));
lock.lock();
LOG(info) << "Done, still not acknowledged: " << fNumUnackedMsgs;
} else {
LOG(info) << "All acknowledgements received";
}
} }
fRegion.reset(); fRegion.reset();
fChannels.at("data").at(0).Transport()->UnsubscribeFromRegionEvents(); fChannels.at("data").at(0).Transport()->UnsubscribeFromRegionEvents();

View File

@@ -15,7 +15,8 @@
#ifndef FAIRMQEXAMPLEREGIONSAMPLER_H #ifndef FAIRMQEXAMPLEREGIONSAMPLER_H
#define FAIRMQEXAMPLEREGIONSAMPLER_H #define FAIRMQEXAMPLEREGIONSAMPLER_H
#include <atomic> #include <mutex>
#include <cstdint>
#include "FairMQDevice.h" #include "FairMQDevice.h"
@@ -38,7 +39,8 @@ class Sampler : public FairMQDevice
uint64_t fMaxIterations; uint64_t fMaxIterations;
uint64_t fNumIterations; uint64_t fNumIterations;
FairMQUnmanagedRegionPtr fRegion; FairMQUnmanagedRegionPtr fRegion;
std::atomic<uint64_t> fNumUnackedMsgs; std::mutex fMtx;
uint64_t fNumUnackedMsgs;
}; };
} // namespace example_region } // namespace example_region

View File

@@ -30,11 +30,11 @@ void Sink::InitTask()
// Get the fMaxIterations value from the command line options (via fConfig) // Get the fMaxIterations value from the command line options (via fConfig)
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations"); fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
fChannels.at("data").at(0).Transport()->SubscribeToRegionEvents([](FairMQRegionInfo info) { fChannels.at("data").at(0).Transport()->SubscribeToRegionEvents([](FairMQRegionInfo info) {
LOG(warn) << ">>>" << info.event; LOG(info) << "Region event: " << info.event
LOG(warn) << "id: " << info.id; << ", id: " << info.id
LOG(warn) << "ptr: " << info.ptr; << ", ptr: " << info.ptr
LOG(warn) << "size: " << info.size; << ", size: " << info.size
LOG(warn) << "flags: " << info.flags; << ", flags: " << info.flags;
}); });
} }