18 #include "../FairMQDevice.h"
19 #include "../FairMQLogger.h"
20 #include <fairmq/tools/Strings.h>
44 uint64_t fMaxIterations;
45 uint64_t fNumIterations;
46 uint64_t fMaxFileSize;
47 uint64_t fBytesWritten;
48 std::string fInChannelName;
49 std::string fOutFilename;
50 std::fstream fOutputFile;
68 LOG(info) <<
"Starting sink and expecting to receive " << fMaxIterations <<
" messages.";
69 auto tStart = std::chrono::high_resolution_clock::now();
71 if (!fOutFilename.empty()) {
72 LOG(debug) <<
"Incoming messages will be written to file: " << fOutFilename;
73 if (fMaxFileSize != 0) {
74 LOG(debug) <<
"File output will stop after " << fMaxFileSize <<
" bytes";
76 LOG(debug) <<
"ATTENTION: --max-file-size is 0 - output file will continue to grow until sink is stopped";
79 fOutputFile.open(fOutFilename, std::ios::out | std::ios::binary);
81 LOG(error) <<
"Could not open '" << fOutFilename;
82 throw std::runtime_error(fair::mq::tools::ToString(
"Could not open '", fOutFilename));
89 if (dataInChannel.
Receive(parts) < 0) {
92 if (fOutputFile.is_open()) {
93 for (
const auto& part : parts) {
94 WriteToFile(
static_cast<const char*
>(part->GetData()), part->GetSize());
98 FairMQMessagePtr msg(dataInChannel.NewMessage());
99 if (dataInChannel.
Receive(msg) < 0) {
102 if (fOutputFile.is_open()) {
103 WriteToFile(
static_cast<const char*
>(msg->GetData()), msg->GetSize());
107 if (fMaxFileSize > 0 && fBytesWritten >= fMaxFileSize) {
108 LOG(info) <<
"Written " << fBytesWritten <<
" bytes, stopping...";
111 if (fMaxIterations > 0) {
112 if (fNumIterations >= fMaxIterations) {
113 LOG(info) <<
"Configured maximum number of iterations reached.";
120 if (fOutputFile.is_open()) {
125 auto tEnd = std::chrono::high_resolution_clock::now();
126 auto ms = std::chrono::duration<double, std::milli>(tEnd - tStart).count();
127 LOG(info) <<
"Received " << fNumIterations <<
" messages in " << ms <<
"ms.";
128 if (!fOutFilename.empty()) {
129 auto sec = std::chrono::duration<double>(tEnd - tStart).count();
130 LOG(info) <<
"Closed '" << fOutFilename <<
"' after writing " << fBytesWritten <<
" bytes."
131 <<
"(" << (fBytesWritten / (1000. * 1000.)) / sec <<
" MB/s)";
134 LOG(info) <<
"Leaving RUNNING state.";
137 void WriteToFile(
const char* ptr,
size_t size)
139 fOutputFile.write(ptr, size);
140 if (fOutputFile.bad()) {
141 LOG(error) <<
"failed writing to file";
142 throw std::runtime_error(
"failed writing to file");
144 fBytesWritten += size;