From c5072ea4253deba75a8f1bcc49be2826249ad102 Mon Sep 17 00:00:00 2001 From: Dennis Klein Date: Thu, 8 Mar 2018 03:07:26 +0100 Subject: [PATCH] FairMQ: Fix missing ofi completion events --- fairmq/ofi/Context.cxx | 31 ++++++++++---------- fairmq/ofi/Context.h | 3 +- fairmq/ofi/Socket.cxx | 31 +++++++++++++------- fairmq/test/helper/devices/TestPairLeft.cxx | 14 +++++---- fairmq/test/helper/devices/TestPairRight.cxx | 7 +++-- 5 files changed, 53 insertions(+), 33 deletions(-) diff --git a/fairmq/ofi/Context.cxx b/fairmq/ofi/Context.cxx index 61d11d0f..e74ee18a 100644 --- a/fairmq/ofi/Context.cxx +++ b/fairmq/ofi/Context.cxx @@ -122,15 +122,12 @@ auto Context::GetBoostVersion() const -> std::string return tools::ToString(BOOST_VERSION / 100000, ".", BOOST_VERSION / 100 % 1000, ".", BOOST_VERSION % 100); } -auto Context::InitOfi(ConnectionType type, std::string addr) -> void +auto Context::InitOfi(ConnectionType type, Address addr) -> void { - auto addr2 = ConvertAddress(addr); - if (addr2.Protocol != "tcp") - throw ContextError{"Wrong protocol: Supplied address must be in format tcp://ip:port"}; - if (!fOfiInfo) { sockaddr_in* sa = static_cast(malloc(sizeof(sockaddr_in))); - auto sa2 = ConvertAddress(addr2); + addr.Port = 0; + auto sa2 = ConvertAddress(addr); memcpy(sa, &sa2, sizeof(sockaddr_in)); // Prepare fi_getinfo query @@ -143,16 +140,15 @@ auto Context::InitOfi(ConnectionType type, std::string addr) -> void ofi_hints->domain_attr->threading = FI_THREAD_SAFE; ofi_hints->domain_attr->control_progress = FI_PROGRESS_AUTO; ofi_hints->domain_attr->data_progress = FI_PROGRESS_AUTO; - // if (type == ConnectionType::Bind) { - // ofi_hints->src_addr = sa; - // ofi_hints->src_addrlen = sizeof(sockaddr_in); - // } else { - // ofi_hints->dest_addr = sa; - // ofi_hints->dest_addrlen = sizeof(sockaddr_in); - // } + ofi_hints->tx_attr->op_flags = FI_COMPLETION; + ofi_hints->rx_attr->op_flags = FI_COMPLETION; + ofi_hints->src_addr = sa; + ofi_hints->src_addrlen = sizeof(sockaddr_in); + ofi_hints->dest_addr = nullptr; + ofi_hints->dest_addrlen = 0; // Query fi_getinfo for fabric to use - auto res = fi_getinfo(FI_VERSION(1, 5), strdup(addr2.Ip.c_str()), 0, 0, ofi_hints.get(), &fOfiInfo); + auto res = fi_getinfo(FI_VERSION(1, 5), nullptr, nullptr, 0, ofi_hints.get(), &fOfiInfo); if (res != 0) throw ContextError{tools::ToString("Failed querying fi_getinfo, reason: ", fi_strerror(res))}; if (!fOfiInfo) throw ContextError{"Could not find any ofi compatible fabric."}; @@ -286,7 +282,12 @@ auto Context::InsertAddressVector(sockaddr_in address) -> fi_addr_t if (ret != 1) throw ContextError{tools::ToString("Failed to insert address into ofi address vector")}; - return ret; + return mappedAddress; +} + +auto Context::AddressVectorLookup(fi_addr_t address) -> sockaddr_in +{ + throw ContextError("Not yet implemented"); } auto Context::ConvertAddress(std::string address) -> Address diff --git a/fairmq/ofi/Context.h b/fairmq/ofi/Context.h index 6e222e94..a744c3c6 100644 --- a/fairmq/ofi/Context.h +++ b/fairmq/ofi/Context.h @@ -41,7 +41,6 @@ class Context Context(int numberIoThreads = 2); ~Context(); - auto InitOfi(ConnectionType type, std::string address) -> void; auto CreateOfiEndpoint() -> fid_ep*; auto CreateOfiCompletionQueue(Direction dir) -> fid_cq*; auto GetZmqVersion() const -> std::string; @@ -51,12 +50,14 @@ class Context auto GetZmqContext() const -> void* { return fZmqContext; } auto GetIoContext() -> boost::asio::io_service& { return fIoContext; } auto InsertAddressVector(sockaddr_in address) -> fi_addr_t; + auto AddressVectorLookup(fi_addr_t address) -> sockaddr_in; struct Address { std::string Protocol; std::string Ip; unsigned int Port; friend auto operator<<(std::ostream& os, const Address& a) -> std::ostream& { return os << a.Protocol << "://" << a.Ip << ":" << a.Port; } }; + auto InitOfi(ConnectionType type, Address address) -> void; static auto ConvertAddress(std::string address) -> Address; static auto ConvertAddress(Address address) -> sockaddr_in; static auto ConvertAddress(sockaddr_in address) -> Address; diff --git a/fairmq/ofi/Socket.cxx b/fairmq/ofi/Socket.cxx index 7e55b88a..3093574f 100644 --- a/fairmq/ofi/Socket.cxx +++ b/fairmq/ofi/Socket.cxx @@ -85,7 +85,7 @@ auto Socket::Bind(const string& address) -> bool try { auto addr = Context::VerifyAddress(address); BindControlSocket(addr); - fContext.InitOfi(ConnectionType::Bind, address); + fContext.InitOfi(ConnectionType::Bind, addr); InitDataEndpoint(); fWaitingForControlPeer = true; return true; @@ -106,7 +106,7 @@ auto Socket::Connect(const string& address) -> void { auto addr = Context::VerifyAddress(address); ConnectControlSocket(addr); - fContext.InitOfi(ConnectionType::Connect, address); + fContext.InitOfi(ConnectionType::Connect, addr); InitDataEndpoint(); fWaitingForControlPeer = true; } @@ -207,7 +207,7 @@ try { auto Socket::SendControlMessage(unique_ptr ctrl) -> void { assert(fControlSocket); - LOG(debug) << "About to send control message: " << ctrl->DebugString(); + // LOG(debug) << "About to send control message: " << ctrl->DebugString(); // Serialize string* str = new string(); @@ -217,8 +217,10 @@ auto Socket::SendControlMessage(unique_ptr ctrl) -> void assert(ret == 0); // Send - if (zmq_msg_send(&msg, fControlSocket, 0) == -1) + if (zmq_msg_send(&msg, fControlSocket, 0) == -1) { + zmq_msg_close(&msg); throw SocketError(tools::ToString("Failed to send control message, reason: ", zmq_strerror(errno))); + } } auto Socket::ReceiveControlMessage() -> unique_ptr @@ -229,14 +231,17 @@ auto Socket::ReceiveControlMessage() -> unique_ptr zmq_msg_t msg; auto ret = zmq_msg_init(&msg); assert(ret == 0); - if (zmq_msg_recv(&msg, fControlSocket, 0) == -1) + if (zmq_msg_recv(&msg, fControlSocket, 0) == -1) { + zmq_msg_close(&msg); throw SocketError(tools::ToString("Failed to receive control message, reason: ", zmq_strerror(errno))); + } // Deserialize auto ctrl = tools::make_unique(); ctrl->ParseFromArray(zmq_msg_data(&msg), zmq_msg_size(&msg)); - LOG(debug) << "Received control message: " << ctrl->DebugString(); + zmq_msg_close(&msg); + // LOG(debug) << "Received control message: " << ctrl->DebugString(); return ctrl; } @@ -269,6 +274,9 @@ auto Socket::WaitForControlPeer() -> void string remoteIp(inet_ntoa(remoteAddr.sin_addr)); int remotePort = ntohs(remoteAddr.sin_port); LOG(debug) << "Accepted control peer connection from " << remoteIp << ":" << remotePort; + + // sucks, but the above event does not guarantee the socket is operational ... + std::this_thread::sleep_for(std::chrono::milliseconds(200)); } else if (event == ZMQ_EVENT_CONNECTED) { LOG(debug) << "Connected successfully to control peer"; } else { @@ -318,7 +326,7 @@ try { throw SocketError(tools::ToString("Failed posting ofi send buffer, reason: ", fi_strerror(ret))); fi_cq_err_entry cqEntry; - ret = fi_cq_sread(fDataCompletionQueueTx, &cqEntry, 1, nullptr, 1000); + ret = fi_cq_sread(fDataCompletionQueueTx, &cqEntry, 1, nullptr, -1); if (ret != 1) throw SocketError(tools::ToString("Failed reading ofi tx completion queue event, reason: ", fi_strerror(ret))); } @@ -350,12 +358,13 @@ try { assert(ctrl->has_post_buffer()); auto postBuffer = ctrl->post_buffer(); auto size = postBuffer.size(); - LOG(debug) << "Received post buffer control message with size: " << size; // Receive data if (size) { msg->Rebuild(size); - auto ret = fi_recv(fDataEndpoint, msg->GetData(), msg->GetSize(), nullptr, fRemoteDataAddr, nullptr); + auto buf = msg->GetData(); + auto size2 = msg->GetSize(); + auto ret = fi_recv(fDataEndpoint, buf, size2, nullptr, fRemoteDataAddr, nullptr); if (ret != FI_SUCCESS) throw SocketError(tools::ToString("Failed posting ofi receive buffer, reason: ", fi_strerror(ret))); @@ -368,9 +377,11 @@ try { SendControlMessage(move(ctrl2)); fi_cq_err_entry cqEntry; - ret = fi_cq_sread(fDataCompletionQueueRx, &cqEntry, 1, nullptr, 1000); + ret = fi_cq_sread(fDataCompletionQueueRx, &cqEntry, 1, nullptr, -1); if (ret != 1) throw SocketError(tools::ToString("Failed reading ofi rx completion queue event, reason: ", fi_strerror(ret))); + assert(cqEntry.len == size2); + assert(cqEntry.buf == buf); } return size; diff --git a/fairmq/test/helper/devices/TestPairLeft.cxx b/fairmq/test/helper/devices/TestPairLeft.cxx index 680a164a..3ccd29d6 100644 --- a/fairmq/test/helper/devices/TestPairLeft.cxx +++ b/fairmq/test/helper/devices/TestPairLeft.cxx @@ -45,13 +45,17 @@ class PairLeft : public FairMQDevice // Simple message with short text data auto msg5{NewSimpleMessageFor("data", 0, "testdata1234")}; - LOG(info) << "Will send msg5"; if (Send(msg5, "data") >= 0) counter++; - LOG(info) << "Sent msg5"; - if (counter == 5) LOG(info) << "Simple message with short text data successfull"; + auto msg6{NewMessageFor("data", 0)}; + auto ret = Receive(msg6, "data"); + if (ret > 0) { + auto content = std::string{static_cast(msg6->GetData()), msg6->GetSize()}; + LOG(info) << ret << ", " << msg6->GetSize() << ", '" << content << "'"; + if (msg6->GetSize() == ret && content == "testdata1234") counter++; + } + if (counter == 6) LOG(info) << "Simple message with short text data successfull"; - std::this_thread::sleep_for(std::chrono::milliseconds(1000)); - assert(counter == 5); + assert(counter == 6); }; }; diff --git a/fairmq/test/helper/devices/TestPairRight.cxx b/fairmq/test/helper/devices/TestPairRight.cxx index 121d5ef9..161d7166 100644 --- a/fairmq/test/helper/devices/TestPairRight.cxx +++ b/fairmq/test/helper/devices/TestPairRight.cxx @@ -52,9 +52,12 @@ class PairRight : public FairMQDevice LOG(info) << ret << ", " << msg5->GetSize() << ", '" << content << "'"; if (msg5->GetSize() == ret && content == "testdata1234") counter++; } - if (counter == 5) LOG(info) << "Simple message with short text data successfull"; + auto msg6{NewSimpleMessageFor("data", 0, "testdata1234")}; + if (Send(msg6, "data") >= 0) counter++; + if (counter == 6) LOG(info) << "Simple message with short text data successfull"; - if (counter == 5) LOG(info) << "PAIR test successfull."; + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + if (counter == 6) LOG(info) << "PAIR test successfull."; }; };