mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-13 16:46:47 +00:00
FairMQ: Setup ofi objects
FI_MSG API FI_EP_RDM, reliable datagram message (unconnected)
This commit is contained in:
parent
4250e3d45b
commit
630a1dbbce
|
@ -83,6 +83,7 @@ set(FAIRMQ_HEADER_FILES
|
||||||
devices/FairMQProxy.h
|
devices/FairMQProxy.h
|
||||||
devices/FairMQSink.h
|
devices/FairMQSink.h
|
||||||
devices/FairMQSplitter.h
|
devices/FairMQSplitter.h
|
||||||
|
ofi/Context.h
|
||||||
ofi/Message.h
|
ofi/Message.h
|
||||||
ofi/Poller.h
|
ofi/Poller.h
|
||||||
ofi/Socket.h
|
ofi/Socket.h
|
||||||
|
@ -150,6 +151,7 @@ set(FAIRMQ_SOURCE_FILES
|
||||||
devices/FairMQProxy.cxx
|
devices/FairMQProxy.cxx
|
||||||
# devices/FairMQSink.cxx
|
# devices/FairMQSink.cxx
|
||||||
devices/FairMQSplitter.cxx
|
devices/FairMQSplitter.cxx
|
||||||
|
ofi/Context.cxx
|
||||||
ofi/Message.cxx
|
ofi/Message.cxx
|
||||||
ofi/Poller.cxx
|
ofi/Poller.cxx
|
||||||
ofi/Socket.cxx
|
ofi/Socket.cxx
|
||||||
|
|
|
@ -74,7 +74,7 @@ class FairMQTransportFactory
|
||||||
virtual void Interrupt() = 0;
|
virtual void Interrupt() = 0;
|
||||||
virtual void Resume() = 0;
|
virtual void Resume() = 0;
|
||||||
|
|
||||||
virtual ~FairMQTransportFactory() noexcept(false) {};
|
virtual ~FairMQTransportFactory() {};
|
||||||
|
|
||||||
static auto CreateTransportFactory(const std::string& type, const std::string& id = "", const FairMQProgOptions* config = nullptr) -> std::shared_ptr<FairMQTransportFactory>;
|
static auto CreateTransportFactory(const std::string& type, const std::string& id = "", const FairMQProgOptions* config = nullptr) -> std::shared_ptr<FairMQTransportFactory>;
|
||||||
|
|
||||||
|
|
179
fairmq/ofi/Context.cxx
Normal file
179
fairmq/ofi/Context.cxx
Normal file
|
@ -0,0 +1,179 @@
|
||||||
|
/********************************************************************************
|
||||||
|
* Copyright (C) 2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||||
|
* *
|
||||||
|
* This software is distributed under the terms of the *
|
||||||
|
* GNU Lesser General Public Licence (LGPL) version 3, *
|
||||||
|
* copied verbatim in the file "LICENSE" *
|
||||||
|
********************************************************************************/
|
||||||
|
|
||||||
|
#include <fairmq/ofi/Context.h>
|
||||||
|
#include <fairmq/Tools.h>
|
||||||
|
#include <FairMQLogger.h>
|
||||||
|
|
||||||
|
#include <arpa/inet.h>
|
||||||
|
#include <memory>
|
||||||
|
#include <rdma/fabric.h>
|
||||||
|
#include <rdma/fi_domain.h>
|
||||||
|
#include <rdma/fi_endpoint.h>
|
||||||
|
#include <rdma/fi_errno.h>
|
||||||
|
#include <regex>
|
||||||
|
#include <string>
|
||||||
|
#include <string.h>
|
||||||
|
#include <zmq.h>
|
||||||
|
|
||||||
|
namespace fair
|
||||||
|
{
|
||||||
|
namespace mq
|
||||||
|
{
|
||||||
|
namespace ofi
|
||||||
|
{
|
||||||
|
|
||||||
|
using namespace std;
|
||||||
|
|
||||||
|
Context::Context(int numberIoThreads)
|
||||||
|
: fOfiDomain(nullptr)
|
||||||
|
, fOfiFabric(nullptr)
|
||||||
|
, fZmqContext(zmq_ctx_new())
|
||||||
|
{
|
||||||
|
if (!fZmqContext)
|
||||||
|
throw ContextError{tools::ToString("Failed creating zmq context, reason: ", zmq_strerror(errno))};
|
||||||
|
}
|
||||||
|
|
||||||
|
Context::~Context()
|
||||||
|
{
|
||||||
|
if (zmq_ctx_term(fZmqContext) != 0) {
|
||||||
|
LOG(error) << "Failed closing zmq context, reason: " << zmq_strerror(errno);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (fOfiFabric) {
|
||||||
|
auto ret = fi_close(&fOfiFabric->fid);
|
||||||
|
if (ret != FI_SUCCESS)
|
||||||
|
LOG(error) << "Failed closing ofi fabric, reason: " << fi_strerror(ret);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (fOfiDomain) {
|
||||||
|
auto ret = fi_close(&fOfiDomain->fid);
|
||||||
|
if (ret != FI_SUCCESS)
|
||||||
|
LOG(error) << "Failed closing ofi domain, reason: " << fi_strerror(ret);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
auto Context::GetZmqVersion() const -> std::string
|
||||||
|
{
|
||||||
|
int major, minor, patch;
|
||||||
|
zmq_version(&major, &minor, &patch);
|
||||||
|
return tools::ToString(major, ".", minor, ".", patch);
|
||||||
|
}
|
||||||
|
|
||||||
|
auto Context::GetOfiApiVersion() const -> std::string
|
||||||
|
{
|
||||||
|
auto ofi_version{fi_version()};
|
||||||
|
return tools::ToString(FI_MAJOR(ofi_version), ".", FI_MINOR(ofi_version));
|
||||||
|
}
|
||||||
|
|
||||||
|
auto Context::InitOfi(ConnectionType type, std::string address) -> void
|
||||||
|
{
|
||||||
|
// Parse address
|
||||||
|
string protocol, ip;
|
||||||
|
unsigned int port = 0;
|
||||||
|
regex address_regex("^([a-z]+)://([0-9]+\\.[0-9]+\\.[0-9]+\\.[0-9]+):([0-9]+).*");
|
||||||
|
smatch address_result;
|
||||||
|
if (regex_match(address, address_result, address_regex)) {
|
||||||
|
protocol = address_result[1];
|
||||||
|
ip = address_result[2];
|
||||||
|
port = stoul(address_result[3]);
|
||||||
|
LOG(debug) << "Parsed '" << protocol << "', '" << ip << "', '" << port << "' fields from '" << address << "'";
|
||||||
|
}
|
||||||
|
if (protocol != "tcp") throw ContextError{"Wrong protocol: Supplied address must be in format tcp://ip:port"};
|
||||||
|
|
||||||
|
if (!fOfiInfo) {
|
||||||
|
sockaddr_in* sa = static_cast<sockaddr_in*>(malloc(sizeof(sockaddr_in)));
|
||||||
|
inet_pton(AF_INET, ip.c_str(), &(sa->sin_addr));
|
||||||
|
sa->sin_port = port;
|
||||||
|
sa->sin_family = AF_INET;
|
||||||
|
|
||||||
|
// Prepare fi_getinfo query
|
||||||
|
unique_ptr<fi_info, void(*)(fi_info*)> ofi_hints(fi_allocinfo(), fi_freeinfo);
|
||||||
|
ofi_hints->caps = FI_MSG;
|
||||||
|
ofi_hints->mode = FI_ASYNC_IOV;
|
||||||
|
ofi_hints->addr_format = FI_SOCKADDR_IN;
|
||||||
|
ofi_hints->fabric_attr->prov_name = strdup("sockets");
|
||||||
|
ofi_hints->ep_attr->type = FI_EP_RDM;
|
||||||
|
if (type == ConnectionType::Bind) {
|
||||||
|
ofi_hints->src_addr = sa;
|
||||||
|
ofi_hints->src_addrlen = sizeof(sockaddr_in);
|
||||||
|
} else {
|
||||||
|
ofi_hints->dest_addr = sa;
|
||||||
|
ofi_hints->dest_addrlen = sizeof(sockaddr_in);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Query fi_getinfo for fabric to use
|
||||||
|
auto res = fi_getinfo(FI_VERSION(1, 5), nullptr, nullptr, 0, ofi_hints.get(), &fOfiInfo);
|
||||||
|
if (res != 0) throw ContextError{tools::ToString("Failed querying fi_getinfo, reason: ", fi_strerror(res))};
|
||||||
|
if (!fOfiInfo) throw ContextError{"Could not find any ofi compatible fabric."};
|
||||||
|
|
||||||
|
// for(auto cursor{ofi_info}; cursor->next != nullptr; cursor = cursor->next) {
|
||||||
|
// LOG(debug) << fi_tostr(cursor, FI_TYPE_INFO);
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
} else {
|
||||||
|
LOG(debug) << "Ofi info already queried. Skipping.";
|
||||||
|
}
|
||||||
|
|
||||||
|
OpenOfiFabric();
|
||||||
|
OpenOfiDomain();
|
||||||
|
}
|
||||||
|
|
||||||
|
auto Context::OpenOfiFabric() -> void
|
||||||
|
{
|
||||||
|
if (!fOfiFabric) {
|
||||||
|
auto ret = fi_fabric(fOfiInfo->fabric_attr, &fOfiFabric, NULL);
|
||||||
|
if (ret != FI_SUCCESS)
|
||||||
|
throw ContextError{tools::ToString("Failed opening ofi fabric, reason: ", fi_strerror(ret))};
|
||||||
|
} else {
|
||||||
|
// TODO Check, if requested fabric matches existing one.
|
||||||
|
// TODO Decide, if we want to support more than one fabric simultaneously.
|
||||||
|
LOG(debug) << "Ofi fabric already opened. Skipping.";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
auto Context::OpenOfiDomain() -> void
|
||||||
|
{
|
||||||
|
if (!fOfiDomain) {
|
||||||
|
auto ret = fi_domain(fOfiFabric, fOfiInfo, &fOfiDomain, NULL);
|
||||||
|
if (ret != FI_SUCCESS)
|
||||||
|
throw ContextError{tools::ToString("Failed opening ofi domain, reason: ", fi_strerror(ret))};
|
||||||
|
} else {
|
||||||
|
LOG(debug) << "Ofi domain already opened. Skipping.";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
auto Context::CreateOfiEndpoint() -> fid_ep*
|
||||||
|
{
|
||||||
|
fid_ep* ep = nullptr;
|
||||||
|
auto ret = fi_endpoint(fOfiDomain, fOfiInfo, &ep, nullptr);
|
||||||
|
if (ret != FI_SUCCESS)
|
||||||
|
throw ContextError{tools::ToString("Failed creating ofi endpoint, reason: ", fi_strerror(ret))};
|
||||||
|
return ep;
|
||||||
|
}
|
||||||
|
|
||||||
|
auto Context::CreateOfiCompletionQueue() -> fid_cq*
|
||||||
|
{
|
||||||
|
fid_cq* cq = nullptr;
|
||||||
|
fi_cq_attr attr = {0, 0, FI_CQ_FORMAT_DATA, FI_WAIT_UNSPEC, 0, FI_CQ_COND_NONE, nullptr};
|
||||||
|
// size_t size; [> # entries for CQ <]
|
||||||
|
// uint64_t flags; [> operation flags <]
|
||||||
|
// enum fi_cq_format format; [> completion format <]
|
||||||
|
// enum fi_wait_obj wait_obj; [> requested wait object <]
|
||||||
|
// int signaling_vector; [> interrupt affinity <]
|
||||||
|
// enum fi_cq_wait_cond wait_cond; [> wait condition format <]
|
||||||
|
// struct fid_wait *wait_set; [> optional wait set <]
|
||||||
|
auto ret = fi_cq_open(fOfiDomain, &attr, &cq, nullptr);
|
||||||
|
if (ret != FI_SUCCESS)
|
||||||
|
throw ContextError{tools::ToString("Failed creating ofi completion queue, reason: ", fi_strerror(ret))};
|
||||||
|
return cq;
|
||||||
|
}
|
||||||
|
|
||||||
|
} /* namespace ofi */
|
||||||
|
} /* namespace mq */
|
||||||
|
} /* namespace fair */
|
62
fairmq/ofi/Context.h
Normal file
62
fairmq/ofi/Context.h
Normal file
|
@ -0,0 +1,62 @@
|
||||||
|
/********************************************************************************
|
||||||
|
* Copyright (C) 2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||||
|
* *
|
||||||
|
* This software is distributed under the terms of the *
|
||||||
|
* GNU Lesser General Public Licence (LGPL) version 3, *
|
||||||
|
* copied verbatim in the file "LICENSE" *
|
||||||
|
********************************************************************************/
|
||||||
|
|
||||||
|
#ifndef FAIR_MQ_OFI_CONTEXT_H
|
||||||
|
#define FAIR_MQ_OFI_CONTEXT_H
|
||||||
|
|
||||||
|
#include <memory>
|
||||||
|
#include <rdma/fabric.h>
|
||||||
|
#include <string>
|
||||||
|
#include <stdexcept>
|
||||||
|
|
||||||
|
namespace fair
|
||||||
|
{
|
||||||
|
namespace mq
|
||||||
|
{
|
||||||
|
namespace ofi
|
||||||
|
{
|
||||||
|
|
||||||
|
enum class ConnectionType : bool { Bind, Connect };
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @class Context Context.h <fairmq/ofi/Context.h>
|
||||||
|
* @brief Transport-wide context
|
||||||
|
*
|
||||||
|
* @todo TODO insert long description
|
||||||
|
*/
|
||||||
|
class Context
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
Context(int numberIoThreads = 1);
|
||||||
|
~Context();
|
||||||
|
|
||||||
|
/// Deferred Ofi initialization
|
||||||
|
auto InitOfi(ConnectionType type, std::string address) -> void;
|
||||||
|
auto CreateOfiEndpoint() -> fid_ep*;
|
||||||
|
auto CreateOfiCompletionQueue() -> fid_cq*;
|
||||||
|
auto GetZmqVersion() const -> std::string;
|
||||||
|
auto GetOfiApiVersion() const -> std::string;
|
||||||
|
auto GetZmqContext() const -> void* { return fZmqContext; }
|
||||||
|
|
||||||
|
private:
|
||||||
|
void* fZmqContext;
|
||||||
|
fi_info* fOfiInfo;
|
||||||
|
fid_fabric* fOfiFabric;
|
||||||
|
fid_domain* fOfiDomain;
|
||||||
|
|
||||||
|
auto OpenOfiFabric() -> void;
|
||||||
|
auto OpenOfiDomain() -> void;
|
||||||
|
}; /* class Context */
|
||||||
|
|
||||||
|
struct ContextError : std::runtime_error { using std::runtime_error::runtime_error; };
|
||||||
|
|
||||||
|
} /* namespace ofi */
|
||||||
|
} /* namespace mq */
|
||||||
|
} /* namespace fair */
|
||||||
|
|
||||||
|
#endif /* FAIR_MQ_OFI_CONTEXT_H */
|
|
@ -11,6 +11,9 @@
|
||||||
#include <fairmq/Tools.h>
|
#include <fairmq/Tools.h>
|
||||||
#include <FairMQLogger.h>
|
#include <FairMQLogger.h>
|
||||||
|
|
||||||
|
#include <rdma/fabric.h>
|
||||||
|
#include <rdma/fi_endpoint.h>
|
||||||
|
#include <sstream>
|
||||||
#include <zmq.h>
|
#include <zmq.h>
|
||||||
|
|
||||||
namespace fair
|
namespace fair
|
||||||
|
@ -22,20 +25,17 @@ namespace ofi
|
||||||
|
|
||||||
using namespace std;
|
using namespace std;
|
||||||
|
|
||||||
Socket::Socket(const TransportFactory& factory, const string& type, const string& name, const string& id /*= ""*/)
|
Socket::Socket(Context& context, const string& type, const string& name, const string& id /*= ""*/)
|
||||||
: fId{id + "." + name + "." + type}
|
: fDataEndpoint(nullptr)
|
||||||
, fBytesTx{0}
|
, fId(id + "." + name + "." + type)
|
||||||
, fBytesRx{0}
|
, fSndTimeout(100)
|
||||||
, fMessagesTx{0}
|
, fRcvTimeout(100)
|
||||||
, fMessagesRx{0}
|
, fContext(context)
|
||||||
, fSndTimeout{100}
|
|
||||||
, fRcvTimeout{100}
|
|
||||||
{
|
{
|
||||||
if (type != "pair") {
|
if (type != "pair") {
|
||||||
throw SocketError{tools::ToString("Socket type '", type, "' not implemented for ofi transport.")};
|
throw SocketError{tools::ToString("Socket type '", type, "' not implemented for ofi transport.")};
|
||||||
} else {
|
} else {
|
||||||
fMetaSocket = zmq_socket(factory.fZmqContext, GetConstant(type));
|
fMetaSocket = zmq_socket(fContext.GetZmqContext(), GetConstant(type));
|
||||||
}
|
|
||||||
|
|
||||||
if (fMetaSocket == nullptr) {
|
if (fMetaSocket == nullptr) {
|
||||||
throw SocketError{tools::ToString("Failed creating socket ", fId, ", reason: ", zmq_strerror(errno))};
|
throw SocketError{tools::ToString("Failed creating socket ", fId, ", reason: ", zmq_strerror(errno))};
|
||||||
|
@ -59,6 +59,7 @@ Socket::Socket(const TransportFactory& factory, const string& type, const string
|
||||||
if (zmq_setsockopt(fMetaSocket, ZMQ_RCVTIMEO, &fRcvTimeout, sizeof(fRcvTimeout)) != 0) {
|
if (zmq_setsockopt(fMetaSocket, ZMQ_RCVTIMEO, &fRcvTimeout, sizeof(fRcvTimeout)) != 0) {
|
||||||
throw SocketError{tools::ToString("Failed setting ZMQ_RCVTIMEO socket option, reason: ", zmq_strerror(errno))};
|
throw SocketError{tools::ToString("Failed setting ZMQ_RCVTIMEO socket option, reason: ", zmq_strerror(errno))};
|
||||||
}
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
auto Socket::Bind(const string& address) -> bool
|
auto Socket::Bind(const string& address) -> bool
|
||||||
|
@ -72,6 +73,33 @@ auto Socket::Bind(const string& address) -> bool
|
||||||
LOG(error) << "Failed binding socket " << fId << ", reason: " << zmq_strerror(errno);
|
LOG(error) << "Failed binding socket " << fId << ", reason: " << zmq_strerror(errno);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fContext.InitOfi(ConnectionType::Bind, address);
|
||||||
|
|
||||||
|
if (!fDataEndpoint) {
|
||||||
|
try {
|
||||||
|
fDataEndpoint = fContext.CreateOfiEndpoint();
|
||||||
|
} catch (ContextError& e) {
|
||||||
|
LOG(error) << "Failed creating ofi endpoint for " << address << ", reason: " << e.what();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!fDataCompletionQueueTx)
|
||||||
|
fDataCompletionQueueTx = fContext.CreateOfiCompletionQueue();
|
||||||
|
auto ret = fi_ep_bind(fDataEndpoint, &fDataCompletionQueueTx->fid, FI_TRANSMIT);
|
||||||
|
if (ret != FI_SUCCESS)
|
||||||
|
LOG(error) << "Failed binding ofi transmit completion queue to endpoint, reason: " << fi_strerror(ret);
|
||||||
|
|
||||||
|
if (!fDataCompletionQueueRx)
|
||||||
|
fDataCompletionQueueRx = fContext.CreateOfiCompletionQueue();
|
||||||
|
ret = fi_ep_bind(fDataEndpoint, &fDataCompletionQueueRx->fid, FI_RECV);
|
||||||
|
if (ret != FI_SUCCESS)
|
||||||
|
LOG(error) << "Failed binding ofi receive completion queue to endpoint, reason: " << fi_strerror(ret);
|
||||||
|
|
||||||
|
ret = fi_enable(fDataEndpoint);
|
||||||
|
if (ret != FI_SUCCESS)
|
||||||
|
LOG(error) << "Failed opening ofi fabric, reason: " << fi_strerror(ret);
|
||||||
|
}
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -80,21 +108,48 @@ auto Socket::Connect(const string& address) -> void
|
||||||
if (zmq_connect(fMetaSocket, address.c_str()) != 0) {
|
if (zmq_connect(fMetaSocket, address.c_str()) != 0) {
|
||||||
throw SocketError{tools::ToString("Failed connecting socket ", fId, ", reason: ", zmq_strerror(errno))};
|
throw SocketError{tools::ToString("Failed connecting socket ", fId, ", reason: ", zmq_strerror(errno))};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fContext.InitOfi(ConnectionType::Connect, address);
|
||||||
|
|
||||||
|
if (!fDataEndpoint) {
|
||||||
|
try {
|
||||||
|
fDataEndpoint = fContext.CreateOfiEndpoint();
|
||||||
|
} catch (ContextError& e) {
|
||||||
|
throw SocketError(tools::ToString("Failed creating ofi endpoint for ", address, ", reason: ", e.what()));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!fDataCompletionQueueTx)
|
||||||
|
fDataCompletionQueueTx = fContext.CreateOfiCompletionQueue();
|
||||||
|
auto ret = fi_ep_bind(fDataEndpoint, &fDataCompletionQueueTx->fid, FI_TRANSMIT);
|
||||||
|
if (ret != FI_SUCCESS)
|
||||||
|
LOG(error) << "Failed binding ofi transmit completion queue to endpoint, reason: " << fi_strerror(ret);
|
||||||
|
|
||||||
|
if (!fDataCompletionQueueRx)
|
||||||
|
fDataCompletionQueueRx = fContext.CreateOfiCompletionQueue();
|
||||||
|
ret = fi_ep_bind(fDataEndpoint, &fDataCompletionQueueRx->fid, FI_RECV);
|
||||||
|
if (ret != FI_SUCCESS)
|
||||||
|
LOG(error) << "Failed binding ofi receive completion queue to endpoint, reason: " << fi_strerror(ret);
|
||||||
|
|
||||||
|
ret = fi_enable(fDataEndpoint);
|
||||||
|
if (ret != FI_SUCCESS)
|
||||||
|
throw SocketError{tools::ToString("Failed opening ofi fabric, reason: ", fi_strerror(ret))};
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
auto Socket::Send(FairMQMessagePtr& msg, const int timeout) -> int { return SendImpl(msg, 0, timeout); }
|
auto Socket::Send(MessagePtr& msg, const int timeout) -> int { return SendImpl(msg, 0, timeout); }
|
||||||
auto Socket::Receive(FairMQMessagePtr& msg, const int timeout) -> int { return ReceiveImpl(msg, 0, timeout); }
|
auto Socket::Receive(MessagePtr& msg, const int timeout) -> int { return ReceiveImpl(msg, 0, timeout); }
|
||||||
auto Socket::Send(std::vector<std::unique_ptr<FairMQMessage>>& msgVec, const int timeout) -> int64_t { return SendImpl(msgVec, 0, timeout); }
|
auto Socket::Send(std::vector<MessagePtr>& msgVec, const int timeout) -> int64_t { return SendImpl(msgVec, 0, timeout); }
|
||||||
auto Socket::Receive(std::vector<std::unique_ptr<FairMQMessage>>& msgVec, const int timeout) -> int64_t { return ReceiveImpl(msgVec, 0, timeout); }
|
auto Socket::Receive(std::vector<MessagePtr>& msgVec, const int timeout) -> int64_t { return ReceiveImpl(msgVec, 0, timeout); }
|
||||||
|
|
||||||
auto Socket::TrySend(FairMQMessagePtr& msg) -> int { return SendImpl(msg, ZMQ_DONTWAIT, 0); }
|
auto Socket::TrySend(MessagePtr& msg) -> int { return SendImpl(msg, ZMQ_DONTWAIT, 0); }
|
||||||
auto Socket::TryReceive(FairMQMessagePtr& msg) -> int { return ReceiveImpl(msg, ZMQ_DONTWAIT, 0); }
|
auto Socket::TryReceive(MessagePtr& msg) -> int { return ReceiveImpl(msg, ZMQ_DONTWAIT, 0); }
|
||||||
auto Socket::TrySend(std::vector<std::unique_ptr<FairMQMessage>>& msgVec) -> int64_t { return SendImpl(msgVec, ZMQ_DONTWAIT, 0); }
|
auto Socket::TrySend(std::vector<MessagePtr>& msgVec) -> int64_t { return SendImpl(msgVec, ZMQ_DONTWAIT, 0); }
|
||||||
auto Socket::TryReceive(std::vector<std::unique_ptr<FairMQMessage>>& msgVec) -> int64_t { return ReceiveImpl(msgVec, ZMQ_DONTWAIT, 0); }
|
auto Socket::TryReceive(std::vector<MessagePtr>& msgVec) -> int64_t { return ReceiveImpl(msgVec, ZMQ_DONTWAIT, 0); }
|
||||||
|
|
||||||
auto Socket::SendImpl(FairMQMessagePtr& msg, const int flags, const int timeout) -> int
|
auto Socket::SendImpl(FairMQMessagePtr& msg, const int flags, const int timeout) -> int
|
||||||
{
|
{
|
||||||
auto ret = zmq_send(fMetaSocket, nullptr, 0, flags);
|
auto metadata = new int;
|
||||||
|
auto ret = zmq_send(fMetaSocket, &metadata, sizeof(int), flags);
|
||||||
if (ret == EAGAIN) {
|
if (ret == EAGAIN) {
|
||||||
return -2;
|
return -2;
|
||||||
} else if (ret < 0) {
|
} else if (ret < 0) {
|
||||||
|
@ -294,6 +349,24 @@ auto Socket::Close() -> void
|
||||||
if (zmq_close(fMetaSocket) != 0) {
|
if (zmq_close(fMetaSocket) != 0) {
|
||||||
throw SocketError{tools::ToString("Failed closing zmq socket, reason: ", zmq_strerror(errno))};
|
throw SocketError{tools::ToString("Failed closing zmq socket, reason: ", zmq_strerror(errno))};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (fDataEndpoint) {
|
||||||
|
auto ret = fi_close(&fDataEndpoint->fid);
|
||||||
|
if (ret != FI_SUCCESS)
|
||||||
|
LOG(error) << "Failed closing ofi endpoint, reason: " << fi_strerror(ret);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (fDataCompletionQueueTx) {
|
||||||
|
auto ret = fi_close(&fDataCompletionQueueTx->fid);
|
||||||
|
if (ret != FI_SUCCESS)
|
||||||
|
LOG(error) << "Failed closing ofi transmit completion queue, reason: " << fi_strerror(ret);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (fDataCompletionQueueRx) {
|
||||||
|
auto ret = fi_close(&fDataCompletionQueueRx->fid);
|
||||||
|
if (ret != FI_SUCCESS)
|
||||||
|
LOG(error) << "Failed closing ofi receive completion queue, reason: " << fi_strerror(ret);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
auto Socket::SetOption(const string& option, const void* value, size_t valueSize) -> void
|
auto Socket::SetOption(const string& option, const void* value, size_t valueSize) -> void
|
||||||
|
|
|
@ -11,8 +11,10 @@
|
||||||
|
|
||||||
#include <FairMQSocket.h>
|
#include <FairMQSocket.h>
|
||||||
#include <FairMQMessage.h>
|
#include <FairMQMessage.h>
|
||||||
|
#include <fairmq/ofi/Context.h>
|
||||||
|
|
||||||
#include <memory> // unique_ptr
|
#include <memory> // unique_ptr
|
||||||
|
#include <rdma/fabric.h>
|
||||||
|
|
||||||
namespace fair
|
namespace fair
|
||||||
{
|
{
|
||||||
|
@ -21,8 +23,6 @@ namespace mq
|
||||||
namespace ofi
|
namespace ofi
|
||||||
{
|
{
|
||||||
|
|
||||||
class TransportFactory;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @class Socket Socket.h <fairmq/ofi/Socket.h>
|
* @class Socket Socket.h <fairmq/ofi/Socket.h>
|
||||||
* @brief
|
* @brief
|
||||||
|
@ -32,7 +32,7 @@ class TransportFactory;
|
||||||
class Socket : public fair::mq::Socket
|
class Socket : public fair::mq::Socket
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
Socket(const TransportFactory& factory, const std::string& type, const std::string& name, const std::string& id = "");
|
Socket(Context& factory, const std::string& type, const std::string& name, const std::string& id = "");
|
||||||
Socket(const Socket&) = delete;
|
Socket(const Socket&) = delete;
|
||||||
Socket operator=(const Socket&) = delete;
|
Socket operator=(const Socket&) = delete;
|
||||||
|
|
||||||
|
@ -75,11 +75,15 @@ class Socket : public fair::mq::Socket
|
||||||
|
|
||||||
private:
|
private:
|
||||||
void* fMetaSocket;
|
void* fMetaSocket;
|
||||||
|
fid_ep* fDataEndpoint;
|
||||||
|
fid_cq* fDataCompletionQueueTx;
|
||||||
|
fid_cq* fDataCompletionQueueRx;
|
||||||
std::string fId;
|
std::string fId;
|
||||||
std::atomic<unsigned long> fBytesTx;
|
std::atomic<unsigned long> fBytesTx;
|
||||||
std::atomic<unsigned long> fBytesRx;
|
std::atomic<unsigned long> fBytesRx;
|
||||||
std::atomic<unsigned long> fMessagesTx;
|
std::atomic<unsigned long> fMessagesTx;
|
||||||
std::atomic<unsigned long> fMessagesRx;
|
std::atomic<unsigned long> fMessagesRx;
|
||||||
|
Context& fContext;
|
||||||
|
|
||||||
int fSndTimeout;
|
int fSndTimeout;
|
||||||
int fRcvTimeout;
|
int fRcvTimeout;
|
||||||
|
|
|
@ -12,10 +12,7 @@
|
||||||
#include <fairmq/ofi/TransportFactory.h>
|
#include <fairmq/ofi/TransportFactory.h>
|
||||||
#include <fairmq/Tools.h>
|
#include <fairmq/Tools.h>
|
||||||
|
|
||||||
#include <rdma/fabric.h> // OFI libfabric
|
|
||||||
#include <rdma/fi_errno.h>
|
|
||||||
#include <stdexcept>
|
#include <stdexcept>
|
||||||
#include <zmq.h>
|
|
||||||
|
|
||||||
namespace fair
|
namespace fair
|
||||||
{
|
{
|
||||||
|
@ -27,41 +24,12 @@ namespace ofi
|
||||||
using namespace std;
|
using namespace std;
|
||||||
|
|
||||||
TransportFactory::TransportFactory(const string& id, const FairMQProgOptions* config)
|
TransportFactory::TransportFactory(const string& id, const FairMQProgOptions* config)
|
||||||
: FairMQTransportFactory{id}
|
try : FairMQTransportFactory{id}
|
||||||
, fZmqContext{zmq_ctx_new()}
|
|
||||||
{
|
{
|
||||||
if (!fZmqContext)
|
LOG(debug) << "Transport: Using ZeroMQ (" << fContext.GetZmqVersion() << ") & "
|
||||||
{
|
<< "OFI libfabric (API " << fContext.GetOfiApiVersion() << ")";
|
||||||
throw TransportFactoryError{tools::ToString("Failed creating zmq context, reason: ", zmq_strerror(errno))};
|
|
||||||
}
|
|
||||||
|
|
||||||
auto ofi_hints = fi_allocinfo();
|
|
||||||
ofi_hints->caps = FI_MSG | FI_RMA;
|
|
||||||
ofi_hints->mode = FI_ASYNC_IOV;
|
|
||||||
ofi_hints->addr_format = FI_SOCKADDR_IN;
|
|
||||||
auto ofi_info = fi_allocinfo();
|
|
||||||
if (ofi_hints == nullptr || ofi_info == nullptr)
|
|
||||||
{
|
|
||||||
throw TransportFactoryError{"Failed allocating fi_info structs"};
|
|
||||||
}
|
|
||||||
auto res = fi_getinfo(FI_VERSION(1, 5), nullptr, nullptr, 0, ofi_hints, &ofi_info);
|
|
||||||
if (res != 0)
|
|
||||||
{
|
|
||||||
throw TransportFactoryError{tools::ToString("Failed querying fi_getinfo, reason: ", fi_strerror(res))};
|
|
||||||
}
|
|
||||||
for(auto cursor{ofi_info}; cursor->next != nullptr; cursor = cursor->next)
|
|
||||||
{
|
|
||||||
// LOG(debug) << fi_tostr(cursor, FI_TYPE_INFO);
|
|
||||||
}
|
|
||||||
fi_freeinfo(ofi_hints);
|
|
||||||
fi_freeinfo(ofi_info);
|
|
||||||
|
|
||||||
int major, minor, patch;
|
|
||||||
zmq_version(&major, &minor, &patch);
|
|
||||||
auto ofi_version{fi_version()};
|
|
||||||
LOG(debug) << "Transport: Using ZeroMQ (" << major << "." << minor << "." << patch << ") & "
|
|
||||||
<< "OFI libfabric (API " << FI_MAJOR(ofi_version) << "." << FI_MINOR(ofi_version) << ")";
|
|
||||||
}
|
}
|
||||||
|
catch (ContextError& e) { throw TransportFactoryError{e.what()}; }
|
||||||
|
|
||||||
auto TransportFactory::CreateMessage() const -> MessagePtr
|
auto TransportFactory::CreateMessage() const -> MessagePtr
|
||||||
{
|
{
|
||||||
|
@ -85,8 +53,7 @@ auto TransportFactory::CreateMessage(UnmanagedRegionPtr& region, void* data, con
|
||||||
|
|
||||||
auto TransportFactory::CreateSocket(const string& type, const string& name) const -> SocketPtr
|
auto TransportFactory::CreateSocket(const string& type, const string& name) const -> SocketPtr
|
||||||
{
|
{
|
||||||
assert(fZmqContext);
|
return SocketPtr{new Socket(fContext, type, name, GetId())};
|
||||||
return SocketPtr{new Socket(*this, type, name, GetId())};
|
|
||||||
}
|
}
|
||||||
|
|
||||||
auto TransportFactory::CreatePoller(const vector<FairMQChannel>& channels) const -> PollerPtr
|
auto TransportFactory::CreatePoller(const vector<FairMQChannel>& channels) const -> PollerPtr
|
||||||
|
@ -119,13 +86,6 @@ auto TransportFactory::GetType() const -> Transport
|
||||||
return Transport::OFI;
|
return Transport::OFI;
|
||||||
}
|
}
|
||||||
|
|
||||||
TransportFactory::~TransportFactory() noexcept(false)
|
|
||||||
{
|
|
||||||
if (zmq_ctx_term(fZmqContext) != 0) {
|
|
||||||
throw TransportFactoryError{tools::ToString("Failed closing zmq context, reason: ", zmq_strerror(errno))};
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
} /* namespace ofi */
|
} /* namespace ofi */
|
||||||
} /* namespace mq */
|
} /* namespace mq */
|
||||||
} /* namespace fair */
|
} /* namespace fair */
|
||||||
|
|
|
@ -11,6 +11,7 @@
|
||||||
|
|
||||||
#include <FairMQTransportFactory.h>
|
#include <FairMQTransportFactory.h>
|
||||||
#include <options/FairMQProgOptions.h>
|
#include <options/FairMQProgOptions.h>
|
||||||
|
#include <fairmq/ofi/Context.h>
|
||||||
|
|
||||||
namespace fair
|
namespace fair
|
||||||
{
|
{
|
||||||
|
@ -19,8 +20,6 @@ namespace mq
|
||||||
namespace ofi
|
namespace ofi
|
||||||
{
|
{
|
||||||
|
|
||||||
class Socket;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @class TransportFactory TransportFactory.h <fairmq/ofi/TransportFactory.h>
|
* @class TransportFactory TransportFactory.h <fairmq/ofi/TransportFactory.h>
|
||||||
* @brief FairMQ transport factory for the ofi transport (implemented with ZeroMQ + libfabric)
|
* @brief FairMQ transport factory for the ofi transport (implemented with ZeroMQ + libfabric)
|
||||||
|
@ -29,8 +28,6 @@ class Socket;
|
||||||
*/
|
*/
|
||||||
class TransportFactory : public FairMQTransportFactory
|
class TransportFactory : public FairMQTransportFactory
|
||||||
{
|
{
|
||||||
friend Socket;
|
|
||||||
|
|
||||||
public:
|
public:
|
||||||
TransportFactory(const std::string& id = "", const FairMQProgOptions* config = nullptr);
|
TransportFactory(const std::string& id = "", const FairMQProgOptions* config = nullptr);
|
||||||
TransportFactory(const TransportFactory&) = delete;
|
TransportFactory(const TransportFactory&) = delete;
|
||||||
|
@ -55,10 +52,8 @@ class TransportFactory : public FairMQTransportFactory
|
||||||
void Interrupt() override {}
|
void Interrupt() override {}
|
||||||
void Resume() override {}
|
void Resume() override {}
|
||||||
|
|
||||||
~TransportFactory() noexcept(false) override;
|
|
||||||
|
|
||||||
private:
|
private:
|
||||||
void* fZmqContext;
|
mutable Context fContext;
|
||||||
}; /* class TransportFactory */
|
}; /* class TransportFactory */
|
||||||
|
|
||||||
} /* namespace ofi */
|
} /* namespace ofi */
|
||||||
|
|
|
@ -30,8 +30,24 @@ class PairLeft : public FairMQDevice
|
||||||
|
|
||||||
auto Run() -> void override
|
auto Run() -> void override
|
||||||
{
|
{
|
||||||
auto msg{NewMessageFor("data", 0)};
|
int counter{0};
|
||||||
Send(msg, "data");
|
|
||||||
|
// Simple empty message ping pong
|
||||||
|
auto msg1{NewMessageFor("data", 0)};
|
||||||
|
auto msg2{NewMessageFor("data", 0)};
|
||||||
|
auto msg3{NewMessageFor("data", 0)};
|
||||||
|
if (Send(msg1, "data") >= 0) counter++;
|
||||||
|
if (Receive(msg2, "data") >= 0) counter++;
|
||||||
|
if (Send(msg2, "data") >= 0) counter++;
|
||||||
|
if (Receive(msg3, "data") >= 0) counter++;
|
||||||
|
if (counter == 4) LOG(info) << "Simple empty message ping pong successfull";
|
||||||
|
|
||||||
|
// Simple message with short text data
|
||||||
|
auto msg4{NewSimpleMessageFor("data", 0, "testdata1234")};
|
||||||
|
if (Send(msg4, "data") >= 0) counter++;
|
||||||
|
if (counter == 5) LOG(info) << "Simple message with short text data successfull";
|
||||||
|
|
||||||
|
assert(counter == 5);
|
||||||
};
|
};
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -7,6 +7,7 @@
|
||||||
********************************************************************************/
|
********************************************************************************/
|
||||||
|
|
||||||
#include <FairMQDevice.h>
|
#include <FairMQDevice.h>
|
||||||
|
#include <string>
|
||||||
|
|
||||||
namespace fair
|
namespace fair
|
||||||
{
|
{
|
||||||
|
@ -30,11 +31,28 @@ class PairRight : public FairMQDevice
|
||||||
|
|
||||||
auto Run() -> void override
|
auto Run() -> void override
|
||||||
{
|
{
|
||||||
MessagePtr msg{NewMessageFor("data", 0)};
|
int counter{0};
|
||||||
|
|
||||||
if (Receive(msg, "data") >= 0) {
|
// Simple empty message ping pong
|
||||||
LOG(info) << "PAIR test successfull";
|
auto msg1{NewMessageFor("data", 0)};
|
||||||
|
if (Receive(msg1, "data") >= 0) counter++;
|
||||||
|
if (Send(msg1, "data") >= 0) counter++;
|
||||||
|
auto msg2{NewMessageFor("data", 0)};
|
||||||
|
if (Receive(msg2, "data") >= 0) counter++;
|
||||||
|
if (Send(msg2, "data") >= 0) counter++;
|
||||||
|
if (counter == 4) LOG(info) << "Simple empty message ping pong successfull";
|
||||||
|
|
||||||
|
// Simple message with short text data
|
||||||
|
auto msg3{NewMessageFor("data", 0)};
|
||||||
|
auto ret = Receive(msg3, "data");
|
||||||
|
if (ret > 0) {
|
||||||
|
auto content = std::string{static_cast<char*>(msg3->GetData()), msg3->GetSize()};
|
||||||
|
LOG(info) << ret << ", " << msg3->GetSize() << ", '" << content << "'";
|
||||||
|
if (msg3->GetSize() == ret && content == "testdata1234") counter++;
|
||||||
}
|
}
|
||||||
|
if (counter == 5) LOG(info) << "Simple message with short text data successfull";
|
||||||
|
|
||||||
|
if (counter == 5) LOG(info) << "PAIR test successfull.";
|
||||||
};
|
};
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue
Block a user