diff --git a/fairmq/shmem/Manager.h b/fairmq/shmem/Manager.h index 7a3cb5fc..46e17f40 100644 --- a/fairmq/shmem/Manager.h +++ b/fairmq/shmem/Manager.h @@ -34,6 +34,7 @@ #include #include #include +#include #include // getenv #include @@ -173,9 +174,6 @@ class Manager Manager(const Manager&) = delete; Manager operator=(const Manager&) = delete; - RBTreeBestFitSegment& Segment() { return fSegments.at(0); } - boost::interprocess::managed_shared_memory& ManagementSegment() { return fManagementSegment; } - static void StartMonitor(const std::string& id) { using namespace boost::interprocess; @@ -464,6 +462,68 @@ class Manager bool ThrowingOnBadAlloc() const { return fThrowOnBadAlloc; } + boost::interprocess::managed_shared_memory::handle_t GetHandleFromAddress(const void* ptr) const { return fSegments.at(0).get_handle_from_address(ptr); } + void* GetAddressFromHandle(const boost::interprocess::managed_shared_memory::handle_t handle) const { return fSegments.at(0).get_address_from_handle(handle); } + + char* Allocate(const size_t size, size_t alignment = 0) + { + char* ptr = nullptr; + // tools::RateLimiter rateLimiter(20); + + while (ptr == nullptr) { + try { + // boost::interprocess::managed_shared_memory::size_type actualSize = size; + // char* hint = 0; // unused for boost::interprocess::allocate_new + // ptr = fSegments.at(0).allocation_command(boost::interprocess::allocate_new, size, actualSize, hint); + size_t segmentSize = fSegments.at(0).get_size(); + if (size > segmentSize) { + throw MessageBadAlloc(tools::ToString("Requested message size (", size, ") exceeds segment size (", segmentSize, ")")); + } + if (alignment == 0) { + ptr = reinterpret_cast(fSegments.at(0).allocate(size)); + } else { + ptr = reinterpret_cast(fSegments.at(0).allocate_aligned(size, alignment)); + } + } catch (boost::interprocess::bad_alloc& ba) { + // LOG(warn) << "Shared memory full..."; + if (ThrowingOnBadAlloc()) { + throw MessageBadAlloc(tools::ToString("shmem: could not create a message of size ", size, ", alignment: ", (alignment != 0) ? std::to_string(alignment) : "default", ", free memory: ", fSegments.at(0).get_free_memory())); + } + // rateLimiter.maybe_sleep(); + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + if (Interrupted()) { + return ptr; + } else { + continue; + } + } +#ifdef FAIRMQ_DEBUG_MODE + boost::interprocess::scoped_lock lock(fShmMtx); + IncrementShmMsgCounter(); + AddMsgDebug(getpid(), size, static_cast(GetHandleFromAddress(ptr)), std::chrono::system_clock::now().time_since_epoch().count()); +#endif + } + + return ptr; + } + + void Deallocate(boost::interprocess::managed_shared_memory::handle_t handle) + { + fSegments.at(0).deallocate(GetAddressFromHandle(handle)); +#ifdef FAIRMQ_DEBUG_MODE + boost::interprocess::scoped_lock lock(fShmMtx); + DecrementShmMsgCounter(); + RemoveMsgDebug(handle); +#endif + } + + char* ShrinkInPlace(size_t oldSize, size_t newSize, char* localPtr) + { + using namespace boost::interprocess; + managed_shared_memory::size_type shrunkSize = newSize; + return fSegments.at(0).allocation_command(shrink_in_place, oldSize + 128, shrunkSize, localPtr); + } + ~Manager() { using namespace boost::interprocess; diff --git a/fairmq/shmem/Message.h b/fairmq/shmem/Message.h index 4998f1ff..1cf1e12c 100644 --- a/fairmq/shmem/Message.h +++ b/fairmq/shmem/Message.h @@ -169,7 +169,7 @@ class Message final : public fair::mq::Message if (!fLocalPtr) { if (fMeta.fRegionId == 0) { if (fMeta.fSize > 0) { - fLocalPtr = reinterpret_cast(fManager.Segment().get_address_from_handle(fMeta.fHandle)); + fLocalPtr = reinterpret_cast(fManager.GetAddressFromHandle(fMeta.fHandle)); } else { fLocalPtr = nullptr; } @@ -189,15 +189,14 @@ class Message final : public fair::mq::Message size_t GetSize() const override { return fMeta.fSize; } - bool SetUsedSize(const size_t size) override + bool SetUsedSize(const size_t newSize) override { - if (size == fMeta.fSize) { + if (newSize == fMeta.fSize) { return true; - } else if (size <= fMeta.fSize) { + } else if (newSize <= fMeta.fSize) { try { - boost::interprocess::managed_shared_memory::size_type shrunkSize = size; - fLocalPtr = fManager.Segment().allocation_command(boost::interprocess::shrink_in_place, fMeta.fSize + 128, shrunkSize, fLocalPtr); - fMeta.fSize = size; + fLocalPtr = fManager.ShrinkInPlace(fMeta.fSize, newSize, fLocalPtr); + fMeta.fSize = newSize; return true; } catch (boost::interprocess::interprocess_exception& e) { LOG(info) << "could not set used size: " << e.what(); @@ -245,59 +244,21 @@ class Message final : public fair::mq::Message mutable Region* fRegionPtr; mutable char* fLocalPtr; - bool InitializeChunk(const size_t size, size_t alignment = 0) + char* InitializeChunk(const size_t size, size_t alignment = 0) { - // tools::RateLimiter rateLimiter(20); - - while (fMeta.fHandle < 0) { - try { - // boost::interprocess::managed_shared_memory::size_type actualSize = size; - // char* hint = 0; // unused for boost::interprocess::allocate_new - // fLocalPtr = fManager.Segment().allocation_command(boost::interprocess::allocate_new, size, actualSize, hint); - size_t segmentSize = fManager.Segment().get_size(); - if (size > segmentSize) { - throw MessageBadAlloc(tools::ToString("Requested message size (", size, ") exceeds segment size (", segmentSize, ")")); - } - if (alignment == 0) { - fLocalPtr = reinterpret_cast(fManager.Segment().allocate(size)); - } else { - fLocalPtr = reinterpret_cast(fManager.Segment().allocate_aligned(size, alignment)); - } - } catch (boost::interprocess::bad_alloc& ba) { - // LOG(warn) << "Shared memory full..."; - if (fManager.ThrowingOnBadAlloc()) { - throw MessageBadAlloc(tools::ToString("shmem: could not create a message of size ", size, ", alignment: ", (alignment != 0) ? std::to_string(alignment) : "default", ", free memory: ", fManager.Segment().get_free_memory())); - } - // rateLimiter.maybe_sleep(); - std::this_thread::sleep_for(std::chrono::milliseconds(50)); - if (fManager.Interrupted()) { - return false; - } else { - continue; - } - } - fMeta.fHandle = fManager.Segment().get_handle_from_address(fLocalPtr); -#ifdef FAIRMQ_DEBUG_MODE - boost::interprocess::scoped_lock lock(fManager.GetMtx()); - fManager.IncrementShmMsgCounter(); - fManager.AddMsgDebug(getpid(), size, static_cast(fMeta.fHandle), std::chrono::system_clock::now().time_since_epoch().count()); -#endif + fLocalPtr = fManager.Allocate(size, alignment); + if (fLocalPtr) { + fMeta.fHandle = fManager.GetHandleFromAddress(fLocalPtr); + fMeta.fSize = size; } - - fMeta.fSize = size; - return true; + return fLocalPtr; } void CloseMessage() { if (fMeta.fHandle >= 0 && !fQueued) { if (fMeta.fRegionId == 0) { - fManager.Segment().deallocate(fManager.Segment().get_address_from_handle(fMeta.fHandle)); -#ifdef FAIRMQ_DEBUG_MODE - boost::interprocess::scoped_lock lock(fManager.GetMtx()); - fManager.DecrementShmMsgCounter(); - fManager.RemoveMsgDebug(fMeta.fHandle); -#endif + fManager.Deallocate(fMeta.fHandle); fMeta.fHandle = -1; } else { if (!fRegionPtr) { @@ -311,6 +272,8 @@ class Message final : public fair::mq::Message } } } + fLocalPtr = nullptr; + fMeta.fSize = 0; fManager.DecrementMsgCounter(); }