From 46e2420547d456759187f1c3264ae29e124b6379 Mon Sep 17 00:00:00 2001 From: Dennis Klein Date: Tue, 13 Nov 2018 22:26:38 +0100 Subject: [PATCH] Implement parallel ofi::Socket::Send --- fairmq/ofi/Context.cxx | 17 +- fairmq/ofi/Context.h | 3 - fairmq/ofi/ControlMessages.h | 86 +++---- fairmq/ofi/Poller.cxx | 12 +- fairmq/ofi/Socket.cxx | 426 +++++++++++++------------------- fairmq/ofi/Socket.h | 25 +- fairmq/ofi/TransportFactory.cxx | 8 +- 7 files changed, 229 insertions(+), 348 deletions(-) diff --git a/fairmq/ofi/Context.cxx b/fairmq/ofi/Context.cxx index 15db7819..e2ff045b 100644 --- a/fairmq/ofi/Context.cxx +++ b/fairmq/ofi/Context.cxx @@ -21,7 +21,6 @@ #include #include #include -#include namespace fair { @@ -33,15 +32,11 @@ namespace ofi using namespace std; Context::Context(int numberIoThreads) - : fZmqContext(zmq_ctx_new()) - , fOfiInfo(nullptr) + : fOfiInfo(nullptr) , fOfiFabric(nullptr) , fOfiDomain(nullptr) , fIoWork(fIoContext) { - if (!fZmqContext) - throw ContextError{tools::ToString("Failed creating zmq context, reason: ", zmq_strerror(errno))}; - InitThreadPool(numberIoThreads); } @@ -63,16 +58,6 @@ Context::~Context() fIoContext.stop(); for (auto& thread : fThreadPool) thread.join(); - - if (zmq_ctx_term(fZmqContext) != 0) - LOG(error) << "Failed closing zmq context, reason: " << zmq_strerror(errno); -} - -auto Context::GetZmqVersion() const -> string -{ - int major, minor, patch; - zmq_version(&major, &minor, &patch); - return tools::ToString(major, ".", minor, ".", patch); } auto Context::GetAsiofiVersion() const -> string diff --git a/fairmq/ofi/Context.h b/fairmq/ofi/Context.h index 4ad042f3..fcb3d7f7 100644 --- a/fairmq/ofi/Context.h +++ b/fairmq/ofi/Context.h @@ -48,9 +48,7 @@ class Context ~Context(); // auto CreateOfiEndpoint() -> fid_ep*; - auto GetZmqVersion() const -> std::string; auto GetAsiofiVersion() const -> std::string; - auto GetZmqContext() const -> void* { return fZmqContext; } auto GetIoContext() -> boost::asio::io_context& { return fIoContext; } struct Address { std::string Protocol; @@ -70,7 +68,6 @@ class Context auto Resume() -> void { LOG(debug) << "OFI transport: Resumed (NOOP - not implemented)."; } private: - void* fZmqContext; std::unique_ptr fOfiInfo; std::unique_ptr fOfiFabric; std::unique_ptr fOfiDomain; diff --git a/fairmq/ofi/ControlMessages.h b/fairmq/ofi/ControlMessages.h index 987c3400..512ee35c 100644 --- a/fairmq/ofi/ControlMessages.h +++ b/fairmq/ofi/ControlMessages.h @@ -9,17 +9,28 @@ #ifndef FAIR_MQ_OFI_CONTROLMESSAGES_H #define FAIR_MQ_OFI_CONTROLMESSAGES_H +#include +#include #include #include #include #include -namespace fair -{ -namespace mq -{ -namespace ofi +namespace boost { +namespace asio { + +template +auto buffer(const PodType& obj) -> boost::asio::const_buffer { + return boost::asio::const_buffer(static_cast(&obj), sizeof(PodType)); +} + +} // namespace asio +} // namespace boost + +namespace fair { +namespace mq { +namespace ofi { enum class ControlMessageType { @@ -28,59 +39,38 @@ enum class ControlMessageType PostBufferAcknowledgement }; -struct ControlMessage { +struct ControlMessage +{ ControlMessageType type; }; -struct DataAddressAnnouncement : ControlMessage { - uint32_t ipv4; // in_addr_t from - uint32_t port; // in_port_t from -}; - -struct PostBuffer : ControlMessage { - uint64_t size; // buffer size (size_t) -}; - -struct PostBufferAcknowledgement { - uint64_t size; // size_t -}; - -template -using CtrlMsgPtr = std::unique_ptr>; - -template -auto MakeControlMessage(A* pmr, Args&& ... args) -> CtrlMsgPtr +struct DataAddressAnnouncement : ControlMessage { - void* raw_mem = pmr->allocate(sizeof(T)); - T* raw_ptr = new (raw_mem) T(std::forward(args)...); + uint32_t ipv4; // in_addr_t from + uint32_t port; // in_port_t from +}; + +struct PostBuffer : ControlMessage +{ + uint64_t size; // buffer size (size_t) +}; + +template +auto MakeControlMessage(Args&&... args) -> T +{ + T ctrl = T(std::forward(args)...); if (std::is_same::value) { - raw_ptr->type = ControlMessageType::DataAddressAnnouncement; + ctrl.type = ControlMessageType::DataAddressAnnouncement; } else if (std::is_same::value) { - raw_ptr->type = ControlMessageType::PostBuffer; + ctrl.type = ControlMessageType::PostBuffer; } - return {raw_ptr, [=](T* p) { pmr->deallocate(p, sizeof(T)); }}; + return ctrl; } -template -auto StaticUniquePtrDowncast(std::unique_ptr&& p) -> std::unique_ptr -{ - auto down = static_cast(p.release()); - return std::unique_ptr(down, std::move(p.get_deleter())); -} - -template -auto StaticUniquePtrUpcast(std::unique_ptr&& p) -> std::unique_ptr> -{ - auto up = static_cast(p.release()); - return {up, [deleter = std::move(p.get_deleter())](Base* ptr) { - deleter(static_cast(ptr)); - }}; -} - -} /* namespace ofi */ -} /* namespace mq */ -} /* namespace fair */ +} // namespace ofi +} // namespace mq +} // namespace fair #endif /* FAIR_MQ_OFI_CONTROLMESSAGES_H */ diff --git a/fairmq/ofi/Poller.cxx b/fairmq/ofi/Poller.cxx index 9f83a4ca..700fc561 100644 --- a/fairmq/ofi/Poller.cxx +++ b/fairmq/ofi/Poller.cxx @@ -28,13 +28,13 @@ Poller::Poller(const vector& channels) fItems = new zmq_pollitem_t[fNumItems]; for (int i = 0; i < fNumItems; ++i) { - fItems[i].socket = static_cast(&(channels.at(i).GetSocket()))->GetSocket(); + fItems[i].socket = static_cast(&(channels.at(i).GetSocket()))->GetSocket(); fItems[i].fd = 0; fItems[i].revents = 0; int type = 0; size_t size = sizeof(type); - zmq_getsockopt(static_cast(&(channels.at(i).GetSocket()))->GetSocket(), ZMQ_TYPE, &type, &size); + zmq_getsockopt(static_cast(&(channels.at(i).GetSocket()))->GetSocket(), ZMQ_TYPE, &type, &size); SetItemEvents(fItems[i], type); } @@ -46,13 +46,13 @@ Poller::Poller(const vector& channels) fItems = new zmq_pollitem_t[fNumItems]; for (int i = 0; i < fNumItems; ++i) { - fItems[i].socket = static_cast(&(channels.at(i)->GetSocket()))->GetSocket(); + fItems[i].socket = static_cast(&(channels.at(i)->GetSocket()))->GetSocket(); fItems[i].fd = 0; fItems[i].revents = 0; int type = 0; size_t size = sizeof(type); - zmq_getsockopt(static_cast(&(channels.at(i)->GetSocket()))->GetSocket(), ZMQ_TYPE, &type, &size); + zmq_getsockopt(static_cast(&(channels.at(i)->GetSocket()))->GetSocket(), ZMQ_TYPE, &type, &size); SetItemEvents(fItems[i], type); } @@ -76,13 +76,13 @@ Poller::Poller(const unordered_map>& channelsMap, for (unsigned int i = 0; i < channelsMap.at(channel).size(); ++i) { index = fOffsetMap[channel] + i; - fItems[index].socket = static_cast(&(channelsMap.at(channel).at(i).GetSocket()))->GetSocket(); + fItems[index].socket = static_cast(&(channelsMap.at(channel).at(i).GetSocket()))->GetSocket(); fItems[index].fd = 0; fItems[index].revents = 0; int type = 0; size_t size = sizeof(type); - zmq_getsockopt(static_cast(&(channelsMap.at(channel).at(i).GetSocket()))->GetSocket(), ZMQ_TYPE, &type, &size); + zmq_getsockopt(static_cast(&(channelsMap.at(channel).at(i).GetSocket()))->GetSocket(), ZMQ_TYPE, &type, &size); SetItemEvents(fItems[index], type); } diff --git a/fairmq/ofi/Socket.cxx b/fairmq/ofi/Socket.cxx index 709dfe80..60e37cc0 100644 --- a/fairmq/ofi/Socket.cxx +++ b/fairmq/ofi/Socket.cxx @@ -12,15 +12,17 @@ #include #include -#include #include +#include +#include #include +#include #include -#include +#include +#include #include #include #include -#include #include #include @@ -35,8 +37,7 @@ namespace ofi using namespace std; Socket::Socket(Context& context, const string& type, const string& name, const string& id /*= ""*/) - : fControlSocket(nullptr) - // , fMonitorSocket(nullptr) + : fContext(context) , fPassiveDataEndpoint(nullptr) , fDataEndpoint(nullptr) , fId(id + "." + name + "." + type) @@ -44,46 +45,37 @@ Socket::Socket(Context& context, const string& type, const string& name, const s , fBytesRx(0) , fMessagesTx(0) , fMessagesRx(0) - , fContext(context) , fIoStrand(fContext.GetIoContext()) + , fControlEndpoint(fIoStrand.context(), ZMQ_PAIR) , fSndTimeout(100) , fRcvTimeout(100) + , fQueue1(fIoStrand.context()) + , fQueue2(fIoStrand.context()) { if (type != "pair") { throw SocketError{tools::ToString("Socket type '", type, "' not implemented for ofi transport.")}; } else { - fControlSocket = zmq_socket(fContext.GetZmqContext(), ZMQ_PAIR); - - if (fControlSocket == nullptr) - throw SocketError{tools::ToString("Failed creating zmq meta socket ", fId, ", reason: ", zmq_strerror(errno))}; - - if (zmq_setsockopt(fControlSocket, ZMQ_IDENTITY, fId.c_str(), fId.length()) != 0) - throw SocketError{tools::ToString("Failed setting ZMQ_IDENTITY socket option, reason: ", zmq_strerror(errno))}; + fControlEndpoint.set_option(azmq::socket::identity(fId)); // Tell socket to try and send/receive outstanding messages for milliseconds before terminating. // Default value for ZeroMQ is -1, which is to wait forever. - int linger = 1000; - if (zmq_setsockopt(fControlSocket, ZMQ_LINGER, &linger, sizeof(linger)) != 0) - throw SocketError{tools::ToString("Failed setting ZMQ_LINGER socket option, reason: ", zmq_strerror(errno))}; + fControlEndpoint.set_option(azmq::socket::linger(1000)); - // TODO enable again and implement retries - // if (zmq_setsockopt(fControlSocket, ZMQ_SNDTIMEO, &fSndTimeout, sizeof(fSndTimeout)) != 0) - // throw SocketError{tools::ToString("Failed setting ZMQ_SNDTIMEO socket option, reason: ", zmq_strerror(errno))}; - // - // if (zmq_setsockopt(fControlSocket, ZMQ_RCVTIMEO, &fRcvTimeout, sizeof(fRcvTimeout)) != 0) - // throw SocketError{tools::ToString("Failed setting ZMQ_RCVTIMEO socket option, reason: ", zmq_strerror(errno))}; - - // fMonitorSocket = zmq_socket(fContext.GetZmqContext(), ZMQ_PAIR); - // - // if (fMonitorSocket == nullptr) - // throw SocketError{tools::ToString("Failed creating zmq monitor socket ", fId, ", reason: ", zmq_strerror(errno))}; - // - // auto mon_addr = tools::ToString("inproc://", fId); - // if (zmq_socket_monitor(fControlSocket, mon_addr.c_str(), ZMQ_EVENT_ACCEPTED | ZMQ_EVENT_CONNECTED) < 0) - // throw SocketError{tools::ToString("Failed setting up monitor on meta socket, reason: ", zmq_strerror(errno))}; - // - // if (zmq_connect(fMonitorSocket, mon_addr.c_str()) != 0) - // throw SocketError{tools::ToString("Failed connecting monitor socket to meta socket, reason: ", zmq_strerror(errno))}; + // Setup internal queue + auto hashed_id = std::hash()(fId); + auto queue_id = tools::ToString("inproc://QUEUE", hashed_id); + LOG(debug) << "OFI transport (" << fId << "): " << "Binding Q1: " << queue_id; + fQueue1.bind(queue_id); + LOG(debug) << "OFI transport (" << fId << "): " << "Connecting Q2: " << queue_id; + fQueue2.connect(queue_id); + azmq::socket::snd_hwm send_max(100); + azmq::socket::rcv_hwm recv_max(100); + fQueue1.set_option(send_max); + fQueue1.set_option(recv_max); + fQueue2.set_option(send_max); + fQueue2.set_option(recv_max); + fControlEndpoint.set_option(send_max); + fControlEndpoint.set_option(recv_max); } } @@ -91,13 +83,15 @@ auto Socket::Bind(const string& address) -> bool try { auto addr = Context::VerifyAddress(address); - BindControlSocket(addr); + BindControlEndpoint(addr); // TODO make data port choice more robust addr.Port += 555; fLocalDataAddr = addr; BindDataEndpoint(); + boost::asio::post(fIoStrand, std::bind(&Socket::SendQueueReader, this)); + return true; } catch (const SilentSocketError& e) @@ -115,23 +109,24 @@ catch (const SocketError& e) auto Socket::Connect(const string& address) -> bool { auto addr = Context::VerifyAddress(address); + fRemoteDataAddr = addr; - ConnectControlSocket(addr); + ConnectControlEndpoint(addr); - ProcessControlMessage( - StaticUniquePtrDowncast(ReceiveControlMessage())); + ReceiveDataAddressAnnouncement(); ConnectDataEndpoint(); } -auto Socket::BindControlSocket(Context::Address address) -> void +auto Socket::BindControlEndpoint(Context::Address address) -> void { auto addr = tools::ToString("tcp://", address.Ip, ":", address.Port); - if (zmq_bind(fControlSocket, addr.c_str()) != 0) { - if (errno == EADDRINUSE) throw SilentSocketError("EADDRINUSE"); - throw SocketError(tools::ToString("Failed binding control socket ", fId, ", reason: ", zmq_strerror(errno))); - } + fControlEndpoint.bind(addr); + // if (zmq_bind(fControlSocket, addr.c_str()) != 0) { + // TODO if (errno == EADDRINUSE) throw SilentSocketError("EADDRINUSE"); + // throw SocketError(tools::ToString("Failed binding control socket ", fId, ", reason: ", zmq_strerror(errno))); + // } LOG(debug) << "OFI transport (" << fId << "): control band bound to " << address; } @@ -170,12 +165,13 @@ auto Socket::BindDataEndpoint() -> void LOG(debug) << "OFI transport (" << fId << "): data band connection accepted."; } -auto Socket::ConnectControlSocket(Context::Address address) -> void +auto Socket::ConnectControlEndpoint(Context::Address address) -> void { auto addr = tools::ToString("tcp://", address.Ip, ":", address.Port); - if (zmq_connect(fControlSocket, addr.c_str()) != 0) - throw SocketError(tools::ToString("Failed connecting control socket ", fId, ", reason: ", zmq_strerror(errno))); + fControlEndpoint.connect(addr); + + LOG(debug) << "OFI transport (" << fId << "): control band connected to " << address; } auto Socket::ConnectDataEndpoint() -> void @@ -191,8 +187,12 @@ auto Socket::ConnectDataEndpoint() -> void }); } -auto Socket::ProcessControlMessage(CtrlMsgPtr daa) -> void +auto Socket::ReceiveDataAddressAnnouncement() -> void { + azmq::message ctrl; + auto recv = fControlEndpoint.receive(ctrl); + assert(recv == sizeof(DataAddressAnnouncement)); (void)recv; + auto daa(static_cast(ctrl.data())); assert(daa->type == ControlMessageType::DataAddressAnnouncement); sockaddr_in remoteAddr; @@ -201,191 +201,130 @@ auto Socket::ProcessControlMessage(CtrlMsgPtr daa) -> v remoteAddr.sin_addr.s_addr = daa->ipv4; auto addr = Context::ConvertAddress(remoteAddr); + addr.Protocol = fRemoteDataAddr.Protocol; LOG(debug) << "OFI transport (" << fId << "): Data address announcement of remote endpoint received: " << addr; fRemoteDataAddr = addr; } auto Socket::AnnounceDataAddress() -> void -try { +{ // fLocalDataAddr = fDataEndpoint->get_local_address(); // LOG(debug) << "Address of local ofi endpoint in socket " << fId << ": " << Context::ConvertAddress(fLocalDataAddr); // Create new data address announcement message - auto daa = MakeControlMessage(&fCtrlMemPool); + auto daa = MakeControlMessage(); auto addr = Context::ConvertAddress(fLocalDataAddr); - daa->ipv4 = addr.sin_addr.s_addr; - daa->port = addr.sin_port; + daa.ipv4 = addr.sin_addr.s_addr; + daa.port = addr.sin_port; - SendControlMessage(StaticUniquePtrUpcast(std::move(daa))); + auto sent = fControlEndpoint.send(boost::asio::buffer(daa)); + assert(sent == sizeof(addr)); (void)sent; - LOG(debug) << "OFI transport (" << fId << "): data address announced."; -} catch (const SocketError& e) { - throw SocketError(tools::ToString("Failed to announce data address, reason: ", e.what())); + LOG(debug) << "OFI transport (" << fId << "): data band address " << fLocalDataAddr << " announced."; } -auto Socket::SendControlMessage(CtrlMsgPtr ctrl) -> void +auto Socket::Send(MessagePtr& msg, const int /*timeout*/) -> int { - assert(fControlSocket); - // LOG(debug) << "About to send control message: " << ctrl->DebugString(); + LOG(debug) << "OFI transport (" << fId << "): ENTER Send: size=" << msg->GetSize(); - // Serialize - struct ZmqMsg - { - zmq_msg_t msg; - ~ZmqMsg() { zmq_msg_close(&msg); } - operator zmq_msg_t*() { return &msg; } - } msg; + MessagePtr* msgptr(new std::unique_ptr(std::move(msg))); + try { + auto res = fQueue1.send(boost::asio::const_buffer(msgptr, sizeof(MessagePtr)), 0); - switch (ctrl->type) { - case ControlMessageType::DataAddressAnnouncement: - { - auto ret = zmq_msg_init_size(msg, sizeof(DataAddressAnnouncement)); - (void)ret; - assert(ret == 0); - std::memcpy(zmq_msg_data(msg), ctrl.get(), sizeof(DataAddressAnnouncement)); - } - break; - case ControlMessageType::PostBuffer: - { - auto ret = zmq_msg_init_size(msg, sizeof(PostBuffer)); - (void)ret; - assert(ret == 0); - std::memcpy(zmq_msg_data(msg), ctrl.get(), sizeof(PostBuffer)); - } - break; - default: - throw SocketError(tools::ToString("Cannot send control message of unknown type.")); - } - - // Send - if (zmq_msg_send(msg, fControlSocket, 0) == -1) { - throw SocketError( - tools::ToString("Failed to send control message, reason: ", zmq_strerror(errno))); + LOG(debug) << "OFI transport (" << fId << "): LEAVE Send"; + return res; + } catch (const std::exception& e) { + msg = std::move(*msgptr); + LOG(error) << e.what(); + return -1; + } catch (const boost::system::error_code& e) { + msg = std::move(*msgptr); + LOG(error) << e; + return -1; } } -auto Socket::ReceiveControlMessage() -> CtrlMsgPtr -{ - assert(fControlSocket); - - // Receive - struct ZmqMsg - { - zmq_msg_t msg; - ~ZmqMsg() { zmq_msg_close(&msg); } - operator zmq_msg_t*() { return &msg; } - } msg; - auto ret = zmq_msg_init(msg); - (void)ret; - assert(ret == 0); - if (zmq_msg_recv(msg, fControlSocket, 0) == -1) { - throw SocketError( - tools::ToString("Failed to receive control message, reason: ", zmq_strerror(errno))); - } - - // Deserialize and sanity check - const void* msg_data = zmq_msg_data(msg); - const size_t msg_size = zmq_msg_size(msg); - (void)msg_size; - assert(msg_size >= sizeof(ControlMessage)); - - switch (static_cast(msg_data)->type) { - case ControlMessageType::DataAddressAnnouncement: { - assert(msg_size == sizeof(DataAddressAnnouncement)); - auto daa = MakeControlMessage(&fCtrlMemPool); - std::memcpy(daa.get(), msg_data, sizeof(DataAddressAnnouncement)); - // LOG(debug) << "Received control message: " << ctrl->DebugString(); - return StaticUniquePtrUpcast(std::move(daa)); - } - case ControlMessageType::PostBuffer: { - assert(msg_size == sizeof(PostBuffer)); - auto pb = MakeControlMessage(&fCtrlMemPool); - std::memcpy(pb.get(), msg_data, sizeof(PostBuffer)); - // LOG(debug) << "Received control message: " << ctrl->DebugString(); - return StaticUniquePtrUpcast(std::move(pb)); - } - default: - throw SocketError(tools::ToString("Received control message of unknown type.")); - } -} - -auto Socket::Send(MessagePtr& msg, const int timeout) -> int { return SendImpl(msg, 0, timeout); } -auto Socket::Receive(MessagePtr& msg, const int timeout) -> int { return ReceiveImpl(msg, 0, timeout); } +auto Socket::Receive(MessagePtr& msg, const int timeout) -> int { return 0; /*ReceiveImpl(msg, 0, timeout);*/ } auto Socket::Send(std::vector& msgVec, const int timeout) -> int64_t { return SendImpl(msgVec, 0, timeout); } auto Socket::Receive(std::vector& msgVec, const int timeout) -> int64_t { return ReceiveImpl(msgVec, 0, timeout); } -auto Socket::SendImpl(FairMQMessagePtr& msg, const int /*flags*/, const int /*timeout*/) -> int -try { +auto Socket::SendQueueReader() -> void +{ + fQueue2.async_receive(boost::asio::bind_executor( + fIoStrand, + [&](const boost::system::error_code& ec, azmq::message& zmsg, size_t bytes_transferred) { + if (!ec) { + OnSend(zmsg, bytes_transferred); + } + })); +} + +auto Socket::OnSend(azmq::message& zmsg, size_t bytes_transferred) -> void +{ + LOG(debug) << "OFI transport (" << fId << "): ENTER OnSend: bytes_transferred=" << bytes_transferred; + + MessagePtr msg(std::move(*(static_cast(zmsg.buffer().data())))); auto size = msg->GetSize(); - // LOG(debug) << "OFI transport (" << fId << "): ENTER SendImpl"; + + LOG(debug) << "OFI transport (" << fId << "): >>>>> OnSend: size=" << size; // Create and send control message - auto pb = MakeControlMessage(&fCtrlMemPool); - pb->size = size; - SendControlMessage(StaticUniquePtrUpcast(std::move(pb))); - // LOG(debug) << "OFI transport (" << fId << "): >>>>> SendImpl: Control message sent, size=" << size; - // LOG(debug) << "OFI transport (" << fId << "): >>>>> SendImpl: msg->GetData()=" << msg->GetData() << ",msg->GetSize()=" << msg->GetSize(); + auto pb = MakeControlMessage(); + pb.size = size; + fControlEndpoint.async_send( + azmq::message(boost::asio::buffer(pb)), + [&, msg2 = std::move(msg)](const boost::system::error_code& ec, size_t bytes_transferred2) mutable { + if (!ec) { + OnControlMessageSent(bytes_transferred2, std::move(msg2)); + } + }); + + LOG(debug) << "OFI transport (" << fId << "): LEAVE OnSend"; +} + +auto Socket::OnControlMessageSent(size_t bytes_transferred, MessagePtr msg) -> void +{ + LOG(debug) << "OFI transport (" << fId << "): ENTER OnControlMessageSent: bytes_transferred=" << bytes_transferred; + assert(bytes_transferred == sizeof(PostBuffer)); + + auto size = msg->GetSize(); if (size) { // Receive ack - auto ack = StaticUniquePtrDowncast(ReceiveControlMessage()); - assert(ack.get()); - auto size_ack = ack->size; - assert(size == size_ack); - // LOG(debug) << "OFI transport (" << fId << "): >>>>> SendImpl: Control ack received, size_ack=" << size_ack; + // azmq::message ctrl; + // auto recv = fControlEndpoint.receive(ctrl); + // assert(recv == sizeof(PostBuffer)); + // (void)recv; + // auto ack(static_cast(ctrl.data())); + // assert(ack->type == ControlMessageType::PostBuffer); + // (void)ack; + // LOG(debug) << "OFI transport (" << fId << "): >>>>> SendImpl: Control ack + // received, size_ack=" << size_ack; boost::asio::mutable_buffer buffer(msg->GetData(), size); asiofi::memory_region mr(fContext.GetDomain(), buffer, asiofi::mr::access::send); - std::mutex m; - std::condition_variable cv; - bool completed(false); - - fDataEndpoint->send( - buffer, - mr.desc(), - [&](boost::asio::mutable_buffer) { - { - std::unique_lock lk(m); - completed = true; - } - cv.notify_one(); - // LOG(debug) << "OFI transport (" << fId << "): > SendImpl: Data buffer sent"; - } - ); - - { - std::unique_lock lk(m); - cv.wait(lk, [&](){ return completed; }); - } - // LOG(debug) << "OFI transport (" << fId << "): >>>>> SendImpl: Data send buffer posted"; + fDataEndpoint->send(buffer, mr.desc(), [&, mr2 = std::move(mr)](boost::asio::mutable_buffer) { + LOG(debug) << "OFI transport (" << fId << "): >>>>> Data buffer sent"; + fBytesTx += size; + fMessagesTx++; + }); } - msg.reset(nullptr); - fBytesTx += size; - fMessagesTx++; - - // LOG(debug) << "OFI transport (" << fId << "): LEAVE SendImpl"; - return size; -} -catch (const SilentSocketError& e) -{ - return -2; -} -catch (const std::exception& e) -{ - LOG(error) << e.what(); - return -1; + LOG(debug) << "OFI transport (" << fId << "): LEAVE OnControlMessageSent"; } auto Socket::ReceiveImpl(FairMQMessagePtr& msg, const int /*flags*/, const int /*timeout*/) -> int try { - // LOG(debug) << "OFI transport (" << fId << "): ENTER ReceiveImpl"; + LOG(debug) << "OFI transport (" << fId << "): ENTER ReceiveImpl"; // Receive and process control message - auto pb = StaticUniquePtrDowncast(ReceiveControlMessage()); - assert(pb.get()); + azmq::message ctrl; + auto recv = fControlEndpoint.receive(ctrl); + assert(recv == sizeof(PostBuffer)); (void)recv; + auto pb(static_cast(ctrl.data())); + assert(pb->type == ControlMessageType::PostBuffer); auto size = pb->size; - // LOG(debug) << "OFI transport (" << fId << "): <<<<< ReceiveImpl: Control message received, size=" << size; + LOG(debug) << "OFI transport (" << fId << "): <<<<< ReceiveImpl: Control message received, size=" << size; // Receive data if (size) { @@ -407,9 +346,10 @@ try { ); // LOG(debug) << "OFI transport (" << fId << "): <<<<< ReceiveImpl: Data buffer posted"; - auto ack = MakeControlMessage(&fCtrlMemPool); - ack->size = size; - SendControlMessage(StaticUniquePtrUpcast(std::move(ack))); + auto ack = MakeControlMessage(); + ack.size = size; + auto sent = fControlEndpoint.send(boost::asio::buffer(ack)); + assert(sent == sizeof(PostBuffer)); (void)sent; // LOG(debug) << "OFI transport (" << fId << "): <<<<< ReceiveImpl: Control Ack sent"; { @@ -606,113 +546,85 @@ auto Socket::ReceiveImpl(vector& /*msgVec*/, const int /*flags // } } -auto Socket::Close() -> void -{ - if (zmq_close(fControlSocket) != 0) - throw SocketError(tools::ToString("Failed closing zmq meta socket, reason: ", zmq_strerror(errno))); - - // if (zmq_close(fMonitorSocket) != 0) - // throw SocketError(tools::ToString("Failed closing zmq monitor socket, reason: ", zmq_strerror(errno))); -} +auto Socket::Close() -> void {} auto Socket::SetOption(const string& option, const void* value, size_t valueSize) -> void { - if (zmq_setsockopt(fControlSocket, GetConstant(option), value, valueSize) < 0) { - throw SocketError{tools::ToString("Failed setting socket option, reason: ", zmq_strerror(errno))}; - } + // if (zmq_setsockopt(fControlSocket, GetConstant(option), value, valueSize) < 0) { + // throw SocketError{tools::ToString("Failed setting socket option, reason: ", zmq_strerror(errno))}; + // } } auto Socket::GetOption(const string& option, void* value, size_t* valueSize) -> void { - if (zmq_getsockopt(fControlSocket, GetConstant(option), value, valueSize) < 0) { - throw SocketError{tools::ToString("Failed getting socket option, reason: ", zmq_strerror(errno))}; - } -} - -int Socket::GetLinger() const -{ - int value = 0; - size_t valueSize; - if (zmq_getsockopt(fControlSocket, ZMQ_LINGER, &value, &valueSize) < 0) { - throw SocketError(tools::ToString("failed getting ZMQ_LINGER, reason: ", zmq_strerror(errno))); - } - return value; + // if (zmq_getsockopt(fControlSocket, GetConstant(option), value, valueSize) < 0) { + // throw SocketError{tools::ToString("Failed getting socket option, reason: ", zmq_strerror(errno))}; + // } } void Socket::SetLinger(const int value) { - if (zmq_setsockopt(fControlSocket, ZMQ_LINGER, &value, sizeof(value)) < 0) { - throw SocketError(tools::ToString("failed setting ZMQ_LINGER, reason: ", zmq_strerror(errno))); - } + azmq::socket::linger opt(value); + fControlEndpoint.set_option(opt); } +int Socket::GetLinger() const +{ + azmq::socket::linger opt(0); + fControlEndpoint.get_option(opt); + return opt.value(); +} void Socket::SetSndBufSize(const int value) { - if (zmq_setsockopt(fControlSocket, ZMQ_SNDHWM, &value, sizeof(value)) < 0) { - throw SocketError(tools::ToString("failed setting ZMQ_SNDHWM, reason: ", zmq_strerror(errno))); - } + azmq::socket::snd_hwm opt(value); + fControlEndpoint.set_option(opt); } int Socket::GetSndBufSize() const { - int value = 0; - size_t valueSize; - if (zmq_getsockopt(fControlSocket, ZMQ_SNDHWM, &value, &valueSize) < 0) { - throw SocketError(tools::ToString("failed getting ZMQ_SNDHWM, reason: ", zmq_strerror(errno))); - } - return value; + azmq::socket::snd_hwm opt(0); + fControlEndpoint.get_option(opt); + return opt.value(); } void Socket::SetRcvBufSize(const int value) { - if (zmq_setsockopt(fControlSocket, ZMQ_RCVHWM, &value, sizeof(value)) < 0) { - throw SocketError(tools::ToString("failed setting ZMQ_RCVHWM, reason: ", zmq_strerror(errno))); - } + azmq::socket::rcv_hwm opt(value); + fControlEndpoint.set_option(opt); } int Socket::GetRcvBufSize() const { - int value = 0; - size_t valueSize; - if (zmq_getsockopt(fControlSocket, ZMQ_RCVHWM, &value, &valueSize) < 0) { - throw SocketError(tools::ToString("failed getting ZMQ_RCVHWM, reason: ", zmq_strerror(errno))); - } - return value; + azmq::socket::rcv_hwm opt(0); + fControlEndpoint.get_option(opt); + return opt.value(); } void Socket::SetSndKernelSize(const int value) { - if (zmq_setsockopt(fControlSocket, ZMQ_SNDBUF, &value, sizeof(value)) < 0) { - throw SocketError(tools::ToString("failed getting ZMQ_SNDBUF, reason: ", zmq_strerror(errno))); - } + azmq::socket::snd_buf opt(value); + fControlEndpoint.set_option(opt); } int Socket::GetSndKernelSize() const { - int value = 0; - size_t valueSize; - if (zmq_getsockopt(fControlSocket, ZMQ_SNDBUF, &value, &valueSize) < 0) { - throw SocketError(tools::ToString("failed getting ZMQ_SNDBUF, reason: ", zmq_strerror(errno))); - } - return value; + azmq::socket::snd_buf opt(0); + fControlEndpoint.get_option(opt); + return opt.value(); } void Socket::SetRcvKernelSize(const int value) { - if (zmq_setsockopt(fControlSocket, ZMQ_RCVBUF, &value, sizeof(value)) < 0) { - throw SocketError(tools::ToString("failed getting ZMQ_RCVBUF, reason: ", zmq_strerror(errno))); - } + azmq::socket::rcv_buf opt(value); + fControlEndpoint.set_option(opt); } int Socket::GetRcvKernelSize() const { - int value = 0; - size_t valueSize; - if (zmq_getsockopt(fControlSocket, ZMQ_RCVBUF, &value, &valueSize) < 0) { - throw SocketError(tools::ToString("failed getting ZMQ_RCVBUF, reason: ", zmq_strerror(errno))); - } - return value; + azmq::socket::rcv_buf opt(0); + fControlEndpoint.get_option(opt); + return opt.value(); } auto Socket::GetConstant(const string& constant) -> int diff --git a/fairmq/ofi/Socket.h b/fairmq/ofi/Socket.h index 1f1f76d1..3a3c4aeb 100644 --- a/fairmq/ofi/Socket.h +++ b/fairmq/ofi/Socket.h @@ -15,8 +15,8 @@ #include #include +#include #include -#include #include // unique_ptr #include class FairMQTransportFactory; @@ -51,7 +51,7 @@ class Socket final : public fair::mq::Socket auto Send(std::vector& msgVec, int timeout = 0) -> int64_t override; auto Receive(std::vector& msgVec, int timeout = 0) -> int64_t override; - auto GetSocket() const -> void* { return fControlSocket; } + auto GetSocket() const -> void* { return fControlEndpoint.native_handle(); } void SetLinger(const int value) override; int GetLinger() const override; @@ -79,8 +79,7 @@ class Socket final : public fair::mq::Socket ~Socket() override; private: - void* fControlSocket; - // void* fMonitorSocket; + Context& fContext; std::unique_ptr fPassiveDataEndpoint; std::unique_ptr fDataEndpoint; std::string fId; @@ -88,30 +87,28 @@ class Socket final : public fair::mq::Socket std::atomic fBytesRx; std::atomic fMessagesTx; std::atomic fMessagesRx; - Context& fContext; Context::Address fRemoteDataAddr; Context::Address fLocalDataAddr; - // bool fWaitingForControlPeer; boost::asio::io_service::strand fIoStrand; - boost::container::pmr::unsynchronized_pool_resource fCtrlMemPool; - + mutable azmq::socket fControlEndpoint; int fSndTimeout; int fRcvTimeout; + azmq::pair_socket fQueue1, fQueue2; - auto SendImpl(MessagePtr& msg, const int flags, const int timeout) -> int; + auto SendQueueReader() -> void; + auto OnSend(azmq::message& msg, size_t bytes_transferred) -> void; + auto OnControlMessageSent(size_t bytes_transferred, MessagePtr msg) -> void; auto ReceiveImpl(MessagePtr& msg, const int flags, const int timeout) -> int; auto SendImpl(std::vector& msgVec, const int flags, const int timeout) -> int64_t; auto ReceiveImpl(std::vector& msgVec, const int flags, const int timeout) -> int64_t; // auto WaitForControlPeer() -> void; auto AnnounceDataAddress() -> void; - auto SendControlMessage(CtrlMsgPtr ctrl) -> void; - auto ReceiveControlMessage() -> CtrlMsgPtr; - auto ProcessControlMessage(CtrlMsgPtr ctrl) -> void; - auto ConnectControlSocket(Context::Address address) -> void; - auto BindControlSocket(Context::Address address) -> void; + auto ConnectControlEndpoint(Context::Address address) -> void; + auto BindControlEndpoint(Context::Address address) -> void; auto BindDataEndpoint() -> void; auto ConnectDataEndpoint() -> void; + auto ReceiveDataAddressAnnouncement() -> void; }; /* class Socket */ struct SilentSocketError : SocketError { using SocketError::SocketError; }; diff --git a/fairmq/ofi/TransportFactory.cxx b/fairmq/ofi/TransportFactory.cxx index 8957f26f..694e706b 100644 --- a/fairmq/ofi/TransportFactory.cxx +++ b/fairmq/ofi/TransportFactory.cxx @@ -26,7 +26,7 @@ using namespace std; TransportFactory::TransportFactory(const string& id, const FairMQProgOptions* /*config*/) try : FairMQTransportFactory{id} { - LOG(debug) << "OFI transport: Using ZeroMQ (" << fContext.GetZmqVersion() << ") & " + LOG(debug) << "OFI transport: Using AZMQ & " << "asiofi (" << fContext.GetAsiofiVersion() << ")"; } catch (ContextError& e) @@ -65,19 +65,19 @@ auto TransportFactory::CreateSocket(const string& type, const string& name) -> S return SocketPtr{new Socket(fContext, type, name, GetId(), this)}; } -auto TransportFactory::CreatePoller(const vector& channels) const -> PollerPtr +auto TransportFactory::CreatePoller(const vector& /*channels*/) const -> PollerPtr { throw runtime_error{"Not yet implemented (Poller)."}; // return PollerPtr{new Poller(channels)}; } -auto TransportFactory::CreatePoller(const vector& channels) const -> PollerPtr +auto TransportFactory::CreatePoller(const vector& /*channels*/) const -> PollerPtr { throw runtime_error{"Not yet implemented (Poller)."}; // return PollerPtr{new Poller(channels)}; } -auto TransportFactory::CreatePoller(const unordered_map>& channelsMap, const vector& channelList) const -> PollerPtr +auto TransportFactory::CreatePoller(const unordered_map>& /*channelsMap*/, const vector& /*channelList*/) const -> PollerPtr { throw runtime_error{"Not yet implemented (Poller)."}; // return PollerPtr{new Poller(channelsMap, channelList)};