mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-13 08:41:16 +00:00
Refactor some device code for better readability
This commit is contained in:
parent
26fe5e2bd8
commit
8be2fd33f4
|
@ -197,6 +197,7 @@ void FairMQDevice::WaitForEndOfState(Transition transition)
|
|||
|
||||
void FairMQDevice::InitWrapper()
|
||||
{
|
||||
// run initialization once CompleteInit transition is requested
|
||||
fStateMachine.WaitForPendingState();
|
||||
|
||||
fId = fConfig->GetProperty<string>("id", DefaultId);
|
||||
|
@ -228,33 +229,33 @@ void FairMQDevice::InitWrapper()
|
|||
string networkInterface = fConfig->GetProperty<string>("network-interface", DefaultNetworkInterface);
|
||||
|
||||
// Fill the uninitialized channel containers
|
||||
for (auto& mi : fChannels) {
|
||||
for (auto& channel : fChannels) {
|
||||
int subChannelIndex = 0;
|
||||
for (auto& vi : mi.second) {
|
||||
for (auto& subChannel : channel.second) {
|
||||
// set channel transport
|
||||
LOG(debug) << "Initializing transport for channel " << vi.fName << ": " << fair::mq::TransportNames.at(vi.fTransportType);
|
||||
vi.InitTransport(AddTransport(vi.fTransportType));
|
||||
LOG(debug) << "Initializing transport for channel " << subChannel.fName << ": " << fair::mq::TransportNames.at(subChannel.fTransportType);
|
||||
subChannel.InitTransport(AddTransport(subChannel.fTransportType));
|
||||
|
||||
if (vi.fMethod == "bind") {
|
||||
if (subChannel.fMethod == "bind") {
|
||||
// if binding address is not specified, try getting it from the configured network interface
|
||||
if (vi.fAddress == "unspecified" || vi.fAddress == "") {
|
||||
if (subChannel.fAddress == "unspecified" || subChannel.fAddress == "") {
|
||||
// if the configured network interface is default, get its name from the default route
|
||||
if (networkInterface == "default") {
|
||||
networkInterface = tools::getDefaultRouteNetworkInterface();
|
||||
}
|
||||
vi.fAddress = "tcp://" + tools::getInterfaceIP(networkInterface) + ":1";
|
||||
subChannel.fAddress = "tcp://" + tools::getInterfaceIP(networkInterface) + ":1";
|
||||
}
|
||||
// fill the uninitialized list
|
||||
fUninitializedBindingChannels.push_back(&vi);
|
||||
} else if (vi.fMethod == "connect") {
|
||||
fUninitializedBindingChannels.push_back(&subChannel);
|
||||
} else if (subChannel.fMethod == "connect") {
|
||||
// fill the uninitialized list
|
||||
fUninitializedConnectingChannels.push_back(&vi);
|
||||
} else if (vi.fAddress.find_first_of("@+>") != string::npos) {
|
||||
fUninitializedConnectingChannels.push_back(&subChannel);
|
||||
} else if (subChannel.fAddress.find_first_of("@+>") != string::npos) {
|
||||
// fill the uninitialized list
|
||||
fUninitializedConnectingChannels.push_back(&vi);
|
||||
fUninitializedConnectingChannels.push_back(&subChannel);
|
||||
} else {
|
||||
LOG(error) << "Cannot update configuration. Socket method (bind/connect) for channel '" << vi.fName << "' not specified.";
|
||||
throw runtime_error(tools::ToString("Cannot update configuration. Socket method (bind/connect) for channel ", vi.fName, " not specified."));
|
||||
LOG(error) << "Cannot update configuration. Socket method (bind/connect) for channel '" << subChannel.fName << "' not specified.";
|
||||
throw runtime_error(tools::ToString("Cannot update configuration. Socket method (bind/connect) for channel ", subChannel.fName, " not specified."));
|
||||
}
|
||||
|
||||
subChannelIndex++;
|
||||
|
@ -724,7 +725,7 @@ void FairMQDevice::SetConfig(ProgOptions& config)
|
|||
|
||||
void FairMQDevice::LogSocketRates()
|
||||
{
|
||||
vector<FairMQSocket*> filteredSockets;
|
||||
vector<FairMQChannel*> filteredChannels;
|
||||
vector<string> filteredChannelNames;
|
||||
vector<int> logIntervals;
|
||||
vector<int> intervalCounters;
|
||||
|
@ -732,40 +733,40 @@ void FairMQDevice::LogSocketRates()
|
|||
size_t chanNameLen = 0;
|
||||
|
||||
// iterate over the channels map
|
||||
for (const auto& mi : fChannels) {
|
||||
for (auto& channel : fChannels) {
|
||||
// iterate over the channels vector
|
||||
for (auto vi = (mi.second).begin(); vi != (mi.second).end(); ++vi) {
|
||||
if (vi->fRateLogging > 0) {
|
||||
filteredSockets.push_back(vi->fSocket.get());
|
||||
logIntervals.push_back(vi->fRateLogging);
|
||||
for (auto& subChannel : channel.second) {
|
||||
if (subChannel.fRateLogging > 0) {
|
||||
filteredChannels.push_back(&subChannel);
|
||||
logIntervals.push_back(subChannel.fRateLogging);
|
||||
intervalCounters.push_back(0);
|
||||
filteredChannelNames.push_back(tools::ToString(mi.first, "[", vi - (mi.second).begin(), "]"));
|
||||
filteredChannelNames.push_back(subChannel.GetName());
|
||||
chanNameLen = max(chanNameLen, filteredChannelNames.back().length());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
vector<unsigned long> bytesIn(filteredSockets.size());
|
||||
vector<unsigned long> msgIn(filteredSockets.size());
|
||||
vector<unsigned long> bytesOut(filteredSockets.size());
|
||||
vector<unsigned long> msgOut(filteredSockets.size());
|
||||
vector<unsigned long> bytesIn(filteredChannels.size());
|
||||
vector<unsigned long> msgIn(filteredChannels.size());
|
||||
vector<unsigned long> bytesOut(filteredChannels.size());
|
||||
vector<unsigned long> msgOut(filteredChannels.size());
|
||||
|
||||
vector<unsigned long> bytesInNew(filteredSockets.size());
|
||||
vector<unsigned long> msgInNew(filteredSockets.size());
|
||||
vector<unsigned long> bytesOutNew(filteredSockets.size());
|
||||
vector<unsigned long> msgOutNew(filteredSockets.size());
|
||||
vector<unsigned long> bytesInNew(filteredChannels.size());
|
||||
vector<unsigned long> msgInNew(filteredChannels.size());
|
||||
vector<unsigned long> bytesOutNew(filteredChannels.size());
|
||||
vector<unsigned long> msgOutNew(filteredChannels.size());
|
||||
|
||||
vector<double> mbPerSecIn(filteredSockets.size());
|
||||
vector<double> msgPerSecIn(filteredSockets.size());
|
||||
vector<double> mbPerSecOut(filteredSockets.size());
|
||||
vector<double> msgPerSecOut(filteredSockets.size());
|
||||
vector<double> mbPerSecIn(filteredChannels.size());
|
||||
vector<double> msgPerSecIn(filteredChannels.size());
|
||||
vector<double> mbPerSecOut(filteredChannels.size());
|
||||
vector<double> msgPerSecOut(filteredChannels.size());
|
||||
|
||||
int i = 0;
|
||||
for (const auto& vi : filteredSockets) {
|
||||
bytesIn.at(i) = vi->GetBytesRx();
|
||||
bytesOut.at(i) = vi->GetBytesTx();
|
||||
msgIn.at(i) = vi->GetMessagesRx();
|
||||
msgOut.at(i) = vi->GetMessagesTx();
|
||||
for (const auto& channel : filteredChannels) {
|
||||
bytesIn.at(i) = channel->GetBytesRx();
|
||||
bytesOut.at(i) = channel->GetBytesTx();
|
||||
msgIn.at(i) = channel->GetMessagesRx();
|
||||
msgOut.at(i) = channel->GetMessagesTx();
|
||||
++i;
|
||||
}
|
||||
|
||||
|
@ -782,17 +783,17 @@ void FairMQDevice::LogSocketRates()
|
|||
|
||||
i = 0;
|
||||
|
||||
for (const auto& vi : filteredSockets) {
|
||||
for (const auto& channel : filteredChannels) {
|
||||
intervalCounters.at(i)++;
|
||||
|
||||
if (intervalCounters.at(i) == logIntervals.at(i)) {
|
||||
intervalCounters.at(i) = 0;
|
||||
|
||||
if (msSinceLastLog > 0) {
|
||||
bytesInNew.at(i) = vi->GetBytesRx();
|
||||
msgInNew.at(i) = vi->GetMessagesRx();
|
||||
bytesOutNew.at(i) = vi->GetBytesTx();
|
||||
msgOutNew.at(i) = vi->GetMessagesTx();
|
||||
bytesInNew.at(i) = channel->GetBytesRx();
|
||||
msgInNew.at(i) = channel->GetMessagesRx();
|
||||
bytesOutNew.at(i) = channel->GetBytesTx();
|
||||
msgOutNew.at(i) = channel->GetMessagesTx();
|
||||
|
||||
mbPerSecIn.at(i) = (static_cast<double>(bytesInNew.at(i) - bytesIn.at(i)) / (1000. * 1000.)) / static_cast<double>(msSinceLastLog) * 1000.;
|
||||
msgPerSecIn.at(i) = static_cast<double>(msgInNew.at(i) - msgIn.at(i)) / static_cast<double>(msSinceLastLog) * 1000.;
|
||||
|
@ -822,8 +823,8 @@ void FairMQDevice::LogSocketRates()
|
|||
|
||||
void FairMQDevice::UnblockTransports()
|
||||
{
|
||||
for (auto& t : fTransports) {
|
||||
t.second->Interrupt();
|
||||
for (auto& transport : fTransports) {
|
||||
transport.second->Interrupt();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue
Block a user