feat: add options to control allocation attempts

This commit is contained in:
Alexey Rybalchenko 2021-09-24 09:46:46 +02:00
parent 55a2cfcc37
commit 1449166d44
2 changed files with 23 additions and 8 deletions

View File

@ -71,7 +71,9 @@ Plugin::ProgOptions ConfigPluginProgramOptions()
("shm-mlock-segment-on-creation", po::value<bool >()->default_value(false), "Shared memory: mlock the shared memory segment only once when created.") ("shm-mlock-segment-on-creation", po::value<bool >()->default_value(false), "Shared memory: mlock the shared memory segment only once when created.")
("shm-zero-segment", po::value<bool >()->default_value(false), "Shared memory: zero the shared memory segment memory after initialization (opened or created).") ("shm-zero-segment", po::value<bool >()->default_value(false), "Shared memory: zero the shared memory segment memory after initialization (opened or created).")
("shm-zero-segment-on-creation", po::value<bool >()->default_value(false), "Shared memory: zero the shared memory segment memory only once when created.") ("shm-zero-segment-on-creation", po::value<bool >()->default_value(false), "Shared memory: zero the shared memory segment memory only once when created.")
("shm-throw-bad-alloc", po::value<bool >()->default_value(true), "Throw a fair::mq::MessageBadAlloc if cannot allocate a message (retry if false).") ("shm-throw-bad-alloc", po::value<bool >()->default_value(true), "Shared memory: throw fair::mq::MessageBadAlloc if cannot allocate a message (retry if false).")
("bad-alloc-max-attempts", po::value<int >(), "Maximum number of allocation attempts before throwing fair::mq::MessageBadAlloc. -1 is infinite. There is always at least one attempt, so 0 has safe effect as 1.")
("bad-alloc-attempt-interval", po::value<int >()->default_value(50), "Interval between attempts if cannot allocate a message (in ms).")
("shm-monitor", po::value<bool >()->default_value(true), "Shared memory: run monitor daemon.") ("shm-monitor", po::value<bool >()->default_value(true), "Shared memory: run monitor daemon.")
("shm-no-cleanup", po::value<bool >()->default_value(false), "Shared memory: do not cleanup the memory when last device leaves.") ("shm-no-cleanup", po::value<bool >()->default_value(false), "Shared memory: do not cleanup the memory when last device leaves.")
("ofi-size-hint", po::value<size_t >()->default_value(0), "EXPERIMENTAL: OFI size hint for the allocator.") ("ofi-size-hint", po::value<size_t >()->default_value(0), "EXPERIMENTAL: OFI size hint for the allocator.")

View File

@ -150,13 +150,26 @@ class Manager
, fBeatTheHeart(true) , fBeatTheHeart(true)
, fRegionEventsSubscriptionActive(false) , fRegionEventsSubscriptionActive(false)
, fInterrupted(false) , fInterrupted(false)
, fThrowOnBadAlloc(config ? config->GetProperty<bool>("shm-throw-bad-alloc", true) : true) , fBadAllocMaxAttempts(1)
, fBadAllocAttemptIntervalInMs(config ? config->GetProperty<int>("bad-alloc-attempt-interval", 50) : 50)
, fNoCleanup(config ? config->GetProperty<bool>("shm-no-cleanup", false) : false) , fNoCleanup(config ? config->GetProperty<bool>("shm-no-cleanup", false) : false)
{ {
using namespace boost::interprocess; using namespace boost::interprocess;
LOG(debug) << "Generated shmid '" << fShmId << "' out of session id '" << sessionName << "'."; LOG(debug) << "Generated shmid '" << fShmId << "' out of session id '" << sessionName << "'.";
if (config) {
// if 'shm-throw-bad-alloc' is explicitly set to false (true is default), ignore other settings
if (config->Count("shm-throw-bad-alloc") && config->GetProperty<bool>("shm-throw-bad-alloc") == false) {
fBadAllocMaxAttempts = -1;
} else if (config->Count("bad-alloc-max-attempts")) {
// if 'bad-alloc-max-attempts' is provided, use it
// this can override the default 'shm-throw-bad-alloc'==true if the value of 'bad-alloc-max-attempts' is -1
fBadAllocMaxAttempts = config->GetProperty<int>("bad-alloc-max-attempts");
}
// otherwise leave fBadAllocMaxAttempts at 1 (the original default, set in the initializer list)
}
bool mlockSegment = false; bool mlockSegment = false;
bool mlockSegmentOnCreation = false; bool mlockSegmentOnCreation = false;
bool zeroSegment = false; bool zeroSegment = false;
@ -653,8 +666,6 @@ class Manager
} }
} }
bool ThrowingOnBadAlloc() const { return fThrowOnBadAlloc; }
void GetSegment(uint16_t id) void GetSegment(uint16_t id)
{ {
auto it = fSegments.find(id); auto it = fSegments.find(id);
@ -693,9 +704,10 @@ class Manager
alignment = std::max(alignment, alignof(std::max_align_t)); alignment = std::max(alignment, alignof(std::max_align_t));
char* ptr = nullptr; char* ptr = nullptr;
int numAttempts = 0;
size_t fullSize = ShmHeader::FullSize(size, alignment); size_t fullSize = ShmHeader::FullSize(size, alignment);
while (ptr == nullptr) { while (!ptr) {
try { try {
size_t segmentSize = boost::apply_visitor(SegmentSize(), fSegments.at(fSegmentId)); size_t segmentSize = boost::apply_visitor(SegmentSize(), fSegments.at(fSegmentId));
if (fullSize > segmentSize) { if (fullSize > segmentSize) {
@ -706,10 +718,10 @@ class Manager
ShmHeader::Construct(ptr, alignment); ShmHeader::Construct(ptr, alignment);
} catch (boost::interprocess::bad_alloc& ba) { } catch (boost::interprocess::bad_alloc& ba) {
// LOG(warn) << "Shared memory full..."; // LOG(warn) << "Shared memory full...";
if (ThrowingOnBadAlloc()) { if (fBadAllocMaxAttempts >= 0 && ++numAttempts >= fBadAllocMaxAttempts) {
throw MessageBadAlloc(tools::ToString("shmem: could not create a message of size ", size, ", alignment: ", (alignment != 0) ? std::to_string(alignment) : "default", ", free memory: ", boost::apply_visitor(SegmentFreeMemory(), fSegments.at(fSegmentId)))); throw MessageBadAlloc(tools::ToString("shmem: could not create a message of size ", size, ", alignment: ", (alignment != 0) ? std::to_string(alignment) : "default", ", free memory: ", boost::apply_visitor(SegmentFreeMemory(), fSegments.at(fSegmentId))));
} }
std::this_thread::sleep_for(std::chrono::milliseconds(50)); std::this_thread::sleep_for(std::chrono::milliseconds(fBadAllocAttemptIntervalInMs));
if (Interrupted()) { if (Interrupted()) {
throw MessageBadAlloc(tools::ToString("shmem: could not create a message of size ", size, ", alignment: ", (alignment != 0) ? std::to_string(alignment) : "default", ", free memory: ", boost::apply_visitor(SegmentFreeMemory(), fSegments.at(fSegmentId)))); throw MessageBadAlloc(tools::ToString("shmem: could not create a message of size ", size, ", alignment: ", (alignment != 0) ? std::to_string(alignment) : "default", ", free memory: ", boost::apply_visitor(SegmentFreeMemory(), fSegments.at(fSegmentId))));
} else { } else {
@ -832,7 +844,8 @@ class Manager
bool fRegionEventsSubscriptionActive; bool fRegionEventsSubscriptionActive;
std::atomic<bool> fInterrupted; std::atomic<bool> fInterrupted;
bool fThrowOnBadAlloc; int fBadAllocMaxAttempts;
int fBadAllocAttemptIntervalInMs;
bool fNoCleanup; bool fNoCleanup;
}; };