diff --git a/examples/region/Sampler.cxx b/examples/region/Sampler.cxx index 416e2b55..aaa7534a 100644 --- a/examples/region/Sampler.cxx +++ b/examples/region/Sampler.cxx @@ -27,8 +27,7 @@ Sampler::Sampler() , fNumIterations(0) , fRegion(nullptr) , fNumUnackedMsgs(0) -{ -} +{} void Sampler::InitTask() { @@ -36,20 +35,22 @@ void Sampler::InitTask() fMaxIterations = fConfig->GetProperty("max-iterations"); fChannels.at("data").at(0).Transport()->SubscribeToRegionEvents([](FairMQRegionInfo info) { - LOG(warn) << ">>>" << info.event; - LOG(warn) << "id: " << info.id; - LOG(warn) << "ptr: " << info.ptr; - LOG(warn) << "size: " << info.size; - LOG(warn) << "flags: " << info.flags; + LOG(info) << "Region event: " << info.event + << ", id: " << info.id + << ", ptr: " << info.ptr + << ", size: " << info.size + << ", flags: " << info.flags; }); fRegion = FairMQUnmanagedRegionPtr(NewUnmanagedRegionFor("data", 0, 10000000, [this](const std::vector& blocks) { // callback to be called when message buffers no longer needed by transport + lock_guard lock(fMtx); fNumUnackedMsgs -= blocks.size(); + if (fMaxIterations > 0) { - LOG(debug) << "Received " << blocks.size() << " acks"; + LOG(info) << "Received " << blocks.size() << " acks"; } } )); @@ -69,14 +70,14 @@ bool Sampler::ConditionalRun() // LOG(info) << "check: " << static_cast(fRegion->GetData())[3]; // std::this_thread::sleep_for(std::chrono::seconds(1)); + lock_guard lock(fMtx); if (Send(msg, "data", 0) > 0) { - ++fNumUnackedMsgs; - if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) { LOG(info) << "Configured maximum number of iterations reached. Leaving RUNNING state."; return false; } } + ++fNumUnackedMsgs; return true; } @@ -84,10 +85,17 @@ bool Sampler::ConditionalRun() void Sampler::ResetTask() { // if not all messages acknowledged, wait for a bit. But only once, since receiver could be already dead. - if (fNumUnackedMsgs != 0) { - LOG(debug) << "waiting for all acknowledgements... (" << fNumUnackedMsgs << ")"; - this_thread::sleep_for(chrono::milliseconds(500)); - LOG(debug) << "done, still unacked: " << fNumUnackedMsgs; + { + unique_lock lock(fMtx); + if (fNumUnackedMsgs != 0) { + LOG(info) << "Waiting for all acknowledgements... (" << fNumUnackedMsgs << ")"; + lock.unlock(); + this_thread::sleep_for(chrono::milliseconds(500)); + lock.lock(); + LOG(info) << "Done, still not acknowledged: " << fNumUnackedMsgs; + } else { + LOG(info) << "All acknowledgements received"; + } } fRegion.reset(); fChannels.at("data").at(0).Transport()->UnsubscribeFromRegionEvents(); diff --git a/examples/region/Sampler.h b/examples/region/Sampler.h index 4056fb3d..505713ac 100644 --- a/examples/region/Sampler.h +++ b/examples/region/Sampler.h @@ -15,7 +15,8 @@ #ifndef FAIRMQEXAMPLEREGIONSAMPLER_H #define FAIRMQEXAMPLEREGIONSAMPLER_H -#include +#include +#include #include "FairMQDevice.h" @@ -38,7 +39,8 @@ class Sampler : public FairMQDevice uint64_t fMaxIterations; uint64_t fNumIterations; FairMQUnmanagedRegionPtr fRegion; - std::atomic fNumUnackedMsgs; + std::mutex fMtx; + uint64_t fNumUnackedMsgs; }; } // namespace example_region diff --git a/examples/region/Sink.cxx b/examples/region/Sink.cxx index fd06b639..902c390c 100644 --- a/examples/region/Sink.cxx +++ b/examples/region/Sink.cxx @@ -30,11 +30,11 @@ void Sink::InitTask() // Get the fMaxIterations value from the command line options (via fConfig) fMaxIterations = fConfig->GetProperty("max-iterations"); fChannels.at("data").at(0).Transport()->SubscribeToRegionEvents([](FairMQRegionInfo info) { - LOG(warn) << ">>>" << info.event; - LOG(warn) << "id: " << info.id; - LOG(warn) << "ptr: " << info.ptr; - LOG(warn) << "size: " << info.size; - LOG(warn) << "flags: " << info.flags; + LOG(info) << "Region event: " << info.event + << ", id: " << info.id + << ", ptr: " << info.ptr + << ", size: " << info.size + << ", flags: " << info.flags; }); }