mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-15 09:31:45 +00:00
FairMQ: Implement ofi address exchange
Control messages are encoded with protobuf.
This commit is contained in:
committed by
Mohammad Al-Turany
parent
df5d5d4086
commit
5b3a5b9709
@@ -12,6 +12,7 @@
|
||||
|
||||
#include <arpa/inet.h>
|
||||
#include <cstring>
|
||||
#include <google/protobuf/stubs/common.h>
|
||||
#include <memory>
|
||||
#include <netinet/in.h>
|
||||
#include <rdma/fabric.h>
|
||||
@@ -36,10 +37,15 @@ using namespace std;
|
||||
Context::Context(int numberIoThreads)
|
||||
: fOfiDomain(nullptr)
|
||||
, fOfiFabric(nullptr)
|
||||
, fOfiInfo(nullptr)
|
||||
, fOfiAddressVector(nullptr)
|
||||
, fOfiEventQueue(nullptr)
|
||||
, fZmqContext(zmq_ctx_new())
|
||||
{
|
||||
if (!fZmqContext)
|
||||
throw ContextError{tools::ToString("Failed creating zmq context, reason: ", zmq_strerror(errno))};
|
||||
|
||||
GOOGLE_PROTOBUF_VERIFY_VERSION;
|
||||
}
|
||||
|
||||
Context::~Context()
|
||||
@@ -48,6 +54,12 @@ Context::~Context()
|
||||
LOG(error) << "Failed closing zmq context, reason: " << zmq_strerror(errno);
|
||||
}
|
||||
|
||||
if (fOfiEventQueue) {
|
||||
auto ret = fi_close(&fOfiEventQueue->fid);
|
||||
if (ret != FI_SUCCESS)
|
||||
LOG(error) << "Failed closing ofi event queue, reason: " << fi_strerror(ret);
|
||||
}
|
||||
|
||||
if (fOfiAddressVector) {
|
||||
auto ret = fi_close(&fOfiAddressVector->fid);
|
||||
if (ret != FI_SUCCESS)
|
||||
@@ -67,19 +79,24 @@ Context::~Context()
|
||||
}
|
||||
}
|
||||
|
||||
auto Context::GetZmqVersion() const -> std::string
|
||||
auto Context::GetZmqVersion() const -> string
|
||||
{
|
||||
int major, minor, patch;
|
||||
zmq_version(&major, &minor, &patch);
|
||||
return tools::ToString(major, ".", minor, ".", patch);
|
||||
}
|
||||
|
||||
auto Context::GetOfiApiVersion() const -> std::string
|
||||
auto Context::GetOfiApiVersion() const -> string
|
||||
{
|
||||
auto ofi_version{fi_version()};
|
||||
return tools::ToString(FI_MAJOR(ofi_version), ".", FI_MINOR(ofi_version));
|
||||
}
|
||||
|
||||
auto Context::GetPbVersion() const -> string
|
||||
{
|
||||
return google::protobuf::internal::VersionString(GOOGLE_PROTOBUF_VERSION);
|
||||
}
|
||||
|
||||
auto Context::InitOfi(ConnectionType type, std::string addr) -> void
|
||||
{
|
||||
auto addr2 = ConvertAddress(addr);
|
||||
@@ -93,7 +110,7 @@ auto Context::InitOfi(ConnectionType type, std::string addr) -> void
|
||||
|
||||
// Prepare fi_getinfo query
|
||||
unique_ptr<fi_info, void(*)(fi_info*)> ofi_hints(fi_allocinfo(), fi_freeinfo);
|
||||
ofi_hints->caps = FI_MSG | FI_SOURCE;
|
||||
ofi_hints->caps = FI_MSG | FI_RMA;
|
||||
ofi_hints->mode = FI_ASYNC_IOV;
|
||||
ofi_hints->addr_format = FI_SOCKADDR_IN;
|
||||
ofi_hints->fabric_attr->prov_name = strdup("sockets");
|
||||
@@ -105,17 +122,17 @@ auto Context::InitOfi(ConnectionType type, std::string addr) -> void
|
||||
// ofi_hints->src_addr = sa;
|
||||
// ofi_hints->src_addrlen = sizeof(sockaddr_in);
|
||||
// } else {
|
||||
ofi_hints->dest_addr = sa;
|
||||
ofi_hints->dest_addrlen = sizeof(sockaddr_in);
|
||||
// ofi_hints->dest_addr = sa;
|
||||
// ofi_hints->dest_addrlen = sizeof(sockaddr_in);
|
||||
// }
|
||||
|
||||
// Query fi_getinfo for fabric to use
|
||||
auto res = fi_getinfo(FI_VERSION(1, 5), nullptr, nullptr, 0, ofi_hints.get(), &fOfiInfo);
|
||||
auto res = fi_getinfo(FI_VERSION(1, 5), strdup(addr2.Ip.c_str()), 0, 0, ofi_hints.get(), &fOfiInfo);
|
||||
if (res != 0) throw ContextError{tools::ToString("Failed querying fi_getinfo, reason: ", fi_strerror(res))};
|
||||
if (!fOfiInfo) throw ContextError{"Could not find any ofi compatible fabric."};
|
||||
|
||||
// for(auto cursor{ofi_info}; cursor->next != nullptr; cursor = cursor->next) {
|
||||
LOG(debug) << fi_tostr(fOfiInfo, FI_TYPE_INFO);
|
||||
// LOG(debug) << fi_tostr(fOfiInfo, FI_TYPE_INFO);
|
||||
// }
|
||||
//
|
||||
} else {
|
||||
@@ -123,6 +140,7 @@ auto Context::InitOfi(ConnectionType type, std::string addr) -> void
|
||||
}
|
||||
|
||||
OpenOfiFabric();
|
||||
OpenOfiEventQueue();
|
||||
OpenOfiDomain();
|
||||
OpenOfiAddressVector();
|
||||
}
|
||||
@@ -154,23 +172,39 @@ auto Context::OpenOfiDomain() -> void
|
||||
}
|
||||
}
|
||||
|
||||
auto Context::OpenOfiEventQueue() -> void
|
||||
{
|
||||
fi_eq_attr eqAttr = {100, 0, FI_WAIT_UNSPEC, 0, nullptr};
|
||||
// size_t size; [> # entries for EQ <]
|
||||
// uint64_t flags; [> operation flags <]
|
||||
// enum fi_wait_obj wait_obj; [> requested wait object <]
|
||||
// int signaling_vector; [> interrupt affinity <]
|
||||
// struct fid_wait *wait_set; [> optional wait set <]
|
||||
auto ret = fi_eq_open(fOfiFabric, &eqAttr, &fOfiEventQueue, nullptr);
|
||||
if (ret != FI_SUCCESS)
|
||||
throw ContextError{tools::ToString("Failed opening ofi event queue, reason: ", fi_strerror(ret))};
|
||||
}
|
||||
|
||||
auto Context::OpenOfiAddressVector() -> void
|
||||
{
|
||||
if (!fOfiAddressVector) {
|
||||
assert(fOfiDomain);
|
||||
fi_av_attr attr = {fOfiInfo->domain_attr->av_type, 0, 1000, 0, nullptr, nullptr, 0};
|
||||
// struct fi_av_attr {
|
||||
// enum fi_av_type type; [> type of AV <]
|
||||
// int rx_ctx_bits; [> address bits to identify rx ctx <]
|
||||
// size_t count; [> # entries for AV <]
|
||||
// size_t ep_per_node; [> # endpoints per fabric address <]
|
||||
// const char *name; [> system name of AV <]
|
||||
// void *map_addr; [> base mmap address <]
|
||||
// uint64_t flags; [> operation flags <]
|
||||
// };
|
||||
// enum fi_av_type type; [> type of AV <]
|
||||
// int rx_ctx_bits; [> address bits to identify rx ctx <]
|
||||
// size_t count; [> # entries for AV <]
|
||||
// size_t ep_per_node; [> # endpoints per fabric address <]
|
||||
// const char *name; [> system name of AV <]
|
||||
// void *map_addr; [> base mmap address <]
|
||||
// uint64_t flags; [> operation flags <]
|
||||
auto ret = fi_av_open(fOfiDomain, &attr, &fOfiAddressVector, nullptr);
|
||||
if (ret != FI_SUCCESS)
|
||||
throw ContextError{tools::ToString("Failed opening ofi address vector, reason: ", fi_strerror(ret))};
|
||||
|
||||
assert(fOfiEventQueue);
|
||||
ret = fi_av_bind(fOfiAddressVector, &fOfiEventQueue->fid, 0);
|
||||
if (ret != FI_SUCCESS)
|
||||
throw ContextError{tools::ToString("Failed binding ofi event queue to address vector, reason: ", fi_strerror(ret))};
|
||||
} else {
|
||||
LOG(debug) << "Ofi address vector already opened. Skipping.";
|
||||
}
|
||||
@@ -185,6 +219,11 @@ auto Context::CreateOfiEndpoint() -> fid_ep*
|
||||
if (ret != FI_SUCCESS)
|
||||
throw ContextError{tools::ToString("Failed creating ofi endpoint, reason: ", fi_strerror(ret))};
|
||||
|
||||
assert(fOfiEventQueue);
|
||||
ret = fi_ep_bind(ep, &fOfiEventQueue->fid, 0);
|
||||
if (ret != FI_SUCCESS)
|
||||
throw ContextError{tools::ToString("Failed binding ofi address vector to ofi endpoint, reason: ", fi_strerror(ret))};
|
||||
|
||||
assert(fOfiAddressVector);
|
||||
ret = fi_ep_bind(ep, &fOfiAddressVector->fid, 0);
|
||||
if (ret != FI_SUCCESS)
|
||||
@@ -254,6 +293,21 @@ auto Context::ConvertAddress(Address address) -> sockaddr_in
|
||||
return sa;
|
||||
}
|
||||
|
||||
auto Context::ConvertAddress(sockaddr_in address) -> Address
|
||||
{
|
||||
return {"tcp", inet_ntoa(address.sin_addr), ntohs(address.sin_port)};
|
||||
}
|
||||
|
||||
auto Context::VerifyAddress(const std::string& address) -> Address
|
||||
{
|
||||
auto addr = ConvertAddress(address);
|
||||
|
||||
if (addr.Protocol != "tcp")
|
||||
throw ContextError("Wrong protocol: Supported protocols are: tcp");
|
||||
|
||||
return addr;
|
||||
}
|
||||
|
||||
} /* namespace ofi */
|
||||
} /* namespace mq */
|
||||
} /* namespace fair */
|
||||
|
Reference in New Issue
Block a user