9 #ifndef FAIRMQCHANNEL_H_ 10 #define FAIRMQCHANNEL_H_ 18 #include <FairMQTransportFactory.h> 19 #include <FairMQSocket.h> 20 #include <fairmq/Transports.h> 21 #include <FairMQLogger.h> 22 #include <FairMQParts.h> 36 FairMQChannel(
const std::string& type,
const std::string& method,
const std::string& address);
42 FairMQChannel(
const std::string& name,
const std::string& type, std::shared_ptr<FairMQTransportFactory> factory);
55 auto Bind(
const std::string& address) ->
bool 59 return fSocket->Bind(address);
62 auto Connect(
const std::string& address) ->
void 66 return fSocket->Connect(address);
168 int Send(std::unique_ptr<FairMQMessage>& msg)
const;
169 int Receive(std::unique_ptr<FairMQMessage>& msg)
const;
179 int Send(std::unique_ptr<FairMQMessage>& msg,
int sndTimeoutInMs)
const;
188 int Receive(std::unique_ptr<FairMQMessage>& msg,
int rcvTimeoutInMs)
const;
198 int SendAsync(std::unique_ptr<FairMQMessage>& msg)
const;
205 int ReceiveAsync(std::unique_ptr<FairMQMessage>& msg)
const;
207 int64_t Send(std::vector<std::unique_ptr<FairMQMessage>>& msgVec)
const;
208 int64_t Receive(std::vector<std::unique_ptr<FairMQMessage>>& msgVec)
const;
215 int64_t Send(std::vector<std::unique_ptr<FairMQMessage>>& msgVec,
int sndTimeoutInMs)
const;
222 int64_t Receive(std::vector<std::unique_ptr<FairMQMessage>>& msgVec,
int rcvTimeoutInMs)
const;
231 int64_t
SendAsync(std::vector<std::unique_ptr<FairMQMessage>>& msgVec)
const;
238 int64_t
ReceiveAsync(std::vector<std::unique_ptr<FairMQMessage>>& msgVec)
const;
242 return Send(parts.fParts);
247 return Receive(parts.fParts);
250 int64_t Send(
FairMQParts& parts,
int sndTimeoutInMs)
const 252 return Send(parts.fParts, sndTimeoutInMs);
255 int64_t Receive(
FairMQParts& parts,
int rcvTimeoutInMs)
const 257 return Receive(parts.fParts, rcvTimeoutInMs);
270 unsigned long GetBytesTx()
const;
271 unsigned long GetBytesRx()
const;
272 unsigned long GetMessagesTx()
const;
273 unsigned long GetMessagesRx()
const;
280 template<
typename... Args>
281 FairMQMessagePtr NewMessage(Args&&... args)
const 283 return Transport()->CreateMessage(std::forward<Args>(args)...);
287 FairMQMessagePtr NewSimpleMessage(
const T& data)
const 289 return Transport()->NewSimpleMessage(data);
293 FairMQMessagePtr NewStaticMessage(
const T& data)
const 295 return Transport()->NewStaticMessage(data);
299 std::unique_ptr<FairMQSocket> fSocket;
303 std::string fAddress;
304 std::string fTransport;
312 std::atomic<bool> fIsValid;
314 FairMQ::Transport fTransportType;
317 bool CheckCompatibility(std::unique_ptr<FairMQMessage>& msg)
const;
318 bool CheckCompatibility(std::vector<std::unique_ptr<FairMQMessage>>& msgVec)
const;
320 void InitTransport(std::shared_ptr<FairMQTransportFactory> factory);
326 static std::mutex fChannelMutex;
330 auto SetModified(
const bool modified) -> void;
int GetSndBufSize() const
Definition: FairMQChannel.cxx:211
std::string GetType() const
Definition: FairMQChannel.cxx:155
int ReceiveAsync(std::unique_ptr< FairMQMessage > &msg) const
int SendAsync(std::unique_ptr< FairMQMessage > &msg) const
virtual ~FairMQChannel()
Default destructor.
Definition: FairMQChannel.cxx:743
int GetRateLogging() const
Definition: FairMQChannel.cxx:267
std::string GetAddress() const
Definition: FairMQChannel.cxx:183
int GetRcvKernelSize() const
Definition: FairMQChannel.cxx:253
Definition: FairMQTransportFactory.h:27
auto Transport() const -> const FairMQTransportFactory *
Getter for default transport factory.
Definition: FairMQDevice.h:197
std::string GetChannelPrefix() const
Definition: FairMQChannel.cxx:140
void UpdateRcvBufSize(const int rcvBufSize)
Definition: FairMQChannel.cxx:361
FairMQChannel & operator=(const FairMQChannel &)
Assignment operator.
Definition: FairMQChannel.cxx:109
Definition: FairMQChannel.h:24
int GetSndKernelSize() const
Definition: FairMQChannel.cxx:239
bool IsValid() const
Definition: FairMQChannel.cxx:455
void UpdateRcvKernelSize(const int rcvKernelSize)
Definition: FairMQChannel.cxx:393
void UpdateAddress(const std::string &address)
Definition: FairMQChannel.cxx:313
void UpdateTransport(const std::string &transport)
Definition: FairMQChannel.cxx:329
std::string GetChannelName() const
Definition: FairMQChannel.cxx:135
Definition: FairMQSocket.h:18
void UpdateRateLogging(const int rateLogging)
Definition: FairMQChannel.cxx:409
std::string GetTransport() const
Definition: FairMQChannel.cxx:197
std::string GetChannelIndex() const
Definition: FairMQChannel.cxx:147
void UpdateSndBufSize(const int sndBufSize)
Definition: FairMQChannel.cxx:345
void UpdateMethod(const std::string &method)
Definition: FairMQChannel.cxx:297
void UpdateChannelName(const std::string &name)
Definition: FairMQChannel.cxx:439
std::string GetMethod() const
Definition: FairMQChannel.cxx:169
FairMQParts is a lightweight convenience wrapper around a vector of unique pointers to FairMQMessage...
Definition: FairMQParts.h:20
std::shared_ptr< FairMQTransportFactory > fTransportFactory
Transport factory.
Definition: FairMQDevice.h:417
int GetRcvBufSize() const
Definition: FairMQChannel.cxx:225
void ResetChannel()
Resets the channel (requires validation to be used again).
Definition: FairMQChannel.cxx:663
bool ValidateChannel()
Definition: FairMQChannel.cxx:469
Definition: FairMQDevice.h:44
void UpdateType(const std::string &type)
Definition: FairMQChannel.cxx:281
void UpdateSndKernelSize(const int sndKernelSize)
Definition: FairMQChannel.cxx:377
FairMQChannel()
Default constructor.
Definition: FairMQChannel.cxx:26