Implement parallel ofi::Socket::Send

This commit is contained in:
Dennis Klein
2018-11-13 22:26:38 +01:00
committed by Dennis Klein
parent 9ae48c21f5
commit 46e2420547
7 changed files with 229 additions and 348 deletions

View File

@@ -15,8 +15,8 @@
#include <fairmq/ofi/ControlMessages.h>
#include <asiofi/connected_endpoint.hpp>
#include <azmq/socket.hpp>
#include <boost/asio.hpp>
#include <boost/container/pmr/unsynchronized_pool_resource.hpp>
#include <memory> // unique_ptr
#include <netinet/in.h>
class FairMQTransportFactory;
@@ -51,7 +51,7 @@ class Socket final : public fair::mq::Socket
auto Send(std::vector<MessagePtr>& msgVec, int timeout = 0) -> int64_t override;
auto Receive(std::vector<MessagePtr>& msgVec, int timeout = 0) -> int64_t override;
auto GetSocket() const -> void* { return fControlSocket; }
auto GetSocket() const -> void* { return fControlEndpoint.native_handle(); }
void SetLinger(const int value) override;
int GetLinger() const override;
@@ -79,8 +79,7 @@ class Socket final : public fair::mq::Socket
~Socket() override;
private:
void* fControlSocket;
// void* fMonitorSocket;
Context& fContext;
std::unique_ptr<asiofi::passive_endpoint> fPassiveDataEndpoint;
std::unique_ptr<asiofi::connected_endpoint> fDataEndpoint;
std::string fId;
@@ -88,30 +87,28 @@ class Socket final : public fair::mq::Socket
std::atomic<unsigned long> fBytesRx;
std::atomic<unsigned long> fMessagesTx;
std::atomic<unsigned long> fMessagesRx;
Context& fContext;
Context::Address fRemoteDataAddr;
Context::Address fLocalDataAddr;
// bool fWaitingForControlPeer;
boost::asio::io_service::strand fIoStrand;
boost::container::pmr::unsynchronized_pool_resource fCtrlMemPool;
mutable azmq::socket fControlEndpoint;
int fSndTimeout;
int fRcvTimeout;
azmq::pair_socket fQueue1, fQueue2;
auto SendImpl(MessagePtr& msg, const int flags, const int timeout) -> int;
auto SendQueueReader() -> void;
auto OnSend(azmq::message& msg, size_t bytes_transferred) -> void;
auto OnControlMessageSent(size_t bytes_transferred, MessagePtr msg) -> void;
auto ReceiveImpl(MessagePtr& msg, const int flags, const int timeout) -> int;
auto SendImpl(std::vector<MessagePtr>& msgVec, const int flags, const int timeout) -> int64_t;
auto ReceiveImpl(std::vector<MessagePtr>& msgVec, const int flags, const int timeout) -> int64_t;
// auto WaitForControlPeer() -> void;
auto AnnounceDataAddress() -> void;
auto SendControlMessage(CtrlMsgPtr<ControlMessage> ctrl) -> void;
auto ReceiveControlMessage() -> CtrlMsgPtr<ControlMessage>;
auto ProcessControlMessage(CtrlMsgPtr<DataAddressAnnouncement> ctrl) -> void;
auto ConnectControlSocket(Context::Address address) -> void;
auto BindControlSocket(Context::Address address) -> void;
auto ConnectControlEndpoint(Context::Address address) -> void;
auto BindControlEndpoint(Context::Address address) -> void;
auto BindDataEndpoint() -> void;
auto ConnectDataEndpoint() -> void;
auto ReceiveDataAddressAnnouncement() -> void;
}; /* class Socket */
struct SilentSocketError : SocketError { using SocketError::SocketError; };