diff --git a/fairmq/CMakeLists.txt b/fairmq/CMakeLists.txt index 73563b4b..7d86581f 100644 --- a/fairmq/CMakeLists.txt +++ b/fairmq/CMakeLists.txt @@ -83,6 +83,7 @@ set(FAIRMQ_HEADER_FILES devices/FairMQProxy.h devices/FairMQSink.h devices/FairMQSplitter.h + ofi/Message.h ofi/Poller.h ofi/Socket.h ofi/TransportFactory.h @@ -149,6 +150,7 @@ set(FAIRMQ_SOURCE_FILES devices/FairMQProxy.cxx # devices/FairMQSink.cxx devices/FairMQSplitter.cxx + ofi/Message.cxx ofi/Poller.cxx ofi/Socket.cxx ofi/TransportFactory.cxx diff --git a/fairmq/FairMQMessage.h b/fairmq/FairMQMessage.h index d0b0a70a..78431d96 100644 --- a/fairmq/FairMQMessage.h +++ b/fairmq/FairMQMessage.h @@ -33,7 +33,7 @@ class FairMQMessage virtual void Copy(const std::unique_ptr& msg) __attribute__((deprecated("Use 'Copy(const FairMQMessage& msg)'"))) = 0; virtual void Copy(const FairMQMessage& msg) = 0; - virtual ~FairMQMessage() {}; + virtual ~FairMQMessage() noexcept(false) {}; }; using FairMQMessagePtr = std::unique_ptr; @@ -43,7 +43,9 @@ namespace fair namespace mq { -using MessagePtr = std::unique_ptr; +using Message = FairMQMessage; +using MessagePtr = FairMQMessagePtr; +struct MessageError : std::runtime_error { using std::runtime_error::runtime_error; }; } /* namespace mq */ } /* namespace fair */ diff --git a/fairmq/ofi/Message.cxx b/fairmq/ofi/Message.cxx new file mode 100644 index 00000000..167b3745 --- /dev/null +++ b/fairmq/ofi/Message.cxx @@ -0,0 +1,102 @@ +/******************************************************************************** + * Copyright (C) 2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ + +#include +#include +#include + +#include + +namespace fair +{ +namespace mq +{ +namespace ofi +{ + +using namespace std; + +Message::Message() +{ + // if (zmq_msg_init(&fMessage) != 0) { + // throw MessageError{tools::ToString("Failed initializing meta message, reason: ", zmq_strerror(errno))}; + // } +} + +Message::Message(const size_t size) + : fSize{size} +{ + throw MessageError{"Not yet implemented."}; +} + +Message::Message(void* data, const size_t size, fairmq_free_fn* ffn, void* hint) +{ + throw MessageError{"Not yet implemented."}; +} + +Message::Message(FairMQUnmanagedRegionPtr& region, void* data, const size_t size, void* hint) +{ + throw MessageError{"Not yet implemented."}; +} + +auto Message::Rebuild() -> void +{ + throw MessageError{"Not implemented."}; +} + +auto Message::Rebuild(const size_t size) -> void +{ + throw MessageError{"Not implemented."}; +} + +auto Message::Rebuild(void* data, const size_t size, fairmq_free_fn* ffn, void* hint) -> void +{ + throw MessageError{"Not implemented."}; +} + +auto Message::GetData() const -> void* +{ + throw MessageError{"Not implemented."}; +} + +auto Message::GetSize() const -> size_t +{ + return fSize; +} + +auto Message::SetUsedSize(const size_t size) -> bool +{ + if (size == fSize) { + return true; + } else if (size <= fSize) { + throw MessageError{"Message size shrinking not yet implemented."}; + } else { + throw MessageError{"Cannot grow message size."}; + } +} + +auto Message::Copy(const fair::mq::Message& msg) -> void +{ + throw MessageError{"Not yet implemented."}; +} + +auto Message::Copy(const fair::mq::MessagePtr& msg) -> void +{ + throw MessageError{"Not yet implemented."}; +} + +Message::~Message() noexcept(false) +{ + // if (zmq_msg_close(&fMessage) != 0) { + // throw MessageError{tools::ToString("Failed closing meta message, reason: ", zmq_strerror(errno))}; + // } +} + +} /* namespace ofi */ +} /* namespace mq */ +} /* namespace fair */ diff --git a/fairmq/ofi/Message.h b/fairmq/ofi/Message.h new file mode 100644 index 00000000..efd63d2b --- /dev/null +++ b/fairmq/ofi/Message.h @@ -0,0 +1,68 @@ +/******************************************************************************** + * Copyright (C) 2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ + +#ifndef FAIR_MQ_OFI_MESSAGE_H +#define FAIR_MQ_OFI_MESSAGE_H + +#include +#include + +#include + +#include // size_t +#include + +namespace fair +{ +namespace mq +{ +namespace ofi +{ + +/** + * @class Message Message.h + * @brief + * + * @todo TODO insert long description + */ +class Message : public fair::mq::Message +{ + public: + Message(); + Message(const size_t size); + Message(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr); + Message(FairMQUnmanagedRegionPtr& region, void* data, const size_t size, void* hint = 0); + + Message(const Message&) = delete; + Message operator=(const Message&) = delete; + + auto Rebuild() -> void override; + auto Rebuild(const size_t size) -> void override; + auto Rebuild(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) -> void override; + + auto GetData() const -> void* override; + auto GetSize() const -> size_t override; + + auto SetUsedSize(const size_t size) -> bool override; + + auto GetType() const -> fair::mq::Transport override { return fair::mq::Transport::OFI; } + + auto Copy(const fair::mq::Message& msg) -> void override; + auto Copy(const fair::mq::MessagePtr& msg) -> void override; + + ~Message() noexcept(false) override; + + private: + size_t fSize; +}; /* class Message */ + +} /* namespace ofi */ +} /* namespace mq */ +} /* namespace fair */ + +#endif /* FAIR_MQ_OFI_MESSAGE_H */ diff --git a/fairmq/ofi/TransportFactory.cxx b/fairmq/ofi/TransportFactory.cxx index 31468a63..2ab89071 100644 --- a/fairmq/ofi/TransportFactory.cxx +++ b/fairmq/ofi/TransportFactory.cxx @@ -6,6 +6,7 @@ * copied verbatim in the file "LICENSE" * ********************************************************************************/ +#include #include #include #include @@ -50,7 +51,7 @@ TransportFactory::TransportFactory(const string& id, const FairMQProgOptions* co } for(auto cursor{ofi_info}; cursor->next != nullptr; cursor = cursor->next) { - LOG(debug) << fi_tostr(cursor, FI_TYPE_INFO); + // LOG(debug) << fi_tostr(cursor, FI_TYPE_INFO); } fi_freeinfo(ofi_hints); fi_freeinfo(ofi_info); @@ -64,48 +65,48 @@ TransportFactory::TransportFactory(const string& id, const FairMQProgOptions* co auto TransportFactory::CreateMessage() const -> MessagePtr { - throw runtime_error{"Not yet implemented Msg1."}; + return MessagePtr{new Message()}; } auto TransportFactory::CreateMessage(const size_t size) const -> MessagePtr { - throw runtime_error{"Not yet implemented Msg2."}; + return MessagePtr{new Message(size)}; } auto TransportFactory::CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint) const -> MessagePtr { - throw runtime_error{"Not yet implemented Msg3."}; + return MessagePtr{new Message(data, size, ffn, hint)}; } auto TransportFactory::CreateMessage(UnmanagedRegionPtr& region, void* data, const size_t size, void* hint) const -> MessagePtr { - throw runtime_error{"Not yet implemented Msg4."}; + return MessagePtr{new Message(region, data, size, hint)}; } auto TransportFactory::CreateSocket(const string& type, const string& name) const -> SocketPtr { assert(fZmqContext); - return unique_ptr{new Socket(type, name, GetId(), fZmqContext)}; + return SocketPtr{new Socket(type, name, GetId(), fZmqContext)}; } auto TransportFactory::CreatePoller(const vector& channels) const -> PollerPtr { - return unique_ptr(new Poller(channels)); + return PollerPtr{new Poller(channels)}; } auto TransportFactory::CreatePoller(const vector& channels) const -> PollerPtr { - return unique_ptr(new Poller(channels)); + return PollerPtr{new Poller(channels)}; } auto TransportFactory::CreatePoller(const unordered_map>& channelsMap, const vector& channelList) const -> PollerPtr { - return unique_ptr(new Poller(channelsMap, channelList)); + return PollerPtr{new Poller(channelsMap, channelList)}; } auto TransportFactory::CreatePoller(const FairMQSocket& cmdSocket, const FairMQSocket& dataSocket) const -> PollerPtr { - return unique_ptr(new Poller(cmdSocket, dataSocket)); + return PollerPtr{new Poller(cmdSocket, dataSocket)}; } auto TransportFactory::CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback) const -> UnmanagedRegionPtr @@ -118,7 +119,7 @@ auto TransportFactory::GetType() const -> Transport return Transport::OFI; } -TransportFactory::~TransportFactory() +TransportFactory::~TransportFactory() noexcept(false) { if (zmq_ctx_term(fZmqContext) != 0) { throw TransportFactoryError{tools::ToString("Failed closing zmq context, reason: ", zmq_strerror(errno))};