diff --git a/CMakeLists.txt b/CMakeLists.txt index 563fef29..28ce56f5 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -49,7 +49,7 @@ find_package(Threads REQUIRED) if(BUILD_FAIRMQ) find_package2(PUBLIC Boost VERSION 1.64 REQUIRED - COMPONENTS program_options thread system filesystem regex date_time signals + COMPONENTS container program_options thread system filesystem regex date_time signals ) find_package2(PUBLIC FairLogger VERSION 1.2.0 REQUIRED) find_package2(PRIVATE ZeroMQ VERSION 4.1.5 REQUIRED) diff --git a/fairmq/CMakeLists.txt b/fairmq/CMakeLists.txt index fc7743ca..7728f8fe 100644 --- a/fairmq/CMakeLists.txt +++ b/fairmq/CMakeLists.txt @@ -49,6 +49,8 @@ set(FAIRMQ_PUBLIC_HEADER_FILES FairMQSocket.h FairMQStateMachine.h FairMQTransportFactory.h + MemoryResources.h + MemoryResourceTools.h Tools.h Transports.h options/FairMQProgOptions.h @@ -155,6 +157,7 @@ set(FAIRMQ_SOURCE_FILES zeromq/FairMQUnmanagedRegionZMQ.cxx zeromq/FairMQSocketZMQ.cxx zeromq/FairMQTransportFactoryZMQ.cxx + MemoryResources.cxx ) if(BUILD_NANOMSG_TRANSPORT) @@ -232,6 +235,7 @@ endif() target_link_libraries(${_target} INTERFACE # only consumers link against interface dependencies + Boost::container PUBLIC # libFairMQ AND consumers of libFairMQ link aginst public dependencies Threads::Threads diff --git a/fairmq/FairMQTransportFactory.h b/fairmq/FairMQTransportFactory.h index 730f334b..32065c30 100644 --- a/fairmq/FairMQTransportFactory.h +++ b/fairmq/FairMQTransportFactory.h @@ -9,11 +9,12 @@ #ifndef FAIRMQTRANSPORTFACTORY_H_ #define FAIRMQTRANSPORTFACTORY_H_ -#include -#include -#include -#include #include +#include +#include +#include +#include +#include #include #include @@ -30,6 +31,9 @@ class FairMQTransportFactory /// Topology wide unique id const std::string fkId; + /// The polymorphic memory resource associated with the transport + fair::mq::ChannelResource fMemoryResource{this}; + public: /// ctor /// @param id Topology wide unique id, usually the device id. @@ -37,6 +41,9 @@ class FairMQTransportFactory auto GetId() const -> const std::string { return fkId; }; + /// Get a pointer to the associated polymorphic memory resource + fair::mq::ChannelResource* GetMemoryResource() { return &fMemoryResource; } + /// @brief Create empty FairMQMessage /// @return pointer to FairMQMessage virtual FairMQMessagePtr CreateMessage() const = 0; diff --git a/fairmq/MemoryResourceTools.h b/fairmq/MemoryResourceTools.h new file mode 100644 index 00000000..6dfbf36f --- /dev/null +++ b/fairmq/MemoryResourceTools.h @@ -0,0 +1,119 @@ +/******************************************************************************** + * Copyright (C) 2018 CERN and copyright holders of ALICE O2 * + * Copyright (C) 2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ + +/// @brief Tools for interfacing containers to the transport via polymorphic +/// allocators +/// +/// @author Mikolaj Krzewicki, mkrzewic@cern.ch + +#include +#include + +namespace fair { +namespace mq { + +using ByteSpectatorAllocator = SpectatorAllocator; +using BytePmrAllocator = boost::container::pmr::polymorphic_allocator; + +//_________________________________________________________________________________________________ +// return the message associated with the container or nullptr if it does not +// make sense (e.g. when +// we are just watching an existing message or when the container is not using +// FairMQMemoryResource +// as backend). +template +// typename std::enable_if< +// std::is_base_of< +// boost::container::pmr::polymorphic_allocator, +// typename ContainerT::allocator_type>::value == true, +// FairMQMessagePtr>::type +FairMQMessagePtr getMessage(ContainerT &&container_, FairMQMemoryResource *targetResource = nullptr) +{ + auto container = std::move(container_); + auto alloc = container.get_allocator(); + + auto resource = dynamic_cast(alloc.resource()); + if (!resource && !targetResource) { + throw std::runtime_error("Neither the container or target resource specified"); + } + size_t containerSizeBytes = container.size() * sizeof(typename ContainerT::value_type); + if ((!targetResource && resource) + || (resource && targetResource && resource->is_equal(*targetResource))) { + auto message = resource->getMessage(static_cast( + const_cast::type *>( + container.data()))); + if (message) + message->SetUsedSize(containerSizeBytes); + return std::move(message); + } else { + auto message = targetResource->getTransportFactory()->CreateMessage(containerSizeBytes); + std::memcpy(static_cast(message->GetData()), + container.data(), + containerSizeBytes); + return std::move(message); + } +}; + +//_________________________________________________________________________________________________ +/// Return a vector of const ElemT, resource must be kept alive throughout the +/// lifetime of the +/// container and associated message. +template +std::vector> adoptVector( + size_t nelem, + SpectatorMessageResource *resource) +{ + return std::vector>( + nelem, SpectatorAllocator(resource)); +}; + +//_________________________________________________________________________________________________ +/// Return a vector of const ElemT, takes ownership of the message, needs an +/// upstream global +/// ChannelResource to register the message. +template +std::vector> + adoptVector(size_t nelem, ChannelResource *upstream, FairMQMessagePtr message) +{ + return std::vector>( + nelem, + OwningMessageSpectatorAllocator( + MessageResource{std::move(message), upstream})); +}; + +//_________________________________________________________________________________________________ +// TODO: this is C++14, converting it down to C++11 is too much work atm +// This returns a unique_ptr of const vector, does not allow modifications at +// the cost of pointer +// semantics for access. +// use auto or decltype to catch the return value (or use span) +// template +// auto adoptVector(size_t nelem, FairMQMessage* message) +//{ +// using DataType = std::vector; +// +// struct doubleDeleter +// { +// // kids: don't do this at home! (but here it's OK) +// // this stateful deleter allows a single unique_ptr to manage 2 +// resources at the same time. +// std::unique_ptr extra; +// void operator()(const DataType* ptr) { delete ptr; } +// }; +// +// using OutputType = std::unique_ptr; +// +// auto resource = std::make_unique(message); +// auto output = new DataType(nelem, ByteSpectatorAllocator{resource.get()}); +// return OutputType(output, doubleDeleter{std::move(resource)}); +//} + +} /* namespace mq */ +} /* namespace fair */ diff --git a/fairmq/MemoryResources.cxx b/fairmq/MemoryResources.cxx new file mode 100644 index 00000000..62b4ab9c --- /dev/null +++ b/fairmq/MemoryResources.cxx @@ -0,0 +1,25 @@ +/******************************************************************************** + * Copyright (C) 2018 CERN and copyright holders of ALICE O2 * + * Copyright (C) 2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ + +/// @brief Memory allocators and interfaces related to managing memory via the +/// trasport layer +/// +/// @author Mikolaj Krzewicki, mkrzewic@cern.ch + +#include +#include + +void *fair::mq::ChannelResource::do_allocate(std::size_t bytes, std::size_t /*alignment*/) +{ + FairMQMessagePtr message; + message = factory->CreateMessage(bytes); + void *addr = message->GetData(); + messageMap[addr] = std::move(message); + return addr; +}; diff --git a/fairmq/MemoryResources.h b/fairmq/MemoryResources.h new file mode 100644 index 00000000..bd3f84cf --- /dev/null +++ b/fairmq/MemoryResources.h @@ -0,0 +1,342 @@ +/******************************************************************************** + * Copyright (C) 2018 CERN and copyright holders of ALICE O2 * + * Copyright (C) 2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ + +/// @brief Memory allocators and interfaces related to managing memory via the +/// trasport layer +/// +/// @author Mikolaj Krzewicki, mkrzewic@cern.ch + +#ifndef FAIR_MQ_MEMORY_RESOURCES_H +#define FAIR_MQ_MEMORY_RESOURCES_H + +#include +class FairMQTransportFactory; + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace fair { +namespace mq { + +using byte = unsigned char; + +/// All FairMQ related memory resources need to inherit from this interface +/// class for the +/// getMessage() api. +class FairMQMemoryResource : public boost::container::pmr::memory_resource +{ + public: + /// return the message containing data associated with the pointer (to start + /// of + /// buffer), e.g. pointer returned by std::vector::data() return nullptr if + /// returning + /// a message does not make sense! + virtual FairMQMessagePtr getMessage(void *p) = 0; + virtual void *setMessage(FairMQMessagePtr) = 0; + virtual const FairMQTransportFactory *getTransportFactory() const noexcept = 0; + virtual size_t getNumberOfMessages() const noexcept = 0; +}; + +/// This is the allocator that interfaces to FairMQ memory management. All +/// allocations are +/// delegated to FairMQ so standard (e.g. STL) containers can construct their +/// stuff in +/// memory regions appropriate for the data channel configuration. +class ChannelResource : public FairMQMemoryResource +{ + protected: + const FairMQTransportFactory *factory{nullptr}; + // TODO: for now a map to keep track of allocations, something else would + // probably be + // faster, but for now this does not need to be fast. + boost::container::flat_map messageMap; + + public: + ChannelResource() = delete; + + ChannelResource(const FairMQTransportFactory *_factory) + : FairMQMemoryResource() + , factory(_factory) + , messageMap() + { + if (!_factory) { + throw std::runtime_error("Tried to construct from a nullptr FairMQTransportFactory"); + } + }; + + FairMQMessagePtr getMessage(void *p) override + { + auto mes = std::move(messageMap[p]); + messageMap.erase(p); + return mes; + } + + void *setMessage(FairMQMessagePtr message) override + { + void *addr = message->GetData(); + messageMap[addr] = std::move(message); + return addr; + } + + const FairMQTransportFactory *getTransportFactory() const noexcept override { return factory; } + + size_t getNumberOfMessages() const noexcept override { return messageMap.size(); } + + protected: + void *do_allocate(std::size_t bytes, std::size_t alignment) override; + void do_deallocate(void *p, std::size_t /*bytes*/, std::size_t /*alignment*/) override + { + messageMap.erase(p); + }; + + bool do_is_equal(const boost::container::pmr::memory_resource &other) const noexcept override + { + return this == &other; + }; +}; + +/// This memory resource only watches, does not allocate/deallocate anything. +/// In combination with the ByteSpectatorAllocator this is an alternative to +/// using span, as raw +/// memory (e.g. an existing buffer message) will be accessible with appropriate +/// container. +class SpectatorMessageResource : public FairMQMemoryResource +{ + public: + SpectatorMessageResource() = default; + + SpectatorMessageResource(const FairMQMessage *_message) + : message(_message) + { + } + + FairMQMessagePtr getMessage(void * /*p*/) override { return nullptr; } + const FairMQTransportFactory *getTransportFactory() const noexcept override { return nullptr; } + size_t getNumberOfMessages() const noexcept override { return 0; } + void *setMessage(FairMQMessagePtr) override { return nullptr; } + + protected: + const FairMQMessage *message; + + void *do_allocate(std::size_t bytes, std::size_t /*alignment*/) override + { + if (message) { + if (bytes > message->GetSize()) { + throw std::bad_alloc(); + } + return message->GetData(); + } else { + return nullptr; + } + } + + void do_deallocate(void * /*p*/, std::size_t /*bytes*/, std::size_t /*alignment*/) override + { + message = nullptr; + return; + } + + bool do_is_equal(const memory_resource &other) const noexcept override + { + const SpectatorMessageResource *that = + dynamic_cast(&other); + if (!that) { + return false; + } + if (that->message == message) { + return true; + } + return false; + } +}; + +/// This memory resource only watches, does not allocate/deallocate anything. +/// Ownership of the message is taken. Meant to be used for transparent data +/// adoption in containers. +/// In combination with the SpectatorAllocator this is an alternative to using +/// span, as raw memory +/// (e.g. an existing buffer message) will be accessible with appropriate +/// container. +class MessageResource : public FairMQMemoryResource +{ + public: + MessageResource() noexcept = delete; + MessageResource(const MessageResource &) noexcept = default; + MessageResource(MessageResource &&) noexcept = default; + MessageResource &operator=(const MessageResource &) = default; + MessageResource &operator=(MessageResource &&) = default; + + MessageResource(FairMQMessagePtr message, FairMQMemoryResource *upstream) + : mUpstream{upstream} + , mMessageSize{message->GetSize()} + , mMessageData{mUpstream ? mUpstream->setMessage(std::move(message)) + : throw std::runtime_error( + "MessageResource::MessageResource upstream is nullptr")} + { + } + + FairMQMessagePtr getMessage(void *p) override { return mUpstream->getMessage(p); } + void *setMessage(FairMQMessagePtr message) override + { + return mUpstream->setMessage(std::move(message)); + } + + const FairMQTransportFactory *getTransportFactory() const noexcept override { return nullptr; } + size_t getNumberOfMessages() const noexcept override { return mMessageData ? 1 : 0; } + + protected: + FairMQMemoryResource *mUpstream{nullptr}; + size_t mMessageSize{0}; + void *mMessageData{nullptr}; + + void *do_allocate(std::size_t bytes, std::size_t /*alignment*/) override + { + if (bytes > mMessageSize) { + throw std::bad_alloc(); + } + return mMessageData; + } + + void do_deallocate(void * /*p*/, std::size_t /*bytes*/, std::size_t /*alignment*/) override + { + getMessage(mMessageData); // let the message die. + return; + } + + bool do_is_equal(const memory_resource & /*other*/) const noexcept override + { + // since this uniquely owns the message it can never be equal to anybody + // else + return false; + } +}; + +// This in general (as in STL) is a bad idea, but here it is safe to inherit +// from an allocator since +// we have no additional data and only override some methods so we don't get +// into slicing and other +// problems. +template +class SpectatorAllocator : public boost::container::pmr::polymorphic_allocator +{ + public: + using boost::container::pmr::polymorphic_allocator::polymorphic_allocator; + + // skip default construction of empty elements + // this is important for two reasons: one: it allows us to adopt an existing + // buffer (e.g. + // incoming message) and quickly construct large vectors while skipping the + // element + // initialization. + template + void construct(U *) + { + } + + // dont try to call destructors, makes no sense since resource is managed + // externally AND allowed + // types cannot have side effects + template + void destroy(U *) + { + } + + T *allocate(size_t size) + { + return reinterpret_cast(this->resource()->allocate(size * sizeof(T), 0)); + } + + void deallocate(T *ptr, size_t size) + { + this->resource()->deallocate(const_cast::type *>(ptr), size); + } +}; + +/// This allocator has a pmr-like interface, but keeps the unique +/// MessageResource as internal state, +/// allowing full resource (associated message) management internally without +/// any global state. +template +class OwningMessageSpectatorAllocator +{ + public: + using value_type = T; + + MessageResource mResource; + + OwningMessageSpectatorAllocator() noexcept = default; + OwningMessageSpectatorAllocator(const OwningMessageSpectatorAllocator &) noexcept = default; + OwningMessageSpectatorAllocator(OwningMessageSpectatorAllocator &&) noexcept = default; + + OwningMessageSpectatorAllocator(MessageResource &&resource) noexcept + : mResource{resource} + { + } + + template + OwningMessageSpectatorAllocator(const OwningMessageSpectatorAllocator &other) noexcept + : mResource(other.mResource) + { + } + + OwningMessageSpectatorAllocator &operator=(const OwningMessageSpectatorAllocator &other) + { + mResource = other.mResource; + return *this; + } + + OwningMessageSpectatorAllocator select_on_container_copy_construction() const + { + return OwningMessageSpectatorAllocator(); + } + + boost::container::pmr::memory_resource *resource() { return &mResource; } + + // skip default construction of empty elements + // this is important for two reasons: one: it allows us to adopt an existing + // buffer (e.g. + // incoming message) and quickly construct large vectors while skipping the + // element + // initialization. + template + void construct(U *) + { + } + + // dont try to call destructors, makes no sense since resource is managed + // externally AND allowed + // types cannot have side effects + template + void destroy(U *) + { + } + + T *allocate(size_t size) + { + return reinterpret_cast(mResource.allocate(size * sizeof(T), 0)); + } + + void deallocate(T *ptr, size_t size) + { + mResource.deallocate(const_cast::type *>(ptr), size); + } +}; + +} /* namespace mq */ +} /* namespace fair */ + +#endif /* FAIR_MQ_MEMORY_RESOURCES_H */ diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index efb604e0..7ff12aba 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -222,3 +222,14 @@ add_testsuite(FairMQ.Poller TIMEOUT 10 ${definitions} ) + +add_testsuite(FairMQ.MemoryResources + SOURCES + memory_resources/runner.cxx + memory_resources/_memory_resources.cxx + + LINKS FairMQ + INCLUDES ${CMAKE_CURRENT_BINARY_DIR} + TIMEOUT 10 +) + diff --git a/test/memory_resources/_memory_resources.cxx b/test/memory_resources/_memory_resources.cxx new file mode 100644 index 00000000..b9b65c7e --- /dev/null +++ b/test/memory_resources/_memory_resources.cxx @@ -0,0 +1,179 @@ +/******************************************************************************** + * Copyright (C) 2018 Goethe University Frankfurt * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ + +#include +#include +#include +#include +#include + +namespace { + +using namespace std; +using namespace fair::mq; +using factoryType = std::shared_ptr; + +factoryType factoryZMQ = FairMQTransportFactory::CreateTransportFactory("zeromq"); +factoryType factorySHM = FairMQTransportFactory::CreateTransportFactory("shmem"); + +struct testData +{ + int i{1}; + static int nallocated; + static int nallocations; + static int ndeallocations; + testData() + { + ++nallocated; + ++nallocations; + } + testData(const testData& in) + : i{in.i} + { + ++nallocated; + ++nallocations; + } + testData(const testData&& in) + : i{in.i} + { + ++nallocated; + ++nallocations; + } + testData(int in) + : i{in} + { + ++nallocated; + ++nallocations; + } + ~testData() + { + --nallocated; + ++ndeallocations; + } +}; + +int testData::nallocated = 0; +int testData::nallocations = 0; +int testData::ndeallocations = 0; + +auto allocZMQ = factoryZMQ -> GetMemoryResource(); +auto allocSHM = factorySHM -> GetMemoryResource(); + +TEST(MemoryResources, transportallocatormap_test) +{ + EXPECT_TRUE(allocZMQ != nullptr && allocSHM != allocZMQ); + auto _tmp = factoryZMQ->GetMemoryResource(); + EXPECT_TRUE(_tmp == allocZMQ); +} + +using namespace boost::container::pmr; + +TEST(MemoryResources, allocator_test) +{ + testData::nallocations = 0; + testData::ndeallocations = 0; + + { + std::vector> v( + polymorphic_allocator{allocZMQ}); + v.reserve(3); + EXPECT_TRUE(v.capacity() == 3); + EXPECT_TRUE(allocZMQ->getNumberOfMessages() == 1); + v.emplace_back(1); + v.emplace_back(2); + v.emplace_back(3); + EXPECT_TRUE((byte*)&(*v.end()) - (byte*)&(*v.begin()) == 3 * sizeof(testData)); + EXPECT_TRUE(testData::nallocated == 3); + } + EXPECT_TRUE(testData::nallocated == 0); + EXPECT_TRUE(testData::nallocations == testData::ndeallocations); + + testData::nallocations = 0; + testData::ndeallocations = 0; + { + std::vector> v( + SpectatorAllocator{allocZMQ}); + v.reserve(3); + EXPECT_TRUE(allocZMQ->getNumberOfMessages() == 1); + v.emplace_back(1); + v.emplace_back(2); + v.emplace_back(3); + EXPECT_TRUE(testData::nallocated == 3); + } + EXPECT_TRUE(testData::nallocated + == 3); // ByteSpectatorAllocator does not call dtors so nallocated remains at 3; + EXPECT_TRUE(allocZMQ->getNumberOfMessages() == 0); +} + +TEST(MemoryResources, getMessage_test) +{ + testData::nallocations = 0; + testData::ndeallocations = 0; + + FairMQMessagePtr message{nullptr}; + + int* messageArray{nullptr}; + + // test message creation on the same channel it was allocated with + { + std::vector> v( + polymorphic_allocator{allocZMQ}); + v.emplace_back(1); + v.emplace_back(2); + v.emplace_back(3); + void* vectorBeginPtr = &v[0]; + message = getMessage(std::move(v)); + EXPECT_TRUE(message != nullptr); + EXPECT_TRUE(message->GetData() == vectorBeginPtr); + } + EXPECT_TRUE(message->GetSize() == 3 * sizeof(testData)); + messageArray = static_cast(message->GetData()); + EXPECT_TRUE(messageArray[0] == 1 && messageArray[1] == 2 && messageArray[2] == 3); + + // test message creation on a different channel than it was allocated with + { + std::vector> v( + polymorphic_allocator{allocZMQ}); + v.emplace_back(4); + v.emplace_back(5); + v.emplace_back(6); + void* vectorBeginPtr = &v[0]; + message = getMessage(std::move(v), allocSHM); + EXPECT_TRUE(message != nullptr); + EXPECT_TRUE(message->GetData() != vectorBeginPtr); + } + EXPECT_TRUE(message->GetSize() == 3 * sizeof(testData)); + messageArray = static_cast(message->GetData()); + EXPECT_TRUE(messageArray[0] == 4 && messageArray[1] == 5 && messageArray[2] == 6); + + { + std::vector> v( + SpectatorAllocator{allocSHM}); + } +} + +TEST(MemoryResources, adoptVector_test) +{ + // Create a bogus message + auto message = factoryZMQ->CreateMessage(3 * sizeof(testData)); + auto messageAddr = message.get(); + testData tmpBuf[3] = {3, 2, 1}; + std::memcpy(message->GetData(), tmpBuf, 3 * sizeof(testData)); + + auto adoptedOwner = + adoptVector(3, factoryZMQ->GetMemoryResource(), std::move(message)); + EXPECT_TRUE(adoptedOwner[0].i == 3); + EXPECT_TRUE(adoptedOwner[1].i == 2); + EXPECT_TRUE(adoptedOwner[2].i == 1); + + auto reclaimedMessage = getMessage(std::move(adoptedOwner)); + EXPECT_TRUE(reclaimedMessage.get() == messageAddr); + EXPECT_TRUE(adoptedOwner.size() == 0); +} + +} // namespace diff --git a/test/memory_resources/runner.cxx b/test/memory_resources/runner.cxx new file mode 100644 index 00000000..ebabfc9a --- /dev/null +++ b/test/memory_resources/runner.cxx @@ -0,0 +1,19 @@ +/******************************************************************************** + * Copyright (C) 2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ + +#include + +#include + +auto main(int argc, char** argv) -> int +{ + ::testing::InitGoogleTest(&argc, argv); + ::testing::FLAGS_gtest_death_test_style = "threadsafe"; + setenv("FAIRMQ_PATH", FAIRMQ_TEST_ENVIRONMENT, 0); + return RUN_ALL_TESTS(); +}