FairMQ  1.4.33
C++ Message Queuing Library and Framework
FairMQProxy.h
1 /********************************************************************************
2  * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
3  * *
4  * This software is distributed under the terms of the *
5  * GNU Lesser General Public Licence (LGPL) version 3, *
6  * copied verbatim in the file "LICENSE" *
7  ********************************************************************************/
15 #ifndef FAIRMQPROXY_H_
16 #define FAIRMQPROXY_H_
17 
18 #include "FairMQDevice.h"
19 
20 #include <string>
21 
22 class FairMQProxy : public FairMQDevice
23 {
24  public:
25  FairMQProxy()
26  : fMultipart(true)
27  , fInChannelName()
28  , fOutChannelName()
29  {}
30  ~FairMQProxy() {}
31 
32  protected:
33  bool fMultipart;
34  std::string fInChannelName;
35  std::string fOutChannelName;
36 
37  void InitTask() override
38  {
39  fMultipart = fConfig->GetProperty<bool>("multipart");
40  fInChannelName = fConfig->GetProperty<std::string>("in-channel");
41  fOutChannelName = fConfig->GetProperty<std::string>("out-channel");
42  }
43 
44  void Run() override
45  {
46  if (fMultipart) {
47  while (!NewStatePending()) {
48  FairMQParts payload;
49  if (Receive(payload, fInChannelName) >= 0) {
50  if (Send(payload, fOutChannelName) < 0) {
51  LOG(debug) << "Transfer interrupted";
52  break;
53  }
54  } else {
55  LOG(debug) << "Transfer interrupted";
56  break;
57  }
58  }
59  } else {
60  while (!NewStatePending()) {
61  FairMQMessagePtr payload(fTransportFactory->CreateMessage());
62  if (Receive(payload, fInChannelName) >= 0) {
63  if (Send(payload, fOutChannelName) < 0) {
64  LOG(debug) << "Transfer interrupted";
65  break;
66  }
67  } else {
68  LOG(debug) << "Transfer interrupted";
69  break;
70  }
71  }
72  }
73  }
74 };
75 
76 #endif /* FAIRMQPROXY_H_ */
FairMQProxy
Definition: FairMQProxy.h:23
FairMQParts
FairMQParts is a lightweight convenience wrapper around a vector of unique pointers to FairMQMessage,...
Definition: FairMQParts.h:21
fair::mq::ProgOptions::GetProperty
T GetProperty(const std::string &key) const
Read config property, throw if no property with this key exists.
Definition: ProgOptions.h:69
FairMQProxy::InitTask
void InitTask() override
Task initialization (can be overloaded in child classes)
Definition: FairMQProxy.h:43
FairMQDevice::Receive
int64_t Receive(FairMQMessagePtr &msg, const std::string &channel, const int index=0, int rcvTimeoutInMs=-1)
Definition: FairMQDevice.h:108
FairMQDevice::Send
int64_t Send(FairMQMessagePtr &msg, const std::string &channel, const int index=0, int sndTimeoutInMs=-1)
Definition: FairMQDevice.h:97
FairMQDevice::fConfig
fair::mq::ProgOptions * fConfig
Pointer to config (internal or external)
Definition: FairMQDevice.h:385
FairMQDevice::NewStatePending
bool NewStatePending() const
Returns true if a new state has been requested, signaling the current handler to stop.
Definition: FairMQDevice.h:470
FairMQProxy::Run
void Run() override
Runs the device (to be overloaded in child classes)
Definition: FairMQProxy.h:50
FairMQDevice::fTransportFactory
std::shared_ptr< FairMQTransportFactory > fTransportFactory
Default transport factory.
Definition: FairMQDevice.h:379
FairMQDevice
Definition: FairMQDevice.h:50

privacy