Add example/test for built-in devices

This commit is contained in:
Alexey Rybalchenko
2018-10-18 11:10:06 +02:00
committed by Dennis Klein
parent ffab4ac78c
commit d4a4ea14d2
23 changed files with 371 additions and 49 deletions

View File

@@ -18,7 +18,8 @@
using namespace std;
FairMQBenchmarkSampler::FairMQBenchmarkSampler()
: fSameMessage(true)
: fMultipart(false)
, fNumParts(1)
, fMsgSize(10000)
, fMsgRate(0)
, fNumIterations(0)
@@ -33,8 +34,9 @@ FairMQBenchmarkSampler::~FairMQBenchmarkSampler()
void FairMQBenchmarkSampler::InitTask()
{
fSameMessage = fConfig->GetValue<bool>("same-msg");
fMsgSize = fConfig->GetValue<int>("msg-size");
fMultipart = fConfig->GetValue<bool>("multipart");
fNumParts = fConfig->GetValue<size_t>("num-parts");
fMsgSize = fConfig->GetValue<size_t>("msg-size");
fMsgRate = fConfig->GetValue<float>("msg-rate");
fMaxIterations = fConfig->GetValue<uint64_t>("max-iterations");
fOutChannelName = fConfig->GetValue<string>("out-channel");
@@ -54,12 +56,16 @@ void FairMQBenchmarkSampler::Run()
while (CheckCurrentState(RUNNING))
{
if (fSameMessage)
if (fMultipart)
{
FairMQMessagePtr msg(dataOutChannel.NewMessage());
msg->Copy(*baseMsg);
FairMQParts parts;
if (dataOutChannel.Send(msg) >= 0)
for (size_t i = 0; i < fNumParts; ++i)
{
parts.AddPart(dataOutChannel.NewMessage(fMsgSize));
}
if (dataOutChannel.Send(parts) >= 0)
{
if (fMaxIterations > 0)
{

View File

@@ -26,8 +26,9 @@ class FairMQBenchmarkSampler : public FairMQDevice
virtual ~FairMQBenchmarkSampler();
protected:
bool fSameMessage;
int fMsgSize;
bool fMultipart;
size_t fNumParts;
size_t fMsgSize;
std::atomic<int> fMsgCounter;
float fMsgRate;
uint64_t fNumIterations;

View File

@@ -20,7 +20,7 @@
using namespace std;
FairMQMerger::FairMQMerger()
: fMultipart(1)
: fMultipart(true)
, fInChannelName("data-in")
, fOutChannelName("data-out")
{
@@ -30,6 +30,8 @@ void FairMQMerger::RegisterChannelEndpoints()
{
RegisterChannelEndpoint(fInChannelName, 1, 10000);
RegisterChannelEndpoint(fOutChannelName, 1, 1);
PrintRegisteredChannels();
}
FairMQMerger::~FairMQMerger()
@@ -38,7 +40,7 @@ FairMQMerger::~FairMQMerger()
void FairMQMerger::InitTask()
{
fMultipart = fConfig->GetValue<int>("multipart");
fMultipart = fConfig->GetValue<bool>("multipart");
fInChannelName = fConfig->GetValue<string>("in-channel");
fOutChannelName = fConfig->GetValue<string>("out-channel");
}

View File

@@ -26,7 +26,7 @@ class FairMQMerger : public FairMQDevice
virtual ~FairMQMerger();
protected:
int fMultipart;
bool fMultipart;
std::string fInChannelName;
std::string fOutChannelName;

View File

@@ -14,7 +14,7 @@
using namespace std;
FairMQMultiplier::FairMQMultiplier()
: fMultipart(1)
: fMultipart(true)
, fNumOutputs(0)
, fInChannelName()
, fOutChannelNames()
@@ -27,7 +27,7 @@ FairMQMultiplier::~FairMQMultiplier()
void FairMQMultiplier::InitTask()
{
fMultipart = fConfig->GetValue<int>("multipart");
fMultipart = fConfig->GetValue<bool>("multipart");
fInChannelName = fConfig->GetValue<string>("in-channel");
fOutChannelNames = fConfig->GetValue<vector<string>>("out-channel");
fNumOutputs = fChannels.at(fOutChannelNames.at(0)).size();

View File

@@ -20,7 +20,7 @@ class FairMQMultiplier : public FairMQDevice
virtual ~FairMQMultiplier();
protected:
int fMultipart;
bool fMultipart;
int fNumOutputs;
std::string fInChannelName;
std::vector<std::string> fOutChannelNames;

View File

@@ -20,7 +20,7 @@
using namespace std;
FairMQProxy::FairMQProxy()
: fMultipart(1)
: fMultipart(true)
, fInChannelName()
, fOutChannelName()
{
@@ -32,7 +32,7 @@ FairMQProxy::~FairMQProxy()
void FairMQProxy::InitTask()
{
fMultipart = fConfig->GetValue<int>("multipart");
fMultipart = fConfig->GetValue<bool>("multipart");
fInChannelName = fConfig->GetValue<string>("in-channel");
fOutChannelName = fConfig->GetValue<string>("out-channel");
}

View File

@@ -26,7 +26,7 @@ class FairMQProxy : public FairMQDevice
virtual ~FairMQProxy();
protected:
int fMultipart;
bool fMultipart;
std::string fInChannelName;
std::string fOutChannelName;

View File

@@ -27,7 +27,8 @@ class FairMQSink : public FairMQDevice//, public OutputPolicy
{
public:
FairMQSink()
: fMaxIterations(0)
: fMultipart(false)
, fMaxIterations(0)
, fNumIterations(0)
, fInChannelName()
{}
@@ -36,12 +37,14 @@ class FairMQSink : public FairMQDevice//, public OutputPolicy
{}
protected:
bool fMultipart;
uint64_t fMaxIterations;
uint64_t fNumIterations;
std::string fInChannelName;
virtual void InitTask()
{
fMultipart = fConfig->GetValue<bool>("multipart");
fMaxIterations = fConfig->GetValue<uint64_t>("max-iterations");
fInChannelName = fConfig->GetValue<std::string>("in-channel");
}
@@ -56,18 +59,39 @@ class FairMQSink : public FairMQDevice//, public OutputPolicy
while (CheckCurrentState(RUNNING))
{
FairMQMessagePtr msg(dataInChannel.NewMessage());
if (dataInChannel.Receive(msg) >= 0)
if (fMultipart)
{
if (fMaxIterations > 0)
FairMQParts parts;
if (dataInChannel.Receive(parts) >= 0)
{
if (fNumIterations >= fMaxIterations)
if (fMaxIterations > 0)
{
break;
if (fNumIterations >= fMaxIterations)
{
LOG(info) << "Configured maximum number of iterations reached.";
break;
}
}
fNumIterations++;
}
}
else
{
FairMQMessagePtr msg(dataInChannel.NewMessage());
if (dataInChannel.Receive(msg) >= 0)
{
if (fMaxIterations > 0)
{
if (fNumIterations >= fMaxIterations)
{
LOG(info) << "Configured maximum number of iterations reached.";
break;
}
}
fNumIterations++;
}
fNumIterations++;
}
}

View File

@@ -20,7 +20,7 @@
using namespace std;
FairMQSplitter::FairMQSplitter()
: fMultipart(1)
: fMultipart(true)
, fNumOutputs(0)
, fDirection(0)
, fInChannelName()
@@ -34,7 +34,7 @@ FairMQSplitter::~FairMQSplitter()
void FairMQSplitter::InitTask()
{
fMultipart = fConfig->GetValue<int>("multipart");
fMultipart = fConfig->GetValue<bool>("multipart");
fInChannelName = fConfig->GetValue<string>("in-channel");
fOutChannelName = fConfig->GetValue<string>("out-channel");
fNumOutputs = fChannels.at(fOutChannelName).size();

View File

@@ -26,7 +26,7 @@ class FairMQSplitter : public FairMQDevice
virtual ~FairMQSplitter();
protected:
int fMultipart;
bool fMultipart;
int fNumOutputs;
int fDirection;
std::string fInChannelName;