Compare commits

...

4 Commits

Author SHA1 Message Date
Alexey Rybalchenko
bbc1dd4600 Add optional file output to FairMQSink 2021-03-01 15:33:45 +01:00
Dennis Klein
8327810942 Warn on unknown --channel-config args 2021-03-01 08:37:57 +01:00
Dennis Klein
c37742e3b4 Update Copyright string 2021-03-01 08:37:57 +01:00
Alexey Rybalchenko
93dff3c5a7 Fix regression in shmmonitor 2021-02-19 09:54:29 +01:00
5 changed files with 91 additions and 26 deletions

View File

@@ -1,5 +1,5 @@
/********************************************************************************
* Copyright (C) 2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* Copyright (C) 2017-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public License (LGPL) version 3, *
@@ -84,6 +84,7 @@ Properties SuboptParser(const vector<string>& channelConfig, const string& devic
char* subopts = &argString[0];
char* value = nullptr;
while (subopts && *subopts != 0 && *subopts != ' ') {
char* cur = subopts;
int subopt = getsubopt(&subopts, (char**)channelOptionKeys, &value);
if (subopt == NAME) {
channelName = value;
@@ -94,6 +95,8 @@ Properties SuboptParser(const vector<string>& channelConfig, const string& devic
socketsArray.push_back(make_pair("", socketProperties));
} else if (subopt >= 0 && value != nullptr) {
channelProperties.put(channelOptionKeys[subopt], value);
} else if (subopt == -1) {
LOG(warn) << "Ignoring unknown argument in --channel-config: " << cur;
}
}

View File

@@ -21,7 +21,7 @@
#define FAIRMQ_GIT_DATE "@PROJECT_GIT_DATE@"
#define FAIRMQ_REPO_URL "https://github.com/FairRootGroup/FairMQ"
#define FAIRMQ_LICENSE "LGPL-3.0"
#define FAIRMQ_COPYRIGHT "2012-2020 GSI"
#define FAIRMQ_COPYRIGHT "2012-2021 GSI"
#define FAIRMQ_BUILD_TYPE "@CMAKE_BUILD_TYPE@"
#endif // FAIR_MQ_VERSION_H

View File

@@ -17,19 +17,24 @@
#include "../FairMQDevice.h"
#include "../FairMQLogger.h"
#include <fairmq/tools/Strings.h>
#include <chrono>
#include <string>
#include <fstream>
#include <stdexcept>
// template<typename OutputPolicy>
class FairMQSink : public FairMQDevice //, public OutputPolicy
class FairMQSink : public FairMQDevice
{
public:
FairMQSink()
: fMultipart(false)
, fMaxIterations(0)
, fNumIterations(0)
, fMaxFileSize(0)
, fBytesWritten(0)
, fInChannelName()
, fOutFilename()
{}
~FairMQSink() {}
@@ -38,13 +43,21 @@ class FairMQSink : public FairMQDevice //, public OutputPolicy
bool fMultipart;
uint64_t fMaxIterations;
uint64_t fNumIterations;
uint64_t fMaxFileSize;
uint64_t fBytesWritten;
std::string fInChannelName;
std::string fOutFilename;
std::fstream fOutputFile;
void InitTask() override
{
fMultipart = fConfig->GetProperty<bool>("multipart");
fMultipart = fConfig->GetProperty<bool>("multipart");
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
fMaxFileSize = fConfig->GetProperty<uint64_t>("max-file-size");
fInChannelName = fConfig->GetProperty<std::string>("in-channel");
fOutFilename = fConfig->GetProperty<std::string>("out-filename");
fBytesWritten = 0;
}
void Run() override
@@ -52,41 +65,83 @@ class FairMQSink : public FairMQDevice //, public OutputPolicy
// store the channel reference to avoid traversing the map on every loop iteration
FairMQChannel& dataInChannel = fChannels.at(fInChannelName).at(0);
LOG(info) << "Starting the benchmark and expecting to receive " << fMaxIterations << " messages.";
LOG(info) << "Starting sink and expecting to receive " << fMaxIterations << " messages.";
auto tStart = std::chrono::high_resolution_clock::now();
if (!fOutFilename.empty()) {
LOG(debug) << "Incoming messages will be written to file: " << fOutFilename;
if (fMaxFileSize != 0) {
LOG(debug) << "File output will stop after " << fMaxFileSize << " bytes";
} else {
LOG(debug) << "ATTENTION: --max-file-size is 0 - output file will continue to grow until sink is stopped";
}
fOutputFile.open(fOutFilename, std::ios::out | std::ios::binary);
if (!fOutputFile) {
LOG(error) << "Could not open '" << fOutFilename;
throw std::runtime_error(fair::mq::tools::ToString("Could not open '", fOutFilename));
}
}
while (!NewStatePending()) {
if (fMultipart) {
FairMQParts parts;
if (dataInChannel.Receive(parts) >= 0) {
if (fMaxIterations > 0) {
if (fNumIterations >= fMaxIterations) {
LOG(info) << "Configured maximum number of iterations reached.";
break;
}
if (dataInChannel.Receive(parts) < 0) {
continue;
}
if (fOutputFile.is_open()) {
for (const auto& part : parts) {
WriteToFile(static_cast<const char*>(part->GetData()), part->GetSize());
}
fNumIterations++;
}
} else {
FairMQMessagePtr msg(dataInChannel.NewMessage());
if (dataInChannel.Receive(msg) >= 0) {
if (fMaxIterations > 0) {
if (fNumIterations >= fMaxIterations) {
LOG(info) << "Configured maximum number of iterations reached.";
break;
}
}
fNumIterations++;
if (dataInChannel.Receive(msg) < 0) {
continue;
}
if (fOutputFile.is_open()) {
WriteToFile(static_cast<const char*>(msg->GetData()), msg->GetSize());
}
}
if (fMaxFileSize > 0 && fBytesWritten >= fMaxFileSize) {
LOG(info) << "Written " << fBytesWritten << " bytes, stopping...";
break;
}
if (fMaxIterations > 0) {
if (fNumIterations >= fMaxIterations) {
LOG(info) << "Configured maximum number of iterations reached.";
break;
}
}
fNumIterations++;
}
if (fOutputFile.is_open()) {
fOutputFile.flush();
fOutputFile.close();
}
auto tEnd = std::chrono::high_resolution_clock::now();
auto ms = std::chrono::duration<double, std::milli>(tEnd - tStart).count();
LOG(info) << "Received " << fNumIterations << " messages in " << ms << "ms.";
if (!fOutFilename.empty()) {
auto sec = std::chrono::duration<double>(tEnd - tStart).count();
LOG(info) << "Closed '" << fOutFilename << "' after writing " << fBytesWritten << " bytes."
<< "(" << (fBytesWritten / (1000. * 1000.)) / sec << " MB/s)";
}
LOG(info) << "Leaving RUNNING state. Received " << fNumIterations << " messages in "
<< std::chrono::duration<double, std::milli>(tEnd - tStart).count() << "ms.";
LOG(info) << "Leaving RUNNING state.";
}
void WriteToFile(const char* ptr, size_t size)
{
fOutputFile.write(ptr, size);
if (fOutputFile.bad()) {
LOG(error) << "failed writing to file";
throw std::runtime_error("failed writing to file");
}
fBytesWritten += size;
}
};

View File

@@ -15,6 +15,8 @@ void addCustomOptions(bpo::options_description& options)
{
options.add_options()
("in-channel", bpo::value<std::string>()->default_value("data"), "Name of the input channel")
("out-filename", bpo::value<std::string>()->default_value(""), "Write incoming message buffers to the specified file")
("max-file-size", bpo::value<uint64_t>()->default_value(2000000000), "Maximum file size for the file output (0 - unlimited)")
("max-iterations", bpo::value<uint64_t>()->default_value(0), "Number of run iterations (0 - infinite)")
("multipart", bpo::value<bool>()->default_value(false), "Handle multipart payloads");
}

View File

@@ -136,8 +136,13 @@ void Monitor::Run()
if (fInteractive) {
Interactive();
} else {
} else if (fViewOnly) {
CheckSegment();
} else {
while (!fTerminating) {
this_thread::sleep_for(chrono::milliseconds(fIntervalInMS));
CheckSegment();
}
}
if (!fViewOnly) {