diff --git a/fairmq/shmem/Common.h b/fairmq/shmem/Common.h index 4026fbc9..bd9d5a31 100644 --- a/fairmq/shmem/Common.h +++ b/fairmq/shmem/Common.h @@ -21,7 +21,7 @@ #include #include #include -#include +#include #include @@ -292,73 +292,7 @@ std::string makeShmIdStr(const std::string& sessionId); std::string makeShmIdStr(uint64_t val); uint64_t makeShmIdUint64(const std::string& sessionId); - -struct SegmentSize : public boost::static_visitor -{ - template - size_t operator()(S& s) const { return s.get_size(); } -}; - -struct SegmentAddress : public boost::static_visitor -{ - template - void* operator()(S& s) const { return s.get_address(); } -}; - -struct SegmentMemoryZeroer : public boost::static_visitor<> -{ - template - void operator()(S& s) const { s.zero_free_memory(); } -}; - -struct SegmentFreeMemory : public boost::static_visitor -{ - template - size_t operator()(S& s) const { return s.get_free_memory(); } -}; - -struct SegmentHandleFromAddress : public boost::static_visitor -{ - SegmentHandleFromAddress(const void* _ptr) : ptr(_ptr) {} - - template - boost::interprocess::managed_shared_memory::handle_t operator()(S& s) const { return s.get_handle_from_address(ptr); } - - const void* ptr; -}; - -struct SegmentAddressFromHandle : public boost::static_visitor -{ - SegmentAddressFromHandle(const boost::interprocess::managed_shared_memory::handle_t _handle) : handle(_handle) {} - - template - char* operator()(S& s) const { return reinterpret_cast(s.get_address_from_handle(handle)); } - - const boost::interprocess::managed_shared_memory::handle_t handle; -}; - -struct SegmentAllocate : public boost::static_visitor -{ - SegmentAllocate(const size_t _size) : size(_size) {} - - template - char* operator()(S& s) const { return reinterpret_cast(s.allocate(size)); } - - const size_t size; -}; - -struct SegmentAllocateAligned : public boost::static_visitor -{ - SegmentAllocateAligned(const size_t _size, const size_t _alignment) : size(_size), alignment(_alignment) {} - - template - void* operator()(S& s) const { return s.allocate_aligned(size, alignment); } - - const size_t size; - const size_t alignment; -}; - -struct SegmentBufferShrink : public boost::static_visitor +struct SegmentBufferShrink { SegmentBufferShrink(const size_t _new_size, char* _local_ptr) : new_size(_new_size) @@ -376,15 +310,28 @@ struct SegmentBufferShrink : public boost::static_visitor mutable char* local_ptr; }; -struct SegmentDeallocate : public boost::static_visitor<> -{ - SegmentDeallocate(char* _ptr) : ptr(_ptr) {} +// struct SegmentWrapper +// { +// SegmentWrapper(boost::variant&& _segment) +// : segment(std::move(_segment)) +// , refCountPool(nullptr) +// {} - template - void operator()(S& s) const { return s.deallocate(ptr); } +// void InitRefCountPoolSSF() +// { +// refCountPool = std::make_unique>( +// RefCountPoolSSF(boost::get(segment).get_segment_manager())); +// } - char* ptr; -}; +// void InitRefCountPoolRBT() +// { +// refCountPool = std::make_unique>( +// RefCountPoolRBT(boost::get(segment).get_segment_manager())); +// } + +// boost::variant segment; +// std::unique_ptr> refCountPool; +// }; } // namespace fair::mq::shmem diff --git a/fairmq/shmem/Manager.h b/fairmq/shmem/Manager.h index ec325eb5..fae96108 100644 --- a/fairmq/shmem/Manager.h +++ b/fairmq/shmem/Manager.h @@ -24,7 +24,6 @@ #include #include #include -#include #include // max #include @@ -42,6 +41,7 @@ #include #include #include // pair +#include #include #include // getuid @@ -193,8 +193,8 @@ class Manager } } LOG(debug) << (createdSegment ? "Created" : "Opened") << " managed shared memory segment " << "fmq_" << fShmId << "_m_" << fSegmentId - << ". Size: " << boost::apply_visitor(SegmentSize(), fSegments.at(fSegmentId)) << " bytes." - << " Available: " << boost::apply_visitor(SegmentFreeMemory(), fSegments.at(fSegmentId)) << " bytes." + << ". Size: " << std::visit([](auto& s) { return s.get_size(); }, fSegments.at(fSegmentId)) << " bytes." + << " Available: " << std::visit([](auto& s) { return s.get_free_memory(); }, fSegments.at(fSegmentId)) << " bytes." << " Allocation algorithm: " << allocationAlgorithm; } catch (interprocess_exception& bie) { LOG(error) << "Failed to create/open shared memory segment '" << "fmq_" << fShmId << "_m_" << fSegmentId << "': " << bie.what(); @@ -232,14 +232,16 @@ class Manager void ZeroSegment(uint16_t id) { LOG(debug) << "Zeroing the managed segment free memory..."; - boost::apply_visitor(SegmentMemoryZeroer(), fSegments.at(id)); + std::visit([](auto& s) { return s.zero_free_memory(); }, fSegments.at(id)); LOG(debug) << "Successfully zeroed the managed segment free memory."; } void MlockSegment(uint16_t id) { LOG(debug) << "Locking the managed segment memory pages..."; - if (mlock(boost::apply_visitor(SegmentAddress(), fSegments.at(id)), boost::apply_visitor(SegmentSize(), fSegments.at(id))) == -1) { + if (mlock( + std::visit([](auto& s) { return s.get_address(); }, fSegments.at(id)), + std::visit([](auto& s) { return s.get_size(); }, fSegments.at(id))) == -1) { LOG(error) << "Could not lock the managed segment memory. Code: " << errno << ", reason: " << strerror(errno); throw TransportError(tools::ToString("Could not lock the managed segment memory: ", strerror(errno))); } @@ -456,8 +458,8 @@ class Manager info.managed = true; info.id = segmentId; info.event = RegionEvent::created; - info.ptr = boost::apply_visitor(SegmentAddress(), fSegments.at(segmentId)); - info.size = boost::apply_visitor(SegmentSize(), fSegments.at(segmentId)); + info.ptr = std::visit([](auto& s) { return s.get_address(); }, fSegments.at(segmentId)); + info.size = std::visit([](auto& s) { return s.get_size(); }, fSegments.at(segmentId)); result.push_back(info); } catch (const std::out_of_range& oor) { LOG(error) << "could not find segment with id " << segmentId; @@ -651,11 +653,11 @@ class Manager boost::interprocess::managed_shared_memory::handle_t GetHandleFromAddress(const void* ptr, uint16_t segmentId) const { - return boost::apply_visitor(SegmentHandleFromAddress(ptr), fSegments.at(segmentId)); + return std::visit([ptr](auto& s) { return s.get_handle_from_address(ptr); }, fSegments.at(segmentId)); } char* GetAddressFromHandle(const boost::interprocess::managed_shared_memory::handle_t handle, uint16_t segmentId) const { - return boost::apply_visitor(SegmentAddressFromHandle(handle), fSegments.at(segmentId)); + return std::visit([handle](auto& s) { return reinterpret_cast(s.get_address_from_handle(handle)); }, fSegments.at(segmentId)); } char* Allocate(size_t size, size_t alignment = 0) @@ -668,24 +670,32 @@ class Manager while (!ptr) { try { - size_t segmentSize = boost::apply_visitor(SegmentSize(), fSegments.at(fSegmentId)); + size_t segmentSize = std::visit([](auto& s) { return s.get_size(); }, fSegments.at(fSegmentId)); if (fullSize > segmentSize) { throw MessageBadAlloc(tools::ToString("Requested message size (", fullSize, ") exceeds segment size (", segmentSize, ")")); } - ptr = boost::apply_visitor(SegmentAllocate{fullSize}, fSegments.at(fSegmentId)); + ptr = std::visit([fullSize](auto& s) { return reinterpret_cast(s.allocate(fullSize)); }, fSegments.at(fSegmentId)); ShmHeader::Construct(ptr, alignment); } catch (boost::interprocess::bad_alloc& ba) { // LOG(warn) << "Shared memory full..."; if (fBadAllocMaxAttempts >= 0 && ++numAttempts >= fBadAllocMaxAttempts) { - throw MessageBadAlloc(tools::ToString("shmem: could not create a message of size ", size, ", alignment: ", (alignment != 0) ? std::to_string(alignment) : "default", ", free memory: ", boost::apply_visitor(SegmentFreeMemory(), fSegments.at(fSegmentId)))); + throw MessageBadAlloc(tools::ToString("shmem: could not create a message of size ", size, + ", alignment: ", (alignment != 0) ? std::to_string(alignment) : "default", + ", free memory: ", std::visit([](auto& s) { return s.get_free_memory(); }, fSegments.at(fSegmentId)))); } if (numAttempts == 1 && fBadAllocMaxAttempts > 1) { - LOG(warn) << tools::ToString("shmem: could not create a message of size ", size, ", alignment: ", (alignment != 0) ? std::to_string(alignment) : "default", ", free memory: ", boost::apply_visitor(SegmentFreeMemory(), fSegments.at(fSegmentId)), ". Will try ", (fBadAllocMaxAttempts > 1 ? (std::to_string(fBadAllocMaxAttempts - 1)) + " more times" : " until success"), ", in ", fBadAllocAttemptIntervalInMs, "ms intervals"); + LOG(warn) << tools::ToString("shmem: could not create a message of size ", size, + ", alignment: ", (alignment != 0) ? std::to_string(alignment) : "default", + ", free memory: ", std::visit([](auto& s) { return s.get_free_memory(); }, fSegments.at(fSegmentId)), + ". Will try ", (fBadAllocMaxAttempts > 1 ? (std::to_string(fBadAllocMaxAttempts - 1)) + " more times" : " until success"), + ", in ", fBadAllocAttemptIntervalInMs, "ms intervals"); } std::this_thread::sleep_for(std::chrono::milliseconds(fBadAllocAttemptIntervalInMs)); if (Interrupted()) { - throw MessageBadAlloc(tools::ToString("shmem: could not create a message of size ", size, ", alignment: ", (alignment != 0) ? std::to_string(alignment) : "default", ", free memory: ", boost::apply_visitor(SegmentFreeMemory(), fSegments.at(fSegmentId)))); + throw MessageBadAlloc(tools::ToString("shmem: could not create a message of size ", size, + ", alignment: ", (alignment != 0) ? std::to_string(alignment) : "default", + ", free memory: ", std::visit([](auto& s) { return s.get_free_memory(); }, fSegments.at(fSegmentId)))); } else { continue; } @@ -719,12 +729,12 @@ class Manager } #endif ShmHeader::Destruct(ptr); - boost::apply_visitor(SegmentDeallocate(ptr), fSegments.at(segmentId)); + std::visit([ptr](auto& s) { s.deallocate(ptr); }, fSegments.at(segmentId)); } char* ShrinkInPlace(size_t newSize, char* localPtr, uint16_t segmentId) { - return boost::apply_visitor(SegmentBufferShrink(newSize, localPtr), fSegments.at(segmentId)); + return std::visit(SegmentBufferShrink(newSize, localPtr), fSegments.at(segmentId)); } uint16_t GetSegmentId() const { return fSegmentId; } @@ -772,7 +782,7 @@ class Manager uint64_t fShmId64; std::string fShmId; uint16_t fSegmentId; - std::unordered_map> fSegments; // TODO: refactor to use Segment class + std::unordered_map> fSegments; // TODO: refactor to use Segment class boost::interprocess::managed_shared_memory fManagementSegment; // TODO: refactor to use ManagementSegment class VoidAlloc fShmVoidAlloc; boost::interprocess::interprocess_mutex* fShmMtx; diff --git a/fairmq/shmem/Monitor.cxx b/fairmq/shmem/Monitor.cxx index 35b23c59..06207d04 100644 --- a/fairmq/shmem/Monitor.cxx +++ b/fairmq/shmem/Monitor.cxx @@ -30,6 +30,7 @@ #include #include #include +#include #include @@ -184,7 +185,7 @@ bool Monitor::PrintShm(const ShmId& shmId) VoidAlloc allocInstance(managementSegment.get_segment_manager()); Uint16SegmentInfoHashMap* shmSegments = managementSegment.find(unique_instance).first; - std::unordered_map> segments; + std::unordered_map> segments; Uint16RegionInfoHashMap* shmRegions = managementSegment.find(unique_instance).first; @@ -234,8 +235,8 @@ bool Monitor::PrintShm(const ShmId& shmId) << ", managed segments:\n"; for (const auto& s : segments) { - size_t free = boost::apply_visitor(SegmentFreeMemory(), s.second); - size_t total = boost::apply_visitor(SegmentSize(), s.second); + size_t free = std::visit([](auto& s){ return s.get_free_memory(); }, s.second); + size_t total = std::visit([](auto& s){ return s.get_size(); }, s.second); size_t used = total - free; std::string msgCount; diff --git a/fairmq/shmem/Segment.h b/fairmq/shmem/Segment.h index 1d5f2e6c..fc8eabf1 100644 --- a/fairmq/shmem/Segment.h +++ b/fairmq/shmem/Segment.h @@ -11,10 +11,9 @@ #include #include -#include - #include #include +#include namespace fair::mq::shmem { @@ -44,12 +43,12 @@ struct Segment Register(shmId, id, AllocationAlgorithm::rbtree_best_fit); } - size_t GetSize() const { return boost::apply_visitor(SegmentSize(), fSegment); } - void* GetData() { return boost::apply_visitor(SegmentAddress(), fSegment); } + size_t GetSize() const { return std::visit([](auto& s){ return s.get_size(); }, fSegment); } + void* GetData() { return std::visit([](auto& s){ return s.get_address(); }, fSegment); } - size_t GetFreeMemory() const { return boost::apply_visitor(SegmentFreeMemory(), fSegment); } + size_t GetFreeMemory() const { return std::visit([](auto& s){ return s.get_free_memory(); }, fSegment); } - void Zero() { boost::apply_visitor(SegmentMemoryZeroer(), fSegment); } + void Zero() { std::visit([](auto& s){ return s.zero_free_memory(); }, fSegment); } void Lock() { if (mlock(GetData(), GetSize()) == -1) { @@ -63,7 +62,7 @@ struct Segment } private: - boost::variant fSegment; + std::variant fSegment; static void Register(const std::string& shmId, uint16_t id, AllocationAlgorithm allocAlgo) {