FairMQ  1.4.33
C++ Message Queuing Library and Framework
FairMQSink.h
1 /********************************************************************************
2  * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
3  * *
4  * This software is distributed under the terms of the *
5  * GNU Lesser General Public Licence (LGPL) version 3, *
6  * copied verbatim in the file "LICENSE" *
7  ********************************************************************************/
15 #ifndef FAIRMQSINK_H_
16 #define FAIRMQSINK_H_
17 
18 #include "../FairMQDevice.h"
19 #include "../FairMQLogger.h"
20 #include <fairmq/tools/Strings.h>
21 
22 #include <chrono>
23 #include <string>
24 #include <fstream>
25 #include <stdexcept>
26 
27 class FairMQSink : public FairMQDevice
28 {
29  public:
30  FairMQSink()
31  : fMultipart(false)
32  , fMaxIterations(0)
33  , fNumIterations(0)
34  , fMaxFileSize(0)
35  , fBytesWritten(0)
36  , fInChannelName()
37  , fOutFilename()
38  {}
39 
40  ~FairMQSink() {}
41 
42  protected:
43  bool fMultipart;
44  uint64_t fMaxIterations;
45  uint64_t fNumIterations;
46  uint64_t fMaxFileSize;
47  uint64_t fBytesWritten;
48  std::string fInChannelName;
49  std::string fOutFilename;
50  std::fstream fOutputFile;
51 
52  void InitTask() override
53  {
54  fMultipart = fConfig->GetProperty<bool>("multipart");
55  fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
56  fMaxFileSize = fConfig->GetProperty<uint64_t>("max-file-size");
57  fInChannelName = fConfig->GetProperty<std::string>("in-channel");
58  fOutFilename = fConfig->GetProperty<std::string>("out-filename");
59 
60  fBytesWritten = 0;
61  }
62 
63  void Run() override
64  {
65  // store the channel reference to avoid traversing the map on every loop iteration
66  FairMQChannel& dataInChannel = fChannels.at(fInChannelName).at(0);
67 
68  LOG(info) << "Starting sink and expecting to receive " << fMaxIterations << " messages.";
69  auto tStart = std::chrono::high_resolution_clock::now();
70 
71  if (!fOutFilename.empty()) {
72  LOG(debug) << "Incoming messages will be written to file: " << fOutFilename;
73  if (fMaxFileSize != 0) {
74  LOG(debug) << "File output will stop after " << fMaxFileSize << " bytes";
75  } else {
76  LOG(debug) << "ATTENTION: --max-file-size is 0 - output file will continue to grow until sink is stopped";
77  }
78 
79  fOutputFile.open(fOutFilename, std::ios::out | std::ios::binary);
80  if (!fOutputFile) {
81  LOG(error) << "Could not open '" << fOutFilename;
82  throw std::runtime_error(fair::mq::tools::ToString("Could not open '", fOutFilename));
83  }
84  }
85 
86  while (!NewStatePending()) {
87  if (fMultipart) {
88  FairMQParts parts;
89  if (dataInChannel.Receive(parts) < 0) {
90  continue;
91  }
92  if (fOutputFile.is_open()) {
93  for (const auto& part : parts) {
94  WriteToFile(static_cast<const char*>(part->GetData()), part->GetSize());
95  }
96  }
97  } else {
98  FairMQMessagePtr msg(dataInChannel.NewMessage());
99  if (dataInChannel.Receive(msg) < 0) {
100  continue;
101  }
102  if (fOutputFile.is_open()) {
103  WriteToFile(static_cast<const char*>(msg->GetData()), msg->GetSize());
104  }
105  }
106 
107  if (fMaxFileSize > 0 && fBytesWritten >= fMaxFileSize) {
108  LOG(info) << "Written " << fBytesWritten << " bytes, stopping...";
109  break;
110  }
111  if (fMaxIterations > 0) {
112  if (fNumIterations >= fMaxIterations) {
113  LOG(info) << "Configured maximum number of iterations reached.";
114  break;
115  }
116  }
117  fNumIterations++;
118  }
119 
120  if (fOutputFile.is_open()) {
121  fOutputFile.flush();
122  fOutputFile.close();
123  }
124 
125  auto tEnd = std::chrono::high_resolution_clock::now();
126  auto ms = std::chrono::duration<double, std::milli>(tEnd - tStart).count();
127  LOG(info) << "Received " << fNumIterations << " messages in " << ms << "ms.";
128  if (!fOutFilename.empty()) {
129  auto sec = std::chrono::duration<double>(tEnd - tStart).count();
130  LOG(info) << "Closed '" << fOutFilename << "' after writing " << fBytesWritten << " bytes."
131  << "(" << (fBytesWritten / (1000. * 1000.)) / sec << " MB/s)";
132  }
133 
134  LOG(info) << "Leaving RUNNING state.";
135  }
136 
137  void WriteToFile(const char* ptr, size_t size)
138  {
139  fOutputFile.write(ptr, size);
140  if (fOutputFile.bad()) {
141  LOG(error) << "failed writing to file";
142  throw std::runtime_error("failed writing to file");
143  }
144  fBytesWritten += size;
145  }
146 };
147 
148 #endif /* FAIRMQSINK_H_ */
FairMQSink
Definition: FairMQSink.h:28
FairMQDevice::fChannels
std::unordered_map< std::string, std::vector< FairMQChannel > > fChannels
Device channels.
Definition: FairMQDevice.h:383
FairMQSink::InitTask
void InitTask() override
Task initialization (can be overloaded in child classes)
Definition: FairMQSink.h:58
FairMQParts
FairMQParts is a lightweight convenience wrapper around a vector of unique pointers to FairMQMessage,...
Definition: FairMQParts.h:21
fair::mq::ProgOptions::GetProperty
T GetProperty(const std::string &key) const
Read config property, throw if no property with this key exists.
Definition: ProgOptions.h:69
FairMQChannel::Receive
int64_t Receive(FairMQMessagePtr &msg, int rcvTimeoutInMs=-1)
Definition: FairMQChannel.h:270
FairMQDevice::fConfig
fair::mq::ProgOptions * fConfig
Pointer to config (internal or external)
Definition: FairMQDevice.h:385
FairMQSink::Run
void Run() override
Runs the device (to be overloaded in child classes)
Definition: FairMQSink.h:69
FairMQDevice::NewStatePending
bool NewStatePending() const
Returns true if a new state has been requested, signaling the current handler to stop.
Definition: FairMQDevice.h:470
FairMQChannel
Wrapper class for FairMQSocket and related methods.
Definition: FairMQChannel.h:35
FairMQDevice
Definition: FairMQDevice.h:50

privacy