From e4e88ad1db7d03c08d4926c936704e01d3fabbc3 Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Mon, 10 Nov 2014 17:13:38 +0100 Subject: [PATCH] Allow disabling transfer rate logging per input/output (optional, default is on). This commit introduces new property to FairMQDevice to control whether rateLogger thread should print rates for a given input/output. Per default logging is enabled, so existing code works as before and does not need to be updated. To turn off logging for an initialized input/output, set the `LogInputRate`/`LogOutputRate` property to `0`. Example: ``` c++ splitter.ChangeState(FairMQSplitter::INIT); // turn off rate logging for input 1 and output 4 splitter.SetProperty(FairMQSplitter::LogInputRate, 0, 1); splitter.SetProperty(FairMQSplitter::LogOutputRate, 0, 4); ``` --- fairmq/FairMQDevice.cxx | 75 +++++++++++++++++++++++++++++++---------- fairmq/FairMQDevice.h | 4 +++ 2 files changed, 62 insertions(+), 17 deletions(-) diff --git a/fairmq/FairMQDevice.cxx b/fairmq/FairMQDevice.cxx index 7b6beb96..042c236f 100644 --- a/fairmq/FairMQDevice.cxx +++ b/fairmq/FairMQDevice.cxx @@ -40,6 +40,7 @@ void FairMQDevice::Init() fInputSocketType.push_back("sub"); // default value, can be overwritten in configuration fInputSndBufSize.push_back(10000); // default value, can be overwritten in configuration fInputRcvBufSize.push_back(10000); // default value, can be overwritten in configuration + fLogInputRate.push_back(1); // default value, can be overwritten in configuration } for (int i = 0; i < fNumOutputs; ++i) @@ -49,6 +50,7 @@ void FairMQDevice::Init() fOutputSocketType.push_back("pub"); // default value, can be overwritten in configuration fOutputSndBufSize.push_back(10000); // default value, can be overwritten in configuration fOutputRcvBufSize.push_back(10000); // default value, can be overwritten in configuration + fLogOutputRate.push_back(1); // default value, can be overwritten in configuration } } @@ -193,6 +195,14 @@ void FairMQDevice::SetProperty(const int key, const int value, const int slot /* fOutputRcvBufSize.erase(fOutputRcvBufSize.begin() + slot); fOutputRcvBufSize.insert(fOutputRcvBufSize.begin() + slot, value); break; + case LogInputRate: + fLogInputRate.erase(fLogInputRate.begin() + slot); + fLogInputRate.insert(fLogInputRate.begin() + slot, value); + break; + case LogOutputRate: + fLogOutputRate.erase(fLogOutputRate.begin() + slot); + fLogOutputRate.insert(fLogOutputRate.begin() + slot, value); + break; default: FairMQConfigurable::SetProperty(key, value, slot); break; @@ -240,6 +250,10 @@ int FairMQDevice::GetProperty(const int key, const int default_ /*= 0*/, const i return fOutputSndBufSize.at(slot); case OutputRcvBufSize: return fOutputRcvBufSize.at(slot); + case LogInputRate: + return fLogInputRate.at(slot); + case LogOutputRate: + return fLogOutputRate.at(slot); default: return FairMQConfigurable::GetProperty(key, default_, slot); } @@ -257,23 +271,50 @@ void FairMQDevice::LogSocketRates() timestamp_t msSinceLastLog; - vector bytesIn(fNumInputs); - vector msgIn(fNumInputs); - vector bytesOut(fNumOutputs); - vector msgOut(fNumOutputs); - - vector bytesInNew(fNumInputs); - vector msgInNew(fNumInputs); - vector bytesOutNew(fNumOutputs); - vector msgOutNew(fNumOutputs); - - vector mbPerSecIn(fNumInputs); - vector msgPerSecIn(fNumInputs); - vector mbPerSecOut(fNumOutputs); - vector msgPerSecOut(fNumOutputs); + int numFilteredInputs = 0; + int numFilteredOutputs = 0; + vector filteredInputs; + vector filteredOutputs; int i = 0; for (vector::iterator itr = fPayloadInputs->begin(); itr != fPayloadInputs->end(); itr++) + { + if (fLogInputRate.at(i) > 0) + { + filteredInputs.push_back((*itr)); + ++numFilteredInputs; + } + ++i; + } + + i = 0; + for (vector::iterator itr = fPayloadOutputs->begin(); itr != fPayloadOutputs->end(); itr++) + { + if (fLogOutputRate.at(i) > 0) + { + filteredOutputs.push_back((*itr)); + ++numFilteredOutputs; + } + ++i; + } + + vector bytesIn(numFilteredInputs); + vector msgIn(numFilteredInputs); + vector bytesOut(numFilteredOutputs); + vector msgOut(numFilteredOutputs); + + vector bytesInNew(numFilteredInputs); + vector msgInNew(numFilteredInputs); + vector bytesOutNew(numFilteredOutputs); + vector msgOutNew(numFilteredOutputs); + + vector mbPerSecIn(numFilteredInputs); + vector msgPerSecIn(numFilteredInputs); + vector mbPerSecOut(numFilteredOutputs); + vector msgPerSecOut(numFilteredOutputs); + + i = 0; + for (vector::iterator itr = filteredInputs.begin(); itr != filteredInputs.end(); itr++) { bytesIn.at(i) = (*itr)->GetBytesRx(); msgIn.at(i) = (*itr)->GetMessagesRx(); @@ -281,7 +322,7 @@ void FairMQDevice::LogSocketRates() } i = 0; - for (vector::iterator itr = fPayloadOutputs->begin(); itr != fPayloadOutputs->end(); itr++) + for (vector::iterator itr = filteredOutputs.begin(); itr != filteredOutputs.end(); itr++) { bytesOut.at(i) = (*itr)->GetBytesTx(); msgOut.at(i) = (*itr)->GetMessagesTx(); @@ -300,7 +341,7 @@ void FairMQDevice::LogSocketRates() i = 0; - for (vector::iterator itr = fPayloadInputs->begin(); itr != fPayloadInputs->end(); itr++) + for (vector::iterator itr = filteredInputs.begin(); itr != filteredInputs.end(); itr++) { bytesInNew.at(i) = (*itr)->GetBytesRx(); mbPerSecIn.at(i) = ((double)(bytesInNew.at(i) - bytesIn.at(i)) / (1024. * 1024.)) / (double)msSinceLastLog * 1000.; @@ -316,7 +357,7 @@ void FairMQDevice::LogSocketRates() i = 0; - for (vector::iterator itr = fPayloadOutputs->begin(); itr != fPayloadOutputs->end(); itr++) + for (vector::iterator itr = filteredOutputs.begin(); itr != filteredOutputs.end(); itr++) { bytesOutNew.at(i) = (*itr)->GetBytesTx(); mbPerSecOut.at(i) = ((double)(bytesOutNew.at(i) - bytesOut.at(i)) / (1024. * 1024.)) / (double)msSinceLastLog * 1000.; diff --git a/fairmq/FairMQDevice.h b/fairmq/FairMQDevice.h index 89a1f103..32086e1a 100644 --- a/fairmq/FairMQDevice.h +++ b/fairmq/FairMQDevice.h @@ -49,6 +49,8 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable OutputSndBufSize, OutputRcvBufSize, LogIntervalInMs, + LogInputRate, + LogOutputRate, Last }; @@ -78,12 +80,14 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable vector fInputSocketType; vector fInputSndBufSize; vector fInputRcvBufSize; + vector fLogInputRate; vector fOutputAddress; vector fOutputMethod; vector fOutputSocketType; vector fOutputSndBufSize; vector fOutputRcvBufSize; + vector fLogOutputRate; vector* fPayloadInputs; vector* fPayloadOutputs;