diff --git a/fairmq/CMakeLists.txt b/fairmq/CMakeLists.txt index 7d86581f..d763c0cd 100644 --- a/fairmq/CMakeLists.txt +++ b/fairmq/CMakeLists.txt @@ -83,6 +83,7 @@ set(FAIRMQ_HEADER_FILES devices/FairMQProxy.h devices/FairMQSink.h devices/FairMQSplitter.h + ofi/Context.h ofi/Message.h ofi/Poller.h ofi/Socket.h @@ -150,6 +151,7 @@ set(FAIRMQ_SOURCE_FILES devices/FairMQProxy.cxx # devices/FairMQSink.cxx devices/FairMQSplitter.cxx + ofi/Context.cxx ofi/Message.cxx ofi/Poller.cxx ofi/Socket.cxx diff --git a/fairmq/FairMQTransportFactory.h b/fairmq/FairMQTransportFactory.h index 54677d8d..50633817 100644 --- a/fairmq/FairMQTransportFactory.h +++ b/fairmq/FairMQTransportFactory.h @@ -74,7 +74,7 @@ class FairMQTransportFactory virtual void Interrupt() = 0; virtual void Resume() = 0; - virtual ~FairMQTransportFactory() noexcept(false) {}; + virtual ~FairMQTransportFactory() {}; static auto CreateTransportFactory(const std::string& type, const std::string& id = "", const FairMQProgOptions* config = nullptr) -> std::shared_ptr; diff --git a/fairmq/ofi/Context.cxx b/fairmq/ofi/Context.cxx new file mode 100644 index 00000000..9924cbaa --- /dev/null +++ b/fairmq/ofi/Context.cxx @@ -0,0 +1,179 @@ +/******************************************************************************** + * Copyright (C) 2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ + +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace fair +{ +namespace mq +{ +namespace ofi +{ + +using namespace std; + +Context::Context(int numberIoThreads) + : fOfiDomain(nullptr) + , fOfiFabric(nullptr) + , fZmqContext(zmq_ctx_new()) +{ + if (!fZmqContext) + throw ContextError{tools::ToString("Failed creating zmq context, reason: ", zmq_strerror(errno))}; +} + +Context::~Context() +{ + if (zmq_ctx_term(fZmqContext) != 0) { + LOG(error) << "Failed closing zmq context, reason: " << zmq_strerror(errno); + } + + if (fOfiFabric) { + auto ret = fi_close(&fOfiFabric->fid); + if (ret != FI_SUCCESS) + LOG(error) << "Failed closing ofi fabric, reason: " << fi_strerror(ret); + } + + if (fOfiDomain) { + auto ret = fi_close(&fOfiDomain->fid); + if (ret != FI_SUCCESS) + LOG(error) << "Failed closing ofi domain, reason: " << fi_strerror(ret); + } +} + +auto Context::GetZmqVersion() const -> std::string +{ + int major, minor, patch; + zmq_version(&major, &minor, &patch); + return tools::ToString(major, ".", minor, ".", patch); +} + +auto Context::GetOfiApiVersion() const -> std::string +{ + auto ofi_version{fi_version()}; + return tools::ToString(FI_MAJOR(ofi_version), ".", FI_MINOR(ofi_version)); +} + +auto Context::InitOfi(ConnectionType type, std::string address) -> void +{ + // Parse address + string protocol, ip; + unsigned int port = 0; + regex address_regex("^([a-z]+)://([0-9]+\\.[0-9]+\\.[0-9]+\\.[0-9]+):([0-9]+).*"); + smatch address_result; + if (regex_match(address, address_result, address_regex)) { + protocol = address_result[1]; + ip = address_result[2]; + port = stoul(address_result[3]); + LOG(debug) << "Parsed '" << protocol << "', '" << ip << "', '" << port << "' fields from '" << address << "'"; + } + if (protocol != "tcp") throw ContextError{"Wrong protocol: Supplied address must be in format tcp://ip:port"}; + + if (!fOfiInfo) { + sockaddr_in* sa = static_cast(malloc(sizeof(sockaddr_in))); + inet_pton(AF_INET, ip.c_str(), &(sa->sin_addr)); + sa->sin_port = port; + sa->sin_family = AF_INET; + + // Prepare fi_getinfo query + unique_ptr ofi_hints(fi_allocinfo(), fi_freeinfo); + ofi_hints->caps = FI_MSG; + ofi_hints->mode = FI_ASYNC_IOV; + ofi_hints->addr_format = FI_SOCKADDR_IN; + ofi_hints->fabric_attr->prov_name = strdup("sockets"); + ofi_hints->ep_attr->type = FI_EP_RDM; + if (type == ConnectionType::Bind) { + ofi_hints->src_addr = sa; + ofi_hints->src_addrlen = sizeof(sockaddr_in); + } else { + ofi_hints->dest_addr = sa; + ofi_hints->dest_addrlen = sizeof(sockaddr_in); + } + + // Query fi_getinfo for fabric to use + auto res = fi_getinfo(FI_VERSION(1, 5), nullptr, nullptr, 0, ofi_hints.get(), &fOfiInfo); + if (res != 0) throw ContextError{tools::ToString("Failed querying fi_getinfo, reason: ", fi_strerror(res))}; + if (!fOfiInfo) throw ContextError{"Could not find any ofi compatible fabric."}; + + // for(auto cursor{ofi_info}; cursor->next != nullptr; cursor = cursor->next) { + // LOG(debug) << fi_tostr(cursor, FI_TYPE_INFO); + // } + // + } else { + LOG(debug) << "Ofi info already queried. Skipping."; + } + + OpenOfiFabric(); + OpenOfiDomain(); +} + +auto Context::OpenOfiFabric() -> void +{ + if (!fOfiFabric) { + auto ret = fi_fabric(fOfiInfo->fabric_attr, &fOfiFabric, NULL); + if (ret != FI_SUCCESS) + throw ContextError{tools::ToString("Failed opening ofi fabric, reason: ", fi_strerror(ret))}; + } else { + // TODO Check, if requested fabric matches existing one. + // TODO Decide, if we want to support more than one fabric simultaneously. + LOG(debug) << "Ofi fabric already opened. Skipping."; + } +} + +auto Context::OpenOfiDomain() -> void +{ + if (!fOfiDomain) { + auto ret = fi_domain(fOfiFabric, fOfiInfo, &fOfiDomain, NULL); + if (ret != FI_SUCCESS) + throw ContextError{tools::ToString("Failed opening ofi domain, reason: ", fi_strerror(ret))}; + } else { + LOG(debug) << "Ofi domain already opened. Skipping."; + } +} + +auto Context::CreateOfiEndpoint() -> fid_ep* +{ + fid_ep* ep = nullptr; + auto ret = fi_endpoint(fOfiDomain, fOfiInfo, &ep, nullptr); + if (ret != FI_SUCCESS) + throw ContextError{tools::ToString("Failed creating ofi endpoint, reason: ", fi_strerror(ret))}; + return ep; +} + +auto Context::CreateOfiCompletionQueue() -> fid_cq* +{ + fid_cq* cq = nullptr; + fi_cq_attr attr = {0, 0, FI_CQ_FORMAT_DATA, FI_WAIT_UNSPEC, 0, FI_CQ_COND_NONE, nullptr}; + // size_t size; [> # entries for CQ <] + // uint64_t flags; [> operation flags <] + // enum fi_cq_format format; [> completion format <] + // enum fi_wait_obj wait_obj; [> requested wait object <] + // int signaling_vector; [> interrupt affinity <] + // enum fi_cq_wait_cond wait_cond; [> wait condition format <] + // struct fid_wait *wait_set; [> optional wait set <] + auto ret = fi_cq_open(fOfiDomain, &attr, &cq, nullptr); + if (ret != FI_SUCCESS) + throw ContextError{tools::ToString("Failed creating ofi completion queue, reason: ", fi_strerror(ret))}; + return cq; +} + +} /* namespace ofi */ +} /* namespace mq */ +} /* namespace fair */ diff --git a/fairmq/ofi/Context.h b/fairmq/ofi/Context.h new file mode 100644 index 00000000..75c0c184 --- /dev/null +++ b/fairmq/ofi/Context.h @@ -0,0 +1,62 @@ +/******************************************************************************** + * Copyright (C) 2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ + +#ifndef FAIR_MQ_OFI_CONTEXT_H +#define FAIR_MQ_OFI_CONTEXT_H + +#include +#include +#include +#include + +namespace fair +{ +namespace mq +{ +namespace ofi +{ + +enum class ConnectionType : bool { Bind, Connect }; + +/** + * @class Context Context.h + * @brief Transport-wide context + * + * @todo TODO insert long description + */ +class Context +{ + public: + Context(int numberIoThreads = 1); + ~Context(); + + /// Deferred Ofi initialization + auto InitOfi(ConnectionType type, std::string address) -> void; + auto CreateOfiEndpoint() -> fid_ep*; + auto CreateOfiCompletionQueue() -> fid_cq*; + auto GetZmqVersion() const -> std::string; + auto GetOfiApiVersion() const -> std::string; + auto GetZmqContext() const -> void* { return fZmqContext; } + + private: + void* fZmqContext; + fi_info* fOfiInfo; + fid_fabric* fOfiFabric; + fid_domain* fOfiDomain; + + auto OpenOfiFabric() -> void; + auto OpenOfiDomain() -> void; +}; /* class Context */ + +struct ContextError : std::runtime_error { using std::runtime_error::runtime_error; }; + +} /* namespace ofi */ +} /* namespace mq */ +} /* namespace fair */ + +#endif /* FAIR_MQ_OFI_CONTEXT_H */ diff --git a/fairmq/ofi/Socket.cxx b/fairmq/ofi/Socket.cxx index 1d503cf3..a0df61eb 100644 --- a/fairmq/ofi/Socket.cxx +++ b/fairmq/ofi/Socket.cxx @@ -11,6 +11,9 @@ #include #include +#include +#include +#include #include namespace fair @@ -22,42 +25,40 @@ namespace ofi using namespace std; -Socket::Socket(const TransportFactory& factory, const string& type, const string& name, const string& id /*= ""*/) - : fId{id + "." + name + "." + type} - , fBytesTx{0} - , fBytesRx{0} - , fMessagesTx{0} - , fMessagesRx{0} - , fSndTimeout{100} - , fRcvTimeout{100} +Socket::Socket(Context& context, const string& type, const string& name, const string& id /*= ""*/) + : fDataEndpoint(nullptr) + , fId(id + "." + name + "." + type) + , fSndTimeout(100) + , fRcvTimeout(100) + , fContext(context) { if (type != "pair") { throw SocketError{tools::ToString("Socket type '", type, "' not implemented for ofi transport.")}; } else { - fMetaSocket = zmq_socket(factory.fZmqContext, GetConstant(type)); - } + fMetaSocket = zmq_socket(fContext.GetZmqContext(), GetConstant(type)); - if (fMetaSocket == nullptr) { - throw SocketError{tools::ToString("Failed creating socket ", fId, ", reason: ", zmq_strerror(errno))}; - } + if (fMetaSocket == nullptr) { + throw SocketError{tools::ToString("Failed creating socket ", fId, ", reason: ", zmq_strerror(errno))}; + } - if (zmq_setsockopt(fMetaSocket, ZMQ_IDENTITY, fId.c_str(), fId.length()) != 0) { - throw SocketError{tools::ToString("Failed setting ZMQ_IDENTITY socket option, reason: ", zmq_strerror(errno))}; - } + if (zmq_setsockopt(fMetaSocket, ZMQ_IDENTITY, fId.c_str(), fId.length()) != 0) { + throw SocketError{tools::ToString("Failed setting ZMQ_IDENTITY socket option, reason: ", zmq_strerror(errno))}; + } - // Tell socket to try and send/receive outstanding messages for milliseconds before terminating. - // Default value for ZeroMQ is -1, which is to wait forever. - int linger = 1000; - if (zmq_setsockopt(fMetaSocket, ZMQ_LINGER, &linger, sizeof(linger)) != 0) { - throw SocketError{tools::ToString("Failed setting ZMQ_LINGER socket option, reason: ", zmq_strerror(errno))}; - } + // Tell socket to try and send/receive outstanding messages for milliseconds before terminating. + // Default value for ZeroMQ is -1, which is to wait forever. + int linger = 1000; + if (zmq_setsockopt(fMetaSocket, ZMQ_LINGER, &linger, sizeof(linger)) != 0) { + throw SocketError{tools::ToString("Failed setting ZMQ_LINGER socket option, reason: ", zmq_strerror(errno))}; + } - if (zmq_setsockopt(fMetaSocket, ZMQ_SNDTIMEO, &fSndTimeout, sizeof(fSndTimeout)) != 0) { - throw SocketError{tools::ToString("Failed setting ZMQ_SNDTIMEO socket option, reason: ", zmq_strerror(errno))}; - } + if (zmq_setsockopt(fMetaSocket, ZMQ_SNDTIMEO, &fSndTimeout, sizeof(fSndTimeout)) != 0) { + throw SocketError{tools::ToString("Failed setting ZMQ_SNDTIMEO socket option, reason: ", zmq_strerror(errno))}; + } - if (zmq_setsockopt(fMetaSocket, ZMQ_RCVTIMEO, &fRcvTimeout, sizeof(fRcvTimeout)) != 0) { - throw SocketError{tools::ToString("Failed setting ZMQ_RCVTIMEO socket option, reason: ", zmq_strerror(errno))}; + if (zmq_setsockopt(fMetaSocket, ZMQ_RCVTIMEO, &fRcvTimeout, sizeof(fRcvTimeout)) != 0) { + throw SocketError{tools::ToString("Failed setting ZMQ_RCVTIMEO socket option, reason: ", zmq_strerror(errno))}; + } } } @@ -72,6 +73,33 @@ auto Socket::Bind(const string& address) -> bool LOG(error) << "Failed binding socket " << fId << ", reason: " << zmq_strerror(errno); return false; } + + fContext.InitOfi(ConnectionType::Bind, address); + + if (!fDataEndpoint) { + try { + fDataEndpoint = fContext.CreateOfiEndpoint(); + } catch (ContextError& e) { + LOG(error) << "Failed creating ofi endpoint for " << address << ", reason: " << e.what(); + } + + if (!fDataCompletionQueueTx) + fDataCompletionQueueTx = fContext.CreateOfiCompletionQueue(); + auto ret = fi_ep_bind(fDataEndpoint, &fDataCompletionQueueTx->fid, FI_TRANSMIT); + if (ret != FI_SUCCESS) + LOG(error) << "Failed binding ofi transmit completion queue to endpoint, reason: " << fi_strerror(ret); + + if (!fDataCompletionQueueRx) + fDataCompletionQueueRx = fContext.CreateOfiCompletionQueue(); + ret = fi_ep_bind(fDataEndpoint, &fDataCompletionQueueRx->fid, FI_RECV); + if (ret != FI_SUCCESS) + LOG(error) << "Failed binding ofi receive completion queue to endpoint, reason: " << fi_strerror(ret); + + ret = fi_enable(fDataEndpoint); + if (ret != FI_SUCCESS) + LOG(error) << "Failed opening ofi fabric, reason: " << fi_strerror(ret); + } + return true; } @@ -80,21 +108,48 @@ auto Socket::Connect(const string& address) -> void if (zmq_connect(fMetaSocket, address.c_str()) != 0) { throw SocketError{tools::ToString("Failed connecting socket ", fId, ", reason: ", zmq_strerror(errno))}; } + + fContext.InitOfi(ConnectionType::Connect, address); + + if (!fDataEndpoint) { + try { + fDataEndpoint = fContext.CreateOfiEndpoint(); + } catch (ContextError& e) { + throw SocketError(tools::ToString("Failed creating ofi endpoint for ", address, ", reason: ", e.what())); + } + + if (!fDataCompletionQueueTx) + fDataCompletionQueueTx = fContext.CreateOfiCompletionQueue(); + auto ret = fi_ep_bind(fDataEndpoint, &fDataCompletionQueueTx->fid, FI_TRANSMIT); + if (ret != FI_SUCCESS) + LOG(error) << "Failed binding ofi transmit completion queue to endpoint, reason: " << fi_strerror(ret); + + if (!fDataCompletionQueueRx) + fDataCompletionQueueRx = fContext.CreateOfiCompletionQueue(); + ret = fi_ep_bind(fDataEndpoint, &fDataCompletionQueueRx->fid, FI_RECV); + if (ret != FI_SUCCESS) + LOG(error) << "Failed binding ofi receive completion queue to endpoint, reason: " << fi_strerror(ret); + + ret = fi_enable(fDataEndpoint); + if (ret != FI_SUCCESS) + throw SocketError{tools::ToString("Failed opening ofi fabric, reason: ", fi_strerror(ret))}; + } } -auto Socket::Send(FairMQMessagePtr& msg, const int timeout) -> int { return SendImpl(msg, 0, timeout); } -auto Socket::Receive(FairMQMessagePtr& msg, const int timeout) -> int { return ReceiveImpl(msg, 0, timeout); } -auto Socket::Send(std::vector>& msgVec, const int timeout) -> int64_t { return SendImpl(msgVec, 0, timeout); } -auto Socket::Receive(std::vector>& msgVec, const int timeout) -> int64_t { return ReceiveImpl(msgVec, 0, timeout); } +auto Socket::Send(MessagePtr& msg, const int timeout) -> int { return SendImpl(msg, 0, timeout); } +auto Socket::Receive(MessagePtr& msg, const int timeout) -> int { return ReceiveImpl(msg, 0, timeout); } +auto Socket::Send(std::vector& msgVec, const int timeout) -> int64_t { return SendImpl(msgVec, 0, timeout); } +auto Socket::Receive(std::vector& msgVec, const int timeout) -> int64_t { return ReceiveImpl(msgVec, 0, timeout); } -auto Socket::TrySend(FairMQMessagePtr& msg) -> int { return SendImpl(msg, ZMQ_DONTWAIT, 0); } -auto Socket::TryReceive(FairMQMessagePtr& msg) -> int { return ReceiveImpl(msg, ZMQ_DONTWAIT, 0); } -auto Socket::TrySend(std::vector>& msgVec) -> int64_t { return SendImpl(msgVec, ZMQ_DONTWAIT, 0); } -auto Socket::TryReceive(std::vector>& msgVec) -> int64_t { return ReceiveImpl(msgVec, ZMQ_DONTWAIT, 0); } +auto Socket::TrySend(MessagePtr& msg) -> int { return SendImpl(msg, ZMQ_DONTWAIT, 0); } +auto Socket::TryReceive(MessagePtr& msg) -> int { return ReceiveImpl(msg, ZMQ_DONTWAIT, 0); } +auto Socket::TrySend(std::vector& msgVec) -> int64_t { return SendImpl(msgVec, ZMQ_DONTWAIT, 0); } +auto Socket::TryReceive(std::vector& msgVec) -> int64_t { return ReceiveImpl(msgVec, ZMQ_DONTWAIT, 0); } auto Socket::SendImpl(FairMQMessagePtr& msg, const int flags, const int timeout) -> int { - auto ret = zmq_send(fMetaSocket, nullptr, 0, flags); + auto metadata = new int; + auto ret = zmq_send(fMetaSocket, &metadata, sizeof(int), flags); if (ret == EAGAIN) { return -2; } else if (ret < 0) { @@ -294,6 +349,24 @@ auto Socket::Close() -> void if (zmq_close(fMetaSocket) != 0) { throw SocketError{tools::ToString("Failed closing zmq socket, reason: ", zmq_strerror(errno))}; } + + if (fDataEndpoint) { + auto ret = fi_close(&fDataEndpoint->fid); + if (ret != FI_SUCCESS) + LOG(error) << "Failed closing ofi endpoint, reason: " << fi_strerror(ret); + } + + if (fDataCompletionQueueTx) { + auto ret = fi_close(&fDataCompletionQueueTx->fid); + if (ret != FI_SUCCESS) + LOG(error) << "Failed closing ofi transmit completion queue, reason: " << fi_strerror(ret); + } + + if (fDataCompletionQueueRx) { + auto ret = fi_close(&fDataCompletionQueueRx->fid); + if (ret != FI_SUCCESS) + LOG(error) << "Failed closing ofi receive completion queue, reason: " << fi_strerror(ret); + } } auto Socket::SetOption(const string& option, const void* value, size_t valueSize) -> void diff --git a/fairmq/ofi/Socket.h b/fairmq/ofi/Socket.h index 3cd2637f..dcf8b264 100644 --- a/fairmq/ofi/Socket.h +++ b/fairmq/ofi/Socket.h @@ -11,8 +11,10 @@ #include #include +#include #include // unique_ptr +#include namespace fair { @@ -21,8 +23,6 @@ namespace mq namespace ofi { -class TransportFactory; - /** * @class Socket Socket.h * @brief @@ -32,7 +32,7 @@ class TransportFactory; class Socket : public fair::mq::Socket { public: - Socket(const TransportFactory& factory, const std::string& type, const std::string& name, const std::string& id = ""); + Socket(Context& factory, const std::string& type, const std::string& name, const std::string& id = ""); Socket(const Socket&) = delete; Socket operator=(const Socket&) = delete; @@ -75,11 +75,15 @@ class Socket : public fair::mq::Socket private: void* fMetaSocket; + fid_ep* fDataEndpoint; + fid_cq* fDataCompletionQueueTx; + fid_cq* fDataCompletionQueueRx; std::string fId; std::atomic fBytesTx; std::atomic fBytesRx; std::atomic fMessagesTx; std::atomic fMessagesRx; + Context& fContext; int fSndTimeout; int fRcvTimeout; diff --git a/fairmq/ofi/TransportFactory.cxx b/fairmq/ofi/TransportFactory.cxx index 9138a95a..5be1ab3e 100644 --- a/fairmq/ofi/TransportFactory.cxx +++ b/fairmq/ofi/TransportFactory.cxx @@ -12,10 +12,7 @@ #include #include -#include // OFI libfabric -#include #include -#include namespace fair { @@ -27,41 +24,12 @@ namespace ofi using namespace std; TransportFactory::TransportFactory(const string& id, const FairMQProgOptions* config) - : FairMQTransportFactory{id} - , fZmqContext{zmq_ctx_new()} +try : FairMQTransportFactory{id} { - if (!fZmqContext) - { - throw TransportFactoryError{tools::ToString("Failed creating zmq context, reason: ", zmq_strerror(errno))}; - } - - auto ofi_hints = fi_allocinfo(); - ofi_hints->caps = FI_MSG | FI_RMA; - ofi_hints->mode = FI_ASYNC_IOV; - ofi_hints->addr_format = FI_SOCKADDR_IN; - auto ofi_info = fi_allocinfo(); - if (ofi_hints == nullptr || ofi_info == nullptr) - { - throw TransportFactoryError{"Failed allocating fi_info structs"}; - } - auto res = fi_getinfo(FI_VERSION(1, 5), nullptr, nullptr, 0, ofi_hints, &ofi_info); - if (res != 0) - { - throw TransportFactoryError{tools::ToString("Failed querying fi_getinfo, reason: ", fi_strerror(res))}; - } - for(auto cursor{ofi_info}; cursor->next != nullptr; cursor = cursor->next) - { - // LOG(debug) << fi_tostr(cursor, FI_TYPE_INFO); - } - fi_freeinfo(ofi_hints); - fi_freeinfo(ofi_info); - - int major, minor, patch; - zmq_version(&major, &minor, &patch); - auto ofi_version{fi_version()}; - LOG(debug) << "Transport: Using ZeroMQ (" << major << "." << minor << "." << patch << ") & " - << "OFI libfabric (API " << FI_MAJOR(ofi_version) << "." << FI_MINOR(ofi_version) << ")"; + LOG(debug) << "Transport: Using ZeroMQ (" << fContext.GetZmqVersion() << ") & " + << "OFI libfabric (API " << fContext.GetOfiApiVersion() << ")"; } +catch (ContextError& e) { throw TransportFactoryError{e.what()}; } auto TransportFactory::CreateMessage() const -> MessagePtr { @@ -85,8 +53,7 @@ auto TransportFactory::CreateMessage(UnmanagedRegionPtr& region, void* data, con auto TransportFactory::CreateSocket(const string& type, const string& name) const -> SocketPtr { - assert(fZmqContext); - return SocketPtr{new Socket(*this, type, name, GetId())}; + return SocketPtr{new Socket(fContext, type, name, GetId())}; } auto TransportFactory::CreatePoller(const vector& channels) const -> PollerPtr @@ -119,13 +86,6 @@ auto TransportFactory::GetType() const -> Transport return Transport::OFI; } -TransportFactory::~TransportFactory() noexcept(false) -{ - if (zmq_ctx_term(fZmqContext) != 0) { - throw TransportFactoryError{tools::ToString("Failed closing zmq context, reason: ", zmq_strerror(errno))}; - } -} - } /* namespace ofi */ } /* namespace mq */ } /* namespace fair */ diff --git a/fairmq/ofi/TransportFactory.h b/fairmq/ofi/TransportFactory.h index 7bc26e52..9f1e3ff5 100644 --- a/fairmq/ofi/TransportFactory.h +++ b/fairmq/ofi/TransportFactory.h @@ -11,6 +11,7 @@ #include #include +#include namespace fair { @@ -19,8 +20,6 @@ namespace mq namespace ofi { -class Socket; - /** * @class TransportFactory TransportFactory.h * @brief FairMQ transport factory for the ofi transport (implemented with ZeroMQ + libfabric) @@ -29,8 +28,6 @@ class Socket; */ class TransportFactory : public FairMQTransportFactory { - friend Socket; - public: TransportFactory(const std::string& id = "", const FairMQProgOptions* config = nullptr); TransportFactory(const TransportFactory&) = delete; @@ -55,10 +52,8 @@ class TransportFactory : public FairMQTransportFactory void Interrupt() override {} void Resume() override {} - ~TransportFactory() noexcept(false) override; - private: - void* fZmqContext; + mutable Context fContext; }; /* class TransportFactory */ } /* namespace ofi */ diff --git a/fairmq/test/helper/devices/TestPairLeft.cxx b/fairmq/test/helper/devices/TestPairLeft.cxx index ee38aabc..d2f24b81 100644 --- a/fairmq/test/helper/devices/TestPairLeft.cxx +++ b/fairmq/test/helper/devices/TestPairLeft.cxx @@ -30,8 +30,24 @@ class PairLeft : public FairMQDevice auto Run() -> void override { - auto msg{NewMessageFor("data", 0)}; - Send(msg, "data"); + int counter{0}; + + // Simple empty message ping pong + auto msg1{NewMessageFor("data", 0)}; + auto msg2{NewMessageFor("data", 0)}; + auto msg3{NewMessageFor("data", 0)}; + if (Send(msg1, "data") >= 0) counter++; + if (Receive(msg2, "data") >= 0) counter++; + if (Send(msg2, "data") >= 0) counter++; + if (Receive(msg3, "data") >= 0) counter++; + if (counter == 4) LOG(info) << "Simple empty message ping pong successfull"; + + // Simple message with short text data + auto msg4{NewSimpleMessageFor("data", 0, "testdata1234")}; + if (Send(msg4, "data") >= 0) counter++; + if (counter == 5) LOG(info) << "Simple message with short text data successfull"; + + assert(counter == 5); }; }; diff --git a/fairmq/test/helper/devices/TestPairRight.cxx b/fairmq/test/helper/devices/TestPairRight.cxx index 5dfec498..e657476d 100644 --- a/fairmq/test/helper/devices/TestPairRight.cxx +++ b/fairmq/test/helper/devices/TestPairRight.cxx @@ -7,6 +7,7 @@ ********************************************************************************/ #include +#include namespace fair { @@ -30,11 +31,28 @@ class PairRight : public FairMQDevice auto Run() -> void override { - MessagePtr msg{NewMessageFor("data", 0)}; + int counter{0}; - if (Receive(msg, "data") >= 0) { - LOG(info) << "PAIR test successfull"; + // Simple empty message ping pong + auto msg1{NewMessageFor("data", 0)}; + if (Receive(msg1, "data") >= 0) counter++; + if (Send(msg1, "data") >= 0) counter++; + auto msg2{NewMessageFor("data", 0)}; + if (Receive(msg2, "data") >= 0) counter++; + if (Send(msg2, "data") >= 0) counter++; + if (counter == 4) LOG(info) << "Simple empty message ping pong successfull"; + + // Simple message with short text data + auto msg3{NewMessageFor("data", 0)}; + auto ret = Receive(msg3, "data"); + if (ret > 0) { + auto content = std::string{static_cast(msg3->GetData()), msg3->GetSize()}; + LOG(info) << ret << ", " << msg3->GetSize() << ", '" << content << "'"; + if (msg3->GetSize() == ret && content == "testdata1234") counter++; } + if (counter == 5) LOG(info) << "Simple message with short text data successfull"; + + if (counter == 5) LOG(info) << "PAIR test successfull."; }; };