FairMQ  1.2.0
C++ Message Passing Framework
Socket.h
1 /********************************************************************************
2  * Copyright (C) 2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
3  * *
4  * This software is distributed under the terms of the *
5  * GNU Lesser General Public Licence (LGPL) version 3, *
6  * copied verbatim in the file "LICENSE" *
7  ********************************************************************************/
8 
9 #ifndef FAIR_MQ_OFI_SOCKET_H
10 #define FAIR_MQ_OFI_SOCKET_H
11 
12 #include <FairMQSocket.h>
13 #include <FairMQMessage.h>
14 #include <fairmq/ofi/Context.h>
15 #include <fairmq/ofi/Control.pb.h>
16 
17 #include <boost/asio.hpp>
18 #include <memory> // unique_ptr
19 #include <netinet/in.h>
20 #include <rdma/fabric.h>
21 
22 namespace fair
23 {
24 namespace mq
25 {
26 namespace ofi
27 {
28 
35 class Socket : public fair::mq::Socket
36 {
37  public:
38  Socket(Context& factory, const std::string& type, const std::string& name, const std::string& id = "");
39  Socket(const Socket&) = delete;
40  Socket operator=(const Socket&) = delete;
41 
42  auto GetId() -> std::string { return fId; }
43 
44  auto Bind(const std::string& address) -> bool override;
45  auto Connect(const std::string& address) -> void override;
46 
47  auto Send(MessagePtr& msg, int timeout = 0) -> int override;
48  auto Receive(MessagePtr& msg, int timeout = 0) -> int override;
49  auto Send(std::vector<MessagePtr>& msgVec, int timeout = 0) -> int64_t override;
50  auto Receive(std::vector<MessagePtr>& msgVec, int timeout = 0) -> int64_t override;
51 
52  auto TrySend(MessagePtr& msg) -> int override;
53  auto TryReceive(MessagePtr& msg) -> int override;
54  auto TrySend(std::vector<MessagePtr>& msgVec) -> int64_t override;
55  auto TryReceive(std::vector<MessagePtr>& msgVec) -> int64_t override;
56 
57  auto GetSocket() const -> void* override { return fControlSocket; }
58  auto GetSocket(int nothing) const -> int override { return -1; }
59 
60  auto Close() -> void override;
61 
62  auto SetOption(const std::string& option, const void* value, size_t valueSize) -> void override;
63  auto GetOption(const std::string& option, void* value, size_t* valueSize) -> void override;
64 
65  auto GetBytesTx() const -> unsigned long override { return fBytesTx; }
66  auto GetBytesRx() const -> unsigned long override { return fBytesRx; }
67  auto GetMessagesTx() const -> unsigned long override { return fMessagesTx; }
68  auto GetMessagesRx() const -> unsigned long override { return fMessagesRx; }
69 
70  auto SetSendTimeout(const int timeout, const std::string& address, const std::string& method) -> bool override;
71  auto GetSendTimeout() const -> int override;
72  auto SetReceiveTimeout(const int timeout, const std::string& address, const std::string& method) -> bool override;
73  auto GetReceiveTimeout() const -> int override;
74 
75  static auto GetConstant(const std::string& constant) -> int;
76 
77  ~Socket() override;
78 
79  private:
80  void* fControlSocket;
81  void* fMonitorSocket;
82  fid_ep* fDataEndpoint;
83  fid_cq* fDataCompletionQueueTx;
84  fid_cq* fDataCompletionQueueRx;
85  std::string fId;
86  std::atomic<unsigned long> fBytesTx;
87  std::atomic<unsigned long> fBytesRx;
88  std::atomic<unsigned long> fMessagesTx;
89  std::atomic<unsigned long> fMessagesRx;
90  Context& fContext;
91  fi_addr_t fRemoteDataAddr;
92  sockaddr_in fLocalDataAddr;
93  bool fWaitingForControlPeer;
94  boost::asio::io_service::strand fIoStrand;
95 
96  int fSndTimeout;
97  int fRcvTimeout;
98 
99  auto SendImpl(MessagePtr& msg, const int flags, const int timeout) -> int;
100  auto ReceiveImpl(MessagePtr& msg, const int flags, const int timeout) -> int;
101  auto SendImpl(std::vector<MessagePtr>& msgVec, const int flags, const int timeout) -> int64_t;
102  auto ReceiveImpl(std::vector<MessagePtr>& msgVec, const int flags, const int timeout) -> int64_t;
103 
104  auto InitDataEndpoint() -> void;
105  auto WaitForControlPeer() -> void;
106  auto AnnounceDataAddress() -> void;
107  auto SendControlMessage(std::unique_ptr<ControlMessage> ctrl) -> void;
108  auto ReceiveControlMessage() -> std::unique_ptr<ControlMessage>;
109  auto ProcessDataAddressAnnouncement(std::unique_ptr<ControlMessage> ctrl) -> void;
110  auto ConnectControlSocket(Context::Address address) -> void;
111  auto BindControlSocket(Context::Address address) -> void;
112 }; /* class Socket */
113 
114 // helper function to clean up the object holding the data after it is transported.
115 void free_string(void* /*data*/, void* hint);
116 
117 struct SilentSocketError : SocketError { using SocketError::SocketError; };
118 
119 } /* namespace ofi */
120 } /* namespace mq */
121 } /* namespace fair */
122 
123 #endif /* FAIR_MQ_OFI_SOCKET_H */
Transport-wide context.
Definition: Context.h:38
Definition: FairMQSocket.h:68
Definition: Socket.h:35
Definition: FairMQSocket.h:18
Definition: Context.h:54
Definition: DeviceRunner.h:23
Definition: Socket.h:117