From 53a4d17f8b498a056199ce4b665dbeed847be6b2 Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Tue, 19 May 2020 20:52:21 +0200 Subject: [PATCH] Alignment part I - Interface and shmem send --- fairmq/FairMQMessage.h | 15 +++++++++++++ fairmq/FairMQTransportFactory.h | 13 +++++++++-- fairmq/ofi/Message.cxx | 24 ++++++++++++++++++++ fairmq/ofi/Message.h | 2 ++ fairmq/ofi/TransportFactory.cxx | 12 ++++++++++ fairmq/ofi/TransportFactory.h | 2 ++ fairmq/shmem/Message.h | 38 +++++++++++++++++++++++++++----- fairmq/shmem/TransportFactory.h | 10 +++++++++ fairmq/zeromq/Message.h | 23 +++++++++++++++++++ fairmq/zeromq/TransportFactory.h | 10 +++++++++ test/message/_message.cxx | 19 ++++++++++++++++ 11 files changed, 161 insertions(+), 7 deletions(-) diff --git a/fairmq/FairMQMessage.h b/fairmq/FairMQMessage.h index cb4b043c..4f0e3ccd 100644 --- a/fairmq/FairMQMessage.h +++ b/fairmq/FairMQMessage.h @@ -17,11 +17,26 @@ using fairmq_free_fn = void(void* data, void* hint); class FairMQTransportFactory; +namespace fair +{ +namespace mq +{ + +struct Alignment +{ + size_t alignment; + explicit operator size_t() const { return alignment; } +}; + +} /* namespace mq */ +} /* namespace fair */ + class FairMQMessage { public: FairMQMessage() = default; FairMQMessage(FairMQTransportFactory* factory) : fTransport(factory) {} + virtual void Rebuild() = 0; virtual void Rebuild(const size_t size) = 0; virtual void Rebuild(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) = 0; diff --git a/fairmq/FairMQTransportFactory.h b/fairmq/FairMQTransportFactory.h index 9d0719e0..16c2da39 100644 --- a/fairmq/FairMQTransportFactory.h +++ b/fairmq/FairMQTransportFactory.h @@ -18,7 +18,7 @@ #include #include -#include +#include // shared_ptr #include #include #include @@ -47,13 +47,22 @@ class FairMQTransportFactory fair::mq::ChannelResource* GetMemoryResource() { return &fMemoryResource; } operator fair::mq::ChannelResource*() { return &fMemoryResource; } - /// @brief Create empty FairMQMessage + /// @brief Create empty FairMQMessage (for receiving) /// @return pointer to FairMQMessage virtual FairMQMessagePtr CreateMessage() = 0; + /// @brief Create empty FairMQMessage (for receiving), align received buffer to specified alignment + /// @param alignment alignment to align received buffer to + /// @return pointer to FairMQMessage + virtual FairMQMessagePtr CreateMessage(fair::mq::Alignment alignment) = 0; /// @brief Create new FairMQMessage of specified size /// @param size message size /// @return pointer to FairMQMessage virtual FairMQMessagePtr CreateMessage(const size_t size) = 0; + /// @brief Create new FairMQMessage of specified size and alignment + /// @param size message size + /// @param alignment message alignment + /// @return pointer to FairMQMessage + virtual FairMQMessagePtr CreateMessage(const size_t size, fair::mq::Alignment alignment) = 0; /// @brief Create new FairMQMessage with user provided buffer and size /// @param data pointer to user provided buffer /// @param size size of the user provided buffer diff --git a/fairmq/ofi/Message.cxx b/fairmq/ofi/Message.cxx index 8411a682..d27d5924 100644 --- a/fairmq/ofi/Message.cxx +++ b/fairmq/ofi/Message.cxx @@ -34,6 +34,16 @@ Message::Message(boost::container::pmr::memory_resource* pmr) { } +Message::Message(boost::container::pmr::memory_resource* pmr, Alignment /* alignment */) + : fInitialSize(0) + , fSize(0) + , fData(nullptr) + , fFreeFunction(nullptr) + , fHint(nullptr) + , fPmr(pmr) +{ +} + Message::Message(boost::container::pmr::memory_resource* pmr, const size_t size) : fInitialSize(size) , fSize(size) @@ -48,6 +58,20 @@ Message::Message(boost::container::pmr::memory_resource* pmr, const size_t size) } } +Message::Message(boost::container::pmr::memory_resource* pmr, const size_t size, Alignment /* alignment */) + : fInitialSize(size) + , fSize(size) + , fData(nullptr) + , fFreeFunction(nullptr) + , fHint(nullptr) + , fPmr(pmr) +{ + if (size) { + fData = fPmr->allocate(size); + assert(fData); + } +} + Message::Message(boost::container::pmr::memory_resource* pmr, void* data, const size_t size, diff --git a/fairmq/ofi/Message.h b/fairmq/ofi/Message.h index 5a79926b..8af0b499 100644 --- a/fairmq/ofi/Message.h +++ b/fairmq/ofi/Message.h @@ -34,7 +34,9 @@ class Message final : public fair::mq::Message { public: Message(boost::container::pmr::memory_resource* pmr); + Message(boost::container::pmr::memory_resource* pmr, Alignment alignment); Message(boost::container::pmr::memory_resource* pmr, const size_t size); + Message(boost::container::pmr::memory_resource* pmr, const size_t size, Alignment alignment); Message(boost::container::pmr::memory_resource* pmr, void* data, const size_t size, diff --git a/fairmq/ofi/TransportFactory.cxx b/fairmq/ofi/TransportFactory.cxx index 2e71c55d..4e400e6b 100644 --- a/fairmq/ofi/TransportFactory.cxx +++ b/fairmq/ofi/TransportFactory.cxx @@ -41,11 +41,23 @@ auto TransportFactory::CreateMessage() -> MessagePtr return MessagePtr{new Message(&fMemoryResource)}; } +auto TransportFactory::CreateMessage(Alignment /* alignment */) -> MessagePtr +{ + // TODO Do not ignore alignment + return MessagePtr{new Message(&fMemoryResource)}; +} + auto TransportFactory::CreateMessage(const size_t size) -> MessagePtr { return MessagePtr{new Message(&fMemoryResource, size)}; } +auto TransportFactory::CreateMessage(const size_t size, Alignment /* alignment */) -> MessagePtr +{ + // TODO Do not ignore alignment + return MessagePtr{new Message(&fMemoryResource, size)}; +} + auto TransportFactory::CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, diff --git a/fairmq/ofi/TransportFactory.h b/fairmq/ofi/TransportFactory.h index 1fdc9bc2..b0d55930 100644 --- a/fairmq/ofi/TransportFactory.h +++ b/fairmq/ofi/TransportFactory.h @@ -36,7 +36,9 @@ class TransportFactory final : public FairMQTransportFactory TransportFactory operator=(const TransportFactory&) = delete; auto CreateMessage() -> MessagePtr override; + auto CreateMessage(Alignment alignment) -> MessagePtr override; auto CreateMessage(const std::size_t size) -> MessagePtr override; + auto CreateMessage(const std::size_t size, Alignment alignment) -> MessagePtr override; auto CreateMessage(void* data, const std::size_t size, fairmq_free_fn* ffn, void* hint = nullptr) -> MessagePtr override; auto CreateMessage(UnmanagedRegionPtr& region, void* data, const std::size_t size, void* hint = nullptr) -> MessagePtr override; diff --git a/fairmq/shmem/Message.h b/fairmq/shmem/Message.h index 6fd7a4a0..95b1dd50 100644 --- a/fairmq/shmem/Message.h +++ b/fairmq/shmem/Message.h @@ -47,6 +47,17 @@ class Message final : public fair::mq::Message fManager.IncrementMsgCounter(); } + Message(Manager& manager, Alignment /* alignment */, FairMQTransportFactory* factory = nullptr) + : fair::mq::Message(factory) + , fManager(manager) + , fQueued(false) + , fMeta{0, 0, 0, -1} + , fRegionPtr(nullptr) + , fLocalPtr(nullptr) + { + fManager.IncrementMsgCounter(); + } + Message(Manager& manager, const size_t size, FairMQTransportFactory* factory = nullptr) : fair::mq::Message(factory) , fManager(manager) @@ -59,6 +70,18 @@ class Message final : public fair::mq::Message fManager.IncrementMsgCounter(); } + Message(Manager& manager, const size_t size, Alignment alignment, FairMQTransportFactory* factory = nullptr) + : fair::mq::Message(factory) + , fManager(manager) + , fQueued(false) + , fMeta{0, 0, 0, -1} + , fRegionPtr(nullptr) + , fLocalPtr(nullptr) + { + InitializeChunk(size, static_cast(alignment)); + fManager.IncrementMsgCounter(); + } + Message(Manager& manager, void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr, FairMQTransportFactory* factory = nullptr) : fair::mq::Message(factory) , fManager(manager) @@ -219,19 +242,24 @@ class Message final : public fair::mq::Message mutable Region* fRegionPtr; mutable char* fLocalPtr; - bool InitializeChunk(const size_t size) + bool InitializeChunk(const size_t size, size_t alignment = 0) { tools::RateLimiter rateLimiter(20); while (fMeta.fHandle < 0) { try { - boost::interprocess::managed_shared_memory::size_type actualSize = size; - char* hint = 0; // unused for boost::interprocess::allocate_new - fLocalPtr = fManager.Segment().allocation_command(boost::interprocess::allocate_new, size, actualSize, hint); + // boost::interprocess::managed_shared_memory::size_type actualSize = size; + // char* hint = 0; // unused for boost::interprocess::allocate_new + // fLocalPtr = fManager.Segment().allocation_command(boost::interprocess::allocate_new, size, actualSize, hint); + if (alignment == 0) { + fLocalPtr = reinterpret_cast(fManager.Segment().allocate(size)); + } else { + fLocalPtr = reinterpret_cast(fManager.Segment().allocate_aligned(size, alignment)); + } } catch (boost::interprocess::bad_alloc& ba) { // LOG(warn) << "Shared memory full..."; if (fManager.ThrowingOnBadAlloc()) { - throw MessageBadAlloc(tools::ToString("shmem: could not create a message of size ", size)); + throw MessageBadAlloc(tools::ToString("shmem: could not create a message of size ", size, ", alignment: ", (alignment != 0) ? std::to_string(alignment) : "default")); } rateLimiter.maybe_sleep(); if (fManager.Interrupted()) { diff --git a/fairmq/shmem/TransportFactory.h b/fairmq/shmem/TransportFactory.h index 42d1bde6..4066a892 100644 --- a/fairmq/shmem/TransportFactory.h +++ b/fairmq/shmem/TransportFactory.h @@ -101,11 +101,21 @@ class TransportFactory final : public fair::mq::TransportFactory return tools::make_unique(*fManager, this); } + MessagePtr CreateMessage(Alignment alignment) override + { + return tools::make_unique(*fManager, alignment, this); + } + MessagePtr CreateMessage(const size_t size) override { return tools::make_unique(*fManager, size, this); } + MessagePtr CreateMessage(const size_t size, Alignment alignment) override + { + return tools::make_unique(*fManager, size, alignment, this); + } + MessagePtr CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) override { return tools::make_unique(*fManager, data, size, ffn, hint, this); diff --git a/fairmq/zeromq/Message.h b/fairmq/zeromq/Message.h index a8cc1cd6..9e16e85a 100644 --- a/fairmq/zeromq/Message.h +++ b/fairmq/zeromq/Message.h @@ -47,6 +47,17 @@ class Message final : public fair::mq::Message LOG(error) << "failed initializing message, reason: " << zmq_strerror(errno); } } + Message(Alignment /* alignment */, FairMQTransportFactory* factory = nullptr) + : fair::mq::Message(factory) + , fUsedSizeModified(false) + , fUsedSize() + , fMsg(tools::make_unique()) + , fViewMsg(nullptr) + { + if (zmq_msg_init(fMsg.get()) != 0) { + LOG(error) << "failed initializing message, reason: " << zmq_strerror(errno); + } + } Message(const size_t size, FairMQTransportFactory* factory = nullptr) : fair::mq::Message(factory) @@ -60,6 +71,18 @@ class Message final : public fair::mq::Message } } + Message(const size_t size, Alignment /* alignment */, FairMQTransportFactory* factory = nullptr) + : fair::mq::Message(factory) + , fUsedSizeModified(false) + , fUsedSize(size) + , fMsg(tools::make_unique()) + , fViewMsg(nullptr) + { + if (zmq_msg_init_size(fMsg.get(), size) != 0) { + LOG(error) << "failed initializing message with size, reason: " << zmq_strerror(errno); + } + } + Message(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr, FairMQTransportFactory* factory = nullptr) : fair::mq::Message(factory) , fUsedSizeModified(false) diff --git a/fairmq/zeromq/TransportFactory.h b/fairmq/zeromq/TransportFactory.h index f4fc1f34..64716a88 100644 --- a/fairmq/zeromq/TransportFactory.h +++ b/fairmq/zeromq/TransportFactory.h @@ -55,12 +55,22 @@ class TransportFactory final : public FairMQTransportFactory { return tools::make_unique(this); } + + MessagePtr CreateMessage(Alignment alignment) override + { + return tools::make_unique(alignment, this); + } MessagePtr CreateMessage(const size_t size) override { return tools::make_unique(size, this); } + MessagePtr CreateMessage(const size_t size, Alignment alignment) override + { + return tools::make_unique(size, alignment, this); + } + MessagePtr CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) override { return tools::make_unique(data, size, ffn, hint, this); diff --git a/test/message/_message.cxx b/test/message/_message.cxx index 0c2f8d80..c9d4b01b 100644 --- a/test/message/_message.cxx +++ b/test/message/_message.cxx @@ -15,6 +15,7 @@ #include #include +#include namespace { @@ -77,6 +78,19 @@ void RunMsgRebuild(const string& transport) EXPECT_EQ(string(static_cast(msg->GetData()), msg->GetSize()), string("asdf")); } +void Alignment(const string& transport) +{ + size_t session{fair::mq::tools::UuidHash()}; + + fair::mq::ProgOptions config; + config.SetProperty("session", to_string(session)); + + auto factory = FairMQTransportFactory::CreateTransportFactory(transport, fair::mq::tools::Uuid(), &config); + + FairMQMessagePtr msg(factory->CreateMessage(100, fair::mq::Alignment{64})); + ASSERT_EQ(reinterpret_cast(msg->GetData()) % 64, 0); +} + TEST(Resize, zeromq) { RunPushPullWithMsgResize("zeromq", "ipc://test_message_resize"); @@ -97,4 +111,9 @@ TEST(Rebuild, shmem) RunMsgRebuild("shmem"); } +TEST(Alignment, shmem) // TODO: add test for ZeroMQ once it is implemented +{ + Alignment("shmem"); +} + } // namespace