FairMQ  1.4.33
C++ Message Queuing Library and Framework
FairMQMultiplier.h
1 /********************************************************************************
2  * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
3  * *
4  * This software is distributed under the terms of the *
5  * GNU Lesser General Public Licence (LGPL) version 3, *
6  * copied verbatim in the file "LICENSE" *
7  ********************************************************************************/
8 
9 #ifndef FAIRMQMULTIPLIER_H_
10 #define FAIRMQMULTIPLIER_H_
11 
12 #include "FairMQDevice.h"
13 
14 #include <string>
15 #include <vector>
16 
18 {
19  public:
21  : fMultipart(true)
22  , fNumOutputs(0)
23  , fInChannelName()
24  , fOutChannelNames()
25  {}
26  ~FairMQMultiplier() {}
27 
28  protected:
29  bool fMultipart;
30  int fNumOutputs;
31  std::string fInChannelName;
32  std::vector<std::string> fOutChannelNames;
33 
34  void InitTask() override
35  {
36  fMultipart = fConfig->GetProperty<bool>("multipart");
37  fInChannelName = fConfig->GetProperty<std::string>("in-channel");
38  fOutChannelNames = fConfig->GetProperty<std::vector<std::string>>("out-channel");
39  fNumOutputs = fChannels.at(fOutChannelNames.at(0)).size();
40 
41  if (fMultipart) {
42  OnData(fInChannelName, &FairMQMultiplier::HandleMultipartData);
43  } else {
44  OnData(fInChannelName, &FairMQMultiplier::HandleSingleData);
45  }
46  }
47 
48 
49  bool HandleSingleData(std::unique_ptr<FairMQMessage>& payload, int)
50  {
51  for (unsigned int i = 0; i < fOutChannelNames.size() - 1; ++i) { // all except last channel
52  for (unsigned int j = 0; j < fChannels.at(fOutChannelNames.at(i)).size(); ++j) { // all subChannels in a channel
53  FairMQMessagePtr msgCopy(fTransportFactory->CreateMessage());
54  msgCopy->Copy(*payload);
55 
56  Send(msgCopy, fOutChannelNames.at(i), j);
57  }
58  }
59 
60  unsigned int lastChannelSize = fChannels.at(fOutChannelNames.back()).size();
61 
62  for (unsigned int i = 0; i < lastChannelSize - 1; ++i) { // iterate over all except last subChannels of the last channel
63  FairMQMessagePtr msgCopy(fTransportFactory->CreateMessage());
64  msgCopy->Copy(*payload);
65 
66  Send(msgCopy, fOutChannelNames.back(), i);
67  }
68 
69  Send(payload, fOutChannelNames.back(), lastChannelSize - 1); // send final message to last subChannel of last channel
70 
71  return true;
72  }
73 
74  bool HandleMultipartData(FairMQParts& payload, int)
75  {
76  for (unsigned int i = 0; i < fOutChannelNames.size() - 1; ++i) { // all except last channel
77  for (unsigned int j = 0; j < fChannels.at(fOutChannelNames.at(i)).size(); ++j) { // all subChannels in a channel
78  FairMQParts parts;
79 
80  for (int k = 0; k < payload.Size(); ++k) {
81  FairMQMessagePtr msgCopy(fTransportFactory->CreateMessage());
82  msgCopy->Copy(payload.AtRef(k));
83  parts.AddPart(std::move(msgCopy));
84  }
85 
86  Send(parts, fOutChannelNames.at(i), j);
87  }
88  }
89 
90  unsigned int lastChannelSize = fChannels.at(fOutChannelNames.back()).size();
91 
92  for (unsigned int i = 0; i < lastChannelSize - 1; ++i) { // iterate over all except last subChannels of the last channel
93  FairMQParts parts;
94 
95  for (int k = 0; k < payload.Size(); ++k) {
96  FairMQMessagePtr msgCopy(fTransportFactory->CreateMessage());
97  msgCopy->Copy(payload.AtRef(k));
98  parts.AddPart(std::move(msgCopy));
99  }
100 
101  Send(parts, fOutChannelNames.back(), i);
102  }
103 
104  Send(payload, fOutChannelNames.back(), lastChannelSize - 1); // send final message to last subChannel of last channel
105 
106  return true;
107  }
108 };
109 
110 #endif /* FAIRMQMULTIPLIER_H_ */
FairMQDevice::fChannels
std::unordered_map< std::string, std::vector< FairMQChannel > > fChannels
Device channels.
Definition: FairMQDevice.h:383
FairMQParts::AddPart
void AddPart(FairMQMessage *msg)
Definition: FairMQParts.h:48
FairMQParts
FairMQParts is a lightweight convenience wrapper around a vector of unique pointers to FairMQMessage,...
Definition: FairMQParts.h:21
fair::mq::ProgOptions::GetProperty
T GetProperty(const std::string &key) const
Read config property, throw if no property with this key exists.
Definition: ProgOptions.h:69
FairMQParts::Size
int Size() const
Definition: FairMQParts.h:91
FairMQMultiplier
Definition: FairMQMultiplier.h:18
FairMQDevice::Send
int64_t Send(FairMQMessagePtr &msg, const std::string &channel, const int index=0, int sndTimeoutInMs=-1)
Definition: FairMQDevice.h:97
FairMQDevice::fConfig
fair::mq::ProgOptions * fConfig
Pointer to config (internal or external)
Definition: FairMQDevice.h:385
FairMQDevice::fTransportFactory
std::shared_ptr< FairMQTransportFactory > fTransportFactory
Default transport factory.
Definition: FairMQDevice.h:379
FairMQDevice
Definition: FairMQDevice.h:50
FairMQMultiplier::InitTask
void InitTask() override
Task initialization (can be overloaded in child classes)
Definition: FairMQMultiplier.h:40

privacy