From 8375faf835514f2f98dceabfb2418743ac2d7cd3 Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Thu, 21 Feb 2019 13:40:48 +0100 Subject: [PATCH] Add --max-run-time option and fix bug in LogSocketRates --- fairmq/FairMQDevice.cxx | 95 +++++++++++++--------------- fairmq/FairMQDevice.h | 1 + fairmq/options/FairMQProgOptions.cxx | 23 +++---- 3 files changed, 57 insertions(+), 62 deletions(-) diff --git a/fairmq/FairMQDevice.cxx b/fairmq/FairMQDevice.cxx index 918ae15a..259161af 100644 --- a/fairmq/FairMQDevice.cxx +++ b/fairmq/FairMQDevice.cxx @@ -81,6 +81,7 @@ FairMQDevice::FairMQDevice(FairMQProgOptions* config, const fair::mq::tools::Ver , fMultitransportProceed(false) , fVersion(version) , fRate(0.) + , fMaxRunRuntimeInS(0) , fRawCmdLineArgs() { SubscribeToNewTransition("device", [&](fair::mq::Transition transition) { @@ -175,6 +176,7 @@ void FairMQDevice::InitWrapper() Init(); fRate = fConfig->GetValue("rate"); + fMaxRunRuntimeInS = fConfig->GetValue("max-run-time"); try { fDefaultTransportType = fair::mq::TransportTypes.at(fConfig->GetValue("transport")); @@ -701,9 +703,6 @@ void FairMQDevice::SetConfig(FairMQProgOptions& config) void FairMQDevice::LogSocketRates() { - chrono::time_point t0; - chrono::time_point t1; - vector filteredSockets; vector filteredChannelNames; vector logIntervals; @@ -712,13 +711,10 @@ void FairMQDevice::LogSocketRates() size_t chanNameLen = 0; // iterate over the channels map - for (const auto& mi : fChannels) - { + for (const auto& mi : fChannels) { // iterate over the channels vector - for (auto vi = (mi.second).begin(); vi != (mi.second).end(); ++vi) - { - if (vi->fRateLogging > 0) - { + for (auto vi = (mi.second).begin(); vi != (mi.second).end(); ++vi) { + if (vi->fRateLogging > 0) { filteredSockets.push_back(vi->fSocket.get()); logIntervals.push_back(vi->fRateLogging); intervalCounters.push_back(0); @@ -728,55 +724,50 @@ void FairMQDevice::LogSocketRates() } } - unsigned int numFilteredSockets = filteredSockets.size(); + vector bytesIn(filteredSockets.size()); + vector msgIn(filteredSockets.size()); + vector bytesOut(filteredSockets.size()); + vector msgOut(filteredSockets.size()); - if (numFilteredSockets > 0) - { - vector bytesIn(numFilteredSockets); - vector msgIn(numFilteredSockets); - vector bytesOut(numFilteredSockets); - vector msgOut(numFilteredSockets); + vector bytesInNew(filteredSockets.size()); + vector msgInNew(filteredSockets.size()); + vector bytesOutNew(filteredSockets.size()); + vector msgOutNew(filteredSockets.size()); - vector bytesInNew(numFilteredSockets); - vector msgInNew(numFilteredSockets); - vector bytesOutNew(numFilteredSockets); - vector msgOutNew(numFilteredSockets); + vector mbPerSecIn(filteredSockets.size()); + vector msgPerSecIn(filteredSockets.size()); + vector mbPerSecOut(filteredSockets.size()); + vector msgPerSecOut(filteredSockets.size()); - vector mbPerSecIn(numFilteredSockets); - vector msgPerSecIn(numFilteredSockets); - vector mbPerSecOut(numFilteredSockets); - vector msgPerSecOut(numFilteredSockets); + int i = 0; + for (const auto& vi : filteredSockets) { + bytesIn.at(i) = vi->GetBytesRx(); + bytesOut.at(i) = vi->GetBytesTx(); + msgIn.at(i) = vi->GetMessagesRx(); + msgOut.at(i) = vi->GetMessagesTx(); + ++i; + } - int i = 0; - for (const auto& vi : filteredSockets) - { - bytesIn.at(i) = vi->GetBytesRx(); - bytesOut.at(i) = vi->GetBytesTx(); - msgIn.at(i) = vi->GetMessagesRx(); - msgOut.at(i) = vi->GetMessagesTx(); - ++i; - } + chrono::time_point t0(chrono::high_resolution_clock::now()); + chrono::time_point t1; + uint64_t secondsElapsed = 0; - t0 = chrono::high_resolution_clock::now(); + while (!NewStatePending()) { + WaitFor(chrono::seconds(1)); - LOG(debug) << ": in: <#msgs> () out: <#msgs> ()"; + t1 = chrono::high_resolution_clock::now(); - while (!NewStatePending()) - { - t1 = chrono::high_resolution_clock::now(); + uint64_t msSinceLastLog = chrono::duration_cast(t1 - t0).count(); - unsigned long long msSinceLastLog = chrono::duration_cast(t1 - t0).count(); + i = 0; - i = 0; + for (const auto& vi : filteredSockets) { + intervalCounters.at(i)++; - for (const auto& vi : filteredSockets) - { - intervalCounters.at(i)++; - - if (intervalCounters.at(i) == logIntervals.at(i)) - { - intervalCounters.at(i) = 0; + if (intervalCounters.at(i) == logIntervals.at(i)) { + intervalCounters.at(i) = 0; + if (msSinceLastLog > 0) { bytesInNew.at(i) = vi->GetBytesRx(); msgInNew.at(i) = vi->GetMessagesRx(); bytesOutNew.at(i) = vi->GetBytesTx(); @@ -796,12 +787,14 @@ void FairMQDevice::LogSocketRates() << "in: " << msgPerSecIn.at(i) << " (" << mbPerSecIn.at(i) << " MB) " << "out: " << msgPerSecOut.at(i) << " (" << mbPerSecOut.at(i) << " MB)"; } - - ++i; } - t0 = t1; - WaitFor(chrono::milliseconds(1000)); + ++i; + } + + t0 = t1; + if (fMaxRunRuntimeInS > 0 && ++secondsElapsed >= fMaxRunRuntimeInS) { + ChangeState(fair::mq::Transition::Stop); } } } diff --git a/fairmq/FairMQDevice.h b/fairmq/FairMQDevice.h index b57d8950..677adce4 100644 --- a/fairmq/FairMQDevice.h +++ b/fairmq/FairMQDevice.h @@ -559,6 +559,7 @@ class FairMQDevice const fair::mq::tools::Version fVersion; float fRate; ///< Rate limiting for ConditionalRun + uint64_t fMaxRunRuntimeInS; ///< Maximum runtime for the Running state handler, after which state will change to Ready (in seconds, 0 for no limit). std::vector fRawCmdLineArgs; std::queue fStates; diff --git a/fairmq/options/FairMQProgOptions.cxx b/fairmq/options/FairMQProgOptions.cxx index bdabf006..ec2e8be8 100644 --- a/fairmq/options/FairMQProgOptions.cxx +++ b/fairmq/options/FairMQProgOptions.cxx @@ -56,17 +56,18 @@ FairMQProgOptions::FairMQProgOptions() ("print-options", po::value()->implicit_value(true), "Print options in machine-readable format (