mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-15 17:41:45 +00:00
Compare commits
4 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
bbc1dd4600 | ||
|
8327810942 | ||
|
c37742e3b4 | ||
|
93dff3c5a7 |
@@ -1,5 +1,5 @@
|
|||||||
/********************************************************************************
|
/********************************************************************************
|
||||||
* Copyright (C) 2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
* Copyright (C) 2017-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||||
* *
|
* *
|
||||||
* This software is distributed under the terms of the *
|
* This software is distributed under the terms of the *
|
||||||
* GNU Lesser General Public License (LGPL) version 3, *
|
* GNU Lesser General Public License (LGPL) version 3, *
|
||||||
@@ -84,6 +84,7 @@ Properties SuboptParser(const vector<string>& channelConfig, const string& devic
|
|||||||
char* subopts = &argString[0];
|
char* subopts = &argString[0];
|
||||||
char* value = nullptr;
|
char* value = nullptr;
|
||||||
while (subopts && *subopts != 0 && *subopts != ' ') {
|
while (subopts && *subopts != 0 && *subopts != ' ') {
|
||||||
|
char* cur = subopts;
|
||||||
int subopt = getsubopt(&subopts, (char**)channelOptionKeys, &value);
|
int subopt = getsubopt(&subopts, (char**)channelOptionKeys, &value);
|
||||||
if (subopt == NAME) {
|
if (subopt == NAME) {
|
||||||
channelName = value;
|
channelName = value;
|
||||||
@@ -94,6 +95,8 @@ Properties SuboptParser(const vector<string>& channelConfig, const string& devic
|
|||||||
socketsArray.push_back(make_pair("", socketProperties));
|
socketsArray.push_back(make_pair("", socketProperties));
|
||||||
} else if (subopt >= 0 && value != nullptr) {
|
} else if (subopt >= 0 && value != nullptr) {
|
||||||
channelProperties.put(channelOptionKeys[subopt], value);
|
channelProperties.put(channelOptionKeys[subopt], value);
|
||||||
|
} else if (subopt == -1) {
|
||||||
|
LOG(warn) << "Ignoring unknown argument in --channel-config: " << cur;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -21,7 +21,7 @@
|
|||||||
#define FAIRMQ_GIT_DATE "@PROJECT_GIT_DATE@"
|
#define FAIRMQ_GIT_DATE "@PROJECT_GIT_DATE@"
|
||||||
#define FAIRMQ_REPO_URL "https://github.com/FairRootGroup/FairMQ"
|
#define FAIRMQ_REPO_URL "https://github.com/FairRootGroup/FairMQ"
|
||||||
#define FAIRMQ_LICENSE "LGPL-3.0"
|
#define FAIRMQ_LICENSE "LGPL-3.0"
|
||||||
#define FAIRMQ_COPYRIGHT "2012-2020 GSI"
|
#define FAIRMQ_COPYRIGHT "2012-2021 GSI"
|
||||||
#define FAIRMQ_BUILD_TYPE "@CMAKE_BUILD_TYPE@"
|
#define FAIRMQ_BUILD_TYPE "@CMAKE_BUILD_TYPE@"
|
||||||
|
|
||||||
#endif // FAIR_MQ_VERSION_H
|
#endif // FAIR_MQ_VERSION_H
|
||||||
|
@@ -17,19 +17,24 @@
|
|||||||
|
|
||||||
#include "../FairMQDevice.h"
|
#include "../FairMQDevice.h"
|
||||||
#include "../FairMQLogger.h"
|
#include "../FairMQLogger.h"
|
||||||
|
#include <fairmq/tools/Strings.h>
|
||||||
|
|
||||||
#include <chrono>
|
#include <chrono>
|
||||||
#include <string>
|
#include <string>
|
||||||
|
#include <fstream>
|
||||||
|
#include <stdexcept>
|
||||||
|
|
||||||
// template<typename OutputPolicy>
|
class FairMQSink : public FairMQDevice
|
||||||
class FairMQSink : public FairMQDevice //, public OutputPolicy
|
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
FairMQSink()
|
FairMQSink()
|
||||||
: fMultipart(false)
|
: fMultipart(false)
|
||||||
, fMaxIterations(0)
|
, fMaxIterations(0)
|
||||||
, fNumIterations(0)
|
, fNumIterations(0)
|
||||||
|
, fMaxFileSize(0)
|
||||||
|
, fBytesWritten(0)
|
||||||
, fInChannelName()
|
, fInChannelName()
|
||||||
|
, fOutFilename()
|
||||||
{}
|
{}
|
||||||
|
|
||||||
~FairMQSink() {}
|
~FairMQSink() {}
|
||||||
@@ -38,13 +43,21 @@ class FairMQSink : public FairMQDevice //, public OutputPolicy
|
|||||||
bool fMultipart;
|
bool fMultipart;
|
||||||
uint64_t fMaxIterations;
|
uint64_t fMaxIterations;
|
||||||
uint64_t fNumIterations;
|
uint64_t fNumIterations;
|
||||||
|
uint64_t fMaxFileSize;
|
||||||
|
uint64_t fBytesWritten;
|
||||||
std::string fInChannelName;
|
std::string fInChannelName;
|
||||||
|
std::string fOutFilename;
|
||||||
|
std::fstream fOutputFile;
|
||||||
|
|
||||||
void InitTask() override
|
void InitTask() override
|
||||||
{
|
{
|
||||||
fMultipart = fConfig->GetProperty<bool>("multipart");
|
fMultipart = fConfig->GetProperty<bool>("multipart");
|
||||||
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
|
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
|
||||||
|
fMaxFileSize = fConfig->GetProperty<uint64_t>("max-file-size");
|
||||||
fInChannelName = fConfig->GetProperty<std::string>("in-channel");
|
fInChannelName = fConfig->GetProperty<std::string>("in-channel");
|
||||||
|
fOutFilename = fConfig->GetProperty<std::string>("out-filename");
|
||||||
|
|
||||||
|
fBytesWritten = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void Run() override
|
void Run() override
|
||||||
@@ -52,41 +65,83 @@ class FairMQSink : public FairMQDevice //, public OutputPolicy
|
|||||||
// store the channel reference to avoid traversing the map on every loop iteration
|
// store the channel reference to avoid traversing the map on every loop iteration
|
||||||
FairMQChannel& dataInChannel = fChannels.at(fInChannelName).at(0);
|
FairMQChannel& dataInChannel = fChannels.at(fInChannelName).at(0);
|
||||||
|
|
||||||
LOG(info) << "Starting the benchmark and expecting to receive " << fMaxIterations << " messages.";
|
LOG(info) << "Starting sink and expecting to receive " << fMaxIterations << " messages.";
|
||||||
auto tStart = std::chrono::high_resolution_clock::now();
|
auto tStart = std::chrono::high_resolution_clock::now();
|
||||||
|
|
||||||
|
if (!fOutFilename.empty()) {
|
||||||
|
LOG(debug) << "Incoming messages will be written to file: " << fOutFilename;
|
||||||
|
if (fMaxFileSize != 0) {
|
||||||
|
LOG(debug) << "File output will stop after " << fMaxFileSize << " bytes";
|
||||||
|
} else {
|
||||||
|
LOG(debug) << "ATTENTION: --max-file-size is 0 - output file will continue to grow until sink is stopped";
|
||||||
|
}
|
||||||
|
|
||||||
|
fOutputFile.open(fOutFilename, std::ios::out | std::ios::binary);
|
||||||
|
if (!fOutputFile) {
|
||||||
|
LOG(error) << "Could not open '" << fOutFilename;
|
||||||
|
throw std::runtime_error(fair::mq::tools::ToString("Could not open '", fOutFilename));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
while (!NewStatePending()) {
|
while (!NewStatePending()) {
|
||||||
if (fMultipart) {
|
if (fMultipart) {
|
||||||
FairMQParts parts;
|
FairMQParts parts;
|
||||||
|
if (dataInChannel.Receive(parts) < 0) {
|
||||||
if (dataInChannel.Receive(parts) >= 0) {
|
continue;
|
||||||
if (fMaxIterations > 0) {
|
}
|
||||||
if (fNumIterations >= fMaxIterations) {
|
if (fOutputFile.is_open()) {
|
||||||
LOG(info) << "Configured maximum number of iterations reached.";
|
for (const auto& part : parts) {
|
||||||
break;
|
WriteToFile(static_cast<const char*>(part->GetData()), part->GetSize());
|
||||||
}
|
|
||||||
}
|
}
|
||||||
fNumIterations++;
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
FairMQMessagePtr msg(dataInChannel.NewMessage());
|
FairMQMessagePtr msg(dataInChannel.NewMessage());
|
||||||
|
if (dataInChannel.Receive(msg) < 0) {
|
||||||
if (dataInChannel.Receive(msg) >= 0) {
|
continue;
|
||||||
if (fMaxIterations > 0) {
|
}
|
||||||
if (fNumIterations >= fMaxIterations) {
|
if (fOutputFile.is_open()) {
|
||||||
LOG(info) << "Configured maximum number of iterations reached.";
|
WriteToFile(static_cast<const char*>(msg->GetData()), msg->GetSize());
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
fNumIterations++;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (fMaxFileSize > 0 && fBytesWritten >= fMaxFileSize) {
|
||||||
|
LOG(info) << "Written " << fBytesWritten << " bytes, stopping...";
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
if (fMaxIterations > 0) {
|
||||||
|
if (fNumIterations >= fMaxIterations) {
|
||||||
|
LOG(info) << "Configured maximum number of iterations reached.";
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
fNumIterations++;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (fOutputFile.is_open()) {
|
||||||
|
fOutputFile.flush();
|
||||||
|
fOutputFile.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
auto tEnd = std::chrono::high_resolution_clock::now();
|
auto tEnd = std::chrono::high_resolution_clock::now();
|
||||||
|
auto ms = std::chrono::duration<double, std::milli>(tEnd - tStart).count();
|
||||||
|
LOG(info) << "Received " << fNumIterations << " messages in " << ms << "ms.";
|
||||||
|
if (!fOutFilename.empty()) {
|
||||||
|
auto sec = std::chrono::duration<double>(tEnd - tStart).count();
|
||||||
|
LOG(info) << "Closed '" << fOutFilename << "' after writing " << fBytesWritten << " bytes."
|
||||||
|
<< "(" << (fBytesWritten / (1000. * 1000.)) / sec << " MB/s)";
|
||||||
|
}
|
||||||
|
|
||||||
LOG(info) << "Leaving RUNNING state. Received " << fNumIterations << " messages in "
|
LOG(info) << "Leaving RUNNING state.";
|
||||||
<< std::chrono::duration<double, std::milli>(tEnd - tStart).count() << "ms.";
|
}
|
||||||
|
|
||||||
|
void WriteToFile(const char* ptr, size_t size)
|
||||||
|
{
|
||||||
|
fOutputFile.write(ptr, size);
|
||||||
|
if (fOutputFile.bad()) {
|
||||||
|
LOG(error) << "failed writing to file";
|
||||||
|
throw std::runtime_error("failed writing to file");
|
||||||
|
}
|
||||||
|
fBytesWritten += size;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@@ -15,6 +15,8 @@ void addCustomOptions(bpo::options_description& options)
|
|||||||
{
|
{
|
||||||
options.add_options()
|
options.add_options()
|
||||||
("in-channel", bpo::value<std::string>()->default_value("data"), "Name of the input channel")
|
("in-channel", bpo::value<std::string>()->default_value("data"), "Name of the input channel")
|
||||||
|
("out-filename", bpo::value<std::string>()->default_value(""), "Write incoming message buffers to the specified file")
|
||||||
|
("max-file-size", bpo::value<uint64_t>()->default_value(2000000000), "Maximum file size for the file output (0 - unlimited)")
|
||||||
("max-iterations", bpo::value<uint64_t>()->default_value(0), "Number of run iterations (0 - infinite)")
|
("max-iterations", bpo::value<uint64_t>()->default_value(0), "Number of run iterations (0 - infinite)")
|
||||||
("multipart", bpo::value<bool>()->default_value(false), "Handle multipart payloads");
|
("multipart", bpo::value<bool>()->default_value(false), "Handle multipart payloads");
|
||||||
}
|
}
|
||||||
|
@@ -136,8 +136,13 @@ void Monitor::Run()
|
|||||||
|
|
||||||
if (fInteractive) {
|
if (fInteractive) {
|
||||||
Interactive();
|
Interactive();
|
||||||
} else {
|
} else if (fViewOnly) {
|
||||||
CheckSegment();
|
CheckSegment();
|
||||||
|
} else {
|
||||||
|
while (!fTerminating) {
|
||||||
|
this_thread::sleep_for(chrono::milliseconds(fIntervalInMS));
|
||||||
|
CheckSegment();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!fViewOnly) {
|
if (!fViewOnly) {
|
||||||
|
Reference in New Issue
Block a user