Add --max-run-time option and fix bug in LogSocketRates

This commit is contained in:
Alexey Rybalchenko 2019-02-21 13:40:48 +01:00 committed by Dennis Klein
parent b7125b746e
commit 8375faf835
3 changed files with 57 additions and 62 deletions

View File

@ -81,6 +81,7 @@ FairMQDevice::FairMQDevice(FairMQProgOptions* config, const fair::mq::tools::Ver
, fMultitransportProceed(false) , fMultitransportProceed(false)
, fVersion(version) , fVersion(version)
, fRate(0.) , fRate(0.)
, fMaxRunRuntimeInS(0)
, fRawCmdLineArgs() , fRawCmdLineArgs()
{ {
SubscribeToNewTransition("device", [&](fair::mq::Transition transition) { SubscribeToNewTransition("device", [&](fair::mq::Transition transition) {
@ -175,6 +176,7 @@ void FairMQDevice::InitWrapper()
Init(); Init();
fRate = fConfig->GetValue<float>("rate"); fRate = fConfig->GetValue<float>("rate");
fMaxRunRuntimeInS = fConfig->GetValue<uint64_t>("max-run-time");
try { try {
fDefaultTransportType = fair::mq::TransportTypes.at(fConfig->GetValue<string>("transport")); fDefaultTransportType = fair::mq::TransportTypes.at(fConfig->GetValue<string>("transport"));
@ -701,9 +703,6 @@ void FairMQDevice::SetConfig(FairMQProgOptions& config)
void FairMQDevice::LogSocketRates() void FairMQDevice::LogSocketRates()
{ {
chrono::time_point<chrono::high_resolution_clock> t0;
chrono::time_point<chrono::high_resolution_clock> t1;
vector<FairMQSocket*> filteredSockets; vector<FairMQSocket*> filteredSockets;
vector<string> filteredChannelNames; vector<string> filteredChannelNames;
vector<int> logIntervals; vector<int> logIntervals;
@ -712,13 +711,10 @@ void FairMQDevice::LogSocketRates()
size_t chanNameLen = 0; size_t chanNameLen = 0;
// iterate over the channels map // iterate over the channels map
for (const auto& mi : fChannels) for (const auto& mi : fChannels) {
{
// iterate over the channels vector // iterate over the channels vector
for (auto vi = (mi.second).begin(); vi != (mi.second).end(); ++vi) for (auto vi = (mi.second).begin(); vi != (mi.second).end(); ++vi) {
{ if (vi->fRateLogging > 0) {
if (vi->fRateLogging > 0)
{
filteredSockets.push_back(vi->fSocket.get()); filteredSockets.push_back(vi->fSocket.get());
logIntervals.push_back(vi->fRateLogging); logIntervals.push_back(vi->fRateLogging);
intervalCounters.push_back(0); intervalCounters.push_back(0);
@ -728,28 +724,23 @@ void FairMQDevice::LogSocketRates()
} }
} }
unsigned int numFilteredSockets = filteredSockets.size(); vector<unsigned long> bytesIn(filteredSockets.size());
vector<unsigned long> msgIn(filteredSockets.size());
vector<unsigned long> bytesOut(filteredSockets.size());
vector<unsigned long> msgOut(filteredSockets.size());
if (numFilteredSockets > 0) vector<unsigned long> bytesInNew(filteredSockets.size());
{ vector<unsigned long> msgInNew(filteredSockets.size());
vector<unsigned long> bytesIn(numFilteredSockets); vector<unsigned long> bytesOutNew(filteredSockets.size());
vector<unsigned long> msgIn(numFilteredSockets); vector<unsigned long> msgOutNew(filteredSockets.size());
vector<unsigned long> bytesOut(numFilteredSockets);
vector<unsigned long> msgOut(numFilteredSockets);
vector<unsigned long> bytesInNew(numFilteredSockets); vector<double> mbPerSecIn(filteredSockets.size());
vector<unsigned long> msgInNew(numFilteredSockets); vector<double> msgPerSecIn(filteredSockets.size());
vector<unsigned long> bytesOutNew(numFilteredSockets); vector<double> mbPerSecOut(filteredSockets.size());
vector<unsigned long> msgOutNew(numFilteredSockets); vector<double> msgPerSecOut(filteredSockets.size());
vector<double> mbPerSecIn(numFilteredSockets);
vector<double> msgPerSecIn(numFilteredSockets);
vector<double> mbPerSecOut(numFilteredSockets);
vector<double> msgPerSecOut(numFilteredSockets);
int i = 0; int i = 0;
for (const auto& vi : filteredSockets) for (const auto& vi : filteredSockets) {
{
bytesIn.at(i) = vi->GetBytesRx(); bytesIn.at(i) = vi->GetBytesRx();
bytesOut.at(i) = vi->GetBytesTx(); bytesOut.at(i) = vi->GetBytesTx();
msgIn.at(i) = vi->GetMessagesRx(); msgIn.at(i) = vi->GetMessagesRx();
@ -757,26 +748,26 @@ void FairMQDevice::LogSocketRates()
++i; ++i;
} }
t0 = chrono::high_resolution_clock::now(); chrono::time_point<chrono::high_resolution_clock> t0(chrono::high_resolution_clock::now());
chrono::time_point<chrono::high_resolution_clock> t1;
uint64_t secondsElapsed = 0;
LOG(debug) << "<channel>: in: <#msgs> (<MB>) out: <#msgs> (<MB>)"; while (!NewStatePending()) {
WaitFor(chrono::seconds(1));
while (!NewStatePending())
{
t1 = chrono::high_resolution_clock::now(); t1 = chrono::high_resolution_clock::now();
unsigned long long msSinceLastLog = chrono::duration_cast<chrono::milliseconds>(t1 - t0).count(); uint64_t msSinceLastLog = chrono::duration_cast<chrono::milliseconds>(t1 - t0).count();
i = 0; i = 0;
for (const auto& vi : filteredSockets) for (const auto& vi : filteredSockets) {
{
intervalCounters.at(i)++; intervalCounters.at(i)++;
if (intervalCounters.at(i) == logIntervals.at(i)) if (intervalCounters.at(i) == logIntervals.at(i)) {
{
intervalCounters.at(i) = 0; intervalCounters.at(i) = 0;
if (msSinceLastLog > 0) {
bytesInNew.at(i) = vi->GetBytesRx(); bytesInNew.at(i) = vi->GetBytesRx();
msgInNew.at(i) = vi->GetMessagesRx(); msgInNew.at(i) = vi->GetMessagesRx();
bytesOutNew.at(i) = vi->GetBytesTx(); bytesOutNew.at(i) = vi->GetBytesTx();
@ -796,12 +787,14 @@ void FairMQDevice::LogSocketRates()
<< "in: " << msgPerSecIn.at(i) << " (" << mbPerSecIn.at(i) << " MB) " << "in: " << msgPerSecIn.at(i) << " (" << mbPerSecIn.at(i) << " MB) "
<< "out: " << msgPerSecOut.at(i) << " (" << mbPerSecOut.at(i) << " MB)"; << "out: " << msgPerSecOut.at(i) << " (" << mbPerSecOut.at(i) << " MB)";
} }
}
++i; ++i;
} }
t0 = t1; t0 = t1;
WaitFor(chrono::milliseconds(1000)); if (fMaxRunRuntimeInS > 0 && ++secondsElapsed >= fMaxRunRuntimeInS) {
ChangeState(fair::mq::Transition::Stop);
} }
} }
} }

View File

@ -559,6 +559,7 @@ class FairMQDevice
const fair::mq::tools::Version fVersion; const fair::mq::tools::Version fVersion;
float fRate; ///< Rate limiting for ConditionalRun float fRate; ///< Rate limiting for ConditionalRun
uint64_t fMaxRunRuntimeInS; ///< Maximum runtime for the Running state handler, after which state will change to Ready (in seconds, 0 for no limit).
std::vector<std::string> fRawCmdLineArgs; std::vector<std::string> fRawCmdLineArgs;
std::queue<fair::mq::State> fStates; std::queue<fair::mq::State> fStates;

View File

@ -56,17 +56,18 @@ FairMQProgOptions::FairMQProgOptions()
("print-options", po::value<bool >()->implicit_value(true), "Print options in machine-readable format (<option>:<computed-value>:<type>:<description>)"); ("print-options", po::value<bool >()->implicit_value(true), "Print options in machine-readable format (<option>:<computed-value>:<type>:<description>)");
fMQOptions.add_options() fMQOptions.add_options()
("id", po::value<string>(), "Device ID (required argument).") ("id", po::value<string >(), "Device ID (required argument).")
("io-threads", po::value<int >()->default_value(1), "Number of I/O threads.") ("io-threads", po::value<int >()->default_value(1), "Number of I/O threads.")
("transport", po::value<string>()->default_value("zeromq"), "Transport ('zeromq'/'nanomsg'/'shmem').") ("transport", po::value<string >()->default_value("zeromq"), "Transport ('zeromq'/'nanomsg'/'shmem').")
("network-interface", po::value<string>()->default_value("default"), "Network interface to bind on (e.g. eth0, ib0..., default will try to detect the interface of the default route).") ("network-interface", po::value<string >()->default_value("default"), "Network interface to bind on (e.g. eth0, ib0..., default will try to detect the interface of the default route).")
("config-key", po::value<string>(), "Use provided value instead of device id for fetching the configuration from the config file.") ("config-key", po::value<string >(), "Use provided value instead of device id for fetching the configuration from the config file.")
("initialization-timeout", po::value<int >()->default_value(120), "Timeout for the initialization in seconds (when expecting dynamic initialization).") ("initialization-timeout", po::value<int >()->default_value(120), "Timeout for the initialization in seconds (when expecting dynamic initialization).")
("max-run-time", po::value<uint64_t>()->default_value(0), "Maximum runtime for the Running state handler, after which state will change to Ready (in seconds, 0 for no limit).")
("print-channels", po::value<bool >()->implicit_value(true), "Print registered channel endpoints in a machine-readable format (<channel name>:<min num subchannels>:<max num subchannels>)") ("print-channels", po::value<bool >()->implicit_value(true), "Print registered channel endpoints in a machine-readable format (<channel name>:<min num subchannels>:<max num subchannels>)")
("shm-segment-size", po::value<size_t>()->default_value(2000000000), "Shared memory: size of the shared memory segment (in bytes).") ("shm-segment-size", po::value<size_t >()->default_value(2000000000), "Shared memory: size of the shared memory segment (in bytes).")
("shm-monitor", po::value<bool >()->default_value(true), "Shared memory: run monitor daemon.") ("shm-monitor", po::value<bool >()->default_value(true), "Shared memory: run monitor daemon.")
("rate", po::value<float >()->default_value(0.), "Rate for conditional run loop (Hz).") ("rate", po::value<float >()->default_value(0.), "Rate for conditional run loop (Hz).")
("session", po::value<string>()->default_value("default"), "Session name."); ("session", po::value<string >()->default_value("default"), "Session name.");
fParserOptions.add_options() fParserOptions.add_options()
("mq-config", po::value<string>(), "JSON input as file.") ("mq-config", po::value<string>(), "JSON input as file.")