diff --git a/fairmq/FairMQMessage.h b/fairmq/FairMQMessage.h index f4056534..cb4b043c 100644 --- a/fairmq/FairMQMessage.h +++ b/fairmq/FairMQMessage.h @@ -21,7 +21,7 @@ class FairMQMessage { public: FairMQMessage() = default; - FairMQMessage(FairMQTransportFactory* factory):fTransport{factory} {} + FairMQMessage(FairMQTransportFactory* factory) : fTransport(factory) {} virtual void Rebuild() = 0; virtual void Rebuild(const size_t size) = 0; virtual void Rebuild(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) = 0; @@ -33,7 +33,7 @@ class FairMQMessage virtual fair::mq::Transport GetType() const = 0; FairMQTransportFactory* GetTransport() { return fTransport; } - //void SetTransport(FairMQTransportFactory* transport) { fTransport = transport; } + void SetTransport(FairMQTransportFactory* transport) { fTransport = transport; } virtual void Copy(const FairMQMessage& msg) = 0; @@ -53,6 +53,7 @@ namespace mq using Message = FairMQMessage; using MessagePtr = FairMQMessagePtr; struct MessageError : std::runtime_error { using std::runtime_error::runtime_error; }; +struct MessageBadAlloc : std::runtime_error { using std::runtime_error::runtime_error; }; } /* namespace mq */ } /* namespace fair */ diff --git a/fairmq/FairMQSocket.h b/fairmq/FairMQSocket.h index f6e22ec4..0905cd70 100644 --- a/fairmq/FairMQSocket.h +++ b/fairmq/FairMQSocket.h @@ -20,7 +20,7 @@ class FairMQSocket { public: FairMQSocket() {} - FairMQSocket(FairMQTransportFactory* fac): fTransport(fac) {} + FairMQSocket(FairMQTransportFactory* fac) : fTransport(fac) {} virtual std::string GetId() const = 0; @@ -54,7 +54,7 @@ class FairMQSocket virtual unsigned long GetMessagesRx() const = 0; FairMQTransportFactory* GetTransport() { return fTransport; } - void SetTransport(FairMQTransportFactory* transport) { fTransport=transport; } + void SetTransport(FairMQTransportFactory* transport) { fTransport = transport; } virtual ~FairMQSocket() {}; diff --git a/fairmq/plugins/config/Config.cxx b/fairmq/plugins/config/Config.cxx index f4f577f7..bad43b18 100644 --- a/fairmq/plugins/config/Config.cxx +++ b/fairmq/plugins/config/Config.cxx @@ -61,21 +61,22 @@ Plugin::ProgOptions ConfigPluginProgramOptions() namespace po = boost::program_options; auto pluginOptions = po::options_description{"FairMQ device options"}; pluginOptions.add_options() - ("id", po::value()->default_value(""), "Device ID.") - ("io-threads", po::value()->default_value(1), "Number of I/O threads.") - ("transport", po::value()->default_value("zeromq"), "Transport ('zeromq'/'shmem').") - ("network-interface", po::value()->default_value("default"), "Network interface to bind on (e.g. eth0, ib0..., default will try to detect the interface of the default route).") - ("init-timeout", po::value()->default_value(120), "Timeout for the initialization in seconds (when expecting dynamic initialization).") - ("max-run-time", po::value()->default_value(0), "Maximum runtime for the Running state handler, after which state will change to Ready (in seconds, 0 for no limit).") - ("print-channels", po::value()->implicit_value(true), "Print registered channel endpoints in a machine-readable format (::)") - ("shm-segment-size", po::value()->default_value(2000000000), "Shared memory: size of the shared memory segment (in bytes).") - ("shm-monitor", po::value()->default_value(true), "Shared memory: run monitor daemon.") - ("ofi-size-hint", po::value()->default_value(0), "EXPERIMENTAL: OFI size hint for the allocator.") - ("rate", po::value()->default_value(0.), "Rate for conditional run loop (Hz).") - ("session", po::value()->default_value("default"), "Session name.") - ("config-key", po::value(), "Use provided value instead of device id for fetching the configuration from JSON file.") - ("mq-config", po::value(), "JSON input as file.") - ("channel-config", po::value>()->multitoken()->composing(), "Configuration of single or multiple channel(s) by comma separated key=value list"); + ("id", po::value()->default_value(""), "Device ID.") + ("io-threads", po::value()->default_value(1), "Number of I/O threads.") + ("transport", po::value()->default_value("zeromq"), "Transport ('zeromq'/'shmem').") + ("network-interface", po::value()->default_value("default"), "Network interface to bind on (e.g. eth0, ib0..., default will try to detect the interface of the default route).") + ("init-timeout", po::value()->default_value(120), "Timeout for the initialization in seconds (when expecting dynamic initialization).") + ("max-run-time", po::value()->default_value(0), "Maximum runtime for the Running state handler, after which state will change to Ready (in seconds, 0 for no limit).") + ("print-channels", po::value()->implicit_value(true), "Print registered channel endpoints in a machine-readable format (::)") + ("shm-segment-size", po::value()->default_value(2000000000), "Shared memory: size of the shared memory segment (in bytes).") + ("shm-throw-bad-alloc", po::value()->default_value(true), "Throw a fair::mq::MessageBadAlloc if cannot allocate a message (retry if false).") + ("shm-monitor", po::value()->default_value(true), "Shared memory: run monitor daemon.") + ("ofi-size-hint", po::value()->default_value(0), "EXPERIMENTAL: OFI size hint for the allocator.") + ("rate", po::value()->default_value(0.), "Rate for conditional run loop (Hz).") + ("session", po::value()->default_value("default"), "Session name.") + ("config-key", po::value(), "Use provided value instead of device id for fetching the configuration from JSON file.") + ("mq-config", po::value(), "JSON input as file.") + ("channel-config", po::value>()->multitoken()->composing(), "Configuration of single or multiple channel(s) by comma separated key=value list"); return pluginOptions; } diff --git a/fairmq/run/startMQBenchmark.sh.in b/fairmq/run/startMQBenchmark.sh.in index 62105aed..21eeac8b 100755 --- a/fairmq/run/startMQBenchmark.sh.in +++ b/fairmq/run/startMQBenchmark.sh.in @@ -70,6 +70,7 @@ SAMPLER+=" --severity debug" SAMPLER+=" --msg-size $msgSize" SAMPLER+=" --multipart $multipart" SAMPLER+=" --num-parts $numParts" +SAMPLER+=" --shm-throw-bad-alloc false" # SAMPLER+=" --msg-rate 1000" SAMPLER+=" --max-iterations $maxIterations" SAMPLER+=" --channel-config name=data,type=pair,method=bind,address=tcp://127.0.0.1:5555" diff --git a/fairmq/shmem/Manager.h b/fairmq/shmem/Manager.h index 3ad3c64c..35856302 100644 --- a/fairmq/shmem/Manager.h +++ b/fairmq/shmem/Manager.h @@ -52,7 +52,7 @@ struct SharedMemoryError : std::runtime_error { using std::runtime_error::runtim class Manager { public: - Manager(std::string id, std::string deviceId, size_t size) + Manager(std::string id, std::string deviceId, size_t size, bool throwOnBadAlloc) : fShmId(std::move(id)) , fDeviceId(std::move(deviceId)) , fSegmentName("fmq_" + fShmId + "_main") @@ -69,6 +69,7 @@ class Manager , fMsgCounter(0) , fHeartbeatThread() , fSendHeartbeats(true) + , fThrowOnBadAlloc(throwOnBadAlloc) { using namespace boost::interprocess; LOG(debug) << "created/opened shared memory segment '" << "fmq_" << fShmId << "_main" << "' of " << size << " bytes. Available are " << fSegment.get_free_memory() << " bytes."; @@ -415,6 +416,8 @@ class Manager } } + bool ThrowingOnBadAlloc() const { return fThrowOnBadAlloc; } + private: std::string fShmId; std::string fDeviceId; @@ -440,6 +443,7 @@ class Manager std::thread fHeartbeatThread; std::atomic fSendHeartbeats; + bool fThrowOnBadAlloc; }; } // namespace shmem diff --git a/fairmq/shmem/Message.h b/fairmq/shmem/Message.h index 759537db..6fd7a4a0 100644 --- a/fairmq/shmem/Message.h +++ b/fairmq/shmem/Message.h @@ -12,7 +12,7 @@ #include "Manager.h" #include "Region.h" #include "UnmanagedRegion.h" - +#include #include #include #include @@ -37,7 +37,7 @@ class Message final : public fair::mq::Message public: Message(Manager& manager, FairMQTransportFactory* factory = nullptr) - : fair::mq::Message{factory} + : fair::mq::Message(factory) , fManager(manager) , fQueued(false) , fMeta{0, 0, 0, -1} @@ -48,7 +48,7 @@ class Message final : public fair::mq::Message } Message(Manager& manager, const size_t size, FairMQTransportFactory* factory = nullptr) - : fair::mq::Message{factory} + : fair::mq::Message(factory) , fManager(manager) , fQueued(false) , fMeta{0, 0, 0, -1} @@ -60,7 +60,7 @@ class Message final : public fair::mq::Message } Message(Manager& manager, void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr, FairMQTransportFactory* factory = nullptr) - : fair::mq::Message{factory} + : fair::mq::Message(factory) , fManager(manager) , fQueued(false) , fMeta{0, 0, 0, -1} @@ -79,7 +79,7 @@ class Message final : public fair::mq::Message } Message(Manager& manager, UnmanagedRegionPtr& region, void* data, const size_t size, void* hint = 0, FairMQTransportFactory* factory = nullptr) - : fair::mq::Message{factory} + : fair::mq::Message(factory) , fManager(manager) , fQueued(false) , fMeta{size, static_cast(region.get())->fRegionId, reinterpret_cast(hint), -1} @@ -97,7 +97,7 @@ class Message final : public fair::mq::Message } Message(Manager& manager, MetaHeader& hdr, FairMQTransportFactory* factory = nullptr) - : fair::mq::Message{factory} + : fair::mq::Message(factory) , fManager(manager) , fQueued(false) , fMeta{hdr} @@ -221,6 +221,8 @@ class Message final : public fair::mq::Message bool InitializeChunk(const size_t size) { + tools::RateLimiter rateLimiter(20); + while (fMeta.fHandle < 0) { try { boost::interprocess::managed_shared_memory::size_type actualSize = size; @@ -228,7 +230,10 @@ class Message final : public fair::mq::Message fLocalPtr = fManager.Segment().allocation_command(boost::interprocess::allocate_new, size, actualSize, hint); } catch (boost::interprocess::bad_alloc& ba) { // LOG(warn) << "Shared memory full..."; - std::this_thread::sleep_for(std::chrono::milliseconds(50)); + if (fManager.ThrowingOnBadAlloc()) { + throw MessageBadAlloc(tools::ToString("shmem: could not create a message of size ", size)); + } + rateLimiter.maybe_sleep(); if (fManager.Interrupted()) { return false; } else { diff --git a/fairmq/shmem/TransportFactory.h b/fairmq/shmem/TransportFactory.h index 39efff11..42d1bde6 100644 --- a/fairmq/shmem/TransportFactory.h +++ b/fairmq/shmem/TransportFactory.h @@ -59,11 +59,13 @@ class TransportFactory final : public fair::mq::TransportFactory std::string sessionName = "default"; size_t segmentSize = 2000000000; bool autolaunchMonitor = false; + bool throwOnBadAlloc = true; if (config) { numIoThreads = config->GetProperty("io-threads", numIoThreads); sessionName = config->GetProperty("session", sessionName); segmentSize = config->GetProperty("shm-segment-size", segmentSize); autolaunchMonitor = config->GetProperty("shm-monitor", autolaunchMonitor); + throwOnBadAlloc = config->GetProperty("shm-throw-bad-alloc", throwOnBadAlloc); } else { LOG(debug) << "ProgOptions not available! Using defaults."; } @@ -84,8 +86,7 @@ class TransportFactory final : public fair::mq::TransportFactory Manager::StartMonitor(fShmId); } - fManager = tools::make_unique(fShmId, fDeviceId, segmentSize); - + fManager = tools::make_unique(fShmId, fDeviceId, segmentSize, throwOnBadAlloc); } catch (boost::interprocess::interprocess_exception& e) { LOG(error) << "Could not initialize shared memory transport: " << e.what(); throw std::runtime_error(tools::ToString("Could not initialize shared memory transport: ", e.what()));