From f278e7e31208d219be519791d8de120956b289d5 Mon Sep 17 00:00:00 2001 From: Dennis Klein Date: Wed, 7 Jun 2023 22:24:42 +0200 Subject: [PATCH] feat: Add new tunable `--shm-metadata-msg-size` The shm metadata msg will be right-padded to the given size. This tunable may be used to saturate the kernel msg buffers more quickly with the effect that the ZeroMQ message queue size - on which the FairMQ shmem transport relies upon - behaves more accurately for very small queue sizes. This introduces a change for the meta msg format in the multipart case: old: | MetaHeader 1 | ... | MetaHeader n | new: | n | MetaHeader 1 | ... | MetaHeader n | padded to fMetadataMsgSize | where `n` is a `size_t` and contains the number of following meta headers. Previously, this number was infered from the msg buffer size itself which is no longer possible due to the potential padding. Implements #432 --- fairmq/devices/startMQBenchmark.sh.in | 1 + fairmq/plugins/config/Config.cxx | 2 + fairmq/shmem/Manager.h | 7 ++- fairmq/shmem/Socket.h | 62 ++++++++++++++------------- 4 files changed, 42 insertions(+), 30 deletions(-) diff --git a/fairmq/devices/startMQBenchmark.sh.in b/fairmq/devices/startMQBenchmark.sh.in index 9813b333..fda789b2 100755 --- a/fairmq/devices/startMQBenchmark.sh.in +++ b/fairmq/devices/startMQBenchmark.sh.in @@ -72,6 +72,7 @@ SAMPLER+=" --msg-size $msgSize" SAMPLER+=" --multipart $multipart" SAMPLER+=" --num-parts $numParts" SAMPLER+=" --shm-throw-bad-alloc false" +# SAMPLER+=" --shm-metadata-msg-size 1024" # SAMPLER+=" --msg-rate 1000" SAMPLER+=" --max-iterations $maxIterations" SAMPLER+=" --channel-config name=data,type=pair,method=bind,address=tcp://127.0.0.1:5555" diff --git a/fairmq/plugins/config/Config.cxx b/fairmq/plugins/config/Config.cxx index 5d470825..0195839c 100644 --- a/fairmq/plugins/config/Config.cxx +++ b/fairmq/plugins/config/Config.cxx @@ -11,6 +11,7 @@ #include #include +#include // for std::size_t #include using namespace std; @@ -72,6 +73,7 @@ Plugin::ProgOptions ConfigPluginProgramOptions() ("shm-zero-segment", po::value()->default_value(false), "Shared memory: zero the shared memory segment memory after initialization (opened or created).") ("shm-zero-segment-on-creation", po::value()->default_value(false), "Shared memory: zero the shared memory segment memory only once when created.") ("shm-throw-bad-alloc", po::value()->default_value(true), "Shared memory: throw fair::mq::MessageBadAlloc if cannot allocate a message (retry if false).") + ("shm-metadata-msg-size", po::value()->default_value(0), "Shared memory: size of the zmq metadata message (values smaller than minimum are clamped to the minimum).") ("bad-alloc-max-attempts", po::value(), "Maximum number of allocation attempts before throwing fair::mq::MessageBadAlloc. -1 is infinite. There is always at least one attempt, so 0 has safe effect as 1.") ("bad-alloc-attempt-interval", po::value()->default_value(50), "Interval between attempts if cannot allocate a message (in ms).") ("shm-monitor", po::value()->default_value(false), "Shared memory: run monitor daemon.") diff --git a/fairmq/shmem/Manager.h b/fairmq/shmem/Manager.h index 90265a77..8e00e427 100644 --- a/fairmq/shmem/Manager.h +++ b/fairmq/shmem/Manager.h @@ -29,7 +29,7 @@ #include // max #include #include -#include // max_align_t +#include // max_align_t, std::size_t #include // getenv #include // memcpy #include // make_unique @@ -151,6 +151,7 @@ class Manager , fBadAllocMaxAttempts(1) , fBadAllocAttemptIntervalInMs(config ? config->GetProperty("bad-alloc-attempt-interval", 50) : 50) , fNoCleanup(config ? config->GetProperty("shm-no-cleanup", false) : false) + , fMetadataMsgSize(config ? config->GetProperty("shm-metadata-msg-size", 0) : 0) { using namespace boost::interprocess; @@ -828,6 +829,8 @@ class Manager } } + auto GetMetadataMsgSize() const noexcept { return fMetadataMsgSize; } + ~Manager() { fRegionsGen += 1; // signal TL cache invalidation @@ -884,6 +887,8 @@ class Manager int fBadAllocMaxAttempts; int fBadAllocAttemptIntervalInMs; bool fNoCleanup; + + std::size_t fMetadataMsgSize; }; } // namespace fair::mq::shmem diff --git a/fairmq/shmem/Socket.h b/fairmq/shmem/Socket.h index f6d282bb..83b99af3 100644 --- a/fairmq/shmem/Socket.h +++ b/fairmq/shmem/Socket.h @@ -22,8 +22,12 @@ #include +#include // for std::max #include -#include // make_unique +#include // for std::size_t +#include // for std::memcpy +#include // for std::terminate +#include // for std::make_unique namespace fair::mq { class TransportFactory; @@ -47,6 +51,7 @@ class Socket final : public fair::mq::Socket , fMessagesRx(0) , fTimeout(100) , fConnectedPeersCount(0) + , fMetadataMsgSize(manager.GetMetadataMsgSize()) { assert(context); @@ -124,8 +129,8 @@ class Socket final : public fair::mq::Socket } int elapsed = 0; - // make meta msg - zmq::ZMsg zmqMsg(sizeof(MetaHeader)); + // meta msg format: | MetaHeader | padded to fMetadataMsgSize | + zmq::ZMsg zmqMsg(std::max(fMetadataMsgSize, sizeof(MetaHeader))); std::memcpy(zmqMsg.Data(), &(shmMsg->fMeta), sizeof(MetaHeader)); while (true) { @@ -165,11 +170,11 @@ class Socket final : public fair::mq::Socket int nbytes = zmq_recv(fSocket, &(shmMsg->fMeta), sizeof(MetaHeader), flags); if (nbytes > 0) { // check for number of received messages. must be 1 - if (nbytes != sizeof(MetaHeader)) { + if (static_cast(nbytes) < sizeof(MetaHeader)) { throw SocketError( tools::ToString("Received message is not a valid FairMQ shared memory message. ", "Possibly due to a misconfigured transport on the sender side. ", - "Expected size of ", sizeof(MetaHeader), " bytes, received ", nbytes)); + "Expected minimum size of ", sizeof(MetaHeader), " bytes, received ", nbytes)); } size_t size = shmMsg->GetSize(); @@ -198,13 +203,14 @@ class Socket final : public fair::mq::Socket } int elapsed = 0; - // put it into zmq message - const unsigned int vecSize = msgVec.size(); - zmq::ZMsg zmqMsg(vecSize * sizeof(MetaHeader)); - - // prepare the message with shm metas - MetaHeader* metas = static_cast(zmqMsg.Data()); + // meta msg format: | n | MetaHeader 1 | ... | MetaHeader n | padded to fMetadataMsgSize | + auto const n = msgVec.size(); + zmq::ZMsg zmqMsg(std::max(fMetadataMsgSize, sizeof(std::size_t) + n * sizeof(MetaHeader))); + auto meta_n = static_cast(zmqMsg.Data()); + *meta_n = n; + ++meta_n; + auto metas = static_cast(static_cast(meta_n)); for (auto& msg : msgVec) { auto msgPtr = msg.get(); if (!msgPtr) { @@ -219,7 +225,7 @@ class Socket final : public fair::mq::Socket int64_t totalSize = 0; int nbytes = zmq_msg_send(zmqMsg.Msg(), fSocket, flags); if (nbytes > 0) { - assert(static_cast(nbytes) == (vecSize * sizeof(MetaHeader))); // all or nothing + assert(static_cast(nbytes) >= sizeof(std::size_t) + (n * sizeof(MetaHeader))); for (auto& msg : msgVec) { Message* shmMsg = static_cast(msg.get()); @@ -259,26 +265,23 @@ class Socket final : public fair::mq::Socket zmq::ZMsg zmqMsg; while (true) { - int64_t totalSize = 0; + std::size_t totalSize = 0; int nbytes = zmq_msg_recv(zmqMsg.Msg(), fSocket, flags); if (nbytes > 0) { - MetaHeader* hdrVec = static_cast(zmqMsg.Data()); - const auto hdrVecSize = zmqMsg.Size(); + [[maybe_unused]] auto const size = zmqMsg.Size(); + assert(size > sizeof(std::size_t)); + auto meta_n = static_cast(zmqMsg.Data()); + auto const n = *meta_n; + assert(size >= sizeof(std::size_t) + n * sizeof(MetaHeader)); + ++meta_n; + auto metas = static_cast(static_cast(meta_n)); + msgVec.reserve(msgVec.size() + n); + auto const transport = GetTransport(); - assert(hdrVecSize > 0); - if (hdrVecSize % sizeof(MetaHeader) != 0) { - throw SocketError( - tools::ToString("Received message is not a valid FairMQ shared memory message. ", - "Possibly due to a misconfigured transport on the sender side. ", - "Expected size of ", sizeof(MetaHeader), " bytes, received ", nbytes)); - } - - const auto numMessages = hdrVecSize / sizeof(MetaHeader); - msgVec.reserve(numMessages); - - for (size_t m = 0; m < numMessages; m++) { - // create new message (part) - msgVec.emplace_back(std::make_unique(fManager, hdrVec[m], GetTransport())); + for (std::size_t i = 0; i < n; ++i) { + msgVec.push_back(std::make_unique(fManager, *metas, transport)); + ++metas; + // NOLINTNEXTLINE(cppcoreguidelines-pro-type-static-cast-downcast) Message* shmMsg = static_cast(msgVec.back().get()); totalSize += shmMsg->GetSize(); } @@ -456,6 +459,7 @@ class Socket final : public fair::mq::Socket int fTimeout; mutable unsigned long fConnectedPeersCount; + std::size_t fMetadataMsgSize; }; } // namespace fair::mq::shmem