Allow disabling transfer rate logging per input/output (optional, default is on).

This commit introduces new property to FairMQDevice to control whether rateLogger thread should print rates for a given input/output.
Per default logging is enabled, so existing code works as before and does not need to be updated.
To turn off logging for an initialized input/output, set the `LogInputRate`/`LogOutputRate` property to `0`.

Example:
``` c++
splitter.ChangeState(FairMQSplitter::INIT);
// turn off rate logging for input 1 and output 4
splitter.SetProperty(FairMQSplitter::LogInputRate, 0, 1);
splitter.SetProperty(FairMQSplitter::LogOutputRate, 0, 4);
```
This commit is contained in:
Alexey Rybalchenko 2014-11-10 17:13:38 +01:00
parent 0cef5692b1
commit e4e88ad1db
2 changed files with 62 additions and 17 deletions

View File

@ -40,6 +40,7 @@ void FairMQDevice::Init()
fInputSocketType.push_back("sub"); // default value, can be overwritten in configuration fInputSocketType.push_back("sub"); // default value, can be overwritten in configuration
fInputSndBufSize.push_back(10000); // default value, can be overwritten in configuration fInputSndBufSize.push_back(10000); // default value, can be overwritten in configuration
fInputRcvBufSize.push_back(10000); // default value, can be overwritten in configuration fInputRcvBufSize.push_back(10000); // default value, can be overwritten in configuration
fLogInputRate.push_back(1); // default value, can be overwritten in configuration
} }
for (int i = 0; i < fNumOutputs; ++i) for (int i = 0; i < fNumOutputs; ++i)
@ -49,6 +50,7 @@ void FairMQDevice::Init()
fOutputSocketType.push_back("pub"); // default value, can be overwritten in configuration fOutputSocketType.push_back("pub"); // default value, can be overwritten in configuration
fOutputSndBufSize.push_back(10000); // default value, can be overwritten in configuration fOutputSndBufSize.push_back(10000); // default value, can be overwritten in configuration
fOutputRcvBufSize.push_back(10000); // default value, can be overwritten in configuration fOutputRcvBufSize.push_back(10000); // default value, can be overwritten in configuration
fLogOutputRate.push_back(1); // default value, can be overwritten in configuration
} }
} }
@ -193,6 +195,14 @@ void FairMQDevice::SetProperty(const int key, const int value, const int slot /*
fOutputRcvBufSize.erase(fOutputRcvBufSize.begin() + slot); fOutputRcvBufSize.erase(fOutputRcvBufSize.begin() + slot);
fOutputRcvBufSize.insert(fOutputRcvBufSize.begin() + slot, value); fOutputRcvBufSize.insert(fOutputRcvBufSize.begin() + slot, value);
break; break;
case LogInputRate:
fLogInputRate.erase(fLogInputRate.begin() + slot);
fLogInputRate.insert(fLogInputRate.begin() + slot, value);
break;
case LogOutputRate:
fLogOutputRate.erase(fLogOutputRate.begin() + slot);
fLogOutputRate.insert(fLogOutputRate.begin() + slot, value);
break;
default: default:
FairMQConfigurable::SetProperty(key, value, slot); FairMQConfigurable::SetProperty(key, value, slot);
break; break;
@ -240,6 +250,10 @@ int FairMQDevice::GetProperty(const int key, const int default_ /*= 0*/, const i
return fOutputSndBufSize.at(slot); return fOutputSndBufSize.at(slot);
case OutputRcvBufSize: case OutputRcvBufSize:
return fOutputRcvBufSize.at(slot); return fOutputRcvBufSize.at(slot);
case LogInputRate:
return fLogInputRate.at(slot);
case LogOutputRate:
return fLogOutputRate.at(slot);
default: default:
return FairMQConfigurable::GetProperty(key, default_, slot); return FairMQConfigurable::GetProperty(key, default_, slot);
} }
@ -257,23 +271,50 @@ void FairMQDevice::LogSocketRates()
timestamp_t msSinceLastLog; timestamp_t msSinceLastLog;
vector<unsigned long> bytesIn(fNumInputs); int numFilteredInputs = 0;
vector<unsigned long> msgIn(fNumInputs); int numFilteredOutputs = 0;
vector<unsigned long> bytesOut(fNumOutputs); vector<FairMQSocket*> filteredInputs;
vector<unsigned long> msgOut(fNumOutputs); vector<FairMQSocket*> filteredOutputs;
vector<unsigned long> bytesInNew(fNumInputs);
vector<unsigned long> msgInNew(fNumInputs);
vector<unsigned long> bytesOutNew(fNumOutputs);
vector<unsigned long> msgOutNew(fNumOutputs);
vector<double> mbPerSecIn(fNumInputs);
vector<double> msgPerSecIn(fNumInputs);
vector<double> mbPerSecOut(fNumOutputs);
vector<double> msgPerSecOut(fNumOutputs);
int i = 0; int i = 0;
for (vector<FairMQSocket*>::iterator itr = fPayloadInputs->begin(); itr != fPayloadInputs->end(); itr++) for (vector<FairMQSocket*>::iterator itr = fPayloadInputs->begin(); itr != fPayloadInputs->end(); itr++)
{
if (fLogInputRate.at(i) > 0)
{
filteredInputs.push_back((*itr));
++numFilteredInputs;
}
++i;
}
i = 0;
for (vector<FairMQSocket*>::iterator itr = fPayloadOutputs->begin(); itr != fPayloadOutputs->end(); itr++)
{
if (fLogOutputRate.at(i) > 0)
{
filteredOutputs.push_back((*itr));
++numFilteredOutputs;
}
++i;
}
vector<unsigned long> bytesIn(numFilteredInputs);
vector<unsigned long> msgIn(numFilteredInputs);
vector<unsigned long> bytesOut(numFilteredOutputs);
vector<unsigned long> msgOut(numFilteredOutputs);
vector<unsigned long> bytesInNew(numFilteredInputs);
vector<unsigned long> msgInNew(numFilteredInputs);
vector<unsigned long> bytesOutNew(numFilteredOutputs);
vector<unsigned long> msgOutNew(numFilteredOutputs);
vector<double> mbPerSecIn(numFilteredInputs);
vector<double> msgPerSecIn(numFilteredInputs);
vector<double> mbPerSecOut(numFilteredOutputs);
vector<double> msgPerSecOut(numFilteredOutputs);
i = 0;
for (vector<FairMQSocket*>::iterator itr = filteredInputs.begin(); itr != filteredInputs.end(); itr++)
{ {
bytesIn.at(i) = (*itr)->GetBytesRx(); bytesIn.at(i) = (*itr)->GetBytesRx();
msgIn.at(i) = (*itr)->GetMessagesRx(); msgIn.at(i) = (*itr)->GetMessagesRx();
@ -281,7 +322,7 @@ void FairMQDevice::LogSocketRates()
} }
i = 0; i = 0;
for (vector<FairMQSocket*>::iterator itr = fPayloadOutputs->begin(); itr != fPayloadOutputs->end(); itr++) for (vector<FairMQSocket*>::iterator itr = filteredOutputs.begin(); itr != filteredOutputs.end(); itr++)
{ {
bytesOut.at(i) = (*itr)->GetBytesTx(); bytesOut.at(i) = (*itr)->GetBytesTx();
msgOut.at(i) = (*itr)->GetMessagesTx(); msgOut.at(i) = (*itr)->GetMessagesTx();
@ -300,7 +341,7 @@ void FairMQDevice::LogSocketRates()
i = 0; i = 0;
for (vector<FairMQSocket*>::iterator itr = fPayloadInputs->begin(); itr != fPayloadInputs->end(); itr++) for (vector<FairMQSocket*>::iterator itr = filteredInputs.begin(); itr != filteredInputs.end(); itr++)
{ {
bytesInNew.at(i) = (*itr)->GetBytesRx(); bytesInNew.at(i) = (*itr)->GetBytesRx();
mbPerSecIn.at(i) = ((double)(bytesInNew.at(i) - bytesIn.at(i)) / (1024. * 1024.)) / (double)msSinceLastLog * 1000.; mbPerSecIn.at(i) = ((double)(bytesInNew.at(i) - bytesIn.at(i)) / (1024. * 1024.)) / (double)msSinceLastLog * 1000.;
@ -316,7 +357,7 @@ void FairMQDevice::LogSocketRates()
i = 0; i = 0;
for (vector<FairMQSocket*>::iterator itr = fPayloadOutputs->begin(); itr != fPayloadOutputs->end(); itr++) for (vector<FairMQSocket*>::iterator itr = filteredOutputs.begin(); itr != filteredOutputs.end(); itr++)
{ {
bytesOutNew.at(i) = (*itr)->GetBytesTx(); bytesOutNew.at(i) = (*itr)->GetBytesTx();
mbPerSecOut.at(i) = ((double)(bytesOutNew.at(i) - bytesOut.at(i)) / (1024. * 1024.)) / (double)msSinceLastLog * 1000.; mbPerSecOut.at(i) = ((double)(bytesOutNew.at(i) - bytesOut.at(i)) / (1024. * 1024.)) / (double)msSinceLastLog * 1000.;

View File

@ -49,6 +49,8 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable
OutputSndBufSize, OutputSndBufSize,
OutputRcvBufSize, OutputRcvBufSize,
LogIntervalInMs, LogIntervalInMs,
LogInputRate,
LogOutputRate,
Last Last
}; };
@ -78,12 +80,14 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable
vector<string> fInputSocketType; vector<string> fInputSocketType;
vector<int> fInputSndBufSize; vector<int> fInputSndBufSize;
vector<int> fInputRcvBufSize; vector<int> fInputRcvBufSize;
vector<int> fLogInputRate;
vector<string> fOutputAddress; vector<string> fOutputAddress;
vector<string> fOutputMethod; vector<string> fOutputMethod;
vector<string> fOutputSocketType; vector<string> fOutputSocketType;
vector<int> fOutputSndBufSize; vector<int> fOutputSndBufSize;
vector<int> fOutputRcvBufSize; vector<int> fOutputRcvBufSize;
vector<int> fLogOutputRate;
vector<FairMQSocket*>* fPayloadInputs; vector<FairMQSocket*>* fPayloadInputs;
vector<FairMQSocket*>* fPayloadOutputs; vector<FairMQSocket*>* fPayloadOutputs;