/******************************************************************************** * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * * * This software is distributed under the terms of the * * GNU Lesser General Public Licence (LGPL) version 3, * * copied verbatim in the file "LICENSE" * ********************************************************************************/ #ifndef FAIR_MQ_SHMEM_COMMON_H_ #define FAIR_MQ_SHMEM_COMMON_H_ #include #include #include // std::equal_to #include #include #include #include #include #include #include #include #include #include #include #include #include namespace fair::mq::shmem { static constexpr uint64_t kManagementSegmentSize = 6553600; struct SharedMemoryError : std::runtime_error { using std::runtime_error::runtime_error; }; using SimpleSeqFitSegment = boost::interprocess::basic_managed_shared_memory>, boost::interprocess::null_index>; // boost::interprocess::iset_index>; using RBTreeBestFitSegment = boost::interprocess::basic_managed_shared_memory>, boost::interprocess::null_index>; // boost::interprocess::iset_index>; inline std::string MakeShmName(const std::string& shmId, const std::string& type) { return std::string("fmq_" + shmId + "_" + type); } inline std::string MakeShmName(const std::string& shmId, const std::string& type, int index) { return std::string(MakeShmName(shmId, type) + "_" + std::to_string(index)); } struct RefCount { explicit RefCount(uint16_t c) : count(c) {} uint16_t Get() { return count.load(); } uint16_t Increment() { return count.fetch_add(1); } uint16_t Decrement() { return count.fetch_sub(1); } std::atomic count; }; // Number of nodes allocated at once when the allocator runs out of nodes. static constexpr size_t numNodesPerBlock = 4096; // Maximum number of totally free blocks that the adaptive node pool will hold. // The rest of the totally free blocks will be deallocated with the segment manager. static constexpr size_t maxFreeBlocks = 2; using RefCountPool = boost::interprocess::adaptive_pool; using SegmentManager = boost::interprocess::managed_shared_memory::segment_manager; using VoidAlloc = boost::interprocess::allocator; using CharAlloc = boost::interprocess::allocator; using Str = boost::interprocess::basic_string, CharAlloc>; using StrAlloc = boost::interprocess::allocator; using StrVector = boost::interprocess::vector; // ShmHeader stores user buffer alignment and the reference count in the following structure: // [HdrOffset(uint16_t)][Hdr alignment][Hdr][user buffer alignment][user buffer] // The alignment of Hdr depends on the alignment of std::atomic and is stored in the first entry struct ShmHeader { struct Hdr { uint16_t userOffset; std::atomic refCount; }; static Hdr* HdrPtr(char* ptr) { // [HdrOffset(uint16_t)][Hdr alignment][Hdr][user buffer alignment][user buffer] // ^ return reinterpret_cast(ptr + sizeof(uint16_t) + *(reinterpret_cast(ptr))); } static uint16_t HdrPartSize() // [HdrOffset(uint16_t)][Hdr alignment][Hdr] { // [HdrOffset(uint16_t)][Hdr alignment][Hdr][user buffer alignment][user buffer] // <---------------------------------------> return sizeof(uint16_t) + alignof(Hdr) + sizeof(Hdr); } static std::atomic& RefCountPtr(char* ptr) { // get the ref count ptr from the Hdr return HdrPtr(ptr)->refCount; } static uint16_t UserOffset(char* ptr) { return HdrPartSize() + HdrPtr(ptr)->userOffset; } static char* UserPtr(char* ptr) { // [HdrOffset(uint16_t)][Hdr alignment][Hdr][user buffer alignment][user buffer] // ^ return ptr + HdrPartSize() + HdrPtr(ptr)->userOffset; } static uint16_t RefCount(char* ptr) { return RefCountPtr(ptr).load(); } static uint16_t IncrementRefCount(char* ptr) { return RefCountPtr(ptr).fetch_add(1); } static uint16_t DecrementRefCount(char* ptr) { return RefCountPtr(ptr).fetch_sub(1); } static size_t FullSize(size_t size, size_t alignment) { // [HdrOffset(uint16_t)][Hdr alignment][Hdr][user buffer alignment][user buffer] // <---------------------------------------------------------------------------> return HdrPartSize() + alignment + size; } static void Construct(char* ptr, size_t alignment) { // place the Hdr in the aligned location, fill it and store its offset to HdrOffset // the address alignment should be at least 2 assert(reinterpret_cast(ptr) % 2 == 0); // offset to the beginning of the Hdr. store it in the beginning uint16_t hdrOffset = alignof(Hdr) - ((reinterpret_cast(ptr) + sizeof(uint16_t)) % alignof(Hdr)); memcpy(ptr, &hdrOffset, sizeof(hdrOffset)); // offset to the beginning of the user buffer, store in Hdr together with the ref count uint16_t userOffset = alignment - ((reinterpret_cast(ptr) + HdrPartSize()) % alignment); new(ptr + sizeof(uint16_t) + hdrOffset) Hdr{ userOffset, std::atomic(1) }; } static void Destruct(char* ptr) { RefCountPtr(ptr).~atomic(); } }; struct MetaHeader { size_t fSize; // size of the shm buffer size_t fHint; // user-defined value, given by the user on message creation and returned to the user on "buffer no longer needed"-callbacks boost::interprocess::managed_shared_memory::handle_t fHandle; // handle to shm buffer, convertible to shm buffer ptr mutable boost::interprocess::managed_shared_memory::handle_t fShared; // handle to the buffer storing the ref count for shared buffers uint16_t fRegionId; // id of the unmanaged region mutable uint16_t fSegmentId; // id of the managed segment bool fManaged; // true = managed segment, false = unmanaged region }; enum class AllocationAlgorithm : int { rbtree_best_fit, simple_seq_fit }; struct RegionInfo { static constexpr uint64_t DefaultRcSegmentSize = 10000000; RegionInfo(const char* path, int flags, uint64_t userFlags, uint64_t size, uint64_t rcSegmentSize, const VoidAlloc& alloc) : fPath(path, alloc) , fCreationFlags(flags) , fUserFlags(userFlags) , fSize(size) , fRCSegmentSize(rcSegmentSize) , fDestroyed(false) {} RegionInfo(const VoidAlloc& alloc) : RegionInfo("", 0, 0, 0, DefaultRcSegmentSize, alloc) {} RegionInfo(const char* path, int flags, uint64_t userFlags, uint64_t size, const VoidAlloc& alloc) : RegionInfo(path, flags, userFlags, size, DefaultRcSegmentSize, alloc) {} Str fPath; int fCreationFlags; uint64_t fUserFlags; uint64_t fSize; uint64_t fRCSegmentSize; bool fDestroyed; }; using Uint16RegionInfoPairAlloc = boost::interprocess::allocator, SegmentManager>; using Uint16RegionInfoMap = boost::interprocess::map, Uint16RegionInfoPairAlloc>; using Uint16RegionInfoHashMap = boost::unordered_map, std::equal_to, Uint16RegionInfoPairAlloc>; struct SegmentInfo { SegmentInfo(AllocationAlgorithm aa) : fAllocationAlgorithm(aa) {} AllocationAlgorithm fAllocationAlgorithm; }; struct SessionInfo { SessionInfo(const char* sessionName, int creatorId, const VoidAlloc& alloc) : fSessionName(sessionName, alloc) , fCreatorId(creatorId) {} Str fSessionName; int fCreatorId; }; using Uint16SegmentInfoPairAlloc = boost::interprocess::allocator, SegmentManager>; using Uint16SegmentInfoHashMap = boost::unordered_map, std::equal_to, Uint16SegmentInfoPairAlloc>; // using Uint16SegmentInfoMap = boost::interprocess::map, Uint16SegmentInfoPairAlloc>; struct DeviceCounter { DeviceCounter(unsigned int c) : fCount(c) {} std::atomic fCount; }; struct EventCounter { EventCounter(uint64_t c) : fCount(c) {} std::atomic fCount; }; struct Heartbeat { Heartbeat(uint64_t c) : fCount(c) {} std::atomic fCount; }; struct RegionCounter { RegionCounter(uint16_t c) : fCount(c) {} std::atomic fCount; }; #ifdef FAIRMQ_DEBUG_MODE struct MsgCounter { MsgCounter() : fCount(0) {} MsgCounter(unsigned int c) : fCount(c) {} std::atomic fCount; }; using Uint16MsgCounterPairAlloc = boost::interprocess::allocator, SegmentManager>; using Uint16MsgCounterHashMap = boost::unordered_map, std::equal_to, Uint16MsgCounterPairAlloc>; struct MsgDebug { MsgDebug() : fPid(0) , fSize(0) , fCreationTime(0) {} MsgDebug(pid_t pid, size_t size, const uint64_t creationTime) : fPid(pid) , fSize(size) , fCreationTime(creationTime) {} pid_t fPid; size_t fSize; uint64_t fCreationTime; }; using SizetMsgDebugPairAlloc = boost::interprocess::allocator, SegmentManager>; // using SizetMsgDebugHashMap = boost::unordered_map, std::equal_to, SizetMsgDebugPairAlloc>; using SizetMsgDebugMap = boost::interprocess::map, SizetMsgDebugPairAlloc>; using Uint16MsgDebugMapPairAlloc = boost::interprocess::allocator, SegmentManager>; using Uint16MsgDebugMapHashMap = boost::unordered_map, std::equal_to, Uint16MsgDebugMapPairAlloc>; #endif struct RegionBlock { RegionBlock() = default; RegionBlock(boost::interprocess::managed_shared_memory::handle_t handle, size_t size, size_t hint) : fHandle(handle) , fSize(size) , fHint(hint) {} boost::interprocess::managed_shared_memory::handle_t fHandle{}; size_t fSize = 0; size_t fHint = 0; }; // find id for unique shmem name: // a hash of user id + session id, truncated to 8 characters (to accommodate for name size limit on some systems (MacOS)). std::string makeShmIdStr(const std::string& sessionId, const std::string& userId); std::string makeShmIdStr(const std::string& sessionId); std::string makeShmIdStr(uint64_t val); uint64_t makeShmIdUint64(const std::string& sessionId); struct SegmentBufferShrink { SegmentBufferShrink(const size_t _new_size, char* _local_ptr) : new_size(_new_size) , local_ptr(_local_ptr) {} template char* operator()(S& s) const { boost::interprocess::managed_shared_memory::size_type shrunk_size = new_size; return s.template allocation_command(boost::interprocess::shrink_in_place, new_size + 128, shrunk_size, local_ptr); } const size_t new_size; mutable char* local_ptr; }; } // namespace fair::mq::shmem #endif /* FAIR_MQ_SHMEM_COMMON_H_ */