diff --git a/CMakeLists.txt b/CMakeLists.txt index 0561a92f..80d046cb 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -197,11 +197,6 @@ if(BUILD_DDS_PLUGIN) DESTINATION ${PROJECT_INSTALL_CMAKEMODDIR} ) endif() -if(BUILD_OFI_TRANSPORT OR BUILD_NANOMSG_TRANSPORT) - install(FILES cmake/Findmsgpack.cmake - DESTINATION ${PROJECT_INSTALL_CMAKEMODDIR} - ) -endif() if(BUILD_DOCS) install(DIRECTORY ${CMAKE_BINARY_DIR}/doxygen/html DESTINATION ${PROJECT_INSTALL_DATADIR}/docs diff --git a/fairmq/ofi/Context.cxx b/fairmq/ofi/Context.cxx index 6cbb3385..edc045de 100644 --- a/fairmq/ofi/Context.cxx +++ b/fairmq/ofi/Context.cxx @@ -10,15 +10,13 @@ #include #include +#include #include #include +#include #include #include #include -#include -#include -#include -#include #include #include #include @@ -35,12 +33,10 @@ namespace ofi using namespace std; Context::Context(int numberIoThreads) - : fOfiDomain(nullptr) - , fOfiFabric(nullptr) + : fZmqContext(zmq_ctx_new()) , fOfiInfo(nullptr) - , fOfiAddressVector(nullptr) - , fOfiEventQueue(nullptr) - , fZmqContext(zmq_ctx_new()) + , fOfiFabric(nullptr) + , fOfiDomain(nullptr) , fIoWork(fIoContext) { if (!fZmqContext) @@ -55,9 +51,9 @@ auto Context::InitThreadPool(int numberIoThreads) -> void for (int i = 1; i <= numberIoThreads; ++i) { fThreadPool.emplace_back([&, i, numberIoThreads]{ - LOG(debug) << "I/O thread #" << i << "/" << numberIoThreads << " started"; + LOG(debug) << "OFI transport: I/O thread #" << i << "/" << numberIoThreads << " started"; fIoContext.run(); - LOG(debug) << "I/O thread #" << i << "/" << numberIoThreads << " stopped"; + LOG(debug) << "OFI transport: I/O thread #" << i << "/" << numberIoThreads << " stopped"; }); } } @@ -70,30 +66,6 @@ Context::~Context() if (zmq_ctx_term(fZmqContext) != 0) LOG(error) << "Failed closing zmq context, reason: " << zmq_strerror(errno); - - if (fOfiEventQueue) { - auto ret = fi_close(&fOfiEventQueue->fid); - if (ret != FI_SUCCESS) - LOG(error) << "Failed closing ofi event queue, reason: " << fi_strerror(ret); - } - - if (fOfiAddressVector) { - auto ret = fi_close(&fOfiAddressVector->fid); - if (ret != FI_SUCCESS) - LOG(error) << "Failed closing ofi address vector, reason: " << fi_strerror(ret); - } - - if (fOfiDomain) { - auto ret = fi_close(&fOfiDomain->fid); - if (ret != FI_SUCCESS) - LOG(error) << "Failed closing ofi domain, reason: " << fi_strerror(ret); - } - - if (fOfiFabric) { - auto ret = fi_close(&fOfiFabric->fid); - if (ret != FI_SUCCESS) - LOG(error) << "Failed closing ofi fabric, reason: " << fi_strerror(ret); - } } auto Context::GetZmqVersion() const -> string @@ -103,205 +75,56 @@ auto Context::GetZmqVersion() const -> string return tools::ToString(major, ".", minor, ".", patch); } -auto Context::GetOfiApiVersion() const -> string +auto Context::GetAsiofiVersion() const -> string { - // Disable for now, does not compile with gcc 4.9.2 debian jessie - //auto ofi_version{fi_version()}; - //return tools::ToString(FI_MAJOR(ofi_version), ".", FI_MINOR(ofi_version)); - return "unknown"; -} - -auto Context::GetBoostVersion() const -> std::string -{ - return tools::ToString(BOOST_VERSION / 100000, ".", BOOST_VERSION / 100 % 1000, ".", BOOST_VERSION % 100); + return ASIOFI_VERSION; } auto Context::InitOfi(ConnectionType type, Address addr) -> void { - if (!fOfiInfo) { - sockaddr_in* sa = static_cast(malloc(sizeof(sockaddr_in))); - addr.Port = 0; - auto sa2 = ConvertAddress(addr); - memcpy(sa, &sa2, sizeof(sockaddr_in)); + assert(!fOfiInfo); + assert(!fOfiFabric); + assert(!fOfiDomain); - // Prepare fi_getinfo query - unique_ptr ofi_hints(fi_allocinfo(), fi_freeinfo); - ofi_hints->caps = FI_MSG; - //ofi_hints->mode = FI_CONTEXT; - ofi_hints->addr_format = FI_SOCKADDR_IN; - if (addr.Protocol == "tcp") { - ofi_hints->fabric_attr->prov_name = strdup("sockets"); - } else if (addr.Protocol == "verbs") { - ofi_hints->fabric_attr->prov_name = strdup("verbs;ofi_rxm"); - } - ofi_hints->ep_attr->type = FI_EP_RDM; - //ofi_hints->domain_attr->mr_mode = FI_MR_BASIC | FI_MR_SCALABLE; - ofi_hints->domain_attr->threading = FI_THREAD_SAFE; - ofi_hints->domain_attr->control_progress = FI_PROGRESS_AUTO; - ofi_hints->domain_attr->data_progress = FI_PROGRESS_AUTO; - ofi_hints->tx_attr->op_flags = FI_COMPLETION; - ofi_hints->rx_attr->op_flags = FI_COMPLETION; - if (type == ConnectionType::Bind) { - ofi_hints->src_addr = sa; - ofi_hints->src_addrlen = sizeof(sockaddr_in); - ofi_hints->dest_addr = nullptr; - ofi_hints->dest_addrlen = 0; - } else { - ofi_hints->src_addr = nullptr; - ofi_hints->src_addrlen = 0; - ofi_hints->dest_addr = sa; - ofi_hints->dest_addrlen = sizeof(sockaddr_in); - } - - // Query fi_getinfo for fabric to use - auto res = fi_getinfo(FI_VERSION(1, 5), nullptr, nullptr, 0, ofi_hints.get(), &fOfiInfo); - if (res != 0) throw ContextError{tools::ToString("Failed querying fi_getinfo, reason: ", fi_strerror(res))}; - if (!fOfiInfo) throw ContextError{"Could not find any ofi compatible fabric."}; - - // for(auto cursor{ofi_info}; cursor->next != nullptr; cursor = cursor->next) { - // LOG(debug) << fi_tostr(fOfiInfo, FI_TYPE_INFO); - // } - // + asiofi::hints hints; + if (addr.Protocol == "tcp") { + hints.set_provider("sockets"); + } else if (addr.Protocol == "verbs") { + hints.set_provider("verbs"); + } + if (type == ConnectionType::Bind) { + fOfiInfo = tools::make_unique(addr.Ip.c_str(), std::to_string(addr.Port).c_str(), FI_SOURCE, hints); } else { - LOG(debug) << "Ofi info already queried. Skipping."; + fOfiInfo = tools::make_unique(addr.Ip.c_str(), std::to_string(addr.Port).c_str(), 0, hints); } - OpenOfiFabric(); -// OpenOfiEventQueue(); - OpenOfiDomain(); - OpenOfiAddressVector(); + fOfiFabric = tools::make_unique(*fOfiInfo); + + fOfiDomain = tools::make_unique(*fOfiFabric); } -auto Context::OpenOfiFabric() -> void -{ - if (!fOfiFabric) { - assert(fOfiInfo); - fi_context ctx; - auto ret = fi_fabric(fOfiInfo->fabric_attr, &fOfiFabric, &ctx); - if (ret != FI_SUCCESS) - throw ContextError{tools::ToString("Failed opening ofi fabric, reason: ", fi_strerror(ret))}; - } else { - // TODO Check, if requested fabric matches existing one. - // TODO Decide, if we want to support more than one fabric simultaneously. - LOG(debug) << "Ofi fabric already opened. Skipping."; - } -} - -auto Context::OpenOfiDomain() -> void -{ - if (!fOfiDomain) { - assert(fOfiInfo); - assert(fOfiFabric); - fi_context ctx; - auto ret = fi_domain(fOfiFabric, fOfiInfo, &fOfiDomain, &ctx); - if (ret != FI_SUCCESS) - throw ContextError{tools::ToString("Failed opening ofi domain, reason: ", fi_strerror(ret))}; - } else { - LOG(debug) << "Ofi domain already opened. Skipping."; - } -} - -auto Context::OpenOfiEventQueue() -> void -{ - fi_eq_attr eqAttr = {100, 0, FI_WAIT_UNSPEC, 0, nullptr}; - // size_t size; [> # entries for EQ <] - // uint64_t flags; [> operation flags <] - // enum fi_wait_obj wait_obj; [> requested wait object <] - // int signaling_vector; [> interrupt affinity <] - // struct fid_wait *wait_set; [> optional wait set <] - fi_context ctx; - auto ret = fi_eq_open(fOfiFabric, &eqAttr, &fOfiEventQueue, &ctx); - if (ret != FI_SUCCESS) - throw ContextError{tools::ToString("Failed opening ofi event queue, reason: ", fi_strerror(ret))}; -} - -auto Context::OpenOfiAddressVector() -> void -{ - if (!fOfiAddressVector) { - assert(fOfiDomain); - fi_av_attr attr = {fOfiInfo->domain_attr->av_type, 0, 1000, 0, nullptr, nullptr, 0}; - // enum fi_av_type type; [> type of AV <] - // int rx_ctx_bits; [> address bits to identify rx ctx <] - // size_t count; [> # entries for AV <] - // size_t ep_per_node; [> # endpoints per fabric address <] - // const char *name; [> system name of AV <] - // void *map_addr; [> base mmap address <] - // uint64_t flags; [> operation flags <] - fi_context ctx; - auto ret = fi_av_open(fOfiDomain, &attr, &fOfiAddressVector, &ctx); - if (ret != FI_SUCCESS) - throw ContextError{tools::ToString("Failed opening ofi address vector, reason: ", fi_strerror(ret))}; - - //assert(fOfiEventQueue); - //ret = fi_av_bind(fOfiAddressVector, &fOfiEventQueue->fid, 0); - //if (ret != FI_SUCCESS) - // throw ContextError{tools::ToString("Failed binding ofi event queue to address vector, reason: ", fi_strerror(ret))}; - } else { - LOG(debug) << "Ofi address vector already opened. Skipping."; - } -} - -auto Context::CreateOfiEndpoint() -> fid_ep* -{ - assert(fOfiDomain); - assert(fOfiInfo); - fid_ep* ep = nullptr; - fi_context ctx; - auto ret = fi_endpoint(fOfiDomain, fOfiInfo, &ep, &ctx); - if (ret != FI_SUCCESS) - throw ContextError{tools::ToString("Failed creating ofi endpoint, reason: ", fi_strerror(ret))}; +// auto Context::CreateOfiEndpoint() -> fid_ep* +// { + // assert(fOfiDomain); + // assert(fOfiInfo); + // fid_ep* ep = nullptr; + // fi_context ctx; + // auto ret = fi_endpoint(fOfiDomain, fOfiInfo, &ep, &ctx); + // if (ret != FI_SUCCESS) + // throw ContextError{tools::ToString("Failed creating ofi endpoint, reason: ", fi_strerror(ret))}; //assert(fOfiEventQueue); //ret = fi_ep_bind(ep, &fOfiEventQueue->fid, 0); //if (ret != FI_SUCCESS) // throw ContextError{tools::ToString("Failed binding ofi event queue to ofi endpoint, reason: ", fi_strerror(ret))}; - assert(fOfiAddressVector); - ret = fi_ep_bind(ep, &fOfiAddressVector->fid, 0); - if (ret != FI_SUCCESS) - throw ContextError{tools::ToString("Failed binding ofi address vector to ofi endpoint, reason: ", fi_strerror(ret))}; - - return ep; -} - -auto Context::CreateOfiCompletionQueue(Direction dir) -> fid_cq* -{ - fid_cq* cq = nullptr; - fi_cq_attr attr = {0, 0, FI_CQ_FORMAT_DATA, FI_WAIT_UNSPEC, 0, FI_CQ_COND_NONE, nullptr}; - if (dir == Direction::Receive) { - attr.size = fOfiInfo->rx_attr->size; - } else { - attr.size = fOfiInfo->tx_attr->size; - } - // size_t size; [> # entries for CQ <] - // uint64_t flags; [> operation flags <] - // enum fi_cq_format format; [> completion format <] - // enum fi_wait_obj wait_obj; [> requested wait object <] - // int signaling_vector; [> interrupt affinity <] - // enum fi_cq_wait_cond wait_cond; [> wait condition format <] - // struct fid_wait *wait_set; [> optional wait set <] - fi_context ctx; - auto ret = fi_cq_open(fOfiDomain, &attr, &cq, &ctx); - if (ret != FI_SUCCESS) - throw ContextError{tools::ToString("Failed creating ofi completion queue, reason: ", fi_strerror(ret))}; - return cq; -} - -auto Context::InsertAddressVector(sockaddr_in address) -> fi_addr_t -{ - fi_addr_t mappedAddress; - fi_context ctx; - auto ret = fi_av_insert(fOfiAddressVector, &address, 1, &mappedAddress, 0, &ctx); - if (ret != 1) - throw ContextError{tools::ToString("Failed to insert address into ofi address vector")}; - - return mappedAddress; -} - -auto Context::AddressVectorLookup(fi_addr_t address) -> sockaddr_in -{ - throw ContextError("Not yet implemented"); -} + // assert(fOfiAddressVector); + // ret = fi_ep_bind(ep, &fOfiAddressVector->fid, 0); + // if (ret != FI_SUCCESS) + // throw ContextError{tools::ToString("Failed binding ofi address vector to ofi endpoint, reason: ", fi_strerror(ret))}; +// + // return ep; +// } auto Context::ConvertAddress(std::string address) -> Address { diff --git a/fairmq/ofi/Context.h b/fairmq/ofi/Context.h index bc5db721..e12d1bf4 100644 --- a/fairmq/ofi/Context.h +++ b/fairmq/ofi/Context.h @@ -9,11 +9,13 @@ #ifndef FAIR_MQ_OFI_CONTEXT_H #define FAIR_MQ_OFI_CONTEXT_H +#include +#include +#include #include #include #include #include -#include #include #include #include @@ -38,18 +40,14 @@ enum class Direction : bool { Receive, Transmit }; class Context { public: - Context(int numberIoThreads = 2); + Context(int numberIoThreads = 1); ~Context(); - auto CreateOfiEndpoint() -> fid_ep*; - auto CreateOfiCompletionQueue(Direction dir) -> fid_cq*; + // auto CreateOfiEndpoint() -> fid_ep*; auto GetZmqVersion() const -> std::string; - auto GetOfiApiVersion() const -> std::string; - auto GetBoostVersion() const -> std::string; + auto GetAsiofiVersion() const -> std::string; auto GetZmqContext() const -> void* { return fZmqContext; } auto GetIoContext() -> boost::asio::io_service& { return fIoContext; } - auto InsertAddressVector(sockaddr_in address) -> fi_addr_t; - auto AddressVectorLookup(fi_addr_t address) -> sockaddr_in; struct Address { std::string Protocol; std::string Ip; @@ -64,19 +62,13 @@ class Context private: void* fZmqContext; - fi_info* fOfiInfo; - fid_fabric* fOfiFabric; - fid_domain* fOfiDomain; - fid_av* fOfiAddressVector; - fid_eq* fOfiEventQueue; + std::unique_ptr fOfiInfo; + std::unique_ptr fOfiFabric; + std::unique_ptr fOfiDomain; boost::asio::io_service fIoContext; boost::asio::io_service::work fIoWork; std::vector fThreadPool; - auto OpenOfiFabric() -> void; - auto OpenOfiEventQueue() -> void; - auto OpenOfiDomain() -> void; - auto OpenOfiAddressVector() -> void; auto InitThreadPool(int numberIoThreads) -> void; }; /* class Context */ diff --git a/fairmq/ofi/Poller.cxx b/fairmq/ofi/Poller.cxx index 2155e308..9f83a4ca 100644 --- a/fairmq/ofi/Poller.cxx +++ b/fairmq/ofi/Poller.cxx @@ -124,7 +124,7 @@ auto Poller::CheckOutput(const int index) -> bool return fItems[index].revents & ZMQ_POLLOUT; } -auto Poller::CheckInput(const string channelKey, const int index) -> bool +auto Poller::CheckInput(const string& channelKey, const int index) -> bool { try { return fItems[fOffsetMap.at(channelKey) + index].revents & ZMQ_POLLIN; @@ -136,7 +136,7 @@ auto Poller::CheckInput(const string channelKey, const int index) -> bool } } -auto Poller::CheckOutput(const string channelKey, const int index) -> bool +auto Poller::CheckOutput(const string& channelKey, const int index) -> bool { try { return fItems[fOffsetMap.at(channelKey) + index].revents & ZMQ_POLLOUT; diff --git a/fairmq/ofi/Poller.h b/fairmq/ofi/Poller.h index f59d4385..a6d08a2d 100644 --- a/fairmq/ofi/Poller.h +++ b/fairmq/ofi/Poller.h @@ -51,8 +51,8 @@ class Poller final : public FairMQPoller auto Poll(const int timeout) -> void override; auto CheckInput(const int index) -> bool override; auto CheckOutput(const int index) -> bool override; - auto CheckInput(const std::string channelKey, const int index) -> bool override; - auto CheckOutput(const std::string channelKey, const int index) -> bool override; + auto CheckInput(const std::string& channelKey, const int index) -> bool override; + auto CheckOutput(const std::string& channelKey, const int index) -> bool override; ~Poller() override; diff --git a/fairmq/ofi/Socket.cxx b/fairmq/ofi/Socket.cxx index 8cee2286..23569160 100644 --- a/fairmq/ofi/Socket.cxx +++ b/fairmq/ofi/Socket.cxx @@ -39,15 +39,17 @@ Socket::Socket(Context& context, const string& type, const string& name, const s , fId(id + "." + name + "." + type) , fControlSocket(nullptr) , fMonitorSocket(nullptr) - , fSndTimeout(100) - , fRcvTimeout(100) - , fContext(context) - , fWaitingForControlPeer(false) - , fIoStrand(fContext.GetIoContext()) + , fDataEndpoint(nullptr) + , fId(id + "." + name + "." + type) , fBytesTx(0) , fBytesRx(0) , fMessagesTx(0) , fMessagesRx(0) + , fContext(context) + , fWaitingForControlPeer(false) + , fIoStrand(fContext.GetIoContext()) + , fSndTimeout(100) + , fRcvTimeout(100) { if (type != "pair") { throw SocketError{tools::ToString("Socket type '", type, "' not implemented for ofi transport.")}; @@ -152,29 +154,29 @@ auto Socket::ConnectControlSocket(Context::Address address) -> void auto Socket::InitDataEndpoint() -> void { - if (!fDataEndpoint) { - try { - fDataEndpoint = fContext.CreateOfiEndpoint(); - } catch (ContextError& e) { - throw SocketError(tools::ToString("Failed creating ofi endpoint, reason: ", e.what())); - } + assert(!fDataEndpoint); - if (!fDataCompletionQueueTx) - fDataCompletionQueueTx = fContext.CreateOfiCompletionQueue(Direction::Transmit); - auto ret = fi_ep_bind(fDataEndpoint, &fDataCompletionQueueTx->fid, FI_TRANSMIT); - if (ret != FI_SUCCESS) - throw SocketError(tools::ToString("Failed binding ofi transmit completion queue to endpoint, reason: ", fi_strerror(ret))); - - if (!fDataCompletionQueueRx) - fDataCompletionQueueRx = fContext.CreateOfiCompletionQueue(Direction::Receive); - ret = fi_ep_bind(fDataEndpoint, &fDataCompletionQueueRx->fid, FI_RECV); - if (ret != FI_SUCCESS) - throw SocketError(tools::ToString("Failed binding ofi receive completion queue to endpoint, reason: ", fi_strerror(ret))); - - ret = fi_enable(fDataEndpoint); - if (ret != FI_SUCCESS) - throw SocketError(tools::ToString("Failed enabling ofi endpoint, reason: ", fi_strerror(ret))); - } + // try { + // fDataEndpoint = fContext.CreateOfiEndpoint(); + // } catch (ContextError& e) { + // throw SocketError(tools::ToString("Failed creating ofi endpoint, reason: ", e.what())); + // } +// + // if (!fDataCompletionQueueTx) + // fDataCompletionQueueTx = fContext.CreateOfiCompletionQueue(Direction::Transmit); + // auto ret = fi_ep_bind(fDataEndpoint, &fDataCompletionQueueTx->fid, FI_TRANSMIT); + // if (ret != FI_SUCCESS) + // throw SocketError(tools::ToString("Failed binding ofi transmit completion queue to endpoint, reason: ", fi_strerror(ret))); +// + // if (!fDataCompletionQueueRx) + // fDataCompletionQueueRx = fContext.CreateOfiCompletionQueue(Direction::Receive); + // ret = fi_ep_bind(fDataEndpoint, &fDataCompletionQueueRx->fid, FI_RECV); + // if (ret != FI_SUCCESS) + // throw SocketError(tools::ToString("Failed binding ofi receive completion queue to endpoint, reason: ", fi_strerror(ret))); +// + // ret = fi_enable(fDataEndpoint); + // if (ret != FI_SUCCESS) + // throw SocketError(tools::ToString("Failed enabling ofi endpoint, reason: ", fi_strerror(ret))); } void free_string(void* /*data*/, void* hint) @@ -184,13 +186,13 @@ void free_string(void* /*data*/, void* hint) auto Socket::AnnounceDataAddress() -> void try { - size_t addrlen = sizeof(sockaddr_in); - auto ret = fi_getname(&fDataEndpoint->fid, &fLocalDataAddr, &addrlen); - if (ret != FI_SUCCESS) - throw SocketError(tools::ToString("Failed retrieving native address from ofi endpoint, reason: ", fi_strerror(ret))); - assert(addrlen == sizeof(sockaddr_in)); - - LOG(debug) << "Address of local ofi endpoint in socket " << fId << ": " << Context::ConvertAddress(fLocalDataAddr); + // size_t addrlen = sizeof(sockaddr_in); + // auto ret = fi_getname(&fDataEndpoint->fid, &fLocalDataAddr, &addrlen); + // if (ret != FI_SUCCESS) + // throw SocketError(tools::ToString("Failed retrieving native address from ofi endpoint, reason: ", fi_strerror(ret))); + // assert(addrlen == sizeof(sockaddr_in)); +// + // LOG(debug) << "Address of local ofi endpoint in socket " << fId << ": " << Context::ConvertAddress(fLocalDataAddr); // Create new control message // auto ctrl = tools::make_unique(); @@ -298,7 +300,7 @@ auto Socket::TryReceive(MessagePtr& msg) -> int { return ReceiveImpl(msg, ZMQ_DO auto Socket::TrySend(std::vector& msgVec) -> int64_t { return SendImpl(msgVec, ZMQ_DONTWAIT, 0); } auto Socket::TryReceive(std::vector& msgVec) -> int64_t { return ReceiveImpl(msgVec, ZMQ_DONTWAIT, 0); } -auto Socket::SendImpl(FairMQMessagePtr& msg, const int flags, const int timeout) -> int +auto Socket::SendImpl(FairMQMessagePtr& msg, const int /*flags*/, const int /*timeout*/) -> int try { if (fWaitingForControlPeer) { WaitForControlPeer(); @@ -323,17 +325,17 @@ try { // assert(ctrl2->post_buffer_acknowledgement().size() == size); // Send data - fi_context ctx; - auto ret = fi_send(fDataEndpoint, msg->GetData(), size, nullptr, fRemoteDataAddr, &ctx); - if (ret < 0) - throw SocketError(tools::ToString("Failed posting ofi send buffer, reason: ", fi_strerror(ret))); + // fi_context ctx; + // auto ret = fi_send(fDataEndpoint, msg->GetData(), size, nullptr, fRemoteDataAddr, &ctx); + // if (ret < 0) + // throw SocketError(tools::ToString("Failed posting ofi send buffer, reason: ", fi_strerror(ret))); } if (size) { - fi_cq_err_entry cqEntry; - auto ret = fi_cq_sread(fDataCompletionQueueTx, &cqEntry, 1, nullptr, -1); - if (ret != 1) - throw SocketError(tools::ToString("Failed reading ofi tx completion queue event, reason: ", fi_strerror(ret))); + // fi_cq_err_entry cqEntry; + // auto ret = fi_cq_sread(fDataCompletionQueueTx, &cqEntry, 1, nullptr, -1); + // if (ret != 1) + // throw SocketError(tools::ToString("Failed reading ofi tx completion queue event, reason: ", fi_strerror(ret))); } msg.reset(nullptr); @@ -352,7 +354,7 @@ catch (const std::exception& e) return -1; } -auto Socket::ReceiveImpl(FairMQMessagePtr& msg, const int flags, const int timeout) -> int +auto Socket::ReceiveImpl(FairMQMessagePtr& /*msg*/, const int /*flags*/, const int /*timeout*/) -> int try { if (fWaitingForControlPeer) { WaitForControlPeer(); @@ -368,13 +370,13 @@ try { // Receive data // if (size) { - fi_context ctx; + // fi_context ctx; // msg->Rebuild(size); - auto buf = msg->GetData(); - auto size2 = msg->GetSize(); - auto ret = fi_recv(fDataEndpoint, buf, size2, nullptr, fRemoteDataAddr, &ctx); - if (ret < 0) - throw SocketError(tools::ToString("Failed posting ofi receive buffer, reason: ", fi_strerror(ret))); + // auto buf = msg->GetData(); + // auto size2 = msg->GetSize(); + // auto ret = fi_recv(fDataEndpoint, buf, size2, nullptr, fRemoteDataAddr, &ctx); + // if (ret < 0) + // throw SocketError(tools::ToString("Failed posting ofi receive buffer, reason: ", fi_strerror(ret))); // Create and send control message // auto ctrl2 = tools::make_unique(); @@ -384,12 +386,12 @@ try { // assert(ctrl2->IsInitialized()); // SendControlMessage(move(ctrl2)); - fi_cq_err_entry cqEntry; - ret = fi_cq_sread(fDataCompletionQueueRx, &cqEntry, 1, nullptr, -1); - if (ret != 1) - throw SocketError(tools::ToString("Failed reading ofi rx completion queue event, reason: ", fi_strerror(ret))); - assert(cqEntry.len == size2); - assert(cqEntry.buf == buf); + // fi_cq_err_entry cqEntry; + // ret = fi_cq_sread(fDataCompletionQueueRx, &cqEntry, 1, nullptr, -1); + // if (ret != 1) + // throw SocketError(tools::ToString("Failed reading ofi rx completion queue event, reason: ", fi_strerror(ret))); + // assert(cqEntry.len == size2); + // assert(cqEntry.buf == buf); // } // fBytesRx += size; @@ -408,7 +410,7 @@ catch (const std::exception& e) return -1; } -auto Socket::SendImpl(vector& msgVec, const int flags, const int timeout) -> int64_t +auto Socket::SendImpl(vector& /*msgVec*/, const int /*flags*/, const int /*timeout*/) -> int64_t { throw SocketError{"Not yet implemented."}; // const unsigned int vecSize = msgVec.size(); @@ -492,7 +494,7 @@ auto Socket::SendImpl(vector& msgVec, const int flags, const i // } } -auto Socket::ReceiveImpl(vector& msgVec, const int flags, const int timeout) -> int64_t +auto Socket::ReceiveImpl(vector& /*msgVec*/, const int /*flags*/, const int /*timeout*/) -> int64_t { throw SocketError{"Not yet implemented."}; // int64_t totalSize = 0; @@ -586,24 +588,6 @@ auto Socket::Close() -> void if (zmq_close(fMonitorSocket) != 0) throw SocketError(tools::ToString("Failed closing zmq monitor socket, reason: ", zmq_strerror(errno))); - - if (fDataEndpoint) { - auto ret = fi_close(&fDataEndpoint->fid); - if (ret != FI_SUCCESS) - throw SocketError(tools::ToString("Failed closing ofi endpoint, reason: ", fi_strerror(ret))); - } - - if (fDataCompletionQueueTx) { - auto ret = fi_close(&fDataCompletionQueueTx->fid); - if (ret != FI_SUCCESS) - throw SocketError(tools::ToString("Failed closing ofi transmit completion queue, reason: ", fi_strerror(ret))); - } - - if (fDataCompletionQueueRx) { - auto ret = fi_close(&fDataCompletionQueueRx->fid); - if (ret != FI_SUCCESS) - throw SocketError(tools::ToString("Failed closing ofi receive completion queue, reason: ", fi_strerror(ret))); - } } auto Socket::SetOption(const string& option, const void* value, size_t valueSize) -> void diff --git a/fairmq/ofi/Socket.h b/fairmq/ofi/Socket.h index 46275705..fc083f29 100644 --- a/fairmq/ofi/Socket.h +++ b/fairmq/ofi/Socket.h @@ -13,6 +13,7 @@ #include #include +#include #include #include // unique_ptr #include @@ -84,9 +85,7 @@ class Socket final : public fair::mq::Socket private: void* fControlSocket; void* fMonitorSocket; - fid_ep* fDataEndpoint; - fid_cq* fDataCompletionQueueTx; - fid_cq* fDataCompletionQueueRx; + std::unique_ptr fDataEndpoint; std::string fId; std::atomic fBytesTx; std::atomic fBytesRx; diff --git a/fairmq/ofi/TransportFactory.cxx b/fairmq/ofi/TransportFactory.cxx index 8a427b2c..c164bf07 100644 --- a/fairmq/ofi/TransportFactory.cxx +++ b/fairmq/ofi/TransportFactory.cxx @@ -23,12 +23,11 @@ namespace ofi using namespace std; -TransportFactory::TransportFactory(const string& id, const FairMQProgOptions* config) +TransportFactory::TransportFactory(const string& id, const FairMQProgOptions* /*config*/) try : FairMQTransportFactory{id} { - LOG(debug) << "Transport: Using ZeroMQ (" << fContext.GetZmqVersion() << ") & " - << "OFI libfabric (API " << fContext.GetOfiApiVersion() << ") & " - << "Boost.Asio (" << fContext.GetBoostVersion() << ")"; + LOG(debug) << "OFI transport: Using ZeroMQ (" << fContext.GetZmqVersion() << ") & " + << "asiofi (" << fContext.GetAsiofiVersion() << ")"; } catch (ContextError& e) { @@ -62,20 +61,23 @@ auto TransportFactory::CreateSocket(const string& type, const string& name) -> S auto TransportFactory::CreatePoller(const vector& channels) const -> PollerPtr { - return PollerPtr{new Poller(channels)}; + throw runtime_error{"Not yet implemented (Poller)."}; + // return PollerPtr{new Poller(channels)}; } auto TransportFactory::CreatePoller(const vector& channels) const -> PollerPtr { - return PollerPtr{new Poller(channels)}; + throw runtime_error{"Not yet implemented (Poller)."}; + // return PollerPtr{new Poller(channels)}; } auto TransportFactory::CreatePoller(const unordered_map>& channelsMap, const vector& channelList) const -> PollerPtr { - return PollerPtr{new Poller(channelsMap, channelList)}; + throw runtime_error{"Not yet implemented (Poller)."}; + // return PollerPtr{new Poller(channelsMap, channelList)}; } -auto TransportFactory::CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback) const -> UnmanagedRegionPtr +auto TransportFactory::CreateUnmanagedRegion(const size_t /*size*/, FairMQRegionCallback /*callback*/) const -> UnmanagedRegionPtr { throw runtime_error{"Not yet implemented UMR."}; } diff --git a/fairmq/ofi/TransportFactory.h b/fairmq/ofi/TransportFactory.h index f618a07f..ef0f915c 100644 --- a/fairmq/ofi/TransportFactory.h +++ b/fairmq/ofi/TransportFactory.h @@ -22,7 +22,7 @@ namespace ofi /** * @class TransportFactory TransportFactory.h - * @brief FairMQ transport factory for the ofi transport (implemented with ZeroMQ + libfabric) + * @brief FairMQ transport factory for the ofi transport * * @todo TODO insert long description */