feat(ofi): Require asiofi 0.5

* Modernize some ofi transport code along the way
  * Replace Boost.Container with `<memory_resource>`
  * Introduce namespaced headers
    * `<fairmq/Channel.h>`
    * `<fairmq/Message.h>`
    * `<fairmq/Poller.h>`
    * `<fairmq/Socket.h>`
    * `<fairmq/TransportFactory.h>`
    * `<fairmq/UnmanagedRegion.h>`
* Compile-firewall Boost.Process in `shmem::Manager` because it conflicts
with standalone asio
This commit is contained in:
Dennis Klein 2021-06-06 06:53:31 +02:00 committed by Dennis Klein
parent 9585c20b7f
commit 1007de8e49
23 changed files with 422 additions and 506 deletions

View File

@ -19,7 +19,7 @@ if(BUILD_FAIRMQ OR BUILD_SDK)
endif() endif()
if(BUILD_OFI_TRANSPORT) if(BUILD_OFI_TRANSPORT)
find_package2(PRIVATE asiofi REQUIRED VERSION 0.3.1) find_package2(PRIVATE asiofi REQUIRED VERSION 0.5)
find_package2(PRIVATE OFI REQUIRED) find_package2(PRIVATE OFI REQUIRED)
endif() endif()

View File

@ -143,6 +143,7 @@ if(BUILD_FAIRMQ)
# libFairMQ header files # # libFairMQ header files #
########################## ##########################
set(FAIRMQ_PUBLIC_HEADER_FILES set(FAIRMQ_PUBLIC_HEADER_FILES
Channel.h
Device.h Device.h
DeviceRunner.h DeviceRunner.h
EventManager.h EventManager.h
@ -152,22 +153,26 @@ if(BUILD_FAIRMQ)
FairMQMessage.h FairMQMessage.h
FairMQParts.h FairMQParts.h
FairMQPoller.h FairMQPoller.h
FairMQUnmanagedRegion.h
FairMQSocket.h FairMQSocket.h
FairMQTransportFactory.h FairMQTransportFactory.h
MemoryResources.h FairMQUnmanagedRegion.h
MemoryResourceTools.h
Transports.h
options/FairMQProgOptions.h
JSONParser.h JSONParser.h
ProgOptionsFwd.h MemoryResourceTools.h
ProgOptions.h MemoryResources.h
Properties.h Message.h
PropertyOutput.h
SuboptParser.h
Plugin.h Plugin.h
PluginManager.h PluginManager.h
PluginServices.h PluginServices.h
Poller.h
ProgOptions.h
ProgOptionsFwd.h
Properties.h
PropertyOutput.h
SuboptParser.h
TransportFactory.h
Transports.h
UnmanagedRegion.h
options/FairMQProgOptions.h
runDevice.h runDevice.h
runFairMQDevice.h runFairMQDevice.h
shmem/Monitor.h shmem/Monitor.h
@ -202,8 +207,8 @@ if(BUILD_FAIRMQ)
if(BUILD_OFI_TRANSPORT) if(BUILD_OFI_TRANSPORT)
set(FAIRMQ_PRIVATE_HEADER_FILES ${FAIRMQ_PRIVATE_HEADER_FILES} set(FAIRMQ_PRIVATE_HEADER_FILES ${FAIRMQ_PRIVATE_HEADER_FILES}
ofi/Context.h ofi/Context.h
ofi/ControlMessages.h
ofi/Message.h ofi/Message.h
ofi/Poller.h
ofi/Socket.h ofi/Socket.h
ofi/TransportFactory.h ofi/TransportFactory.h
) )
@ -231,6 +236,7 @@ if(BUILD_FAIRMQ)
plugins/config/Config.cxx plugins/config/Config.cxx
plugins/control/Control.cxx plugins/control/Control.cxx
MemoryResources.cxx MemoryResources.cxx
shmem/Manager.cxx
shmem/Monitor.cxx shmem/Monitor.cxx
) )
@ -238,9 +244,7 @@ if(BUILD_FAIRMQ)
set(FAIRMQ_SOURCE_FILES ${FAIRMQ_SOURCE_FILES} set(FAIRMQ_SOURCE_FILES ${FAIRMQ_SOURCE_FILES}
ofi/Context.cxx ofi/Context.cxx
ofi/Message.cxx ofi/Message.cxx
ofi/Poller.cxx
ofi/Socket.cxx ofi/Socket.cxx
ofi/TransportFactory.cxx
) )
endif() endif()
@ -295,7 +299,6 @@ if(BUILD_FAIRMQ)
if(BUILD_OFI_TRANSPORT) if(BUILD_OFI_TRANSPORT)
set(OFI_DEPS set(OFI_DEPS
asiofi::asiofi asiofi::asiofi
Boost::container
) )
endif() endif()

20
fairmq/Channel.h Normal file
View File

@ -0,0 +1,20 @@
/********************************************************************************
* Copyright (C) 2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#ifndef FAIR_MQ_CHANNEL_H
#define FAIR_MQ_CHANNEL_H
#include <FairMQChannel.h>
namespace fair::mq {
using Channel = FairMQChannel;
} // namespace fair::mq
#endif // FAIR_MQ_CHANNEL_H

View File

@ -1,5 +1,5 @@
/******************************************************************************** /********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * Copyright (C) 2014-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* * * *
* This software is distributed under the terms of the * * This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, * * GNU Lesser General Public Licence (LGPL) version 3, *
@ -9,11 +9,12 @@
#ifndef FAIRMQUNMANAGEDREGION_H_ #ifndef FAIRMQUNMANAGEDREGION_H_
#define FAIRMQUNMANAGEDREGION_H_ #define FAIRMQUNMANAGEDREGION_H_
#include <cstddef> // size_t #include <cstddef> // size_t
#include <cstdint> // uint32_t #include <cstdint> // uint32_t
#include <memory> // std::unique_ptr #include <fairmq/Transports.h>
#include <functional> // std::function #include <functional> // std::function
#include <ostream> // std::ostream #include <memory> // std::unique_ptr
#include <ostream> // std::ostream
#include <vector> #include <vector>
class FairMQTransportFactory; class FairMQTransportFactory;

14
fairmq/Message.h Normal file
View File

@ -0,0 +1,14 @@
/********************************************************************************
* Copyright (C) 2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#ifndef FAIR_MQ_MESSAGE_H
#define FAIR_MQ_MESSAGE_H
#include <FairMQMessage.h>
#endif // FAIR_MQ_MESSAGE_H

14
fairmq/Poller.h Normal file
View File

@ -0,0 +1,14 @@
/********************************************************************************
* Copyright (C) 2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#ifndef FAIR_MQ_POLLER_H
#define FAIR_MQ_POLLER_H
#include <FairMQPoller.h>
#endif // FAIR_MQ_POLLER_H

14
fairmq/Socket.h Normal file
View File

@ -0,0 +1,14 @@
/********************************************************************************
* Copyright (C) 2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#ifndef FAIR_MQ_SOCKET_H
#define FAIR_MQ_SOCKET_H
#include <FairMQSocket.h>
#endif // FAIR_MQ_SOCKET_H

14
fairmq/TransportFactory.h Normal file
View File

@ -0,0 +1,14 @@
/********************************************************************************
* Copyright (C) 2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#ifndef FAIR_MQ_TRANSPORTFACTORY_H
#define FAIR_MQ_TRANSPORTFACTORY_H
#include <FairMQTransportFactory.h>
#endif // FAIR_MQ_TRANSPORTFACTORY_H

14
fairmq/UnmanagedRegion.h Normal file
View File

@ -0,0 +1,14 @@
/********************************************************************************
* Copyright (C) 2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#ifndef FAIR_MQ_UNMANAGEDREGION_H
#define FAIR_MQ_UNMANAGEDREGION_H
#include <FairMQUnmanagedRegion.h>
#endif // FAIR_MQ_UNMANAGEDREGION_H

View File

@ -1,25 +1,22 @@
/******************************************************************************** /********************************************************************************
* Copyright (C) 2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * Copyright (C) 2018-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* * * *
* This software is distributed under the terms of the * * This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, * * GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" * * copied verbatim in the file "LICENSE" *
********************************************************************************/ ********************************************************************************/
#include <fairmq/ofi/Context.h>
#include <fairmq/tools/Strings.h>
#include <FairMQLogger.h>
#include <asiofi/version.hpp>
#include <arpa/inet.h> #include <arpa/inet.h>
#include <boost/version.hpp> #include <asiofi/version.hpp>
#include <cassert> #include <cassert>
#include <cstring> #include <cstring>
#include <fairlogger/Logger.h>
#include <fairmq/ofi/Context.h>
#include <fairmq/tools/Strings.h>
#include <memory> #include <memory>
#include <netinet/in.h> #include <netinet/in.h>
#include <regex> #include <regex>
#include <string> #include <string>
#include <string.h>
#include <sys/socket.h> #include <sys/socket.h>
namespace fair::mq::ofi namespace fair::mq::ofi

View File

@ -1,5 +1,5 @@
/******************************************************************************** /********************************************************************************
* Copyright (C) 2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * Copyright (C) 2018-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* * * *
* This software is distributed under the terms of the * * This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, * * GNU Lesser General Public Licence (LGPL) version 3, *
@ -9,13 +9,12 @@
#ifndef FAIR_MQ_OFI_CONTEXT_H #ifndef FAIR_MQ_OFI_CONTEXT_H
#define FAIR_MQ_OFI_CONTEXT_H #define FAIR_MQ_OFI_CONTEXT_H
#include <FairMQLogger.h> #include <asio/io_context.hpp>
#include <FairMQTransportFactory.h>
#include <asiofi/domain.hpp> #include <asiofi/domain.hpp>
#include <asiofi/fabric.hpp> #include <asiofi/fabric.hpp>
#include <asiofi/info.hpp> #include <asiofi/info.hpp>
#include <boost/asio/io_context.hpp> #include <fairlogger/Logger.h>
#include <fairmq/TransportFactory.h>
#include <memory> #include <memory>
#include <netinet/in.h> #include <netinet/in.h>
#include <ostream> #include <ostream>
@ -58,7 +57,7 @@ class Context
~Context(); ~Context();
auto GetAsiofiVersion() const -> std::string; auto GetAsiofiVersion() const -> std::string;
auto GetIoContext() -> boost::asio::io_context& { return fIoContext; } auto GetIoContext() -> asio::io_context& { return fIoContext; }
static auto ConvertAddress(std::string address) -> Address; static auto ConvertAddress(std::string address) -> Address;
static auto ConvertAddress(Address address) -> sockaddr_in; static auto ConvertAddress(Address address) -> sockaddr_in;
static auto ConvertAddress(sockaddr_in address) -> Address; static auto ConvertAddress(sockaddr_in address) -> Address;
@ -72,8 +71,8 @@ class Context
auto SetSizeHint(size_t size) -> void { fSizeHint = size; } auto SetSizeHint(size_t size) -> void { fSizeHint = size; }
private: private:
boost::asio::io_context fIoContext; asio::io_context fIoContext;
boost::asio::io_context::work fIoWork; asio::io_context::work fIoWork;
std::vector<std::thread> fThreadPool; std::vector<std::thread> fThreadPool;
FairMQTransportFactory& fReceiveFactory; FairMQTransportFactory& fReceiveFactory;
FairMQTransportFactory& fSendFactory; FairMQTransportFactory& fSendFactory;

View File

@ -1,5 +1,5 @@
/******************************************************************************** /********************************************************************************
* Copyright (C) 2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * Copyright (C) 2018-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* * * *
* This software is distributed under the terms of the * * This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, * * GNU Lesser General Public Licence (LGPL) version 3, *
@ -9,24 +9,24 @@
#ifndef FAIR_MQ_OFI_CONTROLMESSAGES_H #ifndef FAIR_MQ_OFI_CONTROLMESSAGES_H
#define FAIR_MQ_OFI_CONTROLMESSAGES_H #define FAIR_MQ_OFI_CONTROLMESSAGES_H
#include <FairMQLogger.h> #include <asio/buffer.hpp>
#include <boost/asio/buffer.hpp>
#include <boost/container/pmr/memory_resource.hpp>
#include <cstdint> #include <cstdint>
#include <fairlogger/Logger.h>
#include <functional> #include <functional>
#include <memory> #include <memory>
#include <memory_resource>
#include <type_traits> #include <type_traits>
namespace boost::asio namespace asio
{ {
template<typename PodType> template<typename PodType>
auto buffer(const PodType& obj) -> boost::asio::const_buffer auto buffer(const PodType& obj) -> asio::const_buffer
{ {
return boost::asio::const_buffer(static_cast<const void*>(&obj), sizeof(PodType)); return asio::const_buffer(static_cast<const void*>(&obj), sizeof(PodType));
} }
} // namespace boost::asio } // namespace asio
namespace fair::mq::ofi namespace fair::mq::ofi
{ {
@ -68,7 +68,7 @@ template<typename T>
using unique_ptr = std::unique_ptr<T, std::function<void(T*)>>; using unique_ptr = std::unique_ptr<T, std::function<void(T*)>>;
template<typename T, typename... Args> template<typename T, typename... Args>
auto MakeControlMessageWithPmr(boost::container::pmr::memory_resource& pmr, Args&&... args) auto MakeControlMessageWithPmr(std::pmr::memory_resource& pmr, Args&&... args)
-> ofi::unique_ptr<ControlMessage> -> ofi::unique_ptr<ControlMessage>
{ {
void* mem = pmr.allocate(sizeof(ControlMessage)); void* mem = pmr.allocate(sizeof(ControlMessage));

View File

@ -1,17 +1,16 @@
/******************************************************************************** /********************************************************************************
* Copyright (C) 2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * Copyright (C) 2018-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* * * *
* This software is distributed under the terms of the * * This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, * * GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" * * copied verbatim in the file "LICENSE" *
********************************************************************************/ ********************************************************************************/
#include <fairmq/ofi/Message.h>
#include <FairMQLogger.h>
#include <asiofi.hpp> #include <asiofi.hpp>
#include <cassert> #include <cassert>
#include <cstdlib> #include <cstdlib>
#include <fairlogger/Logger.h>
#include <fairmq/ofi/Message.h>
#include <zmq.h> #include <zmq.h>
namespace fair::mq::ofi namespace fair::mq::ofi
@ -19,7 +18,7 @@ namespace fair::mq::ofi
using namespace std; using namespace std;
Message::Message(boost::container::pmr::memory_resource* pmr) Message::Message(pmr::memory_resource* pmr)
: fInitialSize(0) : fInitialSize(0)
, fSize(0) , fSize(0)
, fData(nullptr) , fData(nullptr)
@ -29,7 +28,7 @@ Message::Message(boost::container::pmr::memory_resource* pmr)
{ {
} }
Message::Message(boost::container::pmr::memory_resource* pmr, Alignment /* alignment */) Message::Message(pmr::memory_resource* pmr, Alignment /* alignment */)
: fInitialSize(0) : fInitialSize(0)
, fSize(0) , fSize(0)
, fData(nullptr) , fData(nullptr)
@ -39,7 +38,7 @@ Message::Message(boost::container::pmr::memory_resource* pmr, Alignment /* align
{ {
} }
Message::Message(boost::container::pmr::memory_resource* pmr, const size_t size) Message::Message(pmr::memory_resource* pmr, const size_t size)
: fInitialSize(size) : fInitialSize(size)
, fSize(size) , fSize(size)
, fData(nullptr) , fData(nullptr)
@ -53,7 +52,7 @@ Message::Message(boost::container::pmr::memory_resource* pmr, const size_t size)
} }
} }
Message::Message(boost::container::pmr::memory_resource* pmr, const size_t size, Alignment /* alignment */) Message::Message(pmr::memory_resource* pmr, const size_t size, Alignment /* alignment */)
: fInitialSize(size) : fInitialSize(size)
, fSize(size) , fSize(size)
, fData(nullptr) , fData(nullptr)
@ -67,7 +66,7 @@ Message::Message(boost::container::pmr::memory_resource* pmr, const size_t size,
} }
} }
Message::Message(boost::container::pmr::memory_resource* pmr, Message::Message(pmr::memory_resource* pmr,
void* data, void* data,
const size_t size, const size_t size,
fairmq_free_fn* ffn, fairmq_free_fn* ffn,
@ -80,8 +79,8 @@ Message::Message(boost::container::pmr::memory_resource* pmr,
, fPmr(pmr) , fPmr(pmr)
{} {}
Message::Message(boost::container::pmr::memory_resource* /*pmr*/, Message::Message(pmr::memory_resource* /*pmr*/,
FairMQUnmanagedRegionPtr& /*region*/, fair::mq::UnmanagedRegionPtr& /*region*/,
void* /*data*/, void* /*data*/,
const size_t /*size*/, const size_t /*size*/,
void* /*hint*/) void* /*hint*/)

View File

@ -1,5 +1,5 @@
/******************************************************************************** /********************************************************************************
* Copyright (C) 2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * Copyright (C) 2018-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* * * *
* This software is distributed under the terms of the * * This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, * * GNU Lesser General Public Licence (LGPL) version 3, *
@ -9,12 +9,13 @@
#ifndef FAIR_MQ_OFI_MESSAGE_H #ifndef FAIR_MQ_OFI_MESSAGE_H
#define FAIR_MQ_OFI_MESSAGE_H #define FAIR_MQ_OFI_MESSAGE_H
#include <FairMQMessage.h>
#include <FairMQUnmanagedRegion.h>
#include <asiofi.hpp> #include <asiofi.hpp>
#include <atomic> #include <atomic>
#include <cstddef> // size_t #include <cstddef> // size_t
#include <fairmq/Message.h>
#include <fairmq/Transports.h>
#include <fairmq/UnmanagedRegion.h>
#include <memory_resource>
#include <zmq.h> #include <zmq.h>
namespace fair::mq::ofi namespace fair::mq::ofi
@ -29,17 +30,17 @@ namespace fair::mq::ofi
class Message final : public fair::mq::Message class Message final : public fair::mq::Message
{ {
public: public:
Message(boost::container::pmr::memory_resource* pmr); Message(std::pmr::memory_resource* pmr);
Message(boost::container::pmr::memory_resource* pmr, Alignment alignment); Message(std::pmr::memory_resource* pmr, Alignment alignment);
Message(boost::container::pmr::memory_resource* pmr, const size_t size); Message(std::pmr::memory_resource* pmr, const size_t size);
Message(boost::container::pmr::memory_resource* pmr, const size_t size, Alignment alignment); Message(std::pmr::memory_resource* pmr, const size_t size, Alignment alignment);
Message(boost::container::pmr::memory_resource* pmr, Message(std::pmr::memory_resource* pmr,
void* data, void* data,
const size_t size, const size_t size,
fairmq_free_fn* ffn, fairmq_free_fn* ffn,
void* hint = nullptr); void* hint = nullptr);
Message(boost::container::pmr::memory_resource* pmr, Message(std::pmr::memory_resource* pmr,
FairMQUnmanagedRegionPtr& region, fair::mq::UnmanagedRegionPtr& region,
void* data, void* data,
const size_t size, const size_t size,
void* hint = 0); void* hint = 0);
@ -70,7 +71,7 @@ class Message final : public fair::mq::Message
void* fData; void* fData;
fairmq_free_fn* fFreeFunction; fairmq_free_fn* fFreeFunction;
void* fHint; void* fHint;
boost::container::pmr::memory_resource* fPmr; std::pmr::memory_resource* fPmr;
}; /* class Message */ }; /* class Message */
} // namespace fair::mq::ofi } // namespace fair::mq::ofi

View File

@ -1,152 +0,0 @@
/********************************************************************************
* Copyright (C) 2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <fairmq/ofi/Poller.h>
#include <fairmq/ofi/Socket.h>
#include <fairmq/tools/Strings.h>
#include <FairMQLogger.h>
#include <zmq.h>
namespace fair::mq::ofi
{
using namespace std;
Poller::Poller(const vector<FairMQChannel>& channels)
{
fNumItems = channels.size();
fItems = new zmq_pollitem_t[fNumItems];
for (int i = 0; i < fNumItems; ++i) {
fItems[i].socket = static_cast<Socket*>(&(channels.at(i).GetSocket()))->GetSocket();
fItems[i].fd = 0;
fItems[i].revents = 0;
int type = 0;
size_t size = sizeof(type);
zmq_getsockopt(static_cast<Socket*>(&(channels.at(i).GetSocket()))->GetSocket(), ZMQ_TYPE, &type, &size);
SetItemEvents(fItems[i], type);
}
}
Poller::Poller(const vector<const FairMQChannel*>& channels)
{
fNumItems = channels.size();
fItems = new zmq_pollitem_t[fNumItems];
for (int i = 0; i < fNumItems; ++i) {
fItems[i].socket = static_cast<Socket*>(&(channels.at(i)->GetSocket()))->GetSocket();
fItems[i].fd = 0;
fItems[i].revents = 0;
int type = 0;
size_t size = sizeof(type);
zmq_getsockopt(static_cast<Socket*>(&(channels.at(i)->GetSocket()))->GetSocket(), ZMQ_TYPE, &type, &size);
SetItemEvents(fItems[i], type);
}
}
Poller::Poller(const unordered_map<string, vector<FairMQChannel>>& channelsMap, const vector<string>& channelList)
{
try {
int offset = 0;
// calculate offsets and the total size of the poll item set
for (string channel : channelList) {
fOffsetMap[channel] = offset;
offset += channelsMap.at(channel).size();
fNumItems += channelsMap.at(channel).size();
}
fItems = new zmq_pollitem_t[fNumItems];
int index = 0;
for (string channel : channelList) {
for (unsigned int i = 0; i < channelsMap.at(channel).size(); ++i) {
index = fOffsetMap[channel] + i;
fItems[index].socket = static_cast<Socket*>(&(channelsMap.at(channel).at(i).GetSocket()))->GetSocket();
fItems[index].fd = 0;
fItems[index].revents = 0;
int type = 0;
size_t size = sizeof(type);
zmq_getsockopt(static_cast<Socket*>(&(channelsMap.at(channel).at(i).GetSocket()))->GetSocket(), ZMQ_TYPE, &type, &size);
SetItemEvents(fItems[index], type);
}
}
}
catch (const std::out_of_range& oor) {
throw PollerError{tools::ToString("At least one of the provided channel keys for poller initialization is invalid. ",
"Out of range error: ", oor.what())};
}
}
auto Poller::SetItemEvents(zmq_pollitem_t& item, const int type) -> void
{
if (type == ZMQ_PAIR) {
item.events = ZMQ_POLLIN|ZMQ_POLLOUT;
} else {
throw PollerError{"Invalid poller configuration."};
}
}
auto Poller::Poll(const int timeout) -> void
{
if (zmq_poll(fItems, fNumItems, timeout) < 0) {
if (errno == ETERM) {
LOG(debug) << "polling exited, reason: " << zmq_strerror(errno);
} else {
throw PollerError{tools::ToString("Polling failed, reason: ", zmq_strerror(errno))};
}
}
}
auto Poller::CheckInput(const int index) -> bool
{
return fItems[index].revents & ZMQ_POLLIN;
}
auto Poller::CheckOutput(const int index) -> bool
{
return fItems[index].revents & ZMQ_POLLOUT;
}
auto Poller::CheckInput(const string& channelKey, const int index) -> bool
{
try {
return fItems[fOffsetMap.at(channelKey) + index].revents & ZMQ_POLLIN;
} catch (const std::out_of_range& oor) {
throw PollerError{tools::ToString(
"Invalid channel key: '", channelKey, "', ",
"Out of range error: ", oor.what()
)};
}
}
auto Poller::CheckOutput(const string& channelKey, const int index) -> bool
{
try {
return fItems[fOffsetMap.at(channelKey) + index].revents & ZMQ_POLLOUT;
} catch (const std::out_of_range& oor) {
throw PollerError{tools::ToString(
"Invalid channel key: '", channelKey, "', ",
"Out of range error: ", oor.what()
)};
}
}
Poller::~Poller()
{
delete[] fItems;
}
} // namespace fair::mq::ofi

View File

@ -1,64 +0,0 @@
/********************************************************************************
* Copyright (C) 2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#ifndef FAIR_MQ_OFI_POLLER_H
#define FAIR_MQ_OFI_POLLER_H
#include <FairMQChannel.h>
#include <FairMQPoller.h>
#include <FairMQSocket.h>
#include <vector>
#include <unordered_map>
#include <zmq.h>
namespace fair::mq::ofi
{
class TransportFactory;
/**
* @class Poller Poller.h <fairmq/ofi/Poller.h>
* @brief
*
* @todo TODO insert long description
*/
class Poller final : public FairMQPoller
{
friend class FairMQChannel;
friend class TransportFactory;
public:
Poller(const std::vector<FairMQChannel>& channels);
Poller(const std::vector<const FairMQChannel*>& channels);
Poller(const std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, const std::vector<std::string>& channelList);
Poller(const Poller&) = delete;
Poller operator=(const Poller&) = delete;
auto SetItemEvents(zmq_pollitem_t& item, const int type) -> void;
auto Poll(const int timeout) -> void override;
auto CheckInput(const int index) -> bool override;
auto CheckOutput(const int index) -> bool override;
auto CheckInput(const std::string& channelKey, const int index) -> bool override;
auto CheckOutput(const std::string& channelKey, const int index) -> bool override;
~Poller() override;
private:
zmq_pollitem_t* fItems;
int fNumItems;
std::unordered_map<std::string, int> fOffsetMap;
}; /* class Poller */
} // namespace fair::mq::ofi
#endif /* FAIR_MQ_OFI_POLLER_H */

View File

@ -1,5 +1,5 @@
/******************************************************************************** /********************************************************************************
* Copyright (C) 2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * Copyright (C) 2018-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* * * *
* This software is distributed under the terms of the * * This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, * * GNU Lesser General Public Licence (LGPL) version 3, *
@ -13,9 +13,9 @@
#include <FairMQLogger.h> #include <FairMQLogger.h>
#include <asiofi.hpp> #include <asiofi.hpp>
#include <boost/asio/buffer.hpp> #include <asio/buffer.hpp>
#include <boost/asio/dispatch.hpp> #include <asio/dispatch.hpp>
#include <boost/asio/post.hpp> #include <asio/post.hpp>
#include <chrono> #include <chrono>
#include <cstring> #include <cstring>
#include <functional> #include <functional>
@ -151,14 +151,14 @@ auto Socket::BindDataEndpoint() -> void
LOG(debug) << "OFI transport (" << fId << "): data band connection accepted."; LOG(debug) << "OFI transport (" << fId << "): data band connection accepted.";
if (fContext.GetSizeHint()) { if (fContext.GetSizeHint()) {
boost::asio::post(fContext.GetIoContext(), asio::post(fContext.GetIoContext(),
std::bind(&Socket::SendQueueReaderStatic, this)); std::bind(&Socket::SendQueueReaderStatic, this));
boost::asio::post(fContext.GetIoContext(), asio::post(fContext.GetIoContext(),
std::bind(&Socket::RecvQueueReaderStatic, this)); std::bind(&Socket::RecvQueueReaderStatic, this));
} else { } else {
boost::asio::post(fContext.GetIoContext(), asio::post(fContext.GetIoContext(),
std::bind(&Socket::SendQueueReader, this)); std::bind(&Socket::SendQueueReader, this));
boost::asio::post(fContext.GetIoContext(), asio::post(fContext.GetIoContext(),
std::bind(&Socket::RecvControlQueueReader, this)); std::bind(&Socket::RecvControlQueueReader, this));
} }
}); });
@ -180,11 +180,11 @@ try {
ConnectEndpoint(fDataEndpoint, Band::Data); ConnectEndpoint(fDataEndpoint, Band::Data);
if (fContext.GetSizeHint()) { if (fContext.GetSizeHint()) {
boost::asio::post(fContext.GetIoContext(), std::bind(&Socket::SendQueueReaderStatic, this)); asio::post(fContext.GetIoContext(), std::bind(&Socket::SendQueueReaderStatic, this));
boost::asio::post(fContext.GetIoContext(), std::bind(&Socket::RecvQueueReaderStatic, this)); asio::post(fContext.GetIoContext(), std::bind(&Socket::RecvQueueReaderStatic, this));
} else { } else {
boost::asio::post(fContext.GetIoContext(), std::bind(&Socket::SendQueueReader, this)); asio::post(fContext.GetIoContext(), std::bind(&Socket::SendQueueReader, this));
boost::asio::post(fContext.GetIoContext(), std::bind(&Socket::RecvControlQueueReader, this)); asio::post(fContext.GetIoContext(), std::bind(&Socket::RecvControlQueueReader, this));
} }
return true; return true;
@ -307,7 +307,7 @@ auto Socket::SendQueueReader() -> void
} }
// Send control message // Send control message
boost::asio::mutable_buffer ctrlMsg(ctrl.get(), sizeof(ControlMessage)); asio::mutable_buffer ctrlMsg(ctrl.get(), sizeof(ControlMessage));
if (fNeedOfiMemoryRegistration) { if (fNeedOfiMemoryRegistration) {
asiofi::memory_region mr(*fOfiDomain, ctrlMsg, asiofi::mr::access::send); asiofi::memory_region mr(*fOfiDomain, ctrlMsg, asiofi::mr::access::send);
@ -315,17 +315,17 @@ auto Socket::SendQueueReader() -> void
fControlEndpoint->send(ctrlMsg, fControlEndpoint->send(ctrlMsg,
desc, desc,
[&, ctrl2 = std::move(ctrlMsg), mr2 = std::move(mr)]( [&, ctrl2 = std::move(ctrlMsg), mr2 = std::move(mr)](
boost::asio::mutable_buffer) mutable {}); asio::mutable_buffer) mutable {});
} else { } else {
fControlEndpoint->send( fControlEndpoint->send(
ctrlMsg, [&, ctrl2 = std::move(ctrl)](boost::asio::mutable_buffer) mutable {}); ctrlMsg, [&, ctrl2 = std::move(ctrl)](asio::mutable_buffer) mutable {});
} }
// Send data message // Send data message
const auto size = msg->GetSize(); const auto size = msg->GetSize();
if (size) { if (size) {
boost::asio::mutable_buffer buffer(msg->GetData(), size); asio::mutable_buffer buffer(msg->GetData(), size);
if (fNeedOfiMemoryRegistration) { if (fNeedOfiMemoryRegistration) {
asiofi::memory_region mr(*fOfiDomain, buffer, asiofi::mr::access::send); asiofi::memory_region mr(*fOfiDomain, buffer, asiofi::mr::access::send);
@ -334,14 +334,14 @@ auto Socket::SendQueueReader() -> void
fDataEndpoint->send(buffer, fDataEndpoint->send(buffer,
desc, desc,
[&, size, msg2 = std::move(msg), mr2 = std::move(mr)]( [&, size, msg2 = std::move(msg), mr2 = std::move(mr)](
boost::asio::mutable_buffer) mutable { asio::mutable_buffer) mutable {
fBytesTx += size; fBytesTx += size;
fMessagesTx++; fMessagesTx++;
fSendPushSem.signal(); fSendPushSem.signal();
}); });
} else { } else {
fDataEndpoint->send( fDataEndpoint->send(
buffer, [&, size, msg2 = std::move(msg)](boost::asio::mutable_buffer) mutable { buffer, [&, size, msg2 = std::move(msg)](asio::mutable_buffer) mutable {
fBytesTx += size; fBytesTx += size;
fMessagesTx++; fMessagesTx++;
fSendPushSem.signal(); fSendPushSem.signal();
@ -353,7 +353,7 @@ auto Socket::SendQueueReader() -> void
} }
} }
boost::asio::dispatch(fContext.GetIoContext(), std::bind(&Socket::SendQueueReader, this)); asio::dispatch(fContext.GetIoContext(), std::bind(&Socket::SendQueueReader, this));
}); });
} }
@ -377,7 +377,7 @@ auto Socket::SendQueueReaderStatic() -> void
const auto size = msg->GetSize(); const auto size = msg->GetSize();
if (size) { if (size) {
boost::asio::mutable_buffer buffer(msg->GetData(), size); asio::mutable_buffer buffer(msg->GetData(), size);
if (fNeedOfiMemoryRegistration) { if (fNeedOfiMemoryRegistration) {
asiofi::memory_region mr(*fOfiDomain, buffer, asiofi::mr::access::send); asiofi::memory_region mr(*fOfiDomain, buffer, asiofi::mr::access::send);
@ -386,14 +386,14 @@ auto Socket::SendQueueReaderStatic() -> void
fDataEndpoint->send(buffer, fDataEndpoint->send(buffer,
desc, desc,
[&, size, msg2 = std::move(msg), mr2 = std::move(mr)]( [&, size, msg2 = std::move(msg), mr2 = std::move(mr)](
boost::asio::mutable_buffer) mutable { asio::mutable_buffer) mutable {
fBytesTx += size; fBytesTx += size;
fMessagesTx++; fMessagesTx++;
fSendPushSem.signal(); fSendPushSem.signal();
}); });
} else { } else {
fDataEndpoint->send( fDataEndpoint->send(
buffer, [&, size, msg2 = std::move(msg)](boost::asio::mutable_buffer) mutable { buffer, [&, size, msg2 = std::move(msg)](asio::mutable_buffer) mutable {
fBytesTx += size; fBytesTx += size;
fMessagesTx++; fMessagesTx++;
fSendPushSem.signal(); fSendPushSem.signal();
@ -404,7 +404,7 @@ auto Socket::SendQueueReaderStatic() -> void
fSendPushSem.signal(); fSendPushSem.signal();
} }
boost::asio::dispatch(fContext.GetIoContext(), std::bind(&Socket::SendQueueReaderStatic, this)); asio::dispatch(fContext.GetIoContext(), std::bind(&Socket::SendQueueReaderStatic, this));
}); });
} }
@ -460,7 +460,7 @@ auto Socket::RecvControlQueueReader() -> void
fRecvPushSem.async_wait([&] { fRecvPushSem.async_wait([&] {
// Receive control message // Receive control message
ofi::unique_ptr<ControlMessage> ctrl(MakeControlMessageWithPmr<Empty>(fControlMemPool)); ofi::unique_ptr<ControlMessage> ctrl(MakeControlMessageWithPmr<Empty>(fControlMemPool));
boost::asio::mutable_buffer ctrlMsg(ctrl.get(), sizeof(ControlMessage)); asio::mutable_buffer ctrlMsg(ctrl.get(), sizeof(ControlMessage));
if (fNeedOfiMemoryRegistration) { if (fNeedOfiMemoryRegistration) {
asiofi::memory_region mr(*fOfiDomain, ctrlMsg, asiofi::mr::access::recv); asiofi::memory_region mr(*fOfiDomain, ctrlMsg, asiofi::mr::access::recv);
@ -470,10 +470,10 @@ auto Socket::RecvControlQueueReader() -> void
ctrlMsg, ctrlMsg,
desc, desc,
[&, ctrl2 = std::move(ctrl), mr2 = std::move(mr)]( [&, ctrl2 = std::move(ctrl), mr2 = std::move(mr)](
boost::asio::mutable_buffer) mutable { OnRecvControl(std::move(ctrl2)); }); asio::mutable_buffer) mutable { OnRecvControl(std::move(ctrl2)); });
} else { } else {
fControlEndpoint->recv( fControlEndpoint->recv(
ctrlMsg, [&, ctrl2 = std::move(ctrl)](boost::asio::mutable_buffer) mutable { ctrlMsg, [&, ctrl2 = std::move(ctrl)](asio::mutable_buffer) mutable {
OnRecvControl(std::move(ctrl2)); OnRecvControl(std::move(ctrl2));
}); });
} }
@ -507,7 +507,7 @@ auto Socket::OnRecvControl(ofi::unique_ptr<ControlMessage> ctrl) -> void
auto msg = fContext.MakeReceiveMessage(size); auto msg = fContext.MakeReceiveMessage(size);
if (size) { if (size) {
boost::asio::mutable_buffer buffer(msg->GetData(), size); asio::mutable_buffer buffer(msg->GetData(), size);
if (fNeedOfiMemoryRegistration) { if (fNeedOfiMemoryRegistration) {
asiofi::memory_region mr(*fOfiDomain, buffer, asiofi::mr::access::recv); asiofi::memory_region mr(*fOfiDomain, buffer, asiofi::mr::access::recv);
@ -517,11 +517,11 @@ auto Socket::OnRecvControl(ofi::unique_ptr<ControlMessage> ctrl) -> void
buffer, buffer,
desc, desc,
[&, msg2 = std::move(msg), mr2 = std::move(mr)]( [&, msg2 = std::move(msg), mr2 = std::move(mr)](
boost::asio::mutable_buffer) mutable { DataMessageReceived(std::move(msg2)); }); asio::mutable_buffer) mutable { DataMessageReceived(std::move(msg2)); });
} else { } else {
fDataEndpoint->recv(buffer, fDataEndpoint->recv(buffer,
[&, msg2 = std::move(msg)](boost::asio::mutable_buffer) mutable { [&, msg2 = std::move(msg)](asio::mutable_buffer) mutable {
DataMessageReceived(std::move(msg2)); DataMessageReceived(std::move(msg2));
}); });
} }
@ -529,7 +529,7 @@ auto Socket::OnRecvControl(ofi::unique_ptr<ControlMessage> ctrl) -> void
DataMessageReceived(std::move(msg)); DataMessageReceived(std::move(msg));
} }
boost::asio::dispatch(fContext.GetIoContext(), asio::dispatch(fContext.GetIoContext(),
std::bind(&Socket::RecvControlQueueReader, this)); std::bind(&Socket::RecvControlQueueReader, this));
} }
@ -541,7 +541,7 @@ auto Socket::RecvQueueReaderStatic() -> void
auto msg = fContext.MakeReceiveMessage(size); auto msg = fContext.MakeReceiveMessage(size);
if (size) { if (size) {
boost::asio::mutable_buffer buffer(msg->GetData(), size); asio::mutable_buffer buffer(msg->GetData(), size);
if (fNeedOfiMemoryRegistration) { if (fNeedOfiMemoryRegistration) {
asiofi::memory_region mr(*fOfiDomain, buffer, asiofi::mr::access::recv); asiofi::memory_region mr(*fOfiDomain, buffer, asiofi::mr::access::recv);
@ -550,13 +550,13 @@ auto Socket::RecvQueueReaderStatic() -> void
fDataEndpoint->recv(buffer, fDataEndpoint->recv(buffer,
desc, desc,
[&, msg2 = std::move(msg), mr2 = std::move(mr)]( [&, msg2 = std::move(msg), mr2 = std::move(mr)](
boost::asio::mutable_buffer) mutable { asio::mutable_buffer) mutable {
DataMessageReceived(std::move(msg2)); DataMessageReceived(std::move(msg2));
}); });
} else { } else {
fDataEndpoint->recv( fDataEndpoint->recv(
buffer, [&, msg2 = std::move(msg)](boost::asio::mutable_buffer) mutable { buffer, [&, msg2 = std::move(msg)](asio::mutable_buffer) mutable {
DataMessageReceived(std::move(msg2)); DataMessageReceived(std::move(msg2));
}); });
} }
@ -564,7 +564,7 @@ auto Socket::RecvQueueReaderStatic() -> void
DataMessageReceived(std::move(msg)); DataMessageReceived(std::move(msg));
} }
boost::asio::dispatch(fContext.GetIoContext(), asio::dispatch(fContext.GetIoContext(),
std::bind(&Socket::RecvQueueReaderStatic, this)); std::bind(&Socket::RecvQueueReaderStatic, this));
}); });
} }

View File

@ -1,5 +1,5 @@
/******************************************************************************** /********************************************************************************
* Copyright (C) 2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * Copyright (C) 2018-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* * * *
* This software is distributed under the terms of the * * This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, * * GNU Lesser General Public Licence (LGPL) version 3, *
@ -18,7 +18,6 @@
#include <asiofi/memory_resources.hpp> #include <asiofi/memory_resources.hpp>
#include <asiofi/passive_endpoint.hpp> #include <asiofi/passive_endpoint.hpp>
#include <asiofi/semaphore.hpp> #include <asiofi/semaphore.hpp>
#include <boost/asio.hpp>
#include <memory> // unique_ptr #include <memory> // unique_ptr
#include <mutex> #include <mutex>

View File

@ -1,120 +0,0 @@
/********************************************************************************
* Copyright (C) 2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <fairmq/ofi/Message.h>
#include <fairmq/ofi/Poller.h>
#include <fairmq/ofi/Socket.h>
#include <fairmq/ofi/TransportFactory.h>
#include <stdexcept>
namespace fair::mq::ofi
{
using namespace std;
TransportFactory::TransportFactory(const string& id, const fair::mq::ProgOptions* config)
try : FairMQTransportFactory(id)
, fContext(*this, *this, 1)
{
LOG(debug) << "OFI transport: asiofi (" << fContext.GetAsiofiVersion() << ")";
if (config) {
fContext.SetSizeHint(config->GetProperty<size_t>("ofi-size-hint", 0));
}
} catch (ContextError& e) {
throw TransportFactoryError{e.what()};
}
auto TransportFactory::CreateMessage() -> MessagePtr
{
return MessagePtr{new Message(&fMemoryResource)};
}
auto TransportFactory::CreateMessage(Alignment /* alignment */) -> MessagePtr
{
// TODO Do not ignore alignment
return MessagePtr{new Message(&fMemoryResource)};
}
auto TransportFactory::CreateMessage(const size_t size) -> MessagePtr
{
return MessagePtr{new Message(&fMemoryResource, size)};
}
auto TransportFactory::CreateMessage(const size_t size, Alignment /* alignment */) -> MessagePtr
{
// TODO Do not ignore alignment
return MessagePtr{new Message(&fMemoryResource, size)};
}
auto TransportFactory::CreateMessage(void* data,
const size_t size,
fairmq_free_fn* ffn,
void* hint) -> MessagePtr
{
return MessagePtr{new Message(&fMemoryResource, data, size, ffn, hint)};
}
auto TransportFactory::CreateMessage(UnmanagedRegionPtr& region,
void* data,
const size_t size,
void* hint) -> MessagePtr
{
return MessagePtr{new Message(&fMemoryResource, region, data, size, hint)};
}
auto TransportFactory::CreateSocket(const string& type, const string& name) -> SocketPtr
{
return SocketPtr{new Socket(fContext, type, name, GetId())};
}
auto TransportFactory::CreatePoller(const vector<FairMQChannel>& /*channels*/) const -> PollerPtr
{
throw runtime_error{"Not yet implemented (Poller)."};
// return PollerPtr{new Poller(channels)};
}
auto TransportFactory::CreatePoller(const vector<FairMQChannel*>& /*channels*/) const -> PollerPtr
{
throw runtime_error{"Not yet implemented (Poller)."};
// return PollerPtr{new Poller(channels)};
}
auto TransportFactory::CreatePoller(const unordered_map<string, vector<FairMQChannel>>& /*channelsMap*/, const vector<string>& /*channelList*/) const -> PollerPtr
{
throw runtime_error{"Not yet implemented (Poller)."};
// return PollerPtr{new Poller(channelsMap, channelList)};
}
auto TransportFactory::CreateUnmanagedRegion(const size_t /*size*/, FairMQRegionCallback /*callback*/, const std::string& /* path = "" */, int /* flags = 0 */, fair::mq::RegionConfig /* cfg = fair::mq::RegionConfig() */) -> UnmanagedRegionPtr
{
throw runtime_error{"Not yet implemented UMR."};
}
auto TransportFactory::CreateUnmanagedRegion(const size_t /*size*/, FairMQRegionBulkCallback /*callback*/, const std::string& /* path = "" */, int /* flags = 0 */, fair::mq::RegionConfig /* cfg = fair::mq::RegionConfig() */) -> UnmanagedRegionPtr
{
throw runtime_error{"Not yet implemented UMR."};
}
auto TransportFactory::CreateUnmanagedRegion(const size_t /*size*/, const int64_t /*userFlags*/, FairMQRegionCallback /*callback*/, const std::string& /* path = "" */, int /* flags = 0 */, fair::mq::RegionConfig /* cfg = fair::mq::RegionConfig() */) -> UnmanagedRegionPtr
{
throw runtime_error{"Not yet implemented UMR."};
}
auto TransportFactory::CreateUnmanagedRegion(const size_t /*size*/, const int64_t /*userFlags*/, FairMQRegionBulkCallback /*callback*/, const std::string& /* path = "" */, int /* flags = 0 */, fair::mq::RegionConfig /* cfg = fair::mq::RegionConfig() */) -> UnmanagedRegionPtr
{
throw runtime_error{"Not yet implemented UMR."};
}
auto TransportFactory::GetType() const -> Transport
{
return Transport::OFI;
}
} // namespace fair::mq::ofi

View File

@ -1,5 +1,5 @@
/******************************************************************************** /********************************************************************************
* Copyright (C) 2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * Copyright (C) 2018-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* * * *
* This software is distributed under the terms of the * * This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, * * GNU Lesser General Public Licence (LGPL) version 3, *
@ -9,14 +9,28 @@
#ifndef FAIR_MQ_OFI_TRANSPORTFACTORY_H #ifndef FAIR_MQ_OFI_TRANSPORTFACTORY_H
#define FAIR_MQ_OFI_TRANSPORTFACTORY_H #define FAIR_MQ_OFI_TRANSPORTFACTORY_H
#include <FairMQTransportFactory.h>
#include <fairmq/ProgOptions.h>
#include <fairmq/ofi/Context.h>
#include <asiofi.hpp> #include <asiofi.hpp>
#include <cstddef>
#include <cstdint>
#include <fairlogger/Logger.h>
#include <fairmq/Channel.h>
#include <fairmq/Message.h>
#include <fairmq/Poller.h>
#include <fairmq/ProgOptions.h>
#include <fairmq/Socket.h>
#include <fairmq/TransportFactory.h>
#include <fairmq/Transports.h>
#include <fairmq/ofi/Context.h>
#include <fairmq/ofi/Message.h>
#include <fairmq/ofi/Socket.h>
#include <fairmq/ofi/TransportFactory.h>
#include <memory>
#include <stdexcept>
#include <string>
#include <unordered_map>
#include <vector>
namespace fair::mq::ofi namespace fair::mq::ofi {
{
/** /**
* @class TransportFactory TransportFactory.h <fairmq/ofi/TransportFactory.h> * @class TransportFactory TransportFactory.h <fairmq/ofi/TransportFactory.h>
@ -24,37 +38,155 @@ namespace fair::mq::ofi
* *
* @todo TODO insert long description * @todo TODO insert long description
*/ */
class TransportFactory final : public FairMQTransportFactory struct TransportFactory final : mq::TransportFactory
{ {
public: TransportFactory(std::string const& id = "", ProgOptions const* config = nullptr)
TransportFactory(const std::string& id = "", const fair::mq::ProgOptions* config = nullptr); : mq::TransportFactory(id)
TransportFactory(const TransportFactory&) = delete; , fContext(*this, *this, 1)
TransportFactory operator=(const TransportFactory&) = delete; {
try {
LOG(debug) << "OFI transport: asiofi (" << fContext.GetAsiofiVersion() << ")";
auto CreateMessage() -> MessagePtr override; if (config) {
auto CreateMessage(Alignment alignment) -> MessagePtr override; fContext.SetSizeHint(config->GetProperty<size_t>("ofi-size-hint", 0));
auto CreateMessage(const std::size_t size) -> MessagePtr override; }
auto CreateMessage(const std::size_t size, Alignment alignment) -> MessagePtr override; } catch (ContextError& e) {
auto CreateMessage(void* data, const std::size_t size, fairmq_free_fn* ffn, void* hint = nullptr) -> MessagePtr override; throw TransportFactoryError(e.what());
auto CreateMessage(UnmanagedRegionPtr& region, void* data, const std::size_t size, void* hint = nullptr) -> MessagePtr override; }
}
auto CreateSocket(const std::string& type, const std::string& name) -> SocketPtr override; TransportFactory(TransportFactory const&) = delete;
TransportFactory& operator=(TransportFactory const&) = delete;
TransportFactory(TransportFactory&&) = default;
TransportFactory& operator=(TransportFactory&&) = default;
auto CreatePoller(const std::vector<FairMQChannel>& channels) const -> PollerPtr override; auto CreateMessage() -> std::unique_ptr<mq::Message> override
auto CreatePoller(const std::vector<FairMQChannel*>& channels) const -> PollerPtr override; {
auto CreatePoller(const std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, const std::vector<std::string>& channelList) const -> PollerPtr override; return std::make_unique<Message>(&fMemoryResource);
}
auto CreateUnmanagedRegion(const size_t size, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) -> UnmanagedRegionPtr override; auto CreateMessage(Alignment /*alignment*/) -> std::unique_ptr<mq::Message> override
auto CreateUnmanagedRegion(const size_t size, RegionBulkCallback callback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) -> UnmanagedRegionPtr override; {
auto CreateUnmanagedRegion(const size_t size, int64_t userFlags, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) -> UnmanagedRegionPtr override; // TODO Do not ignore alignment
auto CreateUnmanagedRegion(const size_t size, int64_t userFlags, RegionBulkCallback callback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) -> UnmanagedRegionPtr override; return std::make_unique<Message>(&fMemoryResource);
}
void SubscribeToRegionEvents(RegionEventCallback /* callback */) override { LOG(error) << "SubscribeToRegionEvents not yet implemented for OFI"; } auto CreateMessage(std::size_t size) -> std::unique_ptr<mq::Message> override
bool SubscribedToRegionEvents() override { LOG(error) << "Region event subscriptions not yet implemented for OFI"; return false; } {
void UnsubscribeFromRegionEvents() override { LOG(error) << "UnsubscribeFromRegionEvents not yet implemented for OFI"; } return std::make_unique<Message>(&fMemoryResource, size);
std::vector<FairMQRegionInfo> GetRegionInfo() override { LOG(error) << "GetRegionInfo not yet implemented for OFI, returning empty vector"; return std::vector<FairMQRegionInfo>(); } }
auto GetType() const -> Transport override; auto CreateMessage(std::size_t size, Alignment /*alignment*/)
-> std::unique_ptr<mq::Message> override
{
// TODO Do not ignore alignment
return std::make_unique<Message>(&fMemoryResource, size);
}
auto CreateMessage(void* data, std::size_t size, fairmq_free_fn* ffn, void* hint = nullptr)
-> std::unique_ptr<mq::Message> override
{
return std::make_unique<Message>(&fMemoryResource, data, size, ffn, hint);
}
auto CreateMessage(std::unique_ptr<mq::UnmanagedRegion>& region,
void* data,
std::size_t size,
void* hint = nullptr) -> std::unique_ptr<mq::Message> override
{
return std::make_unique<Message>(&fMemoryResource, region, data, size, hint);
}
auto CreateSocket(std::string const& type, std::string const& name)
-> std::unique_ptr<mq::Socket> override
{
return std::make_unique<Socket>(fContext, type, name, GetId());
}
auto CreatePoller(std::vector<mq::Channel> const& /*channels*/) const
-> std::unique_ptr<mq::Poller> override
{
throw std::runtime_error("Not yet implemented (Poller).");
}
auto CreatePoller(std::vector<mq::Channel*> const& /*channels*/) const
-> std::unique_ptr<mq::Poller> override
{
throw std::runtime_error("Not yet implemented (Poller).");
}
auto CreatePoller(
std::unordered_map<std::string, std::vector<FairMQChannel>> const& /*channelsMap*/,
std::vector<std::string> const& /*channelList*/) const
-> std::unique_ptr<mq::Poller> override
{
throw std::runtime_error("Not yet implemented (Poller).");
}
auto CreateUnmanagedRegion(std::size_t /*size*/,
RegionCallback /*callback = nullptr*/,
std::string const& /*path = ""*/,
int /*flags = 0*/,
RegionConfig /*cfg = RegionConfig()*/)
-> std::unique_ptr<mq::UnmanagedRegion> override
{
throw std::runtime_error("Not yet implemented UMR.");
}
auto CreateUnmanagedRegion(std::size_t /*size*/,
RegionBulkCallback /*callback = nullptr*/,
std::string const& /*path = ""*/,
int /*flags = 0*/,
RegionConfig /*cfg = RegionConfig()*/)
-> std::unique_ptr<mq::UnmanagedRegion> override
{
throw std::runtime_error("Not yet implemented UMR.");
}
auto CreateUnmanagedRegion(std::size_t /*size*/,
int64_t /*userFlags*/,
RegionCallback /*callback = nullptr*/,
std::string const& /*path = ""*/,
int /*flags = 0*/,
RegionConfig /*cfg = RegionConfig()*/)
-> std::unique_ptr<mq::UnmanagedRegion> override
{
throw std::runtime_error("Not yet implemented UMR.");
}
auto CreateUnmanagedRegion(std::size_t /*size*/,
int64_t /*userFlags*/,
RegionBulkCallback /*callback = nullptr*/,
std::string const& /*path = ""*/,
int /*flags = 0*/,
RegionConfig /*cfg = RegionConfig()*/)
-> std::unique_ptr<mq::UnmanagedRegion> override
{
throw std::runtime_error("Not yet implemented UMR.");
}
auto SubscribeToRegionEvents(RegionEventCallback /*callback*/) -> void override
{
throw std::runtime_error("Not yet implemented.");
}
auto SubscribedToRegionEvents() -> bool override
{
throw std::runtime_error("Not yet implemented.");
}
auto UnsubscribeFromRegionEvents() -> void override
{
throw std::runtime_error("Not yet implemented.");
}
auto GetRegionInfo() -> std::vector<RegionInfo> override
{
LOG(error) << "GetRegionInfo not yet implemented for OFI, returning empty vector";
return std::vector<RegionInfo>();
}
auto GetType() const -> Transport override { return Transport::OFI; }
void Interrupt() override { fContext.Interrupt(); } void Interrupt() override { fContext.Interrupt(); }
void Resume() override { fContext.Resume(); } void Resume() override { fContext.Resume(); }
@ -65,6 +197,6 @@ class TransportFactory final : public FairMQTransportFactory
asiofi::allocated_pool_resource fMemoryResource; asiofi::allocated_pool_resource fMemoryResource;
}; /* class TransportFactory */ }; /* class TransportFactory */
} // namespace fair::mq::ofi } // namespace fair::mq::ofi
#endif /* FAIR_MQ_OFI_TRANSPORTFACTORY_H */ #endif /* FAIR_MQ_OFI_TRANSPORTFACTORY_H */

48
fairmq/shmem/Manager.cxx Normal file
View File

@ -0,0 +1,48 @@
/********************************************************************************
* Copyright (C) 2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "Manager.h"
// Needed to compile-firewall the <boost/process/async.hpp> header because it
// interferes with the <asio/buffer.hpp> header. So, let's factor
// the whole dependency to Boost.Process out of the header.
#include <boost/process.hpp>
#include <fairlogger/Logger.h>
namespace fair::mq::shmem {
bool Manager::SpawnShmMonitor(const std::string& id)
{
auto const env(boost::this_process::environment());
std::string const fairmq_path_key("FAIRMQ_PATH");
std::string const shmmonitor_exe_name("fairmq-shmmonitor");
std::string const shmmonitor_verbose_key("FAIRMQ_SHMMONITOR_VERBOSE");
auto path(boost::this_process::path());
if (env.count(fairmq_path_key)) {
path.emplace(path.begin(), env.at(fairmq_path_key).to_string());
}
auto exe(boost::process::search_path(shmmonitor_exe_name, path));
if (exe.empty()) {
LOG(warn) << "could not find " << shmmonitor_exe_name << " in \"$" << fairmq_path_key
<< ":$PATH\"";
return false;
}
// TODO Move this to fairmq-shmmonitor itself ?
bool verbose(env.count(shmmonitor_verbose_key)
&& env.at(shmmonitor_verbose_key).to_string() == "true");
boost::process::spawn(
exe, "-x", "-m", "--shmid", id, "-d", "-t", "2000", (verbose ? "--verbose" : ""), env);
return true;
}
} // namespace fair::mq::shmem

View File

@ -1,5 +1,5 @@
/******************************************************************************** /********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * Copyright (C) 2014-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* * * *
* This software is distributed under the terms of the * * This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, * * GNU Lesser General Public Licence (LGPL) version 3, *
@ -31,7 +31,6 @@
#include <boost/interprocess/managed_shared_memory.hpp> #include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/interprocess/sync/named_condition.hpp> #include <boost/interprocess/sync/named_condition.hpp>
#include <boost/interprocess/sync/named_mutex.hpp> #include <boost/interprocess/sync/named_mutex.hpp>
#include <boost/process.hpp>
#include <boost/variant.hpp> #include <boost/variant.hpp>
#include <condition_variable> #include <condition_variable>
@ -240,6 +239,10 @@ class Manager
LOG(debug) << "Successfully locked the managed segment memory pages."; LOG(debug) << "Successfully locked the managed segment memory pages.";
} }
private:
static bool SpawnShmMonitor(const std::string& id);
public:
static void StartMonitor(const std::string& id) static void StartMonitor(const std::string& id)
{ {
using namespace boost::interprocess; using namespace boost::interprocess;
@ -248,25 +251,8 @@ class Manager
LOG(debug) << "Found fairmq-shmmonitor for shared memory id " << id; LOG(debug) << "Found fairmq-shmmonitor for shared memory id " << id;
} catch (interprocess_exception&) { } catch (interprocess_exception&) {
LOG(debug) << "no fairmq-shmmonitor found for shared memory id " << id << ", starting..."; LOG(debug) << "no fairmq-shmmonitor found for shared memory id " << id << ", starting...";
auto env = boost::this_process::environment();
std::vector<boost::filesystem::path> ownPath = boost::this_process::path(); if (SpawnShmMonitor(id)) {
if (const char* fmqp = getenv("FAIRMQ_PATH")) {
ownPath.insert(ownPath.begin(), boost::filesystem::path(fmqp));
}
boost::filesystem::path p = boost::process::search_path("fairmq-shmmonitor", ownPath);
bool verbose = false;
if (const char* verboseEnv = getenv("FAIRMQ_SHMMONITOR_VERBOSE")) {
if (std::string(verboseEnv) == "true") {
verbose = true;
}
}
if (!p.empty()) {
boost::process::spawn(p, "-x", "-m", "--shmid", id, "-d", "-t", "2000", (verbose ? "--verbose" : ""), env);
int numTries = 0; int numTries = 0;
do { do {
try { try {
@ -281,8 +267,6 @@ class Manager
} }
} }
} while (true); } while (true);
} else {
LOG(warn) << "could not find fairmq-shmmonitor in the path";
} }
} }
} }

View File

@ -1,10 +1,10 @@
/******************************************************************************** /********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * Copyright (C) 2014-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* * * *
* This software is distributed under the terms of the * * This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, * * GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" * * copied verbatim in the file "LICENSE" *
********************************************************************************/ ********************************************************************************/
/** /**
* Region.h * Region.h
* *
@ -22,7 +22,6 @@
#include <fairmq/tools/Strings.h> #include <fairmq/tools/Strings.h>
#include <boost/filesystem.hpp> #include <boost/filesystem.hpp>
#include <boost/process.hpp>
#include <boost/date_time/posix_time/posix_time.hpp> #include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/interprocess/managed_shared_memory.hpp> #include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/interprocess/file_mapping.hpp> #include <boost/interprocess/file_mapping.hpp>