Minor refactoring

This commit is contained in:
Alexey Rybalchenko 2020-05-15 21:37:59 +02:00
parent d22023bcb5
commit f4a54ff550
4 changed files with 95 additions and 137 deletions

View File

@ -219,7 +219,6 @@ if(BUILD_FAIRMQ)
FairMQPoller.cxx FairMQPoller.cxx
FairMQSocket.cxx FairMQSocket.cxx
FairMQTransportFactory.cxx FairMQTransportFactory.cxx
devices/FairMQBenchmarkSampler.cxx
devices/FairMQMerger.cxx devices/FairMQMerger.cxx
devices/FairMQMultiplier.cxx devices/FairMQMultiplier.cxx
devices/FairMQProxy.cxx devices/FairMQProxy.cxx

View File

@ -1,100 +0,0 @@
/********************************************************************************
* Copyright (C) 2014-2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "FairMQBenchmarkSampler.h"
#include "tools/RateLimit.h"
#include "../FairMQLogger.h"
#include <chrono>
using namespace std;
FairMQBenchmarkSampler::FairMQBenchmarkSampler()
: fMultipart(false)
, fNumParts(1)
, fMsgSize(10000)
, fMsgRate(0)
, fNumIterations(0)
, fMaxIterations(0)
, fOutChannelName()
{
}
void FairMQBenchmarkSampler::InitTask()
{
fMultipart = fConfig->GetProperty<bool>("multipart");
fNumParts = fConfig->GetProperty<size_t>("num-parts");
fMsgSize = fConfig->GetProperty<size_t>("msg-size");
fMsgRate = fConfig->GetProperty<float>("msg-rate");
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
fOutChannelName = fConfig->GetProperty<string>("out-channel");
}
void FairMQBenchmarkSampler::Run()
{
// store the channel reference to avoid traversing the map on every loop iteration
FairMQChannel& dataOutChannel = fChannels.at(fOutChannelName).at(0);
FairMQMessagePtr baseMsg(dataOutChannel.NewMessage(fMsgSize));
LOG(info) << "Starting the benchmark with message size of " << fMsgSize << " and " << fMaxIterations << " iterations.";
auto tStart = chrono::high_resolution_clock::now();
fair::mq::tools::RateLimiter rateLimiter(fMsgRate);
while (!NewStatePending())
{
if (fMultipart)
{
FairMQParts parts;
for (size_t i = 0; i < fNumParts; ++i)
{
parts.AddPart(dataOutChannel.NewMessage(fMsgSize));
}
if (dataOutChannel.Send(parts) >= 0)
{
if (fMaxIterations > 0)
{
if (fNumIterations >= fMaxIterations)
{
break;
}
}
++fNumIterations;
}
}
else
{
FairMQMessagePtr msg(dataOutChannel.NewMessage(fMsgSize));
if (dataOutChannel.Send(msg) >= 0)
{
if (fMaxIterations > 0)
{
if (fNumIterations >= fMaxIterations)
{
break;
}
}
++fNumIterations;
}
}
if (fMsgRate > 0)
{
rateLimiter.maybe_sleep();
}
}
auto tEnd = chrono::high_resolution_clock::now();
LOG(info) << "Done " << fNumIterations << " iterations in " << chrono::duration<double, milli>(tEnd - tStart).count() << "ms.";
}

View File

@ -9,13 +9,15 @@
#ifndef FAIRMQBENCHMARKSAMPLER_H_ #ifndef FAIRMQBENCHMARKSAMPLER_H_
#define FAIRMQBENCHMARKSAMPLER_H_ #define FAIRMQBENCHMARKSAMPLER_H_
#include <string> #include "../FairMQLogger.h"
#include "FairMQDevice.h"
#include "tools/RateLimit.h"
#include <atomic> #include <atomic>
#include <chrono>
#include <cstddef> // size_t #include <cstddef> // size_t
#include <cstdint> // uint64_t #include <cstdint> // uint64_t
#include <string>
#include "FairMQDevice.h"
/** /**
* Sampler to generate traffic for benchmarking. * Sampler to generate traffic for benchmarking.
@ -24,7 +26,77 @@
class FairMQBenchmarkSampler : public FairMQDevice class FairMQBenchmarkSampler : public FairMQDevice
{ {
public: public:
FairMQBenchmarkSampler(); FairMQBenchmarkSampler()
: fMultipart(false)
, fNumParts(1)
, fMsgSize(10000)
, fMsgRate(0)
, fNumIterations(0)
, fMaxIterations(0)
, fOutChannelName()
{}
void InitTask() override
{
fMultipart = fConfig->GetProperty<bool>("multipart");
fNumParts = fConfig->GetProperty<size_t>("num-parts");
fMsgSize = fConfig->GetProperty<size_t>("msg-size");
fMsgRate = fConfig->GetProperty<float>("msg-rate");
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
fOutChannelName = fConfig->GetProperty<std::string>("out-channel");
}
void Run() override
{
// store the channel reference to avoid traversing the map on every loop iteration
FairMQChannel& dataOutChannel = fChannels.at(fOutChannelName).at(0);
FairMQMessagePtr baseMsg(dataOutChannel.NewMessage(fMsgSize));
LOG(info) << "Starting the benchmark with message size of " << fMsgSize << " and " << fMaxIterations << " iterations.";
auto tStart = std::chrono::high_resolution_clock::now();
fair::mq::tools::RateLimiter rateLimiter(fMsgRate);
while (!NewStatePending()) {
if (fMultipart) {
FairMQParts parts;
for (size_t i = 0; i < fNumParts; ++i) {
parts.AddPart(dataOutChannel.NewMessage(fMsgSize));
}
if (dataOutChannel.Send(parts) >= 0) {
if (fMaxIterations > 0) {
if (fNumIterations >= fMaxIterations) {
break;
}
}
++fNumIterations;
}
} else {
FairMQMessagePtr msg(dataOutChannel.NewMessage(fMsgSize));
if (dataOutChannel.Send(msg) >= 0) {
if (fMaxIterations > 0) {
if (fNumIterations >= fMaxIterations) {
break;
}
}
++fNumIterations;
}
}
if (fMsgRate > 0) {
rateLimiter.maybe_sleep();
}
}
auto tEnd = std::chrono::high_resolution_clock::now();
LOG(info) << "Done " << fNumIterations << " iterations in " << std::chrono::duration<double, std::milli>(tEnd - tStart).count() << "ms.";
}
virtual ~FairMQBenchmarkSampler() {} virtual ~FairMQBenchmarkSampler() {}
protected: protected:
@ -36,9 +108,6 @@ class FairMQBenchmarkSampler : public FairMQDevice
uint64_t fNumIterations; uint64_t fNumIterations;
uint64_t fMaxIterations; uint64_t fMaxIterations;
std::string fOutChannelName; std::string fOutChannelName;
virtual void InitTask() override;
virtual void Run() override;
}; };
#endif /* FAIRMQBENCHMARKSAMPLER_H_ */ #endif /* FAIRMQBENCHMARKSAMPLER_H_ */

View File

@ -15,14 +15,14 @@
#ifndef FAIRMQSINK_H_ #ifndef FAIRMQSINK_H_
#define FAIRMQSINK_H_ #define FAIRMQSINK_H_
#include <string>
#include <chrono>
#include "../FairMQDevice.h" #include "../FairMQDevice.h"
#include "../FairMQLogger.h" #include "../FairMQLogger.h"
#include <chrono>
#include <string>
// template<typename OutputPolicy> // template<typename OutputPolicy>
class FairMQSink : public FairMQDevice//, public OutputPolicy class FairMQSink : public FairMQDevice //, public OutputPolicy
{ {
public: public:
FairMQSink() FairMQSink()
@ -32,8 +32,7 @@ class FairMQSink : public FairMQDevice//, public OutputPolicy
, fInChannelName() , fInChannelName()
{} {}
virtual ~FairMQSink() virtual ~FairMQSink() {}
{}
protected: protected:
bool fMultipart; bool fMultipart;
@ -56,35 +55,25 @@ class FairMQSink : public FairMQDevice//, public OutputPolicy
LOG(info) << "Starting the benchmark and expecting to receive " << fMaxIterations << " messages."; LOG(info) << "Starting the benchmark and expecting to receive " << fMaxIterations << " messages.";
auto tStart = std::chrono::high_resolution_clock::now(); auto tStart = std::chrono::high_resolution_clock::now();
while (!NewStatePending()) while (!NewStatePending()) {
{ if (fMultipart) {
if (fMultipart)
{
FairMQParts parts; FairMQParts parts;
if (dataInChannel.Receive(parts) >= 0) if (dataInChannel.Receive(parts) >= 0) {
{ if (fMaxIterations > 0) {
if (fMaxIterations > 0) if (fNumIterations >= fMaxIterations) {
{
if (fNumIterations >= fMaxIterations)
{
LOG(info) << "Configured maximum number of iterations reached."; LOG(info) << "Configured maximum number of iterations reached.";
break; break;
} }
} }
fNumIterations++; fNumIterations++;
} }
} } else {
else
{
FairMQMessagePtr msg(dataInChannel.NewMessage()); FairMQMessagePtr msg(dataInChannel.NewMessage());
if (dataInChannel.Receive(msg) >= 0) if (dataInChannel.Receive(msg) >= 0) {
{ if (fMaxIterations > 0) {
if (fMaxIterations > 0) if (fNumIterations >= fMaxIterations) {
{
if (fNumIterations >= fMaxIterations)
{
LOG(info) << "Configured maximum number of iterations reached."; LOG(info) << "Configured maximum number of iterations reached.";
break; break;
} }
@ -96,7 +85,8 @@ class FairMQSink : public FairMQDevice//, public OutputPolicy
auto tEnd = std::chrono::high_resolution_clock::now(); auto tEnd = std::chrono::high_resolution_clock::now();
LOG(info) << "Leaving RUNNING state. Received " << fNumIterations << " messages in " << std::chrono::duration<double, std::milli>(tEnd - tStart).count() << "ms."; LOG(info) << "Leaving RUNNING state. Received " << fNumIterations << " messages in "
<< std::chrono::duration<double, std::milli>(tEnd - tStart).count() << "ms.";
} }
}; };