From 811fe50a48dfec431e956452f58e67046bbb9d8c Mon Sep 17 00:00:00 2001 From: Dennis Klein Date: Thu, 8 Mar 2018 23:21:54 +0100 Subject: [PATCH] FairMQ: Enable ofi/verbs provider --- fairmq/FairMQChannel.cxx | 12 +++++++ fairmq/ofi/Context.cxx | 75 +++++++++++++++++++++++++--------------- fairmq/ofi/Socket.cxx | 6 ++-- 3 files changed, 64 insertions(+), 29 deletions(-) diff --git a/fairmq/FairMQChannel.cxx b/fairmq/FairMQChannel.cxx index c39a91e6..c50332ad 100644 --- a/fairmq/FairMQChannel.cxx +++ b/fairmq/FairMQChannel.cxx @@ -563,6 +563,18 @@ bool FairMQChannel::ValidateChannel() return false; } } + else if (address.compare(0, 8, "verbs://") == 0) + { + // check if IPC address is not empty + string addressString = address.substr(9); + if (addressString == "") + { + ss << "INVALID"; + LOG(debug) << ss.str(); + LOG(error) << "invalid channel address: \"" << address << "\" (empty verbs address?)"; + return false; + } + } else { // if neither TCP or IPC is specified, return invalid diff --git a/fairmq/ofi/Context.cxx b/fairmq/ofi/Context.cxx index e74ee18a..f7d6b705 100644 --- a/fairmq/ofi/Context.cxx +++ b/fairmq/ofi/Context.cxx @@ -108,8 +108,10 @@ auto Context::GetZmqVersion() const -> string auto Context::GetOfiApiVersion() const -> string { - auto ofi_version{fi_version()}; - return tools::ToString(FI_MAJOR(ofi_version), ".", FI_MINOR(ofi_version)); + // Disable for now, does not compile with gcc 4.9.2 debian jessie + //auto ofi_version{fi_version()}; + //return tools::ToString(FI_MAJOR(ofi_version), ".", FI_MINOR(ofi_version)); + return "unknown"; } auto Context::GetPbVersion() const -> string @@ -133,19 +135,31 @@ auto Context::InitOfi(ConnectionType type, Address addr) -> void // Prepare fi_getinfo query unique_ptr ofi_hints(fi_allocinfo(), fi_freeinfo); ofi_hints->caps = FI_MSG | FI_RMA; - ofi_hints->mode = FI_ASYNC_IOV; + ofi_hints->mode = FI_CONTEXT; ofi_hints->addr_format = FI_SOCKADDR_IN; - ofi_hints->fabric_attr->prov_name = strdup("sockets"); + if (addr.Protocol == "tcp") { + ofi_hints->fabric_attr->prov_name = strdup("sockets"); + } else if (addr.Protocol == "verbs") { + ofi_hints->fabric_attr->prov_name = strdup("verbs"); + } ofi_hints->ep_attr->type = FI_EP_RDM; + ofi_hints->domain_attr->mr_mode = ~0; ofi_hints->domain_attr->threading = FI_THREAD_SAFE; ofi_hints->domain_attr->control_progress = FI_PROGRESS_AUTO; ofi_hints->domain_attr->data_progress = FI_PROGRESS_AUTO; ofi_hints->tx_attr->op_flags = FI_COMPLETION; ofi_hints->rx_attr->op_flags = FI_COMPLETION; - ofi_hints->src_addr = sa; - ofi_hints->src_addrlen = sizeof(sockaddr_in); - ofi_hints->dest_addr = nullptr; - ofi_hints->dest_addrlen = 0; + if (type == ConnectionType::Bind) { + ofi_hints->src_addr = sa; + ofi_hints->src_addrlen = sizeof(sockaddr_in); + ofi_hints->dest_addr = nullptr; + ofi_hints->dest_addrlen = 0; + } else { + ofi_hints->src_addr = nullptr; + ofi_hints->src_addrlen = 0; + ofi_hints->dest_addr = sa; + ofi_hints->dest_addrlen = sizeof(sockaddr_in); + } // Query fi_getinfo for fabric to use auto res = fi_getinfo(FI_VERSION(1, 5), nullptr, nullptr, 0, ofi_hints.get(), &fOfiInfo); @@ -153,7 +167,7 @@ auto Context::InitOfi(ConnectionType type, Address addr) -> void if (!fOfiInfo) throw ContextError{"Could not find any ofi compatible fabric."}; // for(auto cursor{ofi_info}; cursor->next != nullptr; cursor = cursor->next) { - // LOG(debug) << fi_tostr(fOfiInfo, FI_TYPE_INFO); + // LOG(debug) << fi_tostr(fOfiInfo, FI_TYPE_INFO); // } // } else { @@ -161,7 +175,7 @@ auto Context::InitOfi(ConnectionType type, Address addr) -> void } OpenOfiFabric(); - OpenOfiEventQueue(); +// OpenOfiEventQueue(); OpenOfiDomain(); OpenOfiAddressVector(); } @@ -170,7 +184,8 @@ auto Context::OpenOfiFabric() -> void { if (!fOfiFabric) { assert(fOfiInfo); - auto ret = fi_fabric(fOfiInfo->fabric_attr, &fOfiFabric, nullptr); + fi_context ctx; + auto ret = fi_fabric(fOfiInfo->fabric_attr, &fOfiFabric, &ctx); if (ret != FI_SUCCESS) throw ContextError{tools::ToString("Failed opening ofi fabric, reason: ", fi_strerror(ret))}; } else { @@ -185,7 +200,8 @@ auto Context::OpenOfiDomain() -> void if (!fOfiDomain) { assert(fOfiInfo); assert(fOfiFabric); - auto ret = fi_domain(fOfiFabric, fOfiInfo, &fOfiDomain, nullptr); + fi_context ctx; + auto ret = fi_domain(fOfiFabric, fOfiInfo, &fOfiDomain, &ctx); if (ret != FI_SUCCESS) throw ContextError{tools::ToString("Failed opening ofi domain, reason: ", fi_strerror(ret))}; } else { @@ -201,7 +217,8 @@ auto Context::OpenOfiEventQueue() -> void // enum fi_wait_obj wait_obj; [> requested wait object <] // int signaling_vector; [> interrupt affinity <] // struct fid_wait *wait_set; [> optional wait set <] - auto ret = fi_eq_open(fOfiFabric, &eqAttr, &fOfiEventQueue, nullptr); + fi_context ctx; + auto ret = fi_eq_open(fOfiFabric, &eqAttr, &fOfiEventQueue, &ctx); if (ret != FI_SUCCESS) throw ContextError{tools::ToString("Failed opening ofi event queue, reason: ", fi_strerror(ret))}; } @@ -218,14 +235,15 @@ auto Context::OpenOfiAddressVector() -> void // const char *name; [> system name of AV <] // void *map_addr; [> base mmap address <] // uint64_t flags; [> operation flags <] - auto ret = fi_av_open(fOfiDomain, &attr, &fOfiAddressVector, nullptr); + fi_context ctx; + auto ret = fi_av_open(fOfiDomain, &attr, &fOfiAddressVector, &ctx); if (ret != FI_SUCCESS) throw ContextError{tools::ToString("Failed opening ofi address vector, reason: ", fi_strerror(ret))}; - assert(fOfiEventQueue); - ret = fi_av_bind(fOfiAddressVector, &fOfiEventQueue->fid, 0); - if (ret != FI_SUCCESS) - throw ContextError{tools::ToString("Failed binding ofi event queue to address vector, reason: ", fi_strerror(ret))}; + //assert(fOfiEventQueue); + //ret = fi_av_bind(fOfiAddressVector, &fOfiEventQueue->fid, 0); + //if (ret != FI_SUCCESS) + // throw ContextError{tools::ToString("Failed binding ofi event queue to address vector, reason: ", fi_strerror(ret))}; } else { LOG(debug) << "Ofi address vector already opened. Skipping."; } @@ -236,14 +254,15 @@ auto Context::CreateOfiEndpoint() -> fid_ep* assert(fOfiDomain); assert(fOfiInfo); fid_ep* ep = nullptr; - auto ret = fi_endpoint(fOfiDomain, fOfiInfo, &ep, nullptr); + fi_context ctx; + auto ret = fi_endpoint(fOfiDomain, fOfiInfo, &ep, &ctx); if (ret != FI_SUCCESS) throw ContextError{tools::ToString("Failed creating ofi endpoint, reason: ", fi_strerror(ret))}; - assert(fOfiEventQueue); - ret = fi_ep_bind(ep, &fOfiEventQueue->fid, 0); - if (ret != FI_SUCCESS) - throw ContextError{tools::ToString("Failed binding ofi address vector to ofi endpoint, reason: ", fi_strerror(ret))}; + //assert(fOfiEventQueue); + //ret = fi_ep_bind(ep, &fOfiEventQueue->fid, 0); + //if (ret != FI_SUCCESS) + // throw ContextError{tools::ToString("Failed binding ofi event queue to ofi endpoint, reason: ", fi_strerror(ret))}; assert(fOfiAddressVector); ret = fi_ep_bind(ep, &fOfiAddressVector->fid, 0); @@ -269,7 +288,8 @@ auto Context::CreateOfiCompletionQueue(Direction dir) -> fid_cq* // int signaling_vector; [> interrupt affinity <] // enum fi_cq_wait_cond wait_cond; [> wait condition format <] // struct fid_wait *wait_set; [> optional wait set <] - auto ret = fi_cq_open(fOfiDomain, &attr, &cq, nullptr); + fi_context ctx; + auto ret = fi_cq_open(fOfiDomain, &attr, &cq, &ctx); if (ret != FI_SUCCESS) throw ContextError{tools::ToString("Failed creating ofi completion queue, reason: ", fi_strerror(ret))}; return cq; @@ -278,7 +298,8 @@ auto Context::CreateOfiCompletionQueue(Direction dir) -> fid_cq* auto Context::InsertAddressVector(sockaddr_in address) -> fi_addr_t { fi_addr_t mappedAddress; - auto ret = fi_av_insert(fOfiAddressVector, &address, 1, &mappedAddress, 0, nullptr); + fi_context ctx; + auto ret = fi_av_insert(fOfiAddressVector, &address, 1, &mappedAddress, 0, &ctx); if (ret != 1) throw ContextError{tools::ToString("Failed to insert address into ofi address vector")}; @@ -328,8 +349,8 @@ auto Context::VerifyAddress(const std::string& address) -> Address { auto addr = ConvertAddress(address); - if (addr.Protocol != "tcp") - throw ContextError("Wrong protocol: Supported protocols are: tcp"); + if (!(addr.Protocol == "tcp" || addr.Protocol == "verbs")) + throw ContextError("Wrong protocol: Supported protocols are: tcp:// and verbs://"); return addr; } diff --git a/fairmq/ofi/Socket.cxx b/fairmq/ofi/Socket.cxx index 1701a204..086f24b6 100644 --- a/fairmq/ofi/Socket.cxx +++ b/fairmq/ofi/Socket.cxx @@ -323,7 +323,8 @@ try { // assert(ctrl2->post_buffer_acknowledgement().size() == size); // Send data - auto ret = fi_send(fDataEndpoint, msg->GetData(), size, nullptr, fRemoteDataAddr, nullptr); + fi_context ctx; + auto ret = fi_send(fDataEndpoint, msg->GetData(), size, nullptr, fRemoteDataAddr, &ctx); if (ret != FI_SUCCESS) throw SocketError(tools::ToString("Failed posting ofi send buffer, reason: ", fi_strerror(ret))); @@ -365,10 +366,11 @@ try { // Receive data if (size) { + fi_context ctx; msg->Rebuild(size); auto buf = msg->GetData(); auto size2 = msg->GetSize(); - auto ret = fi_recv(fDataEndpoint, buf, size2, nullptr, fRemoteDataAddr, nullptr); + auto ret = fi_recv(fDataEndpoint, buf, size2, nullptr, fRemoteDataAddr, &ctx); if (ret != FI_SUCCESS) throw SocketError(tools::ToString("Failed posting ofi receive buffer, reason: ", fi_strerror(ret)));