FairMQ: Implement simple ofi Send/Receive with meta com only

This commit is contained in:
Dennis Klein 2018-02-20 19:25:55 +01:00 committed by Mohammad Al-Turany
parent eaebfc6933
commit 4250e3d45b
8 changed files with 47 additions and 143 deletions

View File

@ -63,8 +63,8 @@ namespace fair
namespace mq
{
using SocketPtr = std::unique_ptr<FairMQSocket>;
using Socket = FairMQSocket;
using SocketPtr = FairMQSocketPtr;
struct SocketError : std::runtime_error { using std::runtime_error::runtime_error; };
} /* namespace mq */

View File

@ -23,9 +23,6 @@ using namespace std;
Message::Message()
{
// if (zmq_msg_init(&fMessage) != 0) {
// throw MessageError{tools::ToString("Failed initializing meta message, reason: ", zmq_strerror(errno))};
// }
}
Message::Message(const size_t size)
@ -92,9 +89,6 @@ auto Message::Copy(const fair::mq::MessagePtr& msg) -> void
Message::~Message() noexcept(false)
{
// if (zmq_msg_close(&fMessage) != 0) {
// throw MessageError{tools::ToString("Failed closing meta message, reason: ", zmq_strerror(errno))};
// }
}
} /* namespace ofi */

View File

@ -7,6 +7,7 @@
********************************************************************************/
#include <fairmq/ofi/Socket.h>
#include <fairmq/ofi/TransportFactory.h>
#include <fairmq/Tools.h>
#include <FairMQLogger.h>
@ -21,7 +22,7 @@ namespace ofi
using namespace std;
Socket::Socket(const string& type, const string& name, const string& id /*= ""*/, void* zmqContext)
Socket::Socket(const TransportFactory& factory, const string& type, const string& name, const string& id /*= ""*/)
: fId{id + "." + name + "." + type}
, fBytesTx{0}
, fBytesRx{0}
@ -30,12 +31,10 @@ Socket::Socket(const string& type, const string& name, const string& id /*= ""*/
, fSndTimeout{100}
, fRcvTimeout{100}
{
assert(zmqContext);
if (type != "pair") {
throw SocketError{tools::ToString("Socket type '", type, "' not implemented for ofi transport.")};
} else {
fMetaSocket = zmq_socket(zmqContext, GetConstant(type));
fMetaSocket = zmq_socket(factory.fZmqContext, GetConstant(type));
}
if (fMetaSocket == nullptr) {
@ -95,122 +94,28 @@ auto Socket::TryReceive(std::vector<std::unique_ptr<FairMQMessage>>& msgVec) ->
auto Socket::SendImpl(FairMQMessagePtr& msg, const int flags, const int timeout) -> int
{
throw SocketError{"Not yet implemented."};
// int nbytes = -1;
// int elapsed = 0;
//
// while (true && !fInterrupted)
// {
// nbytes = zmq_msg_send(static_cast<FairMQMessageSHM*>(msg.get())->GetMessage(), fSocket, flags);
// if (nbytes == 0)
// {
// return nbytes;
// }
// else if (nbytes > 0)
// {
// static_cast<FairMQMessageSHM*>(msg.get())->fQueued = true;
//
// size_t size = msg->GetSize();
// fBytesTx += size;
// ++fMessagesTx;
//
// return size;
// }
// else if (zmq_errno() == EAGAIN)
// {
// if (!fInterrupted && ((flags & ZMQ_DONTWAIT) == 0))
// {
// if (timeout)
// {
// elapsed += fSndTimeout;
// if (elapsed >= timeout)
// {
// return -2;
// }
// }
// continue;
// }
// else
// {
// return -2;
// }
// }
// else if (zmq_errno() == ETERM)
// {
// LOG(info) << "terminating socket " << fId;
// return -1;
// }
// else
// {
// LOG(error) << "Failed sending on socket " << fId << ", reason: " << zmq_strerror(errno);
// return nbytes;
// }
// }
//
// return -1;
auto ret = zmq_send(fMetaSocket, nullptr, 0, flags);
if (ret == EAGAIN) {
return -2;
} else if (ret < 0) {
LOG(error) << "Failed sending meta message on socket " << fId << ", reason: " << zmq_strerror(errno);
return -1;
} else {
return ret;
}
}
auto Socket::ReceiveImpl(FairMQMessagePtr& msg, const int flags, const int timeout) -> int
{
throw SocketError{"Not yet implemented."};
// int nbytes = -1;
// int elapsed = 0;
//
// zmq_msg_t* msgPtr = static_cast<FairMQMessageSHM*>(msg.get())->GetMessage();
// while (true)
// {
// nbytes = zmq_msg_recv(msgPtr, fSocket, flags);
// if (nbytes == 0)
// {
// ++fMessagesRx;
//
// return nbytes;
// }
// else if (nbytes > 0)
// {
// MetaHeader* hdr = static_cast<MetaHeader*>(zmq_msg_data(msgPtr));
// size_t size = 0;
// static_cast<FairMQMessageSHM*>(msg.get())->fHandle = hdr->fHandle;
// static_cast<FairMQMessageSHM*>(msg.get())->fSize = hdr->fSize;
// static_cast<FairMQMessageSHM*>(msg.get())->fRegionId = hdr->fRegionId;
// static_cast<FairMQMessageSHM*>(msg.get())->fHint = hdr->fHint;
// size = msg->GetSize();
//
// fBytesRx += size;
// ++fMessagesRx;
//
// return size;
// }
// else if (zmq_errno() == EAGAIN)
// {
// if (!fInterrupted && ((flags & ZMQ_DONTWAIT) == 0))
// {
// if (timeout)
// {
// elapsed += fSndTimeout;
// if (elapsed >= timeout)
// {
// return -2;
// }
// }
// continue;
// }
// else
// {
// return -2;
// }
// }
// else if (zmq_errno() == ETERM)
// {
// LOG(info) << "terminating socket " << fId;
// return -1;
// }
// else
// {
// LOG(error) << "Failed receiving on socket " << fId << ", reason: " << zmq_strerror(errno);
// return nbytes;
// }
// }
auto ret = zmq_recv(fMetaSocket, nullptr, 0, flags);
if (ret == EAGAIN) {
return -2;
} else if (ret < 0) {
LOG(error) << "Failed receiving meta message on socket " << fId << ", reason: " << zmq_strerror(errno);
return -1;
} else {
return ret;
}
}
auto Socket::SendImpl(vector<FairMQMessagePtr>& msgVec, const int flags, const int timeout) -> int64_t

View File

@ -21,16 +21,18 @@ namespace mq
namespace ofi
{
class TransportFactory;
/**
* @class Socket Socket.h <fairmq/ofi/Socket.h>
* @brief
*
* @todo TODO insert long description
*/
class Socket : public FairMQSocket
class Socket : public fair::mq::Socket
{
public:
Socket(const std::string& type, const std::string& name, const std::string& id = "", void* zmqContext = nullptr);
Socket(const TransportFactory& factory, const std::string& type, const std::string& name, const std::string& id = "");
Socket(const Socket&) = delete;
Socket operator=(const Socket&) = delete;
@ -39,15 +41,15 @@ class Socket : public FairMQSocket
auto Bind(const std::string& address) -> bool override;
auto Connect(const std::string& address) -> void override;
auto Send(FairMQMessagePtr& msg, int timeout = 0) -> int override;
auto Receive(FairMQMessagePtr& msg, int timeout = 0) -> int override;
auto Send(std::vector<std::unique_ptr<FairMQMessage>>& msgVec, int timeout = 0) -> int64_t override;
auto Receive(std::vector<std::unique_ptr<FairMQMessage>>& msgVec, int timeout = 0) -> int64_t override;
auto Send(MessagePtr& msg, int timeout = 0) -> int override;
auto Receive(MessagePtr& msg, int timeout = 0) -> int override;
auto Send(std::vector<MessagePtr>& msgVec, int timeout = 0) -> int64_t override;
auto Receive(std::vector<MessagePtr>& msgVec, int timeout = 0) -> int64_t override;
auto TrySend(FairMQMessagePtr& msg) -> int override;
auto TryReceive(FairMQMessagePtr& msg) -> int override;
auto TrySend(std::vector<std::unique_ptr<FairMQMessage>>& msgVec) -> int64_t override;
auto TryReceive(std::vector<std::unique_ptr<FairMQMessage>>& msgVec) -> int64_t override;
auto TrySend(MessagePtr& msg) -> int override;
auto TryReceive(MessagePtr& msg) -> int override;
auto TrySend(std::vector<MessagePtr>& msgVec) -> int64_t override;
auto TryReceive(std::vector<MessagePtr>& msgVec) -> int64_t override;
auto GetSocket() const -> void* override { return fMetaSocket; }
auto GetSocket(int nothing) const -> int override { return -1; }
@ -82,10 +84,10 @@ class Socket : public FairMQSocket
int fSndTimeout;
int fRcvTimeout;
auto SendImpl(FairMQMessagePtr& msg, const int flags, const int timeout) -> int;
auto ReceiveImpl(FairMQMessagePtr& msg, const int flags, const int timeout) -> int;
auto SendImpl(std::vector<std::unique_ptr<FairMQMessage>>& msgVec, const int flags, const int timeout) -> int64_t;
auto ReceiveImpl(std::vector<std::unique_ptr<FairMQMessage>>& msgVec, const int flags, const int timeout) -> int64_t;
auto SendImpl(MessagePtr& msg, const int flags, const int timeout) -> int;
auto ReceiveImpl(MessagePtr& msg, const int flags, const int timeout) -> int;
auto SendImpl(std::vector<MessagePtr>& msgVec, const int flags, const int timeout) -> int64_t;
auto ReceiveImpl(std::vector<MessagePtr>& msgVec, const int flags, const int timeout) -> int64_t;
}; /* class Socket */
} /* namespace ofi */

View File

@ -86,7 +86,7 @@ auto TransportFactory::CreateMessage(UnmanagedRegionPtr& region, void* data, con
auto TransportFactory::CreateSocket(const string& type, const string& name) const -> SocketPtr
{
assert(fZmqContext);
return SocketPtr{new Socket(type, name, GetId(), fZmqContext)};
return SocketPtr{new Socket(*this, type, name, GetId())};
}
auto TransportFactory::CreatePoller(const vector<FairMQChannel>& channels) const -> PollerPtr

View File

@ -19,6 +19,8 @@ namespace mq
namespace ofi
{
class Socket;
/**
* @class TransportFactory TransportFactory.h <fairmq/ofi/TransportFactory.h>
* @brief FairMQ transport factory for the ofi transport (implemented with ZeroMQ + libfabric)
@ -27,6 +29,8 @@ namespace ofi
*/
class TransportFactory : public FairMQTransportFactory
{
friend Socket;
public:
TransportFactory(const std::string& id = "", const FairMQProgOptions* config = nullptr);
TransportFactory(const TransportFactory&) = delete;

View File

@ -30,7 +30,7 @@ class PairLeft : public FairMQDevice
auto Run() -> void override
{
auto msg = FairMQMessagePtr{NewMessage()};
auto msg{NewMessageFor("data", 0)};
Send(msg, "data");
};
};

View File

@ -30,10 +30,9 @@ class PairRight : public FairMQDevice
auto Run() -> void override
{
auto msg = FairMQMessagePtr{NewMessage()};
MessagePtr msg{NewMessageFor("data", 0)};
if (Receive(msg, "data") >= 0)
{
if (Receive(msg, "data") >= 0) {
LOG(info) << "PAIR test successfull";
}
};